diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java index e099e2b203d32..f4b1ecfe80969 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java @@ -42,6 +42,8 @@ import java.text.NumberFormat; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * Driver for the library implementations of Global and Local Clustering Coefficient. * @@ -89,12 +91,15 @@ public static void main(String[] args) throws Exception { env.getConfig().enableObjectReuse(); ParameterTool parameters = ParameterTool.fromArgs(args); + if (! parameters.has("directed")) { printUsage(); return; } boolean directedAlgorithm = parameters.getBoolean("directed"); + int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); + // global and local clustering coefficient results GraphAnalytic gcc; DataSet lcc; @@ -120,14 +125,18 @@ public static void main(String[] args) throws Exception { if (directedAlgorithm) { gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); } else { gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); } } break; @@ -137,14 +146,18 @@ public static void main(String[] args) throws Exception { if (directedAlgorithm) { gcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); } else { gcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = graph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); } } break; @@ -164,51 +177,66 @@ public static void main(String[] args) throws Exception { long edgeCount = vertexCount * edgeFactor; Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setParallelism(little_parallelism) .generate(); if (directedAlgorithm) { if (scale > 32) { Graph newGraph = graph - .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); + .run(new org.apache.flink.graph.asm.simple.directed.Simplify() + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } else { Graph newGraph = graph - .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); + .run(new TranslateGraphIds(new LongValueToIntValue()) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.asm.simple.directed.Simplify() + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } } else { boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); if (scale > 32) { Graph newGraph = graph - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip) + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } else { Graph newGraph = graph - .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); + .run(new TranslateGraphIds(new LongValueToIntValue()) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip) + .setParallelism(little_parallelism)); gcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient() + .setLittleParallelism(little_parallelism)); lcc = newGraph .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() - .setIncludeZeroDegreeVertices(false)); + .setIncludeZeroDegreeVertices(false) + .setLittleParallelism(little_parallelism)); } } } break; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 824aab776e5f2..96f66aba1fea2 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -43,6 +43,8 @@ import java.text.NumberFormat; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * Driver for the library implementation of Jaccard Index. * @@ -87,6 +89,8 @@ public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromArgs(args); + int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT); + DataSet ji; switch (parameters.get("input", "")) { @@ -107,13 +111,15 @@ public static void main(String[] args) throws Exception { case "integer": { ji = reader .keyType(LongValue.class) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); + .run(new org.apache.flink.graph.library.similarity.JaccardIndex() + .setLittleParallelism(little_parallelism)); } break; case "string": { ji = reader .keyType(StringValue.class) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); + .run(new org.apache.flink.graph.library.similarity.JaccardIndex() + .setLittleParallelism(little_parallelism)); } break; default: @@ -131,20 +137,26 @@ public static void main(String[] args) throws Exception { long vertexCount = 1L << scale; long edgeCount = vertexCount * edgeFactor; - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setParallelism(little_parallelism) .generate(); + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + if (scale > 32) { ji = graph - .run(new Simplify(clipAndFlip)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); + .run(new Simplify(clipAndFlip) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex() + .setLittleParallelism(little_parallelism)); } else { ji = graph - .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new Simplify(clipAndFlip)) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); + .run(new TranslateGraphIds(new LongValueToIntValue()) + .setParallelism(little_parallelism)) + .run(new Simplify(clipAndFlip) + .setParallelism(little_parallelism)) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex() + .setLittleParallelism(little_parallelism)); } } break; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java index be19613168f79..909f1014db650 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java @@ -73,7 +73,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { EdgeDegreesPair rhs = (EdgeDegreesPair) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java index ee3175e88c563..e83bd38d1fd48 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java @@ -72,7 +72,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { EdgeSourceDegrees rhs = (EdgeSourceDegrees) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java index 6ba47f2dd4f14..0e37675c4cdee 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java @@ -72,7 +72,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { EdgeTargetDegrees rhs = (EdgeTargetDegrees) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 84873bc9c126a..413fdaa680b97 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -108,7 +108,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -141,7 +142,7 @@ public DataSet> runInternal(Graph input) .equalTo(0) .with(new JoinVertexWithVertexDegrees()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return vertexDegrees; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index f7ac18b6054e1..16b52f6bc90e9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -101,7 +101,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -114,7 +115,7 @@ public DataSet> runInternal(Graph input) .getEdges() .map(new MapEdgeToTargetId()) .setParallelism(parallelism) - .name("Map edge to target ID"); + .name("Edge to target ID"); // t, d(t) DataSet> targetDegree = targetIds @@ -131,7 +132,7 @@ public DataSet> runInternal(Graph input) .equalTo(0) .with(new JoinVertexWithVertexDegree()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return targetDegree; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index e235f6aa3654c..ac736f98c2632 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -101,7 +101,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -114,7 +115,7 @@ public DataSet> runInternal(Graph input) .getEdges() .map(new MapEdgeToSourceId()) .setParallelism(parallelism) - .name("Map edge to source ID"); + .name("Edge to source ID"); // s, d(s) DataSet> sourceDegree = sourceIds @@ -131,7 +132,7 @@ public DataSet> runInternal(Graph input) .equalTo(0) .with(new JoinVertexWithVertexDegree()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return sourceDegree; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index 1f78566d02822..71429ced71e50 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -95,7 +95,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { EdgeDegreePair rhs = (EdgeDegreePair) other; reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index 520723c6d29ae..d94357f039631 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -93,7 +93,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { EdgeSourceDegree rhs = (EdgeSourceDegree) other; reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index 123c1dc002ca8..26a287ab5a801 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -93,7 +93,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { EdgeTargetDegree rhs = (EdgeTargetDegree) other; reduceOnSourceId.mergeWith(rhs.reduceOnSourceId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index 42f084d7f872f..1974bba6fc93e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -122,7 +122,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } @@ -138,7 +139,7 @@ public DataSet> runInternal(Graph input) .getEdges() .map(mapEdgeToId) .setParallelism(parallelism) - .name("Map edge to vertex ID"); + .name("Edge to vertex ID"); // v, deg(v) DataSet> degree = vertexIds @@ -156,7 +157,7 @@ public DataSet> runInternal(Graph input) .equalTo(0) .with(new JoinVertexWithVertexDegree()) .setParallelism(parallelism) - .name("Join zero degree vertices"); + .name("Zero degree vertices"); } return degree; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java index f9cfae9f93014..23c48580128c2 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java @@ -139,7 +139,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { reduceOnTargetId.mergeWith(rhs.reduceOnTargetId); broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 983dac9cf2eef..09d0a03e1681a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -71,7 +71,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { Simplify rhs = (Simplify) other; - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index ce78cfa8faf35..2b023cb12ad35 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -97,7 +97,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java index 6003c9a67603f..941bf4b0379fb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java @@ -93,7 +93,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java index 6ea56eb2c412a..eb6affdbf3467 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java @@ -95,7 +95,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java index 3a493248a4a7e..4420405d76883 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java @@ -93,7 +93,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { // merge configurations - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index 22c8b41d940da..da16c7ee45df1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -118,7 +118,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java index 14c731a2a34e6..8a15ff2c61fa5 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -113,7 +113,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { TriangleListing rhs = (TriangleListing) other; sortTriangleVertices.mergeWith(rhs.sortTriangleVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } @@ -162,7 +163,6 @@ public DataSet> runInternal(Graph input) .groupBy(0) .sortGroup(1, Order.ASCENDING) .reduceGroup(new GenerateTriplets()) - .setParallelism(littleParallelism) .name("Generate triplets"); // u, v, w, bitmask where (u, v), (u, w), and (v, w) are edges in graph @@ -171,7 +171,6 @@ public DataSet> runInternal(Graph input) .where(1, 2) .equalTo(0, 1) .with(new ProjectTriangles()) - .setParallelism(littleParallelism) .name("Triangle listing"); if (sortTriangleVertices.get()) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 4b4bf07866ff7..314b5222ba382 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -118,7 +118,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { // merge configurations includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 89b86fec166f0..bf1111a0389bc 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -114,7 +114,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { TriangleListing rhs = (TriangleListing) other; sortTriangleVertices.mergeWith(rhs.sortTriangleVertices); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } @@ -155,7 +156,6 @@ public DataSet> runInternal(Graph input) .groupBy(0) .sortGroup(1, Order.ASCENDING) .reduceGroup(new GenerateTriplets()) - .setParallelism(littleParallelism) .name("Generate triplets"); // u, v, w where (u, v), (u, w), and (v, w) are edges in graph, v < w @@ -164,7 +164,6 @@ public DataSet> runInternal(Graph input) .where(1, 2) .equalTo(0, 1) .with(new ProjectTriangles()) - .setParallelism(littleParallelism) .name("Triangle listing"); if (sortTriangleVertices.get()) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 60e99bd80d439..d4dfbaf0453d3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -148,7 +148,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { maxIterations = Math.max(maxIterations, rhs.maxIterations); convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold); - parallelism = Math.min(parallelism, rhs.parallelism); + parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism : + ((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index 512a7a08f9459..cc76cd9fa69d8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -151,7 +151,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { // merge configurations - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 7783e6b4f8388..b629f11aa7de6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -181,7 +181,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { // merge configurations groupSize = Math.max(groupSize, rhs.groupSize); - littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); + littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism : + ((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism)); return true; }