From 446cec49e1aa75c4258c07994d0c5ae18ea0c128 Mon Sep 17 00:00:00 2001 From: "mengji.fy" Date: Fri, 17 Mar 2017 18:33:34 +0800 Subject: [PATCH 001/253] [FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment --- .../flink/client/program/ContextEnvironment.java | 11 +++++++++++ .../api/environment/StreamContextEnvironment.java | 8 +------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 1ef94cedf2032..91815e1673a33 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -44,6 +46,8 @@ public class ContextEnvironment extends ExecutionEnvironment { protected final ClassLoader userCodeClassLoader; protected final SavepointRestoreSettings savepointSettings; + + private final int defaultParallelism; public ContextEnvironment(ClusterClient remoteConnection, List jarFiles, List classpaths, ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) { @@ -52,6 +56,9 @@ public ContextEnvironment(ClusterClient remoteConnection, List jarFiles, Li this.classpathsToAttach = classpaths; this.userCodeClassLoader = userCodeClassLoader; this.savepointSettings = savepointSettings; + this.defaultParallelism = GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM); } @Override @@ -104,6 +111,10 @@ public SavepointRestoreSettings getSavepointRestoreSettings() { return savepointSettings; } + public int getDefaultParallelism() { + return this.defaultParallelism; + } + // -------------------------------------------------------------------------------------------- static void setAsContext(ContextEnvironmentFactory factory) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 49c53478c2d9c..55418133496f0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -21,12 +21,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.DetachedEnvironment; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.util.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +38,7 @@ protected StreamContextEnvironment(ContextEnvironment ctx) { if (ctx.getParallelism() > 0) { setParallelism(ctx.getParallelism()); } else { - setParallelism(GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); + setParallelism(ctx.getDefaultParallelism()); } } From 598536ec75411b5afc547d42779658f76e1ba34b Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Sat, 25 Feb 2017 12:19:43 +0800 Subject: [PATCH 002/253] [FLINK-5904] Make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode This closes #3414. --- .../apache/flink/configuration/ConfigConstants.java | 2 +- .../flink/configuration/JobManagerOptions.java | 5 +++++ .../flink/configuration/TaskManagerOptions.java | 5 +++++ .../flink/yarn/AbstractYarnClusterDescriptor.java | 13 +++++++++++-- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index de06b59411fa8..a035beb9a2dc7 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -113,7 +113,7 @@ public final class ConfigConstants { public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay"; // -------------------------------- Runtime ------------------------------- - + /** * The config parameter defining the network address to connect to * for communication with the job manager. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index d9c9d1815485a..2534ddf75cc11 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -57,6 +57,11 @@ public class JobManagerOptions { key("jobmanager.rpc.port") .defaultValue(6123); + /** JVM heap size (in megabytes) for the JobManager */ + public static final ConfigOption JOB_MANAGER_HEAP_MEMORY = + key("jobmanager.heap.mb") + .defaultValue(1024); + /** * The maximum number of prior execution attempts kept in history. */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index adfc8e96288e9..9a00a0ff5b60a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -39,6 +39,11 @@ public class TaskManagerOptions { key("taskmanager.jvm-exit-on-oom") .defaultValue(false); + /** JVM heap size (in megabytes) for the TaskManagers */ + public static final ConfigOption TASK_MANAGER_HEAP_MEMORY = + key("taskmanager.heap.mb") + .defaultValue(1024); + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */ public static final ConfigOption MEMORY_SEGMENT_SIZE = key("taskmanager.memory.segment-size") diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index b383b59c1de5e..9cb80aa21d5c1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -26,10 +26,12 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -113,9 +115,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor */ private int slots = -1; - private int jobManagerMemoryMb = 1024; + private int jobManagerMemoryMb = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue(); - private int taskManagerMemoryMb = 1024; + private int taskManagerMemoryMb = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.defaultValue(); private int taskManagerCount = 1; @@ -166,6 +168,13 @@ public AbstractYarnClusterDescriptor() { flinkConfigurationPath = new Path(confFile.getAbsolutePath()); slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + + if (flinkConfiguration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { + jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); + } + if (flinkConfiguration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) { + taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); + } } catch (Exception e) { LOG.debug("Config couldn't be loaded from environment variable."); } From 3b5dc9dbec8f2542929de6b9eafb7053a1786531 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 18 Apr 2017 17:24:29 +0200 Subject: [PATCH 003/253] [FLINK-5904] Use proper ConfigOption syntax to retrieve config values --- .../flink/configuration/JobManagerOptions.java | 4 +++- .../flink/yarn/AbstractYarnClusterDescriptor.java | 15 ++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 2534ddf75cc11..d129405e06a60 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -57,7 +57,9 @@ public class JobManagerOptions { key("jobmanager.rpc.port") .defaultValue(6123); - /** JVM heap size (in megabytes) for the JobManager */ + /** + * JVM heap size (in megabytes) for the JobManager + */ public static final ConfigOption JOB_MANAGER_HEAP_MEMORY = key("jobmanager.heap.mb") .defaultValue(1024); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 9cb80aa21d5c1..ec7af5a811eae 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -31,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -168,15 +168,12 @@ public AbstractYarnClusterDescriptor() { flinkConfigurationPath = new Path(confFile.getAbsolutePath()); slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - - if (flinkConfiguration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { - jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); - } - if (flinkConfiguration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) { - taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); - } + + jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); + taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); + } catch (Exception e) { - LOG.debug("Config couldn't be loaded from environment variable."); + LOG.debug("Config couldn't be loaded from environment variable.", e); } } From f27bfe37d6e519e6fd6c3c601b26d774028ec2cd Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 13 Apr 2017 14:07:29 -0400 Subject: [PATCH 004/253] [hotfix] [gelly] Driver updates - refactor SimpleDriver to call internal plan method - add CLI parameters for RMatGraph, AdamicAdar, JaccardIndex - remove unused data from VertexDegrees - JaccardIndex now filters on > rather than >= - handle null in ValueArrayTypeInfo - add NonForwardingIdentityMapper to GraphUtils --- .../java/org/apache/flink/graph/Runner.java | 14 +++-- .../flink/graph/drivers/AdamicAdar.java | 20 ++++-- .../graph/drivers/ClusteringCoefficient.java | 30 +++++---- .../graph/drivers/ConnectedComponents.java | 7 +-- .../apache/flink/graph/drivers/EdgeList.java | 5 +- .../org/apache/flink/graph/drivers/HITS.java | 9 +-- .../flink/graph/drivers/JaccardIndex.java | 27 ++++++-- .../apache/flink/graph/drivers/PageRank.java | 9 +-- .../flink/graph/drivers/SimpleDriver.java | 63 ++++++++++++++++--- .../flink/graph/drivers/TriangleListing.java | 32 ++++++---- .../flink/graph/drivers/input/RMatGraph.java | 3 + .../annotate/directed/VertexDegrees.java | 30 ++++----- .../library/link_analysis/Functions.java | 4 +- .../graph/library/link_analysis/PageRank.java | 5 ++ .../library/similarity/JaccardIndex.java | 7 ++- .../types/valuearray/ValueArrayTypeInfo.java | 13 +++- .../valuearray/ValueArrayTypeInfoFactory.java | 5 +- .../apache/flink/graph/utils/GraphUtils.java | 27 +++++++- .../library/similarity/JaccardIndexTest.java | 2 + 19 files changed, 224 insertions(+), 88 deletions(-) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java index 4b6cf42112188..5ffe681cd0fc8 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.utils.ParameterTool; @@ -188,20 +189,21 @@ private static String getAlgorithmUsage(String algorithmName) { public static void main(String[] args) throws Exception { // Set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + ExecutionConfig config = env.getConfig(); // should not have any non-Flink data types - env.getConfig().disableAutoTypeRegistration(); - env.getConfig().disableForceAvro(); - env.getConfig().disableForceKryo(); + config.disableAutoTypeRegistration(); + config.disableForceAvro(); + config.disableForceKryo(); ParameterTool parameters = ParameterTool.fromArgs(args); - env.getConfig().setGlobalJobParameters(parameters); + config.setGlobalJobParameters(parameters); // integration tests run with with object reuse both disabled and enabled if (parameters.has("__disable_object_reuse")) { - env.getConfig().disableObjectReuse(); + config.disableObjectReuse(); } else { - env.getConfig().enableObjectReuse(); + config.enableObjectReuse(); } // Usage diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java index 742c1ded1e1de..8bf9268c5a871 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java @@ -20,9 +20,11 @@ import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Print; +import org.apache.flink.graph.drivers.parameter.DoubleParameter; import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.library.similarity.AdamicAdar.Result; import org.apache.flink.types.CopyableValue; @@ -33,8 +35,16 @@ * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}. */ public class AdamicAdar, VV, EV> -extends SimpleDriver> -implements Driver, CSV, Print { +extends SimpleDriver> +implements CSV, Print { + + private DoubleParameter minRatio = new DoubleParameter(this, "minimum_ratio") + .setDefaultValue(0.0) + .setMinimumValue(0.0, true); + + private DoubleParameter minScore = new DoubleParameter(this, "minimum_score") + .setDefaultValue(0.0) + .setMinimumValue(0.0, true); private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); @@ -61,11 +71,13 @@ public String getLongDescription() { } @Override - public void plan(Graph graph) throws Exception { + protected DataSet> simplePlan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); - result = graph + return graph .run(new org.apache.flink.graph.library.similarity.AdamicAdar() + .setMinimumRatio(minRatio.getValue().floatValue()) + .setMinimumScore(minScore.getValue().floatValue()) .setLittleParallelism(lp)); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java index c463c0a2d4585..4958b5afd8ebb 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.asm.result.PrintableResult; @@ -43,8 +44,8 @@ * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient */ public class ClusteringCoefficient & CopyableValue, VV, EV> -extends SimpleDriver -implements Driver, CSV, Hash, Print { +extends SimpleDriver +implements CSV, Hash, Print { private static final String DIRECTED = "directed"; @@ -85,15 +86,11 @@ public String getLongDescription() { } @Override - public void plan(Graph graph) throws Exception { + protected DataSet simplePlan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); switch (order.getValue()) { case DIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() - .setLittleParallelism(lp)); - globalClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() .setLittleParallelism(lp)); @@ -101,13 +98,14 @@ public void plan(Graph graph) throws Exception { averageClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient() .setLittleParallelism(lp)); - break; - case UNDIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() + @SuppressWarnings("unchecked") + DataSet directedResult = (DataSet) (DataSet) graph + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() .setLittleParallelism(lp)); + return directedResult; + case UNDIRECTED: globalClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() .setLittleParallelism(lp)); @@ -115,7 +113,15 @@ public void plan(Graph graph) throws Exception { averageClusteringCoefficient = graph .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient() .setLittleParallelism(lp)); - break; + + @SuppressWarnings("unchecked") + DataSet undirectedResult = (DataSet) (DataSet) graph + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() + .setLittleParallelism(lp)); + return undirectedResult; + + default: + throw new RuntimeException("Unknown order: " + order); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java index 32263cfe64188..95904d82a319d 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java @@ -78,12 +78,9 @@ public void hash(String executionName) throws Exception { @Override public void print(String executionName) throws Exception { - Collect> collector = new Collect<>(); + List> results = new Collect>().run(components).execute(executionName); - // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 - List> records = collector.run(components).execute(executionName); - - for (Vertex result : records) { + for (Vertex result : results) { System.out.println(result); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java index 524e70ff55837..5da02847f0968 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java @@ -82,10 +82,7 @@ public void hash(String executionName) throws Exception { @Override public void print(String executionName) throws Exception { - Collect> collector = new Collect<>(); - - // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 - List> records = collector.run(edges).execute(executionName); + List> records = new Collect>().run(edges).execute(executionName); if (hasNullValueEdges(edges)) { for (Edge result : records) { diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java index 6081fea757b80..209cddf5add82 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Print; @@ -30,8 +31,8 @@ * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}. */ public class HITS -extends SimpleDriver> -implements Driver, CSV, Print { +extends SimpleDriver> +implements CSV, Print { private static final int DEFAULT_ITERATIONS = 10; @@ -59,8 +60,8 @@ public String getLongDescription() { } @Override - public void plan(Graph graph) throws Exception { - result = graph + protected DataSet> simplePlan(Graph graph) throws Exception { + return graph .run(new org.apache.flink.graph.library.link_analysis.HITS( iterationConvergence.getValue().iterations, iterationConvergence.getValue().convergenceThreshold)); diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java index 1c836ea9e063e..ae0d5f8ccd0ee 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Hash; @@ -34,8 +35,24 @@ * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}. */ public class JaccardIndex, VV, EV> -extends SimpleDriver> -implements Driver, CSV, Hash, Print { +extends SimpleDriver> +implements CSV, Hash, Print { + + private LongParameter minNumerator = new LongParameter(this, "minimum_numerator") + .setDefaultValue(0) + .setMinimumValue(0); + + private LongParameter minDenominator = new LongParameter(this, "minimum_denominator") + .setDefaultValue(1) + .setMinimumValue(1); + + private LongParameter maxNumerator = new LongParameter(this, "maximum_numerator") + .setDefaultValue(1) + .setMinimumValue(0); + + private LongParameter maxDenominator = new LongParameter(this, "maximum_denominator") + .setDefaultValue(1) + .setMinimumValue(1); private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); @@ -64,11 +81,13 @@ public String getLongDescription() { } @Override - public void plan(Graph graph) throws Exception { + protected DataSet> simplePlan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); - result = graph + return graph .run(new org.apache.flink.graph.library.similarity.JaccardIndex() + .setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue()) + .setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue()) .setLittleParallelism(lp)); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java index 8cef0772370eb..5d74bdbb8e3b2 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.drivers; import org.apache.commons.lang3.text.StrBuilder; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Print; @@ -30,8 +31,8 @@ * @see org.apache.flink.graph.library.link_analysis.PageRank */ public class PageRank -extends SimpleDriver> -implements Driver, CSV, Print { +extends SimpleDriver> +implements CSV, Print { private static final int DEFAULT_ITERATIONS = 10; @@ -64,8 +65,8 @@ public String getLongDescription() { } @Override - public void plan(Graph graph) throws Exception { - result = graph + protected DataSet> simplePlan(Graph graph) throws Exception { + return graph .run(new org.apache.flink.graph.library.link_analysis.PageRank( dampingFactor.getValue(), iterationConvergence.getValue().iterations, diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java index 98bdfc5374b13..5cecca1d9eb1e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.drivers; import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.dataset.ChecksumHashCode; import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.asm.dataset.Collect; @@ -33,30 +34,74 @@ * * @param algorithm's result type */ -public abstract class SimpleDriver -extends ParameterizedBase { +public abstract class SimpleDriver +extends ParameterizedBase +implements Driver { - protected DataSet result; + private DataSet result; + protected DataSet getResult() { + return result; + } + + /** + * Plan the algorithm and return the result {@link DataSet}. + * + * @param graph input graph + * @return driver output + * @throws Exception on error + */ + protected abstract DataSet simplePlan(Graph graph) throws Exception; + + @Override + public void plan(Graph graph) throws Exception { + result = simplePlan(graph); + } + + /** + * Print hash of execution results. + * + * Does *not* implement/override {@code Hash} since {@link Driver} + * implementations designate the appropriate outputs. + * + * @param executionName job name + * @throws Exception on error + */ public void hash(String executionName) throws Exception { Checksum checksum = new ChecksumHashCode() - .run((DataSet) result) + .run(result) .execute(executionName); System.out.println(checksum); } + /** + * Print execution results. + * + * Does *not* implement/override {@code Print} since {@link Driver} + * implementations designate the appropriate outputs. + * + * @param executionName job name + * @throws Exception on error + */ public void print(String executionName) throws Exception { - Collect collector = new Collect<>(); - - // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 - List records = collector.run((DataSet) result).execute(executionName); + List results = new Collect().run(result).execute(executionName); - for (R result : records) { + for (R result : results) { System.out.println(result.toPrintableString()); } } + /** + * Write execution results to file using CSV format. + * + * Does *not* implement/override {@code CSV} since {@link Driver} + * implementations designate the appropriate outputs. + * + * @param filename output filename + * @param lineDelimiter CSV delimiter between lines + * @param fieldDelimiter CSV delimiter between fields + */ public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { result .writeAsCsv(filename, lineDelimiter, fieldDelimiter) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java index ca0c1674066e5..5157b8e04c109 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; +import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAnalytic; import org.apache.flink.graph.asm.result.PrintableResult; @@ -42,8 +43,8 @@ * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus */ public class TriangleListing & CopyableValue, VV, EV> -extends SimpleDriver -implements Driver, CSV, Hash, Print { +extends SimpleDriver +implements CSV, Hash, Print { private static final String DIRECTED = "directed"; @@ -83,35 +84,40 @@ public String getLongDescription() { } @Override - public void plan(Graph graph) throws Exception { + protected DataSet simplePlan(Graph graph) throws Exception { int lp = littleParallelism.getValue().intValue(); switch (order.getValue()) { case DIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing() - .setSortTriangleVertices(sortTriangleVertices.getValue()) - .setLittleParallelism(lp)); - if (computeTriadicCensus.getValue()) { triadicCensus = graph .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus() .setLittleParallelism(lp)); } - break; - case UNDIRECTED: - result = graph - .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing() + @SuppressWarnings("unchecked") + DataSet directedResult = (DataSet) (DataSet) graph + .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing() .setSortTriangleVertices(sortTriangleVertices.getValue()) .setLittleParallelism(lp)); + return directedResult; + case UNDIRECTED: if (computeTriadicCensus.getValue()) { triadicCensus = graph .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus() .setLittleParallelism(lp)); } - break; + + @SuppressWarnings("unchecked") + DataSet undirectedResult = (DataSet) (DataSet) graph + .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing() + .setSortTriangleVertices(sortTriangleVertices.getValue()) + .setLittleParallelism(lp)); + return undirectedResult; + + default: + throw new RuntimeException("Unknown order: " + order); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java index e4e6a4cec2a1a..d64534bec3ac8 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java @@ -96,6 +96,9 @@ public class RMatGraph> .setMinimumValue(0.0, true) .setMaximumValue(2.0, true); + private LongParameter seed = new LongParameter(this, "seed") + .setDefaultValue(JDKRandomGeneratorFactory.DEFAULT_SEED); + private Simplify simplify = new Simplify(this); private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index a27ca29067c1e..f73d37bc6cca3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -21,11 +21,11 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeOrder; @@ -118,7 +118,7 @@ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { public DataSet> runInternal(Graph input) throws Exception { // s, t, bitmask - DataSet> edgesWithOrder = input.getEdges() + DataSet> vertexWithEdgeOrder = input.getEdges() .flatMap(new EmitAndFlipEdge()) .setParallelism(parallelism) .name("Emit and flip edge") @@ -128,9 +128,8 @@ public DataSet> runInternal(Graph input) .name("Reduce bitmask"); // s, d(s) - DataSet> vertexDegrees = edgesWithOrder + DataSet> vertexDegrees = vertexWithEdgeOrder .groupBy(0) - .sortGroup(1, Order.ASCENDING) .reduceGroup(new DegreeCount()) .setParallelism(parallelism) .name("Degree count"); @@ -178,22 +177,22 @@ public void flatMap(Edge value, Collector> out) * * @param ID type */ - @ForwardedFields("0; 1") + @ForwardedFields("0") private static final class ReduceBitmask - implements GroupReduceFunction, Tuple3> { + implements GroupReduceFunction, Tuple2> { + private Tuple2 output = new Tuple2<>(null, new ByteValue()); + @Override - public void reduce(Iterable> values, Collector> out) + public void reduce(Iterable> values, Collector> out) throws Exception { - Tuple3 output = null; - byte bitmask = 0; for (Tuple3 value: values) { - output = value; + output.f0 = value.f0; bitmask |= value.f2.getValue(); } - output.f2.setValue(bitmask); + output.f1.setValue(bitmask); out.collect(output); } } @@ -203,21 +202,22 @@ public void reduce(Iterable> values, Collector ID type */ + @ForwardedFields("0") private static class DegreeCount - implements GroupReduceFunction, Vertex> { + implements GroupReduceFunction, Vertex> { private Vertex output = new Vertex<>(null, new Degrees()); @Override - public void reduce(Iterable> values, Collector> out) + public void reduce(Iterable> values, Collector> out) throws Exception { long degree = 0; long outDegree = 0; long inDegree = 0; - for (Tuple3 edge : values) { + for (Tuple2 edge : values) { output.f0 = edge.f0; - byte bitmask = edge.f2.getValue(); + byte bitmask = edge.f1.getValue(); degree++; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java index 5bb2f4c338577..a7d6ef1ab11f9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java @@ -25,13 +25,15 @@ class Functions { + private Functions() {} + /** * Sum vertices' scores. * * @param ID type */ @ForwardedFields("0") - static class SumScore + protected static final class SumScore implements ReduceFunction> { @Override public Tuple2 reduce(Tuple2 left, Tuple2 right) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java index 747735eba273c..c5c4178275396 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java @@ -45,6 +45,7 @@ import org.apache.flink.graph.library.link_analysis.Functions.SumScore; import org.apache.flink.graph.library.link_analysis.PageRank.Result; import org.apache.flink.graph.utils.GraphUtils; +import org.apache.flink.graph.utils.GraphUtils.IdentityMapper; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.DoubleValue; @@ -175,6 +176,10 @@ public DataSet> runInternal(Graph input) .run(new VertexDegrees() .setParallelism(parallelism)); + // prevent Exception "The dam has been closed." in TempBarrier + // for a simplified Graph as in PageRankITCase (see FLINK-5623) + vertexDegree = vertexDegree.map(new IdentityMapper>()); + // vertex count DataSet vertexCount = GraphUtils.count(vertexDegree); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 2f4516a92be25..bc3cb86c20658 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -78,7 +78,7 @@ public class JaccardIndex, VV, EV> private int maximumScoreNumerator = 1; - private int maximumScoreDenominator = 0; + private int maximumScoreDenominator = 1; private int littleParallelism = PARALLELISM_DEFAULT; @@ -121,7 +121,7 @@ public JaccardIndex setMinimumScore(int numerator, int denominator) { } /** - * Filter out Jaccard Index scores greater than or equal to the given maximum fraction. + * Filter out Jaccard Index scores greater than the given maximum fraction. * * @param numerator numerator of the maximum score * @param denominator denominator of the maximum score @@ -253,6 +253,7 @@ public DataSet> runInternal(Graph input) * number of groups and {@link GenerateGroups} emits each edge into each group. * * @param ID type + * @param edge value type */ @ForwardedFields("0->1; 1->2") private static class GenerateGroupSpans @@ -439,7 +440,7 @@ public void reduce(Iterable> values, Collector> if (unboundedScores || (count * minimumScoreDenominator >= distinctNeighbors * minimumScoreNumerator - && count * maximumScoreDenominator < distinctNeighbors * maximumScoreNumerator)) { + && count * maximumScoreDenominator <= distinctNeighbors * maximumScoreNumerator)) { output.f0 = edge.f0; output.f1 = edge.f1; output.f2.setValue(count); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java index ee9b7708bb9fd..4ba8e394fdd99 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java @@ -30,6 +30,7 @@ import org.apache.flink.types.NullValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; +import org.apache.flink.util.Preconditions; import java.util.HashMap; import java.util.Map; @@ -54,7 +55,7 @@ public class ValueArrayTypeInfo extends TypeInformation> implem public ValueArrayTypeInfo(TypeInformation valueType) { this.valueType = valueType; - this.type = valueType.getTypeClass(); + this.type = valueType == null ? null : valueType.getTypeClass(); } @Override @@ -85,12 +86,16 @@ public boolean isTupleType() { @Override public boolean isKeyType() { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + return Comparable.class.isAssignableFrom(type); } @Override @SuppressWarnings("unchecked") public TypeSerializer> createSerializer(ExecutionConfig executionConfig) { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + if (IntValue.class.isAssignableFrom(type)) { return (TypeSerializer>) (TypeSerializer) new IntValueArraySerializer(); } else if (LongValue.class.isAssignableFrom(type)) { @@ -107,6 +112,8 @@ public TypeSerializer> createSerializer(ExecutionConfig executionC @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public TypeComparator> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + if (IntValue.class.isAssignableFrom(type)) { return (TypeComparator>) (TypeComparator) new IntValueArrayComparator(sortOrderAscending); } else if (LongValue.class.isAssignableFrom(type)) { @@ -131,6 +138,8 @@ public Map> getGenericParameters() { @Override public int hashCode() { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + return type.hashCode(); } @@ -154,6 +163,8 @@ public boolean canEqual(Object obj) { @Override public String toString() { + Preconditions.checkNotNull(type, "TypeInformation type class is required"); + return "ValueArrayType<" + type.getSimpleName() + ">"; } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java index 2145c3dc530c8..1cfb41594f281 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java @@ -36,6 +36,9 @@ public class ValueArrayTypeInfoFactory extends TypeInfoFactory> @Override public TypeInformation> createTypeInfo(Type t, Map> genericParameters) { - return new ValueArrayTypeInfo(genericParameters.get("T")); + @SuppressWarnings("unchecked") + TypeInformation> typeInfo = new ValueArrayTypeInfo(genericParameters.get("T")); + + return typeInfo; } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java index 78fb37888435a..52927518a94c0 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java @@ -22,8 +22,10 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.asm.translate.TranslateFunction; import org.apache.flink.types.LongValue; import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO; @@ -51,6 +53,7 @@ public static DataSet count(DataSet input) { * * @param element type */ + @ForwardedFields("*") public static final class IdentityMapper implements MapFunction { public T map(T value) { @@ -58,6 +61,20 @@ public T map(T value) { } } + /** + * The identity mapper returns the input as output. + * + * This does not forward fields and is used to break an operator chain. + * + * @param element type + */ + public static final class NonForwardingIdentityMapper + implements MapFunction { + public T map(T value) { + return value; + } + } + /** * Map each element to a value. * @@ -65,7 +82,7 @@ public T map(T value) { * @param output type */ public static class MapTo - implements MapFunction, ResultTypeQueryable { + implements MapFunction, ResultTypeQueryable, TranslateFunction { private final O value; /** @@ -78,7 +95,13 @@ public MapTo(O value) { } @Override - public O map(I o) throws Exception { + public O map(I input) throws Exception { + return value; + } + + @Override + public O translate(I input, O reuse) + throws Exception { return value; } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java index 128ee70d0e5c0..2443359dbefa7 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java @@ -81,6 +81,8 @@ public void testSimpleGraphWithMaximumScore() String expectedResult = "(0,1,1,4)\n" + "(0,2,1,4)\n" + + "(0,3,2,4)\n" + + "(1,2,2,4)\n" + "(1,3,1,6)\n" + "(1,4,1,3)\n" + "(1,5,1,3)\n" + From e8c1693cfe0607c9c550a996f534edfb69012cc5 Mon Sep 17 00:00:00 2001 From: godfreyhe Date: Wed, 19 Apr 2017 19:43:25 +0800 Subject: [PATCH 005/253] [FLINK-6327] [table] Bug in CommonCalc's estimateRowCount() method This closes #3740. --- .../scala/org/apache/flink/table/plan/nodes/CommonCalc.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index bc25140df725e..96a74704e6edb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -177,7 +177,7 @@ trait CommonCalc { if (calcProgram.getCondition != null) { // we reduce the result card to push filters down - (rowCnt * 0.75).min(1.0) + (rowCnt * 0.75).max(1.0) } else { rowCnt } From bcc233a32ce37ee19c3e51de1f3823c98d24f658 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 19 Apr 2017 18:01:29 +0200 Subject: [PATCH 006/253] [hotfix] [tests] Stabilize SystemProcessingTimeServiceTest --- .../tasks/SystemProcessingTimeServiceTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 50e438c796a96..fb4f0878cb8f7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -20,10 +20,11 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; - import org.apache.flink.util.TestLogger; + import org.junit.Test; +import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -178,7 +179,16 @@ public void onProcessingTime(long timestamp) throws Exception { // this should cancel our future timer.quiesceAndAwaitPending(); - assertTrue(scheduledFuture.isCancelled()); + // it may be that the cancelled status is not immediately visible after the + // termination (not necessary a volatile update), so we need to "get()" the cancellation + // exception to be on the safe side + try { + scheduledFuture.get(); + fail("scheduled future is not cancelled"); + } + catch (CancellationException ignored) { + // expected + } scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() { @Override From 220ef9af0b6fe5046ed1d9abadb1fd5abbb46d4c Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 19 Apr 2017 18:21:43 +0200 Subject: [PATCH 007/253] [hotfix] [tests] Stabilize AsyncCallsTest by ensuring delayed messages are not executed before their time --- .../rpc/akka/AkkaInvocationHandler.java | 9 ++--- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 33 +++++++++++-------- .../runtime/rpc/akka/messages/RunAsync.java | 23 ++++++------- .../flink/runtime/rpc/AsyncCallsTest.java | 6 ++-- 4 files changed, 37 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 56505f9770c65..c21accf2e5223 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -177,12 +177,13 @@ public void runAsync(Runnable runnable) { } @Override - public void scheduleRunAsync(Runnable runnable, long delay) { + public void scheduleRunAsync(Runnable runnable, long delayMillis) { checkNotNull(runnable, "runnable"); - checkArgument(delay >= 0, "delay must be zero or greater"); - + checkArgument(delayMillis >= 0, "delay must be zero or greater"); + if (isLocal) { - rpcEndpoint.tell(new RunAsync(runnable, delay), ActorRef.noSender()); + long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000); + rpcEndpoint.tell(new RunAsync(runnable, atTimeNanos), ActorRef.noSender()); } else { throw new RuntimeException("Trying to send a Runnable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 99f8211a8c1d1..86cd83eabe17e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -268,22 +268,27 @@ private void handleRunAsync(RunAsync runAsync) { runAsync.getClass().getName(), runAsync.getClass().getName()); } - else if (runAsync.getDelay() == 0) { - // run immediately - try { - runAsync.getRunnable().run(); - } catch (Throwable t) { - LOG.error("Caught exception while executing runnable in main thread.", t); - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - } - } else { - // schedule for later. send a new message after the delay, which will then be immediately executed - FiniteDuration delay = new FiniteDuration(runAsync.getDelay(), TimeUnit.MILLISECONDS); - RunAsync message = new RunAsync(runAsync.getRunnable(), 0); + final long timeToRun = runAsync.getTimeNanos(); + final long delayNanos; + + if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) { + // run immediately + try { + runAsync.getRunnable().run(); + } catch (Throwable t) { + LOG.error("Caught exception while executing runnable in main thread.", t); + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + } + } + else { + // schedule for later. send a new message after the delay, which will then be immediately executed + FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS); + RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun); - getContext().system().scheduler().scheduleOnce(delay, getSelf(), message, - getContext().dispatcher(), ActorRef.noSender()); + getContext().system().scheduler().scheduleOnce(delay, getSelf(), message, + getContext().dispatcher(), ActorRef.noSender()); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java index ce4f9d6ca9065..4b8a0b4c6542a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java @@ -18,40 +18,37 @@ package org.apache.flink.runtime.rpc.akka.messages; -import java.io.Serializable; - import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkArgument; /** * Message for asynchronous runnable invocations */ -public final class RunAsync implements Serializable { - private static final long serialVersionUID = -3080595100695371036L; +public final class RunAsync { /** The runnable to be executed. Transient, so it gets lost upon serialization */ - private final transient Runnable runnable; + private final Runnable runnable; /** The delay after which the runnable should be called */ - private final long delay; + private final long atTimeNanos; /** * Creates a new {@code RunAsync} message. * - * @param runnable The Runnable to run. - * @param delay The delay in milliseconds. Zero indicates immediate execution. + * @param runnable The Runnable to run. + * @param atTimeNanos The time (as for System.nanoTime()) when to execute the runnable. */ - public RunAsync(Runnable runnable, long delay) { - checkArgument(delay >= 0); + public RunAsync(Runnable runnable, long atTimeNanos) { + checkArgument(atTimeNanos >= 0); this.runnable = checkNotNull(runnable); - this.delay = delay; + this.atTimeNanos = atTimeNanos; } public Runnable getRunnable() { return runnable; } - public long getDelay() { - return delay; + public long getTimeNanos() { + return atTimeNanos; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 7affdb942777b..e636d6c7af448 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -23,10 +23,10 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; - import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Test; @@ -121,7 +121,7 @@ public void testScheduleWithDelay() throws Exception { final AtomicBoolean concurrentAccess = new AtomicBoolean(false); final OneShotLatch latch = new OneShotLatch(); - final long delay = 200; + final long delay = 100; TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock); testEndpoint.start(); @@ -161,7 +161,7 @@ public void run() { assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess()); assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get()); - assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay); + assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay); } // ------------------------------------------------------------------------ From f03c4b46aabf4d9790748dd2d611ee4cd6974f03 Mon Sep 17 00:00:00 2001 From: hamstah Date: Sat, 15 Apr 2017 15:34:14 +0100 Subject: [PATCH 008/253] [FLINK-5646] [docs] Document JAR upload with the REST API This closes #3722 --- docs/monitoring/rest_api.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md index a53067d26cad2..c5efcc214a083 100644 --- a/docs/monitoring/rest_api.md +++ b/docs/monitoring/rest_api.md @@ -660,6 +660,19 @@ The `savepointPath` points to the external path of the savepoint, which can be u It is possible to upload, run, and list Flink programs via the REST APIs and web frontend. +#### Upload a new JAR file + +Send a `POST` request to `/jars/upload` with your jar file sent as multi-part data under the `jarfile` file. +Also make sure that the multi-part data includes the `Content-Type` of the file itself, some http libraries do not add the header by default. + +The multi-part payload should start like + +``` +------BoundaryXXXX +Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar" +Content-Type: application/x-java-archive +``` + #### Run a Program (POST) Send a `POST` request to `/jars/:jarid/run`. The `jarid` parameter is the file name of the program JAR in the configured web frontend upload directory (configuration key `jobmanager.web.upload.dir`). From f8dc2b3cd5c93f0233e73f19ebf093cd6baa4997 Mon Sep 17 00:00:00 2001 From: tonycox Date: Fri, 27 Jan 2017 12:06:09 +0400 Subject: [PATCH 009/253] [FLINK-5481] [core] Add utility method to easily generate RowTypeInfos This closes #3127 --- .../flink/api/java/typeutils/Types.java | 72 +++++++++++++++++++ .../org/apache/flink/table/api/Types.scala | 50 +++++++++---- .../stringexpr/CalcStringExpressionTest.scala | 4 +- .../table/expressions/ArrayTypeTest.scala | 2 +- .../expressions/ScalarFunctionsTest.scala | 6 +- .../table/expressions/TemporalTypesTest.scala | 46 ++++++------ .../UserDefinedScalarFunctionTest.scala | 8 +-- .../utils/UserDefinedScalarFunctions.scala | 2 +- 8 files changed, 142 insertions(+), 48 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java new file mode 100644 index 0000000000000..5159cde1ede19 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * This class enumerates all supported types of + * the BasicTypeInfo, SqlTimeTypeInfo and RowTypeInfo for creation simplifying + */ +public class Types { + + public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; + public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; + public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; + public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; + public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; + public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + + public static final SqlTimeTypeInfo SQL_DATE = SqlTimeTypeInfo.DATE; + public static final SqlTimeTypeInfo