From 8d58ac97da66e1be143ad732173891add3fffa4e Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Thu, 27 Apr 2017 20:41:53 +0800 Subject: [PATCH 01/13] [FLINK-6393] [gelly] Add Evenly Graph Generator to Flink Gelly --- .../flink/graph/generator/EvenlyGraph.java | 145 ++++++++++++++++++ .../graph/generator/EvenlyGraphTest.java | 82 ++++++++++ 2 files changed, 227 insertions(+) create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EvenlyGraphTest.java diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java new file mode 100644 index 0000000000000..9eb16ffa94e80 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +/** + * Evenly graph means every {@link Vertex} in the {@link Graph} has the same degree. + * when vertex degree is 0, {@link EmptyGraph} will be generated. + * when vertex degree is vertex count - 1, {@link CompleteGraph} will be generated. + */ +public class EvenlyGraph +extends AbstractGraphGenerator { + + public static final int MINIMUM_VERTEX_COUNT = 1; + + public static final int MINIMUM_VERTEX_DEGREE = 0; + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + private long vertexDegree; + + /** + * An undirected {@link Graph} whose vertices have the same degree. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + * @param vertexDegree degree of vertices + */ + public EvenlyGraph(ExecutionEnvironment env, long vertexCount, long vertexDegree) { + Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT, + "Vertex count must be at least " + MINIMUM_VERTEX_COUNT); + Preconditions.checkArgument(vertexDegree >= MINIMUM_VERTEX_DEGREE, + "Vertex degree must be at least " + MINIMUM_VERTEX_DEGREE); + Preconditions.checkArgument(vertexCount % 2 == 0 || vertexDegree % 2 == 0, + "Vertex degree must be even when vertex count is odd number"); + + this.env = env; + this.vertexCount = vertexCount; + this.vertexDegree = vertexDegree; + } + + @Override + public Graph generate() { + // Vertices + DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); + + DataSet> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .flatMap(new LinkVertexToOpposite(vertexCount, vertexDegree)) + .setParallelism(parallelism) + .name("Evenly graph edges"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + @ForwardedFields("*->f0") + public class LinkVertexToOpposite + implements FlatMapFunction> { + + private final long vertexCount; + + private final long vertexDegree; + + private LongValue target = new LongValue(); + + private Edge edge = new Edge<>(null, target, NullValue.getInstance()); + + public LinkVertexToOpposite(long vertex_count, long vertexDegree) { + this.vertexCount = vertex_count; + this.vertexDegree = vertexDegree; + } + + @Override + public void flatMap(LongValue source, Collector> out) + throws Exception { + // empty graph + if (vertexDegree == 0) { + return; + } + + edge.f0 = source; + + // get opposite vertex's id + long opposite = (source.getValue() + vertexCount / 2) % vertexCount; + + // link to opposite vertex if possible + if (vertexCount % 2 == 0 && vertexDegree % 2 == 1) { + target.setValue(opposite); + out.collect(edge); + } + + // link to vertices on both sides of opposite + long initialOffset = (vertexCount + 1) % 2; + long oneSideVertexCount = (vertexDegree - vertexCount % 2) / 2; + for (long i = initialOffset; i <= oneSideVertexCount; i++) { + long previous = (opposite - i + vertexCount) % vertexCount; + long next = (opposite + i + vertexCount % 2) % vertexCount; + + target.setValue(previous); + out.collect(edge); + + target.setValue(next); + out.collect(edge); + } + } + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EvenlyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EvenlyGraphTest.java new file mode 100644 index 0000000000000..95fe177e0115e --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EvenlyGraphTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +public class EvenlyGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new EvenlyGraph(env, 10, 3) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" + + "2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" + + "5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" + + "8,2; 8,3; 8,4; 9,3; 9,4; 9,5"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 10; + int vertexDegree = 3; + + Graph graph = new EvenlyGraph(env, vertexCount, vertexDegree) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(vertexCount * vertexDegree, graph.numberOfEdges()); + + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); + + assertEquals(vertexDegree, maxInDegree); + assertEquals(vertexDegree, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new EvenlyGraph(env, 10, 3) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} From 1c45d8cc362e06e4c15bb1c21d648de2af95a189 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Sun, 30 Apr 2017 18:05:56 +0800 Subject: [PATCH 02/13] [FLINK-6393] [gelly] Add Circulant Graph Generator to Flink Gelly --- .../flink/graph/generator/CirculantGraph.java | 141 ++++++++++++++++++ .../graph/generator/CirculantGraphTest.java | 83 +++++++++++ 2 files changed, 224 insertions(+) create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java new file mode 100644 index 0000000000000..579b4bd2f4746 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import java.util.*; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; +import org.apache.flink.util.Preconditions; + +/* + * @see Circulant Graph at Wolfram MathWorld + */ +public class CirculantGraph +extends AbstractGraphGenerator { + + public static final int MINIMUM_VERTEX_COUNT = 1; + + public static final int MINIMUM_OFFSET = 1; + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + private List signedOffsetList = new ArrayList(); + + /** + * The {@link Graph} containing no edges. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + */ + public CirculantGraph(ExecutionEnvironment env, long vertexCount, List offsetList) { + Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT, + "Vertex count must be at least " + MINIMUM_VERTEX_COUNT); + + if (offsetList != null && !offsetList.isEmpty()) { + Preconditions.checkArgument(new HashSet<>(offsetList).size() == offsetList.size(), + "Offset must not be duplicated"); + + long maxOffset = vertexCount / 2; + for (long offset : offsetList) { + Preconditions.checkArgument(offset >= MINIMUM_OFFSET, + "Offset must be at least " + MINIMUM_OFFSET); + Preconditions.checkArgument(offset <= maxOffset, + "Offset must be at most " + maxOffset); + + // add sign, ignore negative max offset when vertex count is even + signedOffsetList.add(offset); + if (!(vertexCount % 2 == 0 && offset == maxOffset)) { + signedOffsetList.add(-offset); + } + } + } + + this.env = env; + this.vertexCount = vertexCount; + } + + @Override + public Graph generate() { + // Vertices + DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); + + DataSet> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .flatMap(new LinkVertexToOffset(vertexCount, signedOffsetList)) + .setParallelism(parallelism) + .name("Circulant graph edges"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + @FunctionAnnotation.ForwardedFields("*->f0") + public class LinkVertexToOffset + implements FlatMapFunction> { + + private final long vertexCount; + + private final List offsets; + + private LongValue target = new LongValue(); + + private Edge edge = new Edge<>(null, target, NullValue.getInstance()); + + public LinkVertexToOffset(long vertexCount, List offsets) { + this.vertexCount = vertexCount; + this.offsets = offsets; + } + + @Override + public void flatMap(LongValue source, Collector> out) + throws Exception { + // empty graph + if (offsets == null || offsets.isEmpty()) { + return; + } + + edge.f0 = source; + + // link to offset vertex + for (long offset : offsets) { + target.setValue((source.getValue() + offset + vertexCount) % vertexCount); + out.collect(edge); + } + } + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java new file mode 100644 index 0000000000000..6bcd971cccba0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import java.util.Arrays; + +public class CirculantGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new CirculantGraph(env, 10, Arrays.asList(4L, 5L)) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" + + "2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" + + "5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" + + "8,2; 8,3; 8,4; 9,3; 9,4; 9,5"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 10; + + Graph graph = new CirculantGraph(env, vertexCount, Arrays.asList(4L, 5L)) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(vertexCount * 3, graph.numberOfEdges()); + + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue(); + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue(); + + assertEquals(3, maxInDegree); + assertEquals(3, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new CirculantGraph(env, 10, Arrays.asList(4L, 5L)) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} From f78fd99a97e5fcf09914fc5238a2d975d47fc5fa Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Sun, 30 Apr 2017 18:06:22 +0800 Subject: [PATCH 03/13] [FLINK-6393] [gelly] Rewrite implementation of CompleteGraph and EvenlyGraph with CirculantGraph Generator --- .../flink/graph/generator/CompleteGraph.java | 63 ++----------- .../flink/graph/generator/EvenlyGraph.java | 91 ++++--------------- 2 files changed, 30 insertions(+), 124 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index dfa7eb267aef5..3a329b2b7c8f3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -18,19 +18,14 @@ package org.apache.flink.graph.generator; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; -import org.apache.flink.util.LongValueSequenceIterator; import org.apache.flink.util.Preconditions; +import java.util.ArrayList; + /* * @see Complete Graph at Wolfram MathWorld */ @@ -61,54 +56,16 @@ public CompleteGraph(ExecutionEnvironment env, long vertexCount) { @Override public Graph generate() { - // Vertices - DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); - - // Edges - LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); - - DataSet> edges = env - .fromParallelCollection(iterator, LongValue.class) - .setParallelism(parallelism) - .name("Edge iterators") - .flatMap(new LinkVertexToAll(vertexCount)) - .setParallelism(parallelism) - .name("Complete graph edges"); - - // Graph - return Graph.fromDataSet(vertices, edges, env); - } - - @ForwardedFields("*->f0") - public class LinkVertexToAll - implements FlatMapFunction> { + ArrayList offsetList = new ArrayList(); + long maxOffset = vertexCount / 2; - private final long vertexCount; - - private LongValue target = new LongValue(); - - private Edge edge = new Edge<>(null, target, NullValue.getInstance()); - - public LinkVertexToAll(long vertex_count) { - this.vertexCount = vertex_count; + // add all the offset + for (long i = 0; i < maxOffset; i++) { + offsetList.add(maxOffset - i); } - @Override - public void flatMap(LongValue source, Collector> out) - throws Exception { - edge.f0 = source; - - long s = source.getValue(); - long t = (s + 1) % vertexCount; - - while (s != t) { - target.setValue(t); - out.collect(edge); - - if (++t == vertexCount) { - t = 0; - } - } - } + return new CirculantGraph(env, vertexCount, offsetList) + .setParallelism(parallelism) + .generate(); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java index 9eb16ffa94e80..ebfeb34a0fae4 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java @@ -18,21 +18,20 @@ package org.apache.flink.graph.generator; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; -import org.apache.flink.util.Collector; -import org.apache.flink.util.LongValueSequenceIterator; import org.apache.flink.util.Preconditions; +import java.util.ArrayList; + /** - * Evenly graph means every {@link Vertex} in the {@link Graph} has the same degree. + * Every {@link Vertex} in the {@link EvenlyGraph} has the same degree. + * there may exist multiple cases satisfy the condition above, so further vertices + * are chose to be linked. + * {@link EvenlyGraph} is a specific case of {@link CirculantGraph}. * when vertex degree is 0, {@link EmptyGraph} will be generated. * when vertex degree is vertex count - 1, {@link CompleteGraph} will be generated. */ @@ -63,6 +62,8 @@ public EvenlyGraph(ExecutionEnvironment env, long vertexCount, long vertexDegre "Vertex count must be at least " + MINIMUM_VERTEX_COUNT); Preconditions.checkArgument(vertexDegree >= MINIMUM_VERTEX_DEGREE, "Vertex degree must be at least " + MINIMUM_VERTEX_DEGREE); + Preconditions.checkArgument(vertexDegree <= vertexCount - 1, + "Vertex degree must be at most " + (vertexCount - 1)); Preconditions.checkArgument(vertexCount % 2 == 0 || vertexDegree % 2 == 0, "Vertex degree must be even when vertex count is odd number"); @@ -73,73 +74,21 @@ public EvenlyGraph(ExecutionEnvironment env, long vertexCount, long vertexDegre @Override public Graph generate() { - // Vertices - DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); - - // Edges - LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); - - DataSet> edges = env - .fromParallelCollection(iterator, LongValue.class) - .setParallelism(parallelism) - .name("Edge iterators") - .flatMap(new LinkVertexToOpposite(vertexCount, vertexDegree)) - .setParallelism(parallelism) - .name("Evenly graph edges"); - - // Graph - return Graph.fromDataSet(vertices, edges, env); - } - - @ForwardedFields("*->f0") - public class LinkVertexToOpposite - implements FlatMapFunction> { - - private final long vertexCount; - - private final long vertexDegree; + ArrayList offsetList = new ArrayList(); + long maxOffset = vertexCount / 2; - private LongValue target = new LongValue(); - - private Edge edge = new Edge<>(null, target, NullValue.getInstance()); - - public LinkVertexToOpposite(long vertex_count, long vertexDegree) { - this.vertexCount = vertex_count; - this.vertexDegree = vertexDegree; + // add max offset when vertex degree is even and vertex count is odd + if (vertexDegree % 2 == 1 && vertexCount % 2 == 0) { + offsetList.add(maxOffset); } - @Override - public void flatMap(LongValue source, Collector> out) - throws Exception { - // empty graph - if (vertexDegree == 0) { - return; - } - - edge.f0 = source; - - // get opposite vertex's id - long opposite = (source.getValue() + vertexCount / 2) % vertexCount; - - // link to opposite vertex if possible - if (vertexCount % 2 == 0 && vertexDegree % 2 == 1) { - target.setValue(opposite); - out.collect(edge); - } - - // link to vertices on both sides of opposite - long initialOffset = (vertexCount + 1) % 2; - long oneSideVertexCount = (vertexDegree - vertexCount % 2) / 2; - for (long i = initialOffset; i <= oneSideVertexCount; i++) { - long previous = (opposite - i + vertexCount) % vertexCount; - long next = (opposite + i + vertexCount % 2) % vertexCount; - - target.setValue(previous); - out.collect(edge); - - target.setValue(next); - out.collect(edge); - } + // add other offset nearby max offset + for (long i = 0; i < vertexDegree / 2; i++) { + offsetList.add(maxOffset - i - (vertexCount + 1) % 2); } + + return new CirculantGraph(env, vertexCount, offsetList) + .setParallelism(parallelism) + .generate(); } } From 53a11768a3e92ccba9fca5407aa3d16f4da40707 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Sun, 30 Apr 2017 21:32:40 +0800 Subject: [PATCH 04/13] [FLINK-6393] [gelly] Add addOffset method to CirculantGraph and fix errors --- .../flink/graph/generator/CirculantGraph.java | 121 +++++++++--------- .../flink/graph/generator/CompleteGraph.java | 8 +- .../flink/graph/generator/EvenlyGraph.java | 10 +- .../graph/generator/CirculantGraphTest.java | 20 +-- 4 files changed, 80 insertions(+), 79 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index 579b4bd2f4746..39766e1cf0363 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -18,8 +18,6 @@ package org.apache.flink.graph.generator; -import java.util.*; - import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -33,6 +31,9 @@ import org.apache.flink.util.LongValueSequenceIterator; import org.apache.flink.util.Preconditions; +import java.util.HashSet; +import java.util.Set; + /* * @see Circulant Graph at Wolfram MathWorld */ @@ -49,93 +50,93 @@ public class CirculantGraph // Required configuration private long vertexCount; - private List signedOffsetList = new ArrayList(); + private Set signedOffsets = new HashSet<>(); /** - * The {@link Graph} containing no edges. + * An undirected {@link Graph} whose {@link Vertex} connects to targets appointed by an offset list. * * @param env the Flink execution environment * @param vertexCount number of vertices */ - public CirculantGraph(ExecutionEnvironment env, long vertexCount, List offsetList) { + public CirculantGraph(ExecutionEnvironment env, long vertexCount) { Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT, "Vertex count must be at least " + MINIMUM_VERTEX_COUNT); - if (offsetList != null && !offsetList.isEmpty()) { - Preconditions.checkArgument(new HashSet<>(offsetList).size() == offsetList.size(), - "Offset must not be duplicated"); - - long maxOffset = vertexCount / 2; - for (long offset : offsetList) { - Preconditions.checkArgument(offset >= MINIMUM_OFFSET, - "Offset must be at least " + MINIMUM_OFFSET); - Preconditions.checkArgument(offset <= maxOffset, - "Offset must be at most " + maxOffset); - - // add sign, ignore negative max offset when vertex count is even - signedOffsetList.add(offset); - if (!(vertexCount % 2 == 0 && offset == maxOffset)) { - signedOffsetList.add(-offset); - } - } - } - this.env = env; this.vertexCount = vertexCount; } + /** + * Required configuration for each offset of the graph. + * + * @param offset number of vertices; dimensions of size 1 are prohibited due to having no effect + * on the generated graph + * @return this + */ + public CirculantGraph addOffset(long offset) { + long maxOffset = vertexCount / 2; + Preconditions.checkArgument(offset >= MINIMUM_OFFSET, + "Offset must be at least " + MINIMUM_OFFSET); + Preconditions.checkArgument(offset <= maxOffset, + "Offset must be at most " + maxOffset); + + // add sign, ignore negative max offset when vertex count is even + signedOffsets.add(offset); + if (!(vertexCount % 2 == 0 && offset == maxOffset)) { + signedOffsets.add(-offset); + } + + return this; + } + @Override public Graph generate() { // Vertices DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); - // Edges - LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); - DataSet> edges = env - .fromParallelCollection(iterator, LongValue.class) - .setParallelism(parallelism) - .name("Edge iterators") - .flatMap(new LinkVertexToOffset(vertexCount, signedOffsetList)) - .setParallelism(parallelism) - .name("Circulant graph edges"); + DataSet> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .flatMap(new LinkVertexToOffsets(vertexCount, signedOffsets)) + .setParallelism(parallelism) + .name("Circulant graph edges"); // Graph return Graph.fromDataSet(vertices, edges, env); } - @FunctionAnnotation.ForwardedFields("*->f0") - public class LinkVertexToOffset - implements FlatMapFunction> { + @FunctionAnnotation.ForwardedFields("*->f0") + private static class LinkVertexToOffsets + implements FlatMapFunction> { - private final long vertexCount; + private final long vertexCount; - private final List offsets; + private final Set offsets; - private LongValue target = new LongValue(); + private LongValue target = new LongValue(); - private Edge edge = new Edge<>(null, target, NullValue.getInstance()); + private Edge edge = new Edge<>(null, target, NullValue.getInstance()); - public LinkVertexToOffset(long vertexCount, List offsets) { - this.vertexCount = vertexCount; - this.offsets = offsets; - } + public LinkVertexToOffsets(long vertexCount, Set offsets) { + this.vertexCount = vertexCount; + this.offsets = offsets; + } - @Override - public void flatMap(LongValue source, Collector> out) - throws Exception { - // empty graph - if (offsets == null || offsets.isEmpty()) { - return; - } + @Override + public void flatMap(LongValue source, Collector> out) + throws Exception { + edge.f0 = source; - edge.f0 = source; - - // link to offset vertex - for (long offset : offsets) { - target.setValue((source.getValue() + offset + vertexCount) % vertexCount); - out.collect(edge); - } - } - } + // link to offset vertex + long index = source.getValue(); + for (long offset : offsets) { + target.setValue((index + offset + vertexCount) % vertexCount); + out.collect(edge); + } + } + } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index 3a329b2b7c8f3..f9057bee3ab2e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -24,8 +24,6 @@ import org.apache.flink.types.NullValue; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; - /* * @see Complete Graph at Wolfram MathWorld */ @@ -56,15 +54,15 @@ public CompleteGraph(ExecutionEnvironment env, long vertexCount) { @Override public Graph generate() { - ArrayList offsetList = new ArrayList(); + CirculantGraph circulantGraph = new CirculantGraph(env, vertexCount); long maxOffset = vertexCount / 2; // add all the offset for (long i = 0; i < maxOffset; i++) { - offsetList.add(maxOffset - i); + circulantGraph.addOffset(maxOffset - i); } - return new CirculantGraph(env, vertexCount, offsetList) + return circulantGraph .setParallelism(parallelism) .generate(); } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java index ebfeb34a0fae4..cb6fbce6cce10 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java @@ -25,8 +25,6 @@ import org.apache.flink.types.NullValue; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; - /** * Every {@link Vertex} in the {@link EvenlyGraph} has the same degree. * there may exist multiple cases satisfy the condition above, so further vertices @@ -74,20 +72,20 @@ public EvenlyGraph(ExecutionEnvironment env, long vertexCount, long vertexDegre @Override public Graph generate() { - ArrayList offsetList = new ArrayList(); + CirculantGraph circulantGraph = new CirculantGraph(env, vertexCount); long maxOffset = vertexCount / 2; // add max offset when vertex degree is even and vertex count is odd if (vertexDegree % 2 == 1 && vertexCount % 2 == 0) { - offsetList.add(maxOffset); + circulantGraph.addOffset(maxOffset); } // add other offset nearby max offset for (long i = 0; i < vertexDegree / 2; i++) { - offsetList.add(maxOffset - i - (vertexCount + 1) % 2); + circulantGraph.addOffset(maxOffset - i - (vertexCount + 1) % 2); } - return new CirculantGraph(env, vertexCount, offsetList) + return circulantGraph .setParallelism(parallelism) .generate(); } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java index 6bcd971cccba0..18fbce34e90bf 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java @@ -28,15 +28,15 @@ import org.apache.flink.types.NullValue; import org.junit.Test; -import java.util.Arrays; - public class CirculantGraphTest extends AbstractGraphTest { @Test public void testGraph() throws Exception { - Graph graph = new CirculantGraph(env, 10, Arrays.asList(4L, 5L)) + Graph graph = new CirculantGraph(env, 10) + .addOffset(4) + .addOffset(5) .generate(); String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; @@ -53,8 +53,10 @@ public void testGraphMetrics() throws Exception { int vertexCount = 10; - Graph graph = new CirculantGraph(env, vertexCount, Arrays.asList(4L, 5L)) - .generate(); + Graph graph = new CirculantGraph(env, 10) + .addOffset(4) + .addOffset(5) + .generate(); assertEquals(vertexCount, graph.numberOfVertices()); assertEquals(vertexCount * 3, graph.numberOfEdges()); @@ -71,9 +73,11 @@ public void testParallelism() throws Exception { int parallelism = 2; - Graph graph = new CirculantGraph(env, 10, Arrays.asList(4L, 5L)) - .setParallelism(parallelism) - .generate(); + Graph graph = new CirculantGraph(env, 10) + .addOffset(4) + .addOffset(5) + .setParallelism(parallelism) + .generate(); graph.getVertices().output(new DiscardingOutputFormat>()); graph.getEdges().output(new DiscardingOutputFormat>()); From 12f5fa10fd2cb8cbc41a891bb4b7a1b8ffb9b44b Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Sun, 30 Apr 2017 21:36:55 +0800 Subject: [PATCH 05/13] [FLINK-6393] [gelly] Update addOffset description --- .../java/org/apache/flink/graph/generator/CirculantGraph.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index 39766e1cf0363..cdc4b51657a04 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -69,8 +69,7 @@ public CirculantGraph(ExecutionEnvironment env, long vertexCount) { /** * Required configuration for each offset of the graph. * - * @param offset number of vertices; dimensions of size 1 are prohibited due to having no effect - * on the generated graph + * @param offset appoint the vertices' position should be linked by any vertex * @return this */ public CirculantGraph addOffset(long offset) { From c3101d73c429d5b8f9877ce13c25a05b113eb32a Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Mon, 1 May 2017 11:32:12 +0800 Subject: [PATCH 06/13] [FLINK-6393] [gelly] Add offsets with a range --- .../flink/graph/generator/CirculantGraph.java | 26 +++++++++++-------- .../flink/graph/generator/CompleteGraph.java | 4 +-- .../flink/graph/generator/EvenlyGraph.java | 8 +++--- .../graph/generator/CirculantGraphTest.java | 9 +++---- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index cdc4b51657a04..9fa81fc1aaac4 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -69,20 +69,24 @@ public CirculantGraph(ExecutionEnvironment env, long vertexCount) { /** * Required configuration for each offset of the graph. * - * @param offset appoint the vertices' position should be linked by any vertex + * @param startOffset first offset appointing the vertices' position should be linked by any vertex + * @param length offset in [startOffset, startOffset + length) will be added * @return this */ - public CirculantGraph addOffset(long offset) { + public CirculantGraph addOffsets(long startOffset, long length) { long maxOffset = vertexCount / 2; - Preconditions.checkArgument(offset >= MINIMUM_OFFSET, - "Offset must be at least " + MINIMUM_OFFSET); - Preconditions.checkArgument(offset <= maxOffset, - "Offset must be at most " + maxOffset); - - // add sign, ignore negative max offset when vertex count is even - signedOffsets.add(offset); - if (!(vertexCount % 2 == 0 && offset == maxOffset)) { - signedOffsets.add(-offset); + for (int i = 0; i < length; i++) { + long offset = startOffset + i; + Preconditions.checkArgument(offset >= MINIMUM_OFFSET, + "Offset must be at least " + MINIMUM_OFFSET); + Preconditions.checkArgument(offset <= maxOffset, + "Offset must be at most " + maxOffset); + + // add sign, ignore negative max offset when vertex count is even + signedOffsets.add(offset); + if (!(vertexCount % 2 == 0 && offset == maxOffset)) { + signedOffsets.add(-offset); + } } return this; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index f9057bee3ab2e..5ca42d530f5e2 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -58,9 +58,7 @@ public Graph generate() { long maxOffset = vertexCount / 2; // add all the offset - for (long i = 0; i < maxOffset; i++) { - circulantGraph.addOffset(maxOffset - i); - } + circulantGraph.addOffsets(1, maxOffset); return circulantGraph .setParallelism(parallelism) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java index cb6fbce6cce10..f5bed4daee5a5 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java @@ -77,13 +77,13 @@ public Graph generate() { // add max offset when vertex degree is even and vertex count is odd if (vertexDegree % 2 == 1 && vertexCount % 2 == 0) { - circulantGraph.addOffset(maxOffset); + circulantGraph.addOffsets(maxOffset, 1); } // add other offset nearby max offset - for (long i = 0; i < vertexDegree / 2; i++) { - circulantGraph.addOffset(maxOffset - i - (vertexCount + 1) % 2); - } + long length = vertexDegree / 2; + final long startOffset = maxOffset - length + vertexCount % 2; + circulantGraph.addOffsets(startOffset, length); return circulantGraph .setParallelism(parallelism) diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java index 18fbce34e90bf..5c9049a4df1c7 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java @@ -35,8 +35,7 @@ public class CirculantGraphTest public void testGraph() throws Exception { Graph graph = new CirculantGraph(env, 10) - .addOffset(4) - .addOffset(5) + .addOffsets(4, 2) .generate(); String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; @@ -54,8 +53,7 @@ public void testGraphMetrics() int vertexCount = 10; Graph graph = new CirculantGraph(env, 10) - .addOffset(4) - .addOffset(5) + .addOffsets(4, 2) .generate(); assertEquals(vertexCount, graph.numberOfVertices()); @@ -74,8 +72,7 @@ public void testParallelism() int parallelism = 2; Graph graph = new CirculantGraph(env, 10) - .addOffset(4) - .addOffset(5) + .addOffsets(4, 2) .setParallelism(parallelism) .generate(); From b8ab15ad95b424d5e72bc5ba86f7e93eb5d625c3 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Tue, 2 May 2017 01:24:24 +0800 Subject: [PATCH 07/13] FLINK-6393] [gelly] Move offset parser to flatMap --- .../flink/graph/generator/CirculantGraph.java | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index 9fa81fc1aaac4..3f4af0c387b91 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -31,8 +32,8 @@ import org.apache.flink.util.LongValueSequenceIterator; import org.apache.flink.util.Preconditions; -import java.util.HashSet; -import java.util.Set; +import java.util.ArrayList; +import java.util.List; /* * @see Circulant Graph at Wolfram MathWorld @@ -50,7 +51,7 @@ public class CirculantGraph // Required configuration private long vertexCount; - private Set signedOffsets = new HashSet<>(); + private List> startOffsetPairs = new ArrayList<>(); /** * An undirected {@link Graph} whose {@link Vertex} connects to targets appointed by an offset list. @@ -81,14 +82,11 @@ public CirculantGraph addOffsets(long startOffset, long length) { "Offset must be at least " + MINIMUM_OFFSET); Preconditions.checkArgument(offset <= maxOffset, "Offset must be at most " + maxOffset); - - // add sign, ignore negative max offset when vertex count is even - signedOffsets.add(offset); - if (!(vertexCount % 2 == 0 && offset == maxOffset)) { - signedOffsets.add(-offset); - } } + // save startOffset and length pair + startOffsetPairs.add(new Tuple2<>(startOffset, length)); + return this; } @@ -104,7 +102,7 @@ public Graph generate() { .fromParallelCollection(iterator, LongValue.class) .setParallelism(parallelism) .name("Edge iterators") - .flatMap(new LinkVertexToOffsets(vertexCount, signedOffsets)) + .flatMap(new LinkVertexToOffsets(vertexCount, startOffsetPairs)) .setParallelism(parallelism) .name("Circulant graph edges"); @@ -118,15 +116,15 @@ private static class LinkVertexToOffsets private final long vertexCount; - private final Set offsets; + private final List> startOffsetPairs; private LongValue target = new LongValue(); private Edge edge = new Edge<>(null, target, NullValue.getInstance()); - public LinkVertexToOffsets(long vertexCount, Set offsets) { + public LinkVertexToOffsets(long vertexCount, List> startOffsetPairs) { this.vertexCount = vertexCount; - this.offsets = offsets; + this.startOffsetPairs = startOffsetPairs; } @Override @@ -134,6 +132,24 @@ public void flatMap(LongValue source, Collector> out) throws Exception { edge.f0 = source; + // parse startOffsetPairs to offsets + List offsets = new ArrayList<>(); + long maxOffset = vertexCount / 2; + for (Tuple2 offsetPair : startOffsetPairs) { + Long startOffset = offsetPair.f0; + Long length = offsetPair.f1; + + for (int i = 0; i < length; i++) { + long offset = startOffset + i; + + // add sign, ignore negative max offset when vertex count is even + offsets.add(offset); + if (!(vertexCount % 2 == 0 && offset == maxOffset)) { + offsets.add(-offset); + } + } + } + // link to offset vertex long index = source.getValue(); for (long offset : offsets) { From fdb9732f5d3cbed7a8b5e25f7be6c2842d625886 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Wed, 3 May 2017 11:40:51 +0800 Subject: [PATCH 08/13] [FLINK-6393] [gelly] Optimize addOffsets in CirculantGraph --- .../flink/graph/generator/CirculantGraph.java | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index 3f4af0c387b91..9047cbb7f0509 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -76,13 +76,11 @@ public CirculantGraph(ExecutionEnvironment env, long vertexCount) { */ public CirculantGraph addOffsets(long startOffset, long length) { long maxOffset = vertexCount / 2; - for (int i = 0; i < length; i++) { - long offset = startOffset + i; - Preconditions.checkArgument(offset >= MINIMUM_OFFSET, - "Offset must be at least " + MINIMUM_OFFSET); - Preconditions.checkArgument(offset <= maxOffset, - "Offset must be at most " + maxOffset); - } + + Preconditions.checkArgument(startOffset >= MINIMUM_OFFSET, + "Offset must be at least " + MINIMUM_OFFSET); + Preconditions.checkArgument(startOffset + length - 1 <= maxOffset, + "Offset must be at most " + maxOffset); // save startOffset and length pair startOffsetPairs.add(new Tuple2<>(startOffset, length)); @@ -132,9 +130,9 @@ public void flatMap(LongValue source, Collector> out) throws Exception { edge.f0 = source; - // parse startOffsetPairs to offsets - List offsets = new ArrayList<>(); + long index = source.getValue(); long maxOffset = vertexCount / 2; + for (Tuple2 offsetPair : startOffsetPairs) { Long startOffset = offsetPair.f0; Long length = offsetPair.f1; @@ -142,20 +140,17 @@ public void flatMap(LongValue source, Collector> out) for (int i = 0; i < length; i++) { long offset = startOffset + i; - // add sign, ignore negative max offset when vertex count is even - offsets.add(offset); + // add positive offset + target.setValue((index + offset + vertexCount) % vertexCount); + out.collect(edge); + + // add negative offset, ignore negative max offset when vertex count is even if (!(vertexCount % 2 == 0 && offset == maxOffset)) { - offsets.add(-offset); + target.setValue((index - offset + vertexCount) % vertexCount); + out.collect(edge); } } } - - // link to offset vertex - long index = source.getValue(); - for (long offset : offsets) { - target.setValue((index + offset + vertexCount) % vertexCount); - out.collect(edge); - } } } } From f20b969c62e05cda461a7afb8718461b27451e06 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Thu, 4 May 2017 11:01:12 +0800 Subject: [PATCH 09/13] [FLINK-6393] [gelly] Rename EvenlyGraph to EchoGraph --- .../generator/{EvenlyGraph.java => EchoGraph.java} | 13 +++++-------- .../{EvenlyGraphTest.java => EchoGraphTest.java} | 8 ++++---- 2 files changed, 9 insertions(+), 12 deletions(-) rename flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/{EvenlyGraph.java => EchoGraph.java} (85%) rename flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/{EvenlyGraphTest.java => EchoGraphTest.java} (89%) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java similarity index 85% rename from flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java rename to flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java index f5bed4daee5a5..45f59e3ffb633 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EvenlyGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java @@ -26,14 +26,11 @@ import org.apache.flink.util.Preconditions; /** - * Every {@link Vertex} in the {@link EvenlyGraph} has the same degree. - * there may exist multiple cases satisfy the condition above, so further vertices - * are chose to be linked. - * {@link EvenlyGraph} is a specific case of {@link CirculantGraph}. - * when vertex degree is 0, {@link EmptyGraph} will be generated. - * when vertex degree is vertex count - 1, {@link CompleteGraph} will be generated. + * Every {@link Vertex} in the {@link EchoGraph} has the same degree. + * and further vertices are chose to be linked. + * {@link EchoGraph} is a specific case of {@link CirculantGraph}. */ -public class EvenlyGraph +public class EchoGraph extends AbstractGraphGenerator { public static final int MINIMUM_VERTEX_COUNT = 1; @@ -55,7 +52,7 @@ public class EvenlyGraph * @param vertexCount number of vertices * @param vertexDegree degree of vertices */ - public EvenlyGraph(ExecutionEnvironment env, long vertexCount, long vertexDegree) { + public EchoGraph(ExecutionEnvironment env, long vertexCount, long vertexDegree) { Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT, "Vertex count must be at least " + MINIMUM_VERTEX_COUNT); Preconditions.checkArgument(vertexDegree >= MINIMUM_VERTEX_DEGREE, diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EvenlyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java similarity index 89% rename from flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EvenlyGraphTest.java rename to flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java index 95fe177e0115e..8dd41fcc9be12 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EvenlyGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java @@ -28,13 +28,13 @@ import org.apache.flink.types.NullValue; import org.junit.Test; -public class EvenlyGraphTest +public class EchoGraphTest extends AbstractGraphTest { @Test public void testGraph() throws Exception { - Graph graph = new EvenlyGraph(env, 10, 3) + Graph graph = new EchoGraph(env, 10, 3) .generate(); String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; @@ -52,7 +52,7 @@ public void testGraphMetrics() int vertexCount = 10; int vertexDegree = 3; - Graph graph = new EvenlyGraph(env, vertexCount, vertexDegree) + Graph graph = new EchoGraph(env, vertexCount, vertexDegree) .generate(); assertEquals(vertexCount, graph.numberOfVertices()); @@ -70,7 +70,7 @@ public void testParallelism() throws Exception { int parallelism = 2; - Graph graph = new EvenlyGraph(env, 10, 3) + Graph graph = new EchoGraph(env, 10, 3) .setParallelism(parallelism) .generate(); From d2094291f38fee050656e63bfbdb915d595f1e64 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Thu, 4 May 2017 15:44:13 +0800 Subject: [PATCH 10/13] [FLINK-6393] [gelly] Rename addOffsets method --- .../graph/drivers/input/CirculantGraph.java | 159 ++++++++++++++++++ .../flink/graph/drivers/input/EchoGraph.java | 63 +++++++ .../flink/graph/generator/CirculantGraph.java | 38 ++--- .../flink/graph/generator/CompleteGraph.java | 2 +- .../flink/graph/generator/EchoGraph.java | 4 +- .../graph/generator/CirculantGraphTest.java | 6 +- 6 files changed, 247 insertions(+), 25 deletions(-) create mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java create mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java new file mode 100644 index 0000000000000..2c1e3ae4aa74f --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a {@link org.apache.flink.graph.generator.GridGraph}. + */ +public class CirculantGraph +extends GeneratedGraph { + + private static final String PREFIX = "dim"; + + private List dimensions = new ArrayList<>(); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return CirculantGraph.class.getSimpleName(); + } + + @Override + public String getUsage() { + return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage(); + } + + @Override + public void configure(ParameterTool parameterTool) throws ProgramParametrizationException { + super.configure(parameterTool); + + // add dimensions as ordered by dimension ID (dim0, dim1, dim2, ...) + + Map dimensionMap = new TreeMap<>(); + + // first parse all dimensions into a sorted map + for (String key : parameterTool.toMap().keySet()) { + if (key.startsWith(PREFIX)) { + int dimensionId = Integer.parseInt(key.substring(PREFIX.length())); + dimensionMap.put(dimensionId, parameterTool.get(key)); + } + } + + // then store dimensions in order + for (String field : dimensionMap.values()) { + dimensions.add(new Dimension(field)); + } + } + + @Override + public String getIdentity() { + return getTypeName() + " " + getName() + " (" + dimensions + ")"; + } + + @Override + protected long vertexCount() { + // in Java 8 use Math.multiplyExact(long, long) + BigInteger vertexCount = BigInteger.ONE; + for (Dimension dimension : dimensions) { + vertexCount = vertexCount.multiply(BigInteger.valueOf(dimension.size)); + } + + if (vertexCount.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) { + throw new ProgramParametrizationException("Number of vertices in grid graph '" + vertexCount + + "' is greater than Long.MAX_VALUE."); + } + + return vertexCount.longValue(); + } + + @Override + public Graph generate(ExecutionEnvironment env) { + org.apache.flink.graph.generator.GridGraph graph = new org.apache.flink.graph.generator.GridGraph(env); + + for (Dimension dimension : dimensions) { + graph.addDimension(dimension.size, dimension.wrapEndpoints); + } + + return graph + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } + + /** + * Stores and parses the size and endpoint wrapping configuration for a + * {@link org.apache.flink.graph.generator.GridGraph} dimension. + */ + private static class Dimension { + private long size; + + private boolean wrapEndpoints; + + /** + * Configuration string to be parsed. The size integer and endpoint + * wrapping boolean must be separated by a colon. + * + * @param field configuration string + */ + public Dimension(String field) { + ProgramParametrizationException exception = new ProgramParametrizationException("Grid dimension must use " + + "a colon to separate the integer size and boolean indicating whether the dimension endpoints are " + + "connected: '" + field + "'"); + + if (! field.contains(":")) { + throw exception; + } + + String[] parts = field.split(":"); + + if (parts.length != 2) { + throw exception; + } + + try { + size = Long.parseLong(parts[0]); + wrapEndpoints = Boolean.parseBoolean(parts[1]); + } catch(NumberFormatException ex) { + throw exception; + } + } + + @Override + public String toString() { + return Long.toString(size) + (wrapEndpoints ? "+" : "⊞"); + } + } +} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java new file mode 100644 index 0000000000000..2ed949307e91f --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.drivers.input; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.drivers.parameter.LongParameter; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/** + * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}. + */ +public class EchoGraph +extends GeneratedGraph { + + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") + .setDefaultValue(PARALLELISM_DEFAULT); + + @Override + public String getName() { + return EchoGraph.class.getSimpleName(); + } + + @Override + public String getIdentity() { + return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")"; + } + + @Override + protected long vertexCount() { + return vertexCount.getValue(); + } + + @Override + protected Graph generate(ExecutionEnvironment env) throws Exception { + return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue()) + .setParallelism(littleParallelism.getValue().intValue()) + .generate(); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index 9047cbb7f0509..95cf147b14987 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -51,7 +51,7 @@ public class CirculantGraph // Required configuration private long vertexCount; - private List> startOffsetPairs = new ArrayList<>(); + private List> offsetRanges = new ArrayList<>(); /** * An undirected {@link Graph} whose {@link Vertex} connects to targets appointed by an offset list. @@ -70,20 +70,20 @@ public CirculantGraph(ExecutionEnvironment env, long vertexCount) { /** * Required configuration for each offset of the graph. * - * @param startOffset first offset appointing the vertices' position should be linked by any vertex - * @param length offset in [startOffset, startOffset + length) will be added + * @param offset first offset appointing the vertices' position should be linked by any vertex + * @param length offset in [offset, offset + length) will be added * @return this */ - public CirculantGraph addOffsets(long startOffset, long length) { + public CirculantGraph addOffsetRange(long offset, long length) { long maxOffset = vertexCount / 2; - Preconditions.checkArgument(startOffset >= MINIMUM_OFFSET, + Preconditions.checkArgument(offset >= MINIMUM_OFFSET, "Offset must be at least " + MINIMUM_OFFSET); - Preconditions.checkArgument(startOffset + length - 1 <= maxOffset, + Preconditions.checkArgument(offset + length - 1 <= maxOffset, "Offset must be at most " + maxOffset); - // save startOffset and length pair - startOffsetPairs.add(new Tuple2<>(startOffset, length)); + // save offset range + offsetRanges.add(new Tuple2<>(offset, length)); return this; } @@ -100,7 +100,7 @@ public Graph generate() { .fromParallelCollection(iterator, LongValue.class) .setParallelism(parallelism) .name("Edge iterators") - .flatMap(new LinkVertexToOffsets(vertexCount, startOffsetPairs)) + .flatMap(new LinkVertexToOffsets(vertexCount, offsetRanges)) .setParallelism(parallelism) .name("Circulant graph edges"); @@ -114,15 +114,15 @@ private static class LinkVertexToOffsets private final long vertexCount; - private final List> startOffsetPairs; + private final List> offsetRanges; private LongValue target = new LongValue(); private Edge edge = new Edge<>(null, target, NullValue.getInstance()); - public LinkVertexToOffsets(long vertexCount, List> startOffsetPairs) { + public LinkVertexToOffsets(long vertexCount, List> offsetRanges) { this.vertexCount = vertexCount; - this.startOffsetPairs = startOffsetPairs; + this.offsetRanges = offsetRanges; } @Override @@ -133,20 +133,20 @@ public void flatMap(LongValue source, Collector> out) long index = source.getValue(); long maxOffset = vertexCount / 2; - for (Tuple2 offsetPair : startOffsetPairs) { - Long startOffset = offsetPair.f0; - Long length = offsetPair.f1; + for (Tuple2 offsetRange : offsetRanges) { + Long offset = offsetRange.f0; + Long length = offsetRange.f1; for (int i = 0; i < length; i++) { - long offset = startOffset + i; + long curOffset = offset + i; // add positive offset - target.setValue((index + offset + vertexCount) % vertexCount); + target.setValue((index + curOffset + vertexCount) % vertexCount); out.collect(edge); // add negative offset, ignore negative max offset when vertex count is even - if (!(vertexCount % 2 == 0 && offset == maxOffset)) { - target.setValue((index - offset + vertexCount) % vertexCount); + if (!(vertexCount % 2 == 0 && curOffset == maxOffset)) { + target.setValue((index - curOffset + vertexCount) % vertexCount); out.collect(edge); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index 5ca42d530f5e2..a63d11a59d9dd 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -58,7 +58,7 @@ public Graph generate() { long maxOffset = vertexCount / 2; // add all the offset - circulantGraph.addOffsets(1, maxOffset); + circulantGraph.addOffsetRange(1, maxOffset); return circulantGraph .setParallelism(parallelism) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java index 45f59e3ffb633..1baa508a9d356 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java @@ -74,13 +74,13 @@ public Graph generate() { // add max offset when vertex degree is even and vertex count is odd if (vertexDegree % 2 == 1 && vertexCount % 2 == 0) { - circulantGraph.addOffsets(maxOffset, 1); + circulantGraph.addOffsetRange(maxOffset, 1); } // add other offset nearby max offset long length = vertexDegree / 2; final long startOffset = maxOffset - length + vertexCount % 2; - circulantGraph.addOffsets(startOffset, length); + circulantGraph.addOffsetRange(startOffset, length); return circulantGraph .setParallelism(parallelism) diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java index 5c9049a4df1c7..728482935a420 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java @@ -35,7 +35,7 @@ public class CirculantGraphTest public void testGraph() throws Exception { Graph graph = new CirculantGraph(env, 10) - .addOffsets(4, 2) + .addOffsetRange(4, 2) .generate(); String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; @@ -53,7 +53,7 @@ public void testGraphMetrics() int vertexCount = 10; Graph graph = new CirculantGraph(env, 10) - .addOffsets(4, 2) + .addOffsetRange(4, 2) .generate(); assertEquals(vertexCount, graph.numberOfVertices()); @@ -72,7 +72,7 @@ public void testParallelism() int parallelism = 2; Graph graph = new CirculantGraph(env, 10) - .addOffsets(4, 2) + .addOffsetRange(4, 2) .setParallelism(parallelism) .generate(); From 61d51233a3720c7c1ea7d497e228be60994a92b3 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Thu, 4 May 2017 16:13:51 +0800 Subject: [PATCH 11/13] [FLINK-6393] [gelly] Rename addOffsets method --- .../flink/graph/generator/CirculantGraph.java | 38 +++++++++---------- .../flink/graph/generator/CompleteGraph.java | 2 +- .../flink/graph/generator/EchoGraph.java | 4 +- .../graph/generator/CirculantGraphTest.java | 6 +-- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java index 9047cbb7f0509..95cf147b14987 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java @@ -51,7 +51,7 @@ public class CirculantGraph // Required configuration private long vertexCount; - private List> startOffsetPairs = new ArrayList<>(); + private List> offsetRanges = new ArrayList<>(); /** * An undirected {@link Graph} whose {@link Vertex} connects to targets appointed by an offset list. @@ -70,20 +70,20 @@ public CirculantGraph(ExecutionEnvironment env, long vertexCount) { /** * Required configuration for each offset of the graph. * - * @param startOffset first offset appointing the vertices' position should be linked by any vertex - * @param length offset in [startOffset, startOffset + length) will be added + * @param offset first offset appointing the vertices' position should be linked by any vertex + * @param length offset in [offset, offset + length) will be added * @return this */ - public CirculantGraph addOffsets(long startOffset, long length) { + public CirculantGraph addOffsetRange(long offset, long length) { long maxOffset = vertexCount / 2; - Preconditions.checkArgument(startOffset >= MINIMUM_OFFSET, + Preconditions.checkArgument(offset >= MINIMUM_OFFSET, "Offset must be at least " + MINIMUM_OFFSET); - Preconditions.checkArgument(startOffset + length - 1 <= maxOffset, + Preconditions.checkArgument(offset + length - 1 <= maxOffset, "Offset must be at most " + maxOffset); - // save startOffset and length pair - startOffsetPairs.add(new Tuple2<>(startOffset, length)); + // save offset range + offsetRanges.add(new Tuple2<>(offset, length)); return this; } @@ -100,7 +100,7 @@ public Graph generate() { .fromParallelCollection(iterator, LongValue.class) .setParallelism(parallelism) .name("Edge iterators") - .flatMap(new LinkVertexToOffsets(vertexCount, startOffsetPairs)) + .flatMap(new LinkVertexToOffsets(vertexCount, offsetRanges)) .setParallelism(parallelism) .name("Circulant graph edges"); @@ -114,15 +114,15 @@ private static class LinkVertexToOffsets private final long vertexCount; - private final List> startOffsetPairs; + private final List> offsetRanges; private LongValue target = new LongValue(); private Edge edge = new Edge<>(null, target, NullValue.getInstance()); - public LinkVertexToOffsets(long vertexCount, List> startOffsetPairs) { + public LinkVertexToOffsets(long vertexCount, List> offsetRanges) { this.vertexCount = vertexCount; - this.startOffsetPairs = startOffsetPairs; + this.offsetRanges = offsetRanges; } @Override @@ -133,20 +133,20 @@ public void flatMap(LongValue source, Collector> out) long index = source.getValue(); long maxOffset = vertexCount / 2; - for (Tuple2 offsetPair : startOffsetPairs) { - Long startOffset = offsetPair.f0; - Long length = offsetPair.f1; + for (Tuple2 offsetRange : offsetRanges) { + Long offset = offsetRange.f0; + Long length = offsetRange.f1; for (int i = 0; i < length; i++) { - long offset = startOffset + i; + long curOffset = offset + i; // add positive offset - target.setValue((index + offset + vertexCount) % vertexCount); + target.setValue((index + curOffset + vertexCount) % vertexCount); out.collect(edge); // add negative offset, ignore negative max offset when vertex count is even - if (!(vertexCount % 2 == 0 && offset == maxOffset)) { - target.setValue((index - offset + vertexCount) % vertexCount); + if (!(vertexCount % 2 == 0 && curOffset == maxOffset)) { + target.setValue((index - curOffset + vertexCount) % vertexCount); out.collect(edge); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index 5ca42d530f5e2..a63d11a59d9dd 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -58,7 +58,7 @@ public Graph generate() { long maxOffset = vertexCount / 2; // add all the offset - circulantGraph.addOffsets(1, maxOffset); + circulantGraph.addOffsetRange(1, maxOffset); return circulantGraph .setParallelism(parallelism) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java index 45f59e3ffb633..1baa508a9d356 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java @@ -74,13 +74,13 @@ public Graph generate() { // add max offset when vertex degree is even and vertex count is odd if (vertexDegree % 2 == 1 && vertexCount % 2 == 0) { - circulantGraph.addOffsets(maxOffset, 1); + circulantGraph.addOffsetRange(maxOffset, 1); } // add other offset nearby max offset long length = vertexDegree / 2; final long startOffset = maxOffset - length + vertexCount % 2; - circulantGraph.addOffsets(startOffset, length); + circulantGraph.addOffsetRange(startOffset, length); return circulantGraph .setParallelism(parallelism) diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java index 5c9049a4df1c7..728482935a420 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java @@ -35,7 +35,7 @@ public class CirculantGraphTest public void testGraph() throws Exception { Graph graph = new CirculantGraph(env, 10) - .addOffsets(4, 2) + .addOffsetRange(4, 2) .generate(); String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; @@ -53,7 +53,7 @@ public void testGraphMetrics() int vertexCount = 10; Graph graph = new CirculantGraph(env, 10) - .addOffsets(4, 2) + .addOffsetRange(4, 2) .generate(); assertEquals(vertexCount, graph.numberOfVertices()); @@ -72,7 +72,7 @@ public void testParallelism() int parallelism = 2; Graph graph = new CirculantGraph(env, 10) - .addOffsets(4, 2) + .addOffsetRange(4, 2) .setParallelism(parallelism) .generate(); From fde1ae20be2327b43a800843647941e843b730e1 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Thu, 4 May 2017 16:20:15 +0800 Subject: [PATCH 12/13] [FLINK-6393] [gelly] Add CirculantGraph and EchoGraph to Runner --- .../java/org/apache/flink/graph/Runner.java | 4 + .../graph/drivers/input/CirculantGraph.java | 77 ++++++++----------- .../flink/graph/drivers/input/EchoGraph.java | 10 ++- 3 files changed, 45 insertions(+), 46 deletions(-) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java index 5ffe681cd0fc8..61511f6459c2e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java @@ -35,8 +35,10 @@ import org.apache.flink.graph.drivers.JaccardIndex; import org.apache.flink.graph.drivers.PageRank; import org.apache.flink.graph.drivers.TriangleListing; +import org.apache.flink.graph.drivers.input.CirculantGraph; import org.apache.flink.graph.drivers.input.CompleteGraph; import org.apache.flink.graph.drivers.input.CycleGraph; +import org.apache.flink.graph.drivers.input.EchoGraph; import org.apache.flink.graph.drivers.input.EmptyGraph; import org.apache.flink.graph.drivers.input.GridGraph; import org.apache.flink.graph.drivers.input.HypercubeGraph; @@ -76,6 +78,8 @@ public class Runner { private static final String OUTPUT = "output"; private static ParameterizedFactory inputFactory = new ParameterizedFactory() + .addClass(EchoGraph.class) + .addClass(CirculantGraph.class) .addClass(CompleteGraph.class) .addClass(org.apache.flink.graph.drivers.input.CSV.class) .addClass(CycleGraph.class) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java index 2c1e3ae4aa74f..85ad76fa0fdf3 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java @@ -19,8 +19,8 @@ package org.apache.flink.graph.drivers.input; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT; -import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -35,14 +35,17 @@ import org.apache.flink.types.NullValue; /** - * Generate a {@link org.apache.flink.graph.generator.GridGraph}. + * Generate a {@link org.apache.flink.graph.generator.CirculantGraph}. */ public class CirculantGraph extends GeneratedGraph { - private static final String PREFIX = "dim"; + private static final String PREFIX = "off"; - private List dimensions = new ArrayList<>(); + private LongParameter vertexCount = new LongParameter(this, "vertex_count") + .setMinimumValue(MINIMUM_VERTEX_COUNT); + + private List offsetRanges = new ArrayList<>(); private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); @@ -54,58 +57,47 @@ public String getName() { @Override public String getUsage() { - return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage(); + return "--off0 offset:length [--off1 offset:length [--off2 ...]]" + super.getUsage(); } @Override public void configure(ParameterTool parameterTool) throws ProgramParametrizationException { super.configure(parameterTool); - // add dimensions as ordered by dimension ID (dim0, dim1, dim2, ...) + // add offset ranges as ordered by offset ID (off0, off1, off2, ...) - Map dimensionMap = new TreeMap<>(); + Map offsetRangeMap = new TreeMap<>(); - // first parse all dimensions into a sorted map + // first parse all offset ranges into a sorted map for (String key : parameterTool.toMap().keySet()) { if (key.startsWith(PREFIX)) { - int dimensionId = Integer.parseInt(key.substring(PREFIX.length())); - dimensionMap.put(dimensionId, parameterTool.get(key)); + int offsetId = Integer.parseInt(key.substring(PREFIX.length())); + offsetRangeMap.put(offsetId, parameterTool.get(key)); } } - // then store dimensions in order - for (String field : dimensionMap.values()) { - dimensions.add(new Dimension(field)); + // then store offset ranges in order + for (String field : offsetRangeMap.values()) { + offsetRanges.add(new OffsetRange(field)); } } @Override public String getIdentity() { - return getTypeName() + " " + getName() + " (" + dimensions + ")"; + return getTypeName() + " " + getName() + " (" + offsetRanges + ")"; } @Override protected long vertexCount() { - // in Java 8 use Math.multiplyExact(long, long) - BigInteger vertexCount = BigInteger.ONE; - for (Dimension dimension : dimensions) { - vertexCount = vertexCount.multiply(BigInteger.valueOf(dimension.size)); - } - - if (vertexCount.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) { - throw new ProgramParametrizationException("Number of vertices in grid graph '" + vertexCount + - "' is greater than Long.MAX_VALUE."); - } - - return vertexCount.longValue(); + return vertexCount.getValue(); } @Override public Graph generate(ExecutionEnvironment env) { - org.apache.flink.graph.generator.GridGraph graph = new org.apache.flink.graph.generator.GridGraph(env); + org.apache.flink.graph.generator.CirculantGraph graph = new org.apache.flink.graph.generator.CirculantGraph(env, vertexCount.getValue()); - for (Dimension dimension : dimensions) { - graph.addDimension(dimension.size, dimension.wrapEndpoints); + for (OffsetRange offsetRange : offsetRanges) { + graph.addOffsetRange(offsetRange.offset, offsetRange.length); } return graph @@ -114,24 +106,23 @@ public Graph generate(ExecutionEnvironment env) } /** - * Stores and parses the size and endpoint wrapping configuration for a - * {@link org.apache.flink.graph.generator.GridGraph} dimension. + * Stores and parses the start offset and length configuration for a + * {@link org.apache.flink.graph.generator.CirculantGraph} offset range. */ - private static class Dimension { - private long size; + private static class OffsetRange { + private long offset; - private boolean wrapEndpoints; + private long length; /** - * Configuration string to be parsed. The size integer and endpoint - * wrapping boolean must be separated by a colon. + * Configuration string to be parsed. The offset integer and length integer + * length integer must be separated by a colon. * * @param field configuration string */ - public Dimension(String field) { - ProgramParametrizationException exception = new ProgramParametrizationException("Grid dimension must use " + - "a colon to separate the integer size and boolean indicating whether the dimension endpoints are " + - "connected: '" + field + "'"); + public OffsetRange(String field) { + ProgramParametrizationException exception = new ProgramParametrizationException("Circulant offset range must use " + + "a colon to separate the integer offset and integer length:" + field + "'"); if (! field.contains(":")) { throw exception; @@ -144,8 +135,8 @@ public Dimension(String field) { } try { - size = Long.parseLong(parts[0]); - wrapEndpoints = Boolean.parseBoolean(parts[1]); + offset = Long.parseLong(parts[0]); + length = Long.parseLong(parts[1]); } catch(NumberFormatException ex) { throw exception; } @@ -153,7 +144,7 @@ public Dimension(String field) { @Override public String toString() { - return Long.toString(size) + (wrapEndpoints ? "+" : "⊞"); + return Long.toString(offset) + ":" + Long.toString(length); } } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java index 2ed949307e91f..ab7efdfba3bac 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java @@ -19,7 +19,8 @@ package org.apache.flink.graph.drivers.input; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; -import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT; +import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_COUNT; +import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_DEGREE; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Graph; @@ -28,7 +29,7 @@ import org.apache.flink.types.NullValue; /** - * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}. + * Generate a {@link org.apache.flink.graph.generator.EchoGraph}. */ public class EchoGraph extends GeneratedGraph { @@ -36,6 +37,9 @@ public class EchoGraph private LongParameter vertexCount = new LongParameter(this, "vertex_count") .setMinimumValue(MINIMUM_VERTEX_COUNT); + private LongParameter vertexDegree = new LongParameter(this, "vertex_degree") + .setMinimumValue(MINIMUM_VERTEX_DEGREE); + private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") .setDefaultValue(PARALLELISM_DEFAULT); @@ -56,7 +60,7 @@ protected long vertexCount() { @Override protected Graph generate(ExecutionEnvironment env) throws Exception { - return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue()) + return new org.apache.flink.graph.generator.EchoGraph(env, vertexCount.getValue(), vertexDegree.getValue()) .setParallelism(littleParallelism.getValue().intValue()) .generate(); } From d219446b14be912ac9a33bd09ab824ff7df11464 Mon Sep 17 00:00:00 2001 From: FlorianFan Date: Thu, 4 May 2017 17:09:21 +0800 Subject: [PATCH 13/13] [FLINK-6393] [gelly] Update graph_generators.md document --- docs/dev/libs/gelly/graph_generators.md | 130 ++++++++++++++++++ .../flink/graph/generator/EchoGraph.java | 2 +- 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/docs/dev/libs/gelly/graph_generators.md b/docs/dev/libs/gelly/graph_generators.md index 5598d83d9f778..c0fc9e3186e80 100644 --- a/docs/dev/libs/gelly/graph_generators.md +++ b/docs/dev/libs/gelly/graph_generators.md @@ -652,3 +652,133 @@ val graph = new StarGraph(env.getJavaEnv, vertexCount).generate() {% top %} + +## Echo Graph + +An undirected graph that every vertex has the same degree, +and vertices as far as possible are chose to be linked. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +long vertexCount = 5; +long vertexDegree = 2; + +Graph graph = new EchoGraph(env, vertexCount, vertexDegree) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.EchoGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val vertexCount = 5 +val vertexDegree = 2 + +val graph = new EchoGraph(env.getJavaEnv, vertexCount, vertexDegree).generate() +{% endhighlight %} +
+
+ + + + + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + +## Circulant Graph + +A circulant graph is an undirected graph of n graph vertices +in which the ith graph vertex is adjacent to the (i+j)th and (i-j)th +graph vertices for each j in a list l. The offset j is at least 1 and +at most n/2, offset range is used to add continuous offsets, For example: +`addOffsetRange(2, 3)` would add offset `2`, `3` and `4`. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +long vertexCount = 5; + +Graph graph = new CirculantGraph(env, vertexCount) + .addOffsetRange(1, 2) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.CirculantGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val vertexCount = 5 + +val graph = new CirculantGraph(env.getJavaEnv, vertexCount).addOffsetRange(1, 2).generate() +{% endhighlight %} +
+
+ + + + + + + + + + + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java index 1baa508a9d356..9a0a173cd8d7b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java @@ -27,7 +27,7 @@ /** * Every {@link Vertex} in the {@link EchoGraph} has the same degree. - * and further vertices are chose to be linked. + * and vertices as far as possible are chose to be linked. * {@link EchoGraph} is a specific case of {@link CirculantGraph}. */ public class EchoGraph