From 4e6f552ae55c670c75541c133540193647ad4d4d Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 15 Mar 2016 16:57:37 -0400 Subject: [PATCH 1/4] [FLINK-3589] Allow setting Operator parallelism to default value This is a temporary fix in place of [FLINK-3589]. --- .../main/java/org/apache/flink/api/java/operators/Operator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java index 971cba8e42c12..3b660320fbc6e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java @@ -91,7 +91,7 @@ public O name(String newName) { * @return The operator with set parallelism. */ public O setParallelism(int parallelism) { - if(parallelism < 1) { + if(parallelism < 1 && parallelism != -1) { throw new IllegalArgumentException("The parallelism of an operator must be at least 1."); } this.parallelism = parallelism; From 8a96fbf9193df6239586691e2196927a8c5f43b5 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 15 Mar 2016 16:59:02 -0400 Subject: [PATCH 2/4] [FLINK-2909] [Gelly] Graph Generators Initial set of scale-free graph generators: - Complete graph - Cycle graph - Empty graph - Grid graph - Hypercube graph - Path graph - RMat graph - Singleton edge graph - Star graph --- .../flink/util/LongValueSequenceIterator.java | 193 +++++++++++ .../util/LongValueSequenceIteratorTest.java | 92 +++++ .../java/org/apache/flink/api/java/Utils.java | 2 +- .../apache/flink/graph/examples/Graph500.java | 97 ++++++ .../generator/AbstractGraphGenerator.java | 33 ++ .../flink/graph/generator/CompleteGraph.java | 112 +++++++ .../flink/graph/generator/CycleGraph.java | 60 ++++ .../flink/graph/generator/EmptyGraph.java | 79 +++++ .../flink/graph/generator/GraphGenerator.java | 51 +++ .../graph/generator/GraphGeneratorUtils.java | 119 +++++++ .../flink/graph/generator/GridGraph.java | 165 +++++++++ .../flink/graph/generator/HypercubeGraph.java | 65 ++++ .../flink/graph/generator/PathGraph.java | 60 ++++ .../flink/graph/generator/RMatGraph.java | 316 ++++++++++++++++++ .../graph/generator/SingletonEdgeGraph.java | 107 ++++++ .../flink/graph/generator/StarGraph.java | 100 ++++++ .../random/AbstractGeneratorFactory.java | 72 ++++ .../graph/generator/random/BlockInfo.java | 82 +++++ .../random/JDKRandomGeneratorFactory.java | 72 ++++ .../random/MersenneTwisterFactory.java | 72 ++++ .../generator/random/RandomGenerable.java | 41 +++ .../random/RandomGenerableFactory.java | 57 ++++ .../graph/generator/AbstractGraphTest.java | 33 ++ .../graph/generator/CompleteGraphTest.java | 84 +++++ .../flink/graph/generator/CycleGraphTest.java | 83 +++++ .../flink/graph/generator/EmptyGraphTest.java | 78 +++++ .../flink/graph/generator/GridGraphTest.java | 93 ++++++ .../graph/generator/HypercubeGraphTest.java | 85 +++++ .../flink/graph/generator/PathGraphTest.java | 83 +++++ .../flink/graph/generator/RMatGraphTest.java | 70 ++++ .../generator/SingletonEdgeGraphTest.java | 84 +++++ .../flink/graph/generator/StarGraphTest.java | 85 +++++ .../flink/graph/generator/TestUtils.java | 103 ++++++ 33 files changed, 2927 insertions(+), 1 deletion(-) create mode 100644 flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java create mode 100644 flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java create mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java diff --git a/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java b/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java new file mode 100644 index 0000000000000..86a8ce641bfc9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/LongValueSequenceIterator.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; +import org.apache.flink.types.LongValue; + +import java.util.NoSuchElementException; + +/** + * The {@code LongValueSequenceIterator} is an iterator that returns a sequence of numbers (as {@code LongValue})s. + * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple + * iterators that each return a subsequence of the number sequence. + */ +@Public +public class LongValueSequenceIterator extends SplittableIterator { + + private static final long serialVersionUID = 1L; + + /** The last number returned by the iterator */ + private final long to; + + /** The next number to be returned */ + private long current; + + /** The next value to be returned */ + private LongValue currentValue = new LongValue(); + + /** + * Creates a new splittable iterator, returning the range [from, to]. + * Both boundaries of the interval are inclusive. + * + * @param from The first number returned by the iterator. + * @param to The last number returned by the iterator. + */ + public LongValueSequenceIterator(long from, long to) { + if (from > to) { + throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value."); + } + + this.current = from; + this.to = to; + } + + + /** + * Internal constructor to allow for empty iterators. + * + * @param from The first number returned by the iterator. + * @param to The last number returned by the iterator. + * @param unused A dummy parameter to disambiguate the constructor. + */ + private LongValueSequenceIterator(long from, long to, boolean unused) { + this.current = from; + this.to = to; + } + + public long getCurrent() { + return this.current; + } + + public long getTo() { + return this.to; + } + + @Override + public boolean hasNext() { + return current <= to; + } + + @Override + public LongValue next() { + if (current <= to) { + currentValue.setValue(current++); + return currentValue; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public LongValueSequenceIterator[] split(int numPartitions) { + if (numPartitions < 1) { + throw new IllegalArgumentException("The number of partitions must be at least 1."); + } + + if (numPartitions == 1) { + return new LongValueSequenceIterator[] { new LongValueSequenceIterator(current, to) }; + } + + // here, numPartitions >= 2 !!! + + long elementsPerSplit; + + if (to - current + 1 >= 0) { + elementsPerSplit = (to - current + 1) / numPartitions; + } + else { + // long overflow of the range. + // we compute based on half the distance, to prevent the overflow. + // in most cases it holds that: current < 0 and to > 0, except for: to == 0 and current == Long.MIN_VALUE + // the later needs a special case + final long halfDiff; // must be positive + + if (current == Long.MIN_VALUE) { + // this means to >= 0 + halfDiff = (Long.MAX_VALUE/2+1) + to/2; + } else { + long posFrom = -current; + if (posFrom > to) { + halfDiff = to + ((posFrom - to) / 2); + } else { + halfDiff = posFrom + ((to - posFrom) / 2); + } + } + elementsPerSplit = halfDiff / numPartitions * 2; + } + + if (elementsPerSplit < Long.MAX_VALUE) { + // figure out how many get one in addition + long numWithExtra = -(elementsPerSplit * numPartitions) + to - current + 1; + + // based on rounding errors, we may have lost one) + if (numWithExtra > numPartitions) { + elementsPerSplit++; + numWithExtra -= numPartitions; + + if (numWithExtra > numPartitions) { + throw new RuntimeException("Bug in splitting logic. To much rounding loss."); + } + } + + LongValueSequenceIterator[] iters = new LongValueSequenceIterator[numPartitions]; + long curr = current; + int i = 0; + for (; i < numWithExtra; i++) { + long next = curr + elementsPerSplit + 1; + iters[i] = new LongValueSequenceIterator(curr, next-1); + curr = next; + } + for (; i < numPartitions; i++) { + long next = curr + elementsPerSplit; + iters[i] = new LongValueSequenceIterator(curr, next-1, true); + curr = next; + } + + return iters; + } + else { + // this can only be the case when there are two partitions + if (numPartitions != 2) { + throw new RuntimeException("Bug in splitting logic."); + } + + return new LongValueSequenceIterator[] { + new LongValueSequenceIterator(current, current + elementsPerSplit), + new LongValueSequenceIterator(current + elementsPerSplit, to) + }; + } + } + + + @Override + public int getMaximumNumberOfSplits() { + if (to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to-current+1 >= Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + else { + return (int) (to-current+1); + } + } +} diff --git a/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java new file mode 100644 index 0000000000000..3407690921798 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/LongValueSequenceIteratorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + + +public class LongValueSequenceIteratorTest extends TestLogger { + + @Test + public void testSplitRegular() { + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, 10), 2); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(100, 100000), 7); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-100, 0), 5); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-100, 100), 3); + } + + @Test + public void testSplittingLargeRangesBy2() { + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, Long.MAX_VALUE), 2); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-1000000000L, Long.MAX_VALUE), 2); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(Long.MIN_VALUE, Long.MAX_VALUE), 2); + } + + @Test + public void testSplittingTooSmallRanges() { + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(0, 0), 2); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-5, -5), 2); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(-5, -4), 3); + testSplitting(new org.apache.flink.util.LongValueSequenceIterator(10, 15), 10); + } + + private static final void testSplitting(org.apache.flink.util.LongValueSequenceIterator iter, int numSplits) { + org.apache.flink.util.LongValueSequenceIterator[] splits = iter.split(numSplits); + + assertEquals(numSplits, splits.length); + + // test start and end of range + assertEquals(iter.getCurrent(), splits[0].getCurrent()); + assertEquals(iter.getTo(), splits[numSplits-1].getTo()); + + // test continuous range + for (int i = 1; i < splits.length; i++) { + assertEquals(splits[i-1].getTo() + 1, splits[i].getCurrent()); + } + + testMaxSplitDiff(splits); + } + + + private static final void testMaxSplitDiff(org.apache.flink.util.LongValueSequenceIterator[] iters) { + long minSplitSize = Long.MAX_VALUE; + long maxSplitSize = Long.MIN_VALUE; + + for (LongValueSequenceIterator iter : iters) { + long diff; + if (iter.getTo() < iter.getCurrent()) { + diff = 0; + } else { + diff = iter.getTo() - iter.getCurrent(); + } + if (diff < 0) { + diff = Long.MAX_VALUE; + } + + minSplitSize = Math.min(minSplitSize, diff); + maxSplitSize = Math.max(maxSplitSize, diff); + } + + assertTrue(maxSplitSize == minSplitSize || maxSplitSize-1 == minSplitSize); + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index f284b90107bc4..89b6d17960aef 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -209,7 +209,7 @@ public int hashCode() { @Override public String toString() { - return "ChecksumHashCode " + this.checksum + ", count " + this.count; + return String.format("ChecksumHashCode 0x%016x, count %d", this.checksum, this.count); } } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java new file mode 100644 index 0000000000000..9892d168cd57a --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.LongValue; + +import java.text.NumberFormat; + +/** + * Generate an RMat graph for Graph 500. + * + * Note that this does not yet implement permutation of vertex labels or edges. + + * @see Graph 500 + */ +public class Graph500 { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1 << scale; + long edgeCount = vertexCount * edgeFactor; + + DataSet> edges = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate() + .getEdges() + .project(0, 1); + + // Print, hash, or write RMat graph to disk + switch (parameters.get("output", "hash")) { + case "print": + edges.print(); + break; + + case "hash": + System.out.println(DataSetUtils.checksumHashCode(edges)); + break; + + case "csv": + String filename = parameters.get("filename"); + + String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); + String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + + edges.writeAsCsv(filename, row_delimiter, field_delimiter); + + env.execute(); + break; + } + + JobExecutionResult result = env.getLastJobExecutionResult(); + + NumberFormat nf = NumberFormat.getInstance(); + System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java new file mode 100644 index 0000000000000..99336f6f5e225 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +public abstract class AbstractGraphGenerator +implements GraphGenerator { + + // Optional configuration + protected int parallelism = -1; + + @Override + public AbstractGraphGenerator setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java new file mode 100644 index 0000000000000..31999c77d44d4 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; + +/* + * @see Complete Graph at Wolfram MathWorld + */ +public class CompleteGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + /** + * A {@link Graph} connecting every distinct pair of vertices. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + */ + public CompleteGraph(ExecutionEnvironment env, long vertexCount) { + if (vertexCount <= 0) { + throw new IllegalArgumentException("Vertex count must be greater than zero"); + } + + this.env = env; + this.vertexCount = vertexCount; + } + + @Override + public Graph generate() { + // Vertices + DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); + + DataSet> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .flatMap(new LinkVertexToAll(vertexCount)) + .setParallelism(parallelism) + .name("Complete graph edges"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + @ForwardedFields("*->f0") + public class LinkVertexToAll + implements FlatMapFunction> { + + private final long vertexCount; + + private LongValue target = new LongValue(); + + private Edge edge = new Edge<>(null, target, NullValue.getInstance()); + + public LinkVertexToAll(long vertex_count) { + this.vertexCount = vertex_count; + } + + @Override + public void flatMap(LongValue source, Collector> out) + throws Exception { + edge.f0 = source; + + long s = source.getValue(); + long t = (s + 1) % vertexCount; + + while (s != t) { + target.setValue(t); + out.collect(edge); + + if (++t == vertexCount) { + t = 0; + } + } + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java new file mode 100644 index 0000000000000..365c8f45bc0e0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/* + * @see Cycle Graph at Wolfram MathWorld + */ +public class CycleGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private int vertexCount; + + /** + * An undirected {@link Graph} where all edges form a single cycle. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + */ + public CycleGraph(ExecutionEnvironment env, int vertexCount) { + if (vertexCount <= 0) { + throw new IllegalArgumentException("Vertex count must be greater than zero"); + } + + this.env = env; + this.vertexCount = vertexCount; + } + + @Override + public Graph generate() { + return new GridGraph(env) + .addDimension(vertexCount, true) + .setParallelism(parallelism) + .generate(); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java new file mode 100644 index 0000000000000..d444ed717184c --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.util.Collections; + +/* + * @see Empty Graph at Wolfram MathWorld + */ +public class EmptyGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + /** + * The empty {@link Graph} containing no edges. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + */ + public EmptyGraph(ExecutionEnvironment env, long vertexCount) { + if (vertexCount <= 0) { + throw new IllegalArgumentException("Vertex count must be greater than zero"); + } + + this.env = env; + this.vertexCount = vertexCount; + } + + @Override + public Graph generate() { + // Vertices + DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + TypeInformation> typeInformation = new TupleTypeInfo<>( + ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO); + + DataSource> edges = env + .fromCollection(Collections.>emptyList(), typeInformation) + .setParallelism(parallelism) + .name("Empty edge set"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java new file mode 100644 index 0000000000000..b9e3be7fdf3e4 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.graph.Graph; + +/** + * Graph generators shall be + * - parallelizable, in order to create large datasets + * - scale-free, generating the same graph regardless of parallelism + * - thrifty, using as few operators as possible + * + * Graph generators should prefer to emit edges sorted by the source label. + * + * @param the key type for edge and vertex identifiers + * @param the value type for vertices + * @param the value type for edges + */ +public interface GraphGenerator { + + /** + * Generates the configured graph. + * + * @return generated graph + */ + Graph generate(); + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + GraphGenerator setParallelism(int parallelism); +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java new file mode 100644 index 0000000000000..a7b5ce9ecdfbe --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; + +public class GraphGeneratorUtils { + + /** + * Generates {@link Vertex Vertices} with sequential, numerical labels. + * + * @param env the Flink execution environment. + * @param parallelism operator parallelism + * @param vertexCount number of sequential vertex labels + * @return {@link DataSet} of sequentially labeled {@link Vertex Vertices} + */ + public static DataSet> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) { + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount-1); + + DataSource vertexLabels = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Vertex iterators"); + + DataSet> vertexSequence = vertexLabels + .map(new CreateVertex()) + .setParallelism(parallelism) + .name("Vertex sequence"); + + return vertexSequence; + } + + @ForwardedFields("*->f0") + private static class CreateVertex + implements MapFunction> { + + private Vertex vertex = new Vertex<>(null, NullValue.getInstance()); + + @Override + public Vertex map(LongValue value) + throws Exception { + vertex.f0 = value; + + return vertex; + } + } + + /**************************************************************************/ + + /** + * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s. + * + * @param edges source {@link DataSet} of {@link Edge}s + * @param parallelism operator parallelism + * @param label type + * @param edge value type + * @return {@link DataSet} of discovered {@link Vertex Vertices} + * + * @see {@link Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)} + */ + public static DataSet> vertexSet(DataSet> edges, int parallelism) { + DataSet> vertexSet = edges + .flatMap(new EmitSrcAndTarget()) + .setParallelism(parallelism) + .name("Emit source and target labels"); + + DataSet> distinctVertexSet = vertexSet + .distinct() + .setParallelism(parallelism) + .name("Emit vertex labels"); + + return distinctVertexSet; + } + + /** + * @see {@link Graph.EmitSrcAndTarget} + */ + private static final class EmitSrcAndTarget + implements FlatMapFunction, Vertex> { + + private Vertex output = new Vertex<>(null, new NullValue()); + + @Override + public void flatMap(Edge value, Collector> out) throws Exception { + output.f0 = value.f0; + out.collect(output); + output.f0 = value.f1; + out.collect(output); + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java new file mode 100644 index 0000000000000..e598cf9d53432 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; + +import java.util.ArrayList; +import java.util.List; + +/* + * @see Grid Graph at Wolfram MathWorld + */ +public class GridGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private List> dimensions = new ArrayList<>(); + + private long vertexCount = 1; + + /** + * A {@link Graph} connecting vertices in a regular tiling in one or more dimensions. + * + * @param env the Flink execution environment + */ + public GridGraph(ExecutionEnvironment env) { + this.env = env; + } + + /** + * Required configuration for each dimension of the graph. + * + * @param size number of vertices; dimensions of size 1 are prohibited due to having no effect + * on the generated graph + * @param wrapEndpoints whether to connect first and last vertices; this has no effect on + * dimensions of size 2 + * @return this + */ + public GridGraph addDimension(long size, boolean wrapEndpoints) { + if (size <= 1) { + throw new IllegalArgumentException("Dimension size must be greater than 1"); + } + + vertexCount *= size; + + // prevent duplicate edges + if (size == 2) { + wrapEndpoints = false; + } + + dimensions.add(new Tuple2<>(size, wrapEndpoints)); + + return this; + } + + @Override + public Graph generate() { + if (dimensions.isEmpty()) { + throw new RuntimeException("No dimensions added to GridGraph"); + } + + // Vertices + DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1); + + DataSet> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .flatMap(new LinkVertexToNeighbors(vertexCount, dimensions)) + .setParallelism(parallelism) + .name("Grid graph edges"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + @ForwardedFields("*->f0") + public class LinkVertexToNeighbors + implements FlatMapFunction> { + + private long vertexCount; + + private List> dimensions; + + private LongValue target = new LongValue(); + + private Edge edge = new Edge<>(null, target, NullValue.getInstance()); + + public LinkVertexToNeighbors(long vertexCount, List> dimensions) { + this.vertexCount = vertexCount; + this.dimensions = dimensions; + } + + @Override + public void flatMap(LongValue source, Collector> out) + throws Exception { + edge.f0 = source; + long val = source.getValue(); + + // the distance between neighbors in a given iteration + long increment = vertexCount; + + // the value in the remaining dimensions + long remainder = val; + + for (Tuple2 dimension : dimensions) { + increment /= dimension.f0; + + // the index within this dimension + long index = remainder / increment; + + if (index > 0) { + target.setValue(val - increment); + out.collect(edge); + } else if (dimension.f1) { + target.setValue(val + increment * (dimension.f0 - 1)); + out.collect(edge); + } + + if (index < dimension.f0 - 1) { + target.setValue(val + increment); + out.collect(edge); + } else if (dimension.f1) { + target.setValue(val - increment * (dimension.f0 - 1)); + out.collect(edge); + } + + remainder %= increment; + } + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java new file mode 100644 index 0000000000000..40968a0876a08 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/* + * @see Hypercube Graph at Wolfram MathWorld + */ +public class HypercubeGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long dimensions; + + /** + * An undirected {@link Graph} where edges form an n-dimensional hypercube. + * + * @param env the Flink execution environment + * @param dimensions number of dimensions + */ + public HypercubeGraph(ExecutionEnvironment env, long dimensions) { + if (dimensions <= 0) { + throw new IllegalArgumentException("Number of dimensions must be greater than zero"); + } + + this.env = env; + this.dimensions = dimensions; + } + + @Override + public Graph generate() { + GridGraph graph = new GridGraph(env); + + for (int i = 0; i < dimensions; i++) { + graph.addDimension(2, false); + } + + return graph + .setParallelism(parallelism) + .generate(); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java new file mode 100644 index 0000000000000..1aec7237999f6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +/* + * @see Path Graph at Wolfram MathWorld + */ +public class PathGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + /** + * An undirected {@link Graph} where all edges form a single path. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + */ + public PathGraph(ExecutionEnvironment env, long vertexCount) { + if (vertexCount <= 0) { + throw new IllegalArgumentException("Vertex count must be greater than zero"); + } + + this.env = env; + this.vertexCount = vertexCount; + } + + @Override + public Graph generate() { + return new GridGraph(env) + .addDimension(vertexCount, false) + .setParallelism(parallelism) + .generate(); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java new file mode 100644 index 0000000000000..c68f78a7eaf3f --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.commons.math3.random.RandomGenerator; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.generator.random.BlockInfo; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import java.util.List; + +/* + * @see R-MAT: A Recursive Model for Graph Mining + */ +public class RMatGraph +extends AbstractGraphGenerator { + + // Default RMat constants + public static final float DEFAULT_A = 0.57f; + + public static final float DEFAULT_B = 0.19f; + + public static final float DEFAULT_C = 0.19f; + + public static final float DEFAULT_NOISE = 0.10f; + + // Required to create the DataSource + private ExecutionEnvironment env; + + // Required configuration + private final RandomGenerableFactory randomGenerableFactory; + + private final long vertexCount; + + private final long edgeCount; + + // Optional configuration + private float A = DEFAULT_A; + + private float B = DEFAULT_B; + + private float C = DEFAULT_C; + + private boolean noiseEnabled = false; + + private float noise = DEFAULT_NOISE; + + private boolean simpleGraph = false; + + private boolean clipAndFlip = false; + + /** + * Generate a directed or undirected power-law {@link Graph} using the + * Recusrive Matrix (R-Mat) model. + * + * @param env the Flink execution environment + * @param randomGeneratorFactory source of randomness + * @param vertexCount number of vertices + * @param edgeCount number of edges + */ + public RMatGraph(ExecutionEnvironment env, RandomGenerableFactory randomGeneratorFactory, long vertexCount, long edgeCount) { + if (vertexCount <= 0) { + throw new IllegalArgumentException("Vertex count must be greater than zero"); + } + + if (edgeCount <= 0) { + throw new IllegalArgumentException("Edge count must be greater than zero"); + } + + this.env = env; + this.randomGenerableFactory = randomGeneratorFactory; + this.vertexCount = vertexCount; + this.edgeCount = edgeCount; + } + + /** + * The parameters for recursively subdividing the adjacency matrix. + * + * Setting A = B = C = 0.25 emulates the Erdős–Rényi model. + * + * Graph500 uses A = 0.57, B = C = 0.19. + * + * @param A likelihood of source bit = 0, target bit = 0 + * @param B likelihood of source bit = 0, target bit = 1 + * @param C likelihood of source bit = 1, target bit = 0 + * @return this + */ + public RMatGraph setConstants(float A, float B, float C) { + if (A < 0.0f || B < 0.0f || C < 0.0f || A + B + C > 1.0f) { + throw new RuntimeException("RMat parameters A, B, and C must be non-negative and sum to less than or equal to one"); + } + + this.A = A; + this.B = B; + this.C = C; + + return this; + } + + /** + * Enable and configure noise. Each edge is generated independently, but + * when noise is enabled the parameters A, B, and C are randomly increased + * or decreased, then normalized, by a fraction of the noise factor during + * the computation of each bit. + * + * @param noiseEnabled whether to enable noise perturbation + * @param noise strength of noise perturbation + * @return this + */ + public RMatGraph setNoise(boolean noiseEnabled, float noise) { + if (noise < 0.0f || noise > 2.0f) { + throw new RuntimeException("RMat parameter noise must be non-negative and less than or equal to 2.0"); + } + + this.noiseEnabled = noiseEnabled; + this.noise = noise; + + return this; + } + + /** + * When configured for a simple graph duplicate edges and self-loops will + * be removed. The clip-and-flip method removes edges where source < target + * before symmetrizing the graph. + * + * @param simpleGraph whether to generate a simple graph + * @param clipAndFlip method for generating simple graph + * @return this + */ + public RMatGraph setSimpleGraph(boolean simpleGraph, boolean clipAndFlip) { + this.simpleGraph = simpleGraph; + this.clipAndFlip = clipAndFlip; + + return this; + } + + @Override + public Graph generate() { + int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 1); + + // Edges + int cyclesPerEdge = noiseEnabled ? 5 * scale : scale; + + List> generatorBlocks = randomGenerableFactory + .getRandomGenerables(edgeCount, cyclesPerEdge); + + DataSet> generatedEdges = env + .fromCollection(generatorBlocks) + .name("Random generators") + .rebalance() + .setParallelism(parallelism) + .name("Rebalance") + .flatMap(new GenerateEdges(vertexCount, scale, A, B, C, noiseEnabled, noise, simpleGraph, clipAndFlip)) + .setParallelism(parallelism) + .name("RMat graph edges"); + + DataSet> edges; + + if (simpleGraph) { + edges = generatedEdges + .distinct(1, 0) + .setParallelism(parallelism) + .name("Distinct"); + } else { + edges = generatedEdges; + } + + // Vertices + DataSet> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + private static final class GenerateEdges + implements FlatMapFunction, Edge> { + + // Configuration + private final long vertexCount; + + private final int scale; + + private final float A; + + private final float B; + + private final float C; + + private final float D; + + private final boolean noiseEnabled; + + private final float noise; + + private final boolean simpleGraph; + + private final boolean clipAndFlip; + + // Output + private LongValue source = new LongValue(); + + private LongValue target = new LongValue(); + + private Edge sourceToTarget = new Edge<>(source, target, NullValue.getInstance()); + + private Edge targetToSource = new Edge<>(target, source, NullValue.getInstance()); + + public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise, boolean simpleGraph, boolean clipAndFlip) { + this.vertexCount = vertexCount; + this.scale = scale; + this.A = A; + this.B = B; + this.C = C; + this.D = 1.0f - A - B - C; + this.noiseEnabled = noiseEnabled; + this.noise = noise; + this.simpleGraph = simpleGraph; + this.clipAndFlip = clipAndFlip; + } + + @Override + public void flatMap(BlockInfo blockInfo, Collector> out) + throws Exception { + RandomGenerator rng = blockInfo.getRandomGenerable().generator(); + long edgesToGenerate = blockInfo.getElementCount(); + + while (edgesToGenerate > 0) { + long x = 0; + long y = 0; + + // matrix constants are reset for each edge + float a = A; + float b = B; + float c = C; + float d = D; + + for (int bit = 0; bit < scale; bit++) { + // generated next bit for source and target + x <<= 1; + y <<= 1; + + float random = rng.nextFloat(); + + if (random <= a) { + } else if (random <= a + b) { + y += 1; + } else if (random <= a + b + c) { + x += 1; + } else { + x += 1; + y += 1; + } + + if (noiseEnabled) { + // noise is bounded such that all parameters remain non-negative + a *= 1.0 - noise / 2 + rng.nextFloat() * noise; + b *= 1.0 - noise / 2 + rng.nextFloat() * noise; + c *= 1.0 - noise / 2 + rng.nextFloat() * noise; + d *= 1.0 - noise / 2 + rng.nextFloat() * noise; + + // normalize back to a + b + c + d = 1.0 + float norm = 1.0f / (a + b + c + d); + + a *= norm; + b *= norm; + c *= norm; + + // could multiply by norm, but subtract to minimize rounding error + d = 1.0f - a - b - c; + } + } + + // if vertexCount is not a power-of-2 then discard edges outside the vertex range + if (x < vertexCount && y < vertexCount) { + source.setValue(x); + target.setValue(y); + + if (simpleGraph) { + if ((clipAndFlip && x > y) || (!clipAndFlip && x != y)) { + out.collect(sourceToTarget); + out.collect(targetToSource); + } + } else { + out.collect(sourceToTarget); + } + + edgesToGenerate--; + } + } + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java new file mode 100644 index 0000000000000..a714a29ee93a6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.LongValueSequenceIterator; + +/** + * A singleton-edge {@link Graph} contains one or more isolated two-paths. The in- and out-degree + * of every vertex is 1. For {@code n} vertices there are {@code n/2} components. + */ +public class SingletonEdgeGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexPairCount; + + /** + * An undirected {@link Graph} containing isolated two-paths. + * + * @param env the Flink execution environment + * @param vertexPairCount number of pairs of vertices + */ + public SingletonEdgeGraph(ExecutionEnvironment env, long vertexPairCount) { + if (vertexPairCount <= 0) { + throw new IllegalArgumentException("Vertex pair count must be greater than zero"); + } + + this.env = env; + this.vertexPairCount = vertexPairCount; + } + + @Override + public Graph generate() { + // Vertices + long vertexCount = 2 * this.vertexPairCount; + + DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1); + + DataSet> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .map(new LinkVertexToSingletonNeighbor()) + .setParallelism(parallelism) + .name("Complete graph edges"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + @ForwardedFields("*->f0") + private class LinkVertexToSingletonNeighbor + implements MapFunction> { + + private LongValue source = new LongValue(); + + private LongValue target = new LongValue(); + + private Edge edge = new Edge<>(source, target, NullValue.getInstance()); + + @Override + public Edge map(LongValue value) throws Exception { + long val = value.getValue(); + + source.setValue(val); + + if (val % 2 == 0) { + target.setValue(val + 1); + } else { + target.setValue(val - 1); + } + + return edge; + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java new file mode 100644 index 0000000000000..cb99f3014714d --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.LongValueSequenceIterator; + +/* + * @see Star Graph at Wolfram MathWorld + */ +public class StarGraph +extends AbstractGraphGenerator { + + // Required to create the DataSource + private final ExecutionEnvironment env; + + // Required configuration + private long vertexCount; + + /** + * An undirected {@link Graph} containing a single central {@link Vertex} connected to all other leaf vertices. + * + * @param env the Flink execution environment + * @param vertexCount number of vertices + */ + public StarGraph(ExecutionEnvironment env, long vertexCount) { + if (vertexCount <= 0) { + throw new IllegalArgumentException("Vertex count must be greater than zero"); + } + + this.env = env; + this.vertexCount = vertexCount; + } + + @Override + public Graph generate() { + // Vertices + DataSet> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount); + + // Edges + LongValueSequenceIterator iterator = new LongValueSequenceIterator(1, this.vertexCount - 1); + + DataSet> edges = env + .fromParallelCollection(iterator, LongValue.class) + .setParallelism(parallelism) + .name("Edge iterators") + .flatMap(new LinkVertexToCenter()) + .setParallelism(parallelism) + .name("Star graph edges"); + + // Graph + return Graph.fromDataSet(vertices, edges, env); + } + + @ForwardedFields("*->f0") + public class LinkVertexToCenter + implements FlatMapFunction> { + + private LongValue center = new LongValue(0); + + private Edge center_to_leaf = new Edge<>(center, null, NullValue.getInstance()); + + private Edge leaf_to_center = new Edge<>(null, center, NullValue.getInstance()); + + @Override + public void flatMap(LongValue leaf, Collector> out) + throws Exception { + center_to_leaf.f1 = leaf; + out.collect(center_to_leaf); + + leaf_to_center.f0 = leaf; + out.collect(leaf_to_center); + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java new file mode 100644 index 0000000000000..3bb904e0b4331 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +import java.util.ArrayList; +import java.util.List; + +/** + * This base class handles the task of dividing the requested work into the + * appropriate number of blocks of near-equal size. + * + * @param the type of the {@code RandomGenerator} + */ +public abstract class AbstractGeneratorFactory +implements RandomGenerableFactory { + + // A large computation will run in parallel but blocks are generated on + // and distributed from a single node. This limit should be greater + // than the maximum expected parallelism. + public static final int MAXIMUM_BLOCK_COUNT = 1 << 20; + + // This should be sufficiently large relative to the cost of instantiating + // and initializing the random generator and sufficiently small relative to + // the cost of generating random values. + protected abstract int getMinimumCyclesPerBlock(); + + protected abstract RandomGenerable next(); + + @Override + public List> getRandomGenerables(long elementCount, int cyclesPerElement) { + long cycles = elementCount * cyclesPerElement; + int blockCount = Math.min((int) Math.ceil(cycles / (float) getMinimumCyclesPerBlock()), MAXIMUM_BLOCK_COUNT); + + long elementsPerBlock = elementCount / blockCount; + long elementRemainder = elementCount % blockCount; + + List> blocks = new ArrayList<>(blockCount); + long blockStart = 0; + + for (int blockIndex = 0 ; blockIndex < blockCount ; blockIndex++) { + if (blockIndex == blockCount - elementRemainder) { + elementsPerBlock++; + } + + RandomGenerable randomGenerable = next(); + + blocks.add(new BlockInfo<>(randomGenerable, blockIndex, blockCount, blockStart, elementsPerBlock)); + + blockStart += elementsPerBlock; + } + + return blocks; + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java new file mode 100644 index 0000000000000..5e30a3f6bd8ea --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +/** + * Defines a source of randomness and a unit of work. + * + * @param the type of the {@code RandomGenerator} + */ +public class BlockInfo { + + private final RandomGenerable randomGenerable; + + private final int blockIndex; + + private final int blockCount; + + private final long firstElement; + + private final long elementCount; + + public BlockInfo(RandomGenerable randomGenerable, int blockIndex, int blockCount, long firstElement, long elementCount) { + this.randomGenerable = randomGenerable; + this.blockIndex = blockIndex; + this.blockCount = blockCount; + this.firstElement = firstElement; + this.elementCount = elementCount; + } + + /** + * @return the source of randomness + */ + public RandomGenerable getRandomGenerable() { + return randomGenerable; + } + + /** + * @return the index of this block within the list of blocks + */ + public int getBlockIndex() { + return blockIndex; + } + + /** + * @return the total number of blocks + */ + public int getBlockCount() { + return blockCount; + } + + /** + * @return the index of the first element in this block + */ + public long getFirstElement() { + return firstElement; + } + + /** + * @return the total number of elements across all blocks + */ + public long getElementCount() { + return elementCount; + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java new file mode 100644 index 0000000000000..2024cae186c7e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.JDKRandomGenerator; + +/** + * Uses a seeded {@link JDKRandomGenerator} to generate seeds for the + * distributed collection of {@link JDKRandomGenerator}. + */ +public class JDKRandomGeneratorFactory +extends AbstractGeneratorFactory { + + public static final long DEFAULT_SEED = 0x4b6f7e18198de7a4L; + + public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20; + + private final JDKRandomGenerator random = new JDKRandomGenerator(); + + public JDKRandomGeneratorFactory() { + this(DEFAULT_SEED); + } + + public JDKRandomGeneratorFactory(long seed) { + random.setSeed(seed); + } + + @Override + protected int getMinimumCyclesPerBlock() { + return MINIMUM_CYCLES_PER_BLOCK; + } + + @Override + protected JDKRandomGenerable next() { + return new JDKRandomGenerable(random.nextLong()); + } + + private static class JDKRandomGenerable + implements RandomGenerable { + + private final long seed; + + public JDKRandomGenerable(long seed) { + this.seed = seed; + } + + @Override + public JDKRandomGenerator generator() { + JDKRandomGenerator random = new JDKRandomGenerator(); + + random.setSeed(seed); + + return random; + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java new file mode 100644 index 0000000000000..22a7b04db7a3e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.MersenneTwister; + +/** + * Uses a seeded {@link MersenneTwister} to generate seeds for the + * distributed collection of {@link MersenneTwister}. + */ +public class MersenneTwisterFactory +extends AbstractGeneratorFactory { + + public static final long DEFAULT_SEED = 0x74c8cc8a58a9ceb9L; + + public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20; + + private final MersenneTwister random = new MersenneTwister(); + + public MersenneTwisterFactory() { + this(DEFAULT_SEED); + } + + public MersenneTwisterFactory(long seed) { + random.setSeed(seed); + } + + @Override + protected int getMinimumCyclesPerBlock() { + return MINIMUM_CYCLES_PER_BLOCK; + } + + @Override + protected MersenneTwisterGenerable next() { + return new MersenneTwisterGenerable(random.nextLong()); + } + + private static class MersenneTwisterGenerable + implements RandomGenerable { + + private final long seed; + + public MersenneTwisterGenerable(long seed) { + this.seed = seed; + } + + @Override + public MersenneTwister generator() { + MersenneTwister random = new MersenneTwister(); + + random.setSeed(seed); + + return random; + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java new file mode 100644 index 0000000000000..318b50815cc98 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +/** + * A RandomGenerable provides deferred instantiation and initialization of a + * RandomGenerator. This allows pre-processing or discovery to be distributed + * and performed in parallel by Flink tasks. + * + * A distributed PRNG is described by Matsumoto and Takuji in + * "Dynamic Creation of Pseudorandom Number Generators". + * + * @param the type of the {@code RandomGenerator} + */ +public interface RandomGenerable { + + /** + * Returns an initialized {@link RandomGenerator}. + * + * @return a {@code RandomGenerator} of type {@code T} + */ + T generator(); +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java new file mode 100644 index 0000000000000..ead29fc82a83d --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +import java.util.List; + +/** + * A {@code RandomGenerableFactory} returns a scale-free collection of sources + * of pseudorandomness which can be used to perform repeatable parallel + * computation regardless of parallelism. + * + *
+ * {@code
+ * RandomGenerableFactory factory = new JDKRandomGeneratorFactory()
+ *
+ * List> generatorBlocks = factory
+ *     .getRandomGenerables(elementCount, cyclesPerElement);
+ *
+ * DataSet<...> generatedEdges = env
+ *     .fromCollection(generatorBlocks)
+ *         .name("Random generators")
+ *     .flatMap(...
+ * }
+ * 
+ * + * @param the type of the {@code RandomGenerator} + */ +public interface RandomGenerableFactory { + + /** + * The amount of work ({@code elementCount * cyclerPerElement}) is used to + * generate a list of blocks of work of near-equal size. + * + * @param elementCount number of elements, as indexed in the {@code BlockInfo} + * @param cyclesPerElement number of cycles of the PRNG per element + * @return the list of configuration blocks + */ + List> getRandomGenerables(long elementCount, int cyclesPerElement); +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java new file mode 100644 index 0000000000000..1cac80baacce3 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Before; + +public class AbstractGraphTest { + + protected ExecutionEnvironment env; + + @Before + public void setup() { + env = ExecutionEnvironment.createCollectionsEnvironment(); + env.getConfig().disableSysoutLogging(); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java new file mode 100644 index 0000000000000..af47fdc41d7d1 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CompleteGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int vertexCount = 4; + + Graph graph = new CompleteGraph(env, vertexCount) + .generate(); + + String vertices = "0; 1; 2; 3"; + String edges = "0,1; 0,2; 0,3; 1,0; 1,2; 1,3; 2,0; 2,1; 2,3; 3,0; 3,1; 3,2"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 10; + + Graph graph = new CompleteGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(vertexCount*(vertexCount-1), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(vertexCount - 1, minInDegree); + assertEquals(vertexCount - 1, minOutDegree); + assertEquals(vertexCount - 1, maxInDegree); + assertEquals(vertexCount - 1, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new CompleteGraph(env, 10) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java new file mode 100644 index 0000000000000..fb6799b939691 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CycleGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new CycleGraph(env, 10) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 5,4;" + + "5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8; 9,0; 0,9"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new CycleGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(2 * vertexCount, graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(2, minInDegree); + assertEquals(2, minOutDegree); + assertEquals(2, maxInDegree); + assertEquals(2, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new CycleGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java new file mode 100644 index 0000000000000..bc1ef7782a239 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EmptyGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new EmptyGraph(env, 10) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = null; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new EmptyGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(0, graph.numberOfEdges()); + + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(0, maxInDegree); + assertEquals(0, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new EmptyGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java new file mode 100644 index 0000000000000..b7b262fe6938a --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class GridGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new GridGraph(this.env) + .addDimension(2, false) + .addDimension(3, false) + .generate(); + + // 0 1 2 + // 3 4 5 + String vertices = "0; 1; 2; 3; 4; 5"; + String edges = "0,1; 0,3; 1,0; 1,2; 1,4; 2,1; 2,5; 3,0; 3,4; 4,1;" + + "4,3; 4,5; 5,2; 5,4"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + Graph graph = new GridGraph(this.env) + .addDimension(2, true) + .addDimension(3, true) + .addDimension(5, true) + .addDimension(7, true) + .generate(); + + // Each vertex is the source of one edge in the first dimension of size 2, + // and the source of two edges in each dimension of size greater than 2. + assertEquals(2*3*5*7, graph.numberOfVertices()); + assertEquals(7 * 2*3*5*7, graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(7, minInDegree); + assertEquals(7, minOutDegree); + assertEquals(7, maxInDegree); + assertEquals(7, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new GridGraph(env) + .addDimension(3, false) + .addDimension(5, false) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java new file mode 100644 index 0000000000000..6d35484c62afd --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class HypercubeGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int dimensions = 3; + + Graph graph = new HypercubeGraph(this.env, dimensions) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7"; + String edges = "0,1; 0,2; 0,4; 1,0; 1,3; 1,5; 2,0; 2,3; 2,6; 3,1; 3,2; 3,7;" + + "4,0; 4,5; 4,6; 5,1; 5,4; 5,7; 6,2; 6,4; 6,7; 7,3; 7,6; 7,5"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int dimensions = 10; + + Graph graph = new HypercubeGraph(this.env, dimensions) + .generate(); + + assertEquals(1 << dimensions, graph.numberOfVertices()); + assertEquals(dimensions * (1 << dimensions), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(dimensions, minInDegree); + assertEquals(dimensions, minOutDegree); + assertEquals(dimensions, maxInDegree); + assertEquals(dimensions, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new HypercubeGraph(env, 4) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java new file mode 100644 index 0000000000000..b8a409f961359 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PathGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new PathGraph(env, 10) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 5,4;" + + "5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new PathGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(2 * (vertexCount - 1), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(1, minInDegree); + assertEquals(1, minOutDegree); + assertEquals(2, maxInDegree); + assertEquals(2, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new PathGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java new file mode 100644 index 0000000000000..a06c63fd06101 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RMatGraphTest +extends AbstractGraphTest { + + @Test + public void testGraphMetrics() + throws Exception { + long vertexCount = 100; + + long edgeCount = 1000; + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate(); + + assertTrue(vertexCount >= graph.numberOfVertices()); + assertEquals(edgeCount, graph.numberOfEdges()); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + Graph graph = new RMatGraph<>(env, rnd, 100, 1000) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java new file mode 100644 index 0000000000000..3877717f506c2 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SingletonEdgeGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int vertexPairCount = 5; + + Graph graph = new SingletonEdgeGraph(env, vertexPairCount) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 2,3; 3,2; 4,5; 5,4; 6,7; 7,6; 8,9; 9,8"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexPairCount = 10; + + Graph graph = new SingletonEdgeGraph(env, vertexPairCount) + .generate(); + + assertEquals(2 * vertexPairCount, graph.numberOfVertices()); + assertEquals(2 * vertexPairCount, graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(1, minInDegree); + assertEquals(1, minOutDegree); + assertEquals(1, maxInDegree); + assertEquals(1, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new SingletonEdgeGraph(env, 10) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java new file mode 100644 index 0000000000000..2b090db85df79 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class StarGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int vertexCount = 10; + + Graph graph = new StarGraph(env, vertexCount) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 0,2; 2,0; 0,3; 3,0; 0,4; 4,0; 0,5; 5,0;" + + "0,6; 6,0; 0,7; 7,0; 0,8; 8,0; 0,9; 9,0"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new StarGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(2 * (vertexCount - 1), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(1, minInDegree); + assertEquals(1, minOutDegree); + assertEquals(vertexCount - 1, maxInDegree); + assertEquals(vertexCount - 1, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new StarGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java new file mode 100644 index 0000000000000..3ea5a449cdd01 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.test.util.TestBaseUtils; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public final class TestUtils { + + /** + * Compare graph vertices and edges against expected values. + * + * @param graph graph under test + * @param expectedVertices vertex labels separated by semi-colons; whitespace is ignored + * @param expectedEdges edges of the form "source,target" separated by semi-colons; whitespace is ignored + * @param the key type for edge and vertex identifiers + * @param the value type for vertices + * @param the value type for edges + * @throws Exception + */ + public static void compareGraph(Graph graph, String expectedVertices, String expectedEdges) + throws Exception { + // Vertices + if (expectedVertices != null) { + List resultVertices = new ArrayList<>(); + + for (Vertex vertex : graph.getVertices().collect()) { + resultVertices.add(vertex.f0.toString()); + } + + TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s","").replace(";", "\n")); + } + + // Edges + if (expectedEdges != null) { + List resultEdges = new ArrayList<>(); + + for (Edge edge : graph.getEdges().collect()) { + resultEdges.add(edge.f0.toString() + "," + edge.f1.toString()); + } + + TestBaseUtils.compareResultAsText(resultEdges, expectedEdges.replaceAll("\\s","").replace(";", "\n")); + } + } + + /** + * Verify operator parallelism. + * + * @param env the Flink execution environment. + * @param expectedParallelism expected operator parallelism + */ + public static void verifyParallelism(ExecutionEnvironment env, int expectedParallelism) { + env.setParallelism(2 * expectedParallelism); + + Optimizer compiler = new Optimizer(null, new DefaultCostEstimator(), new Configuration()); + OptimizedPlan optimizedPlan = compiler.compile(env.createProgramPlan()); + + List queue = new ArrayList<>(); + queue.addAll(optimizedPlan.getDataSinks()); + + while (queue.size() > 0) { + PlanNode node = queue.remove(queue.size() - 1); + + // Data sources may have parallelism of 1, so simply check that the node + // parallelism has not been increased by setting the default parallelism + assertTrue("Wrong parallelism for " + node.toString(), node.getParallelism() <= expectedParallelism); + + for (Channel channel : node.getInputs()) { + queue.add(channel.getSource()); + } + } + } +} From 018a780fa20a1b14cd6607e96c365c88aacfb2e3 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 7 Apr 2016 17:00:03 -0400 Subject: [PATCH 3/4] Website documentation. --- docs/apis/batch/libs/gelly.md | 544 ++++++++++++++++++ docs/page/css/flink.css | 21 + .../flink/graph/generator/CompleteGraph.java | 2 +- .../flink/graph/generator/EmptyGraph.java | 2 +- .../flink/graph/generator/GridGraph.java | 2 +- .../flink/graph/generator/RMatGraph.java | 2 +- .../flink/graph/generator/GridGraphTest.java | 4 +- .../graph/generator/HypercubeGraphTest.java | 4 +- 8 files changed, 573 insertions(+), 8 deletions(-) diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index b22e362d37e3d..d915089b81e80 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1734,3 +1734,547 @@ vertex represents a group of vertices and each edge represents a group of edges vertex and edge in the output graph stores the common group value and the number of represented elements. {% top %} + +Graph Generators +----------- + +Gelly provides a collection of scalable graph generators. Each generator is + +* parallelizable, in order to create large datasets +* scale-free, generating the same graph regardless of parallelism +* thrifty, using as few operators as possible + +### Complete Graph + +An undirected graph connecting every distinct pair of vertices. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = new CompleteGraph(env, 5) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.CompleteGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val graph = new CompleteGraph(env.getJavaEnv, 5).generate() +{% endhighlight %} +
+
+ + + + + + + + + + + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + +### Cycle Graph + +An undirected graph where all edges form a single cycle. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = new CycleGraph(env, 5) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.CycleGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val graph = new CycleGraph(env.getJavaEnv, 5).generate() +{% endhighlight %} +
+
+ + + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + +### Empty Graph + +The graph containing no edges. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = new EmptyGraph(env, 5) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.EmptyGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val graph = new EmptyGraph(env.getJavaEnv, 5).generate() +{% endhighlight %} +
+
+ + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + +### Grid Graph + +An undirected graph connecting vertices in a regular tiling in one or more dimensions. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = new GridGraph(env) + .addDimension(2, false) + .addDimension(4, false) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.GridGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val graph = new GridGraph(env.getJavaEnv).addDimension(2, false).addDimension(4, false).generate() +{% endhighlight %} +
+
+ + + + + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + + 5 + + + 6 + + + 7 + + +### Hypercube Graph + +An undirected graph where edges form an n-dimensional hypercube. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = new HypercubeGraph(env, 2) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.HypercubeGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +// note: configured with the number of vertex pairs +val graph = new HypercubeGraph(env.getJavaEnv, 4).generate() +{% endhighlight %} +
+
+ + + + + + + + + + + + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + + 5 + + + 6 + + + 7 + + +### Path Graph + +An undirected Graph where all edges form a single path. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = new PathGraph(env, 5) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.PathGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val graph = new PathGraph(env.getJavaEnv, 5).generate() +{% endhighlight %} +
+
+ + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + +### RMat Graph + +A directed or undirected power-law graph generated using the [Recursive Matrix (R-Mat)] +(http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) model. RMat is a stochastic +generator configured with a source of randomness. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + +int vertexCount = 1 << scale; +int edgeCount = edgeFactor * vertexCount; + +Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.RMatGraph + +val env = ExecutionEnvironment.getExecutionEnvironment + +val vertexCount = 1 << scale +val edgeCount = edgeFactor * vertexCount + +val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).generate() +{% endhighlight %} +
+
+ +Manipulating the RMat constants and noise affects the degree-skew. The RMat generator can be +configured to produce a simple graph by removing self-loops and duplicate edges, with undirected +edges produced by a "clip-and-flip" throwing away the half matrix above the diagonal or a full +"flip" preserving and mirroring all edges. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + +int vertexCount = 1 << scale; +int edgeCount = edgeFactor * vertexCount; + +Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setConstants(0.57f, 0.19f, 0.19f) + .setNoise(true, 0.10f) + .setSimpleGraph(true, false) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.RMatGraph + +val env = ExecutionEnvironment.getExecutionEnvironment + +val vertexCount = 1 << scale +val edgeCount = edgeFactor * vertexCount + +val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, false).generate() +{% endhighlight %} +
+
+ +### Singleton Edge Graph + +An undirected graph containing isolated two-paths. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +// note: configured with the number of vertex pairs +Graph graph = new SingletonEdgeGraph(env, 4) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.SingletonEdgeGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +// note: configured with the number of vertex pairs +val graph = new SingletonEdgeGraph(env.getJavaEnv, 4).generate() +{% endhighlight %} +
+
+ + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + + 5 + + + 6 + + + 7 + + +### Star Graph + +An undirected graph containing a single central vertex connected to all other leaf vertices. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +Graph graph = new StarGraph(env, 6) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.StarGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +val graph = new StarGraph(env.getJavaEnv, 6).generate() +{% endhighlight %} +
+
+ + + + + + + + + + + 0 + + + 1 + + + 2 + + + 3 + + + 4 + + + 5 + + +{% top %} diff --git a/docs/page/css/flink.css b/docs/page/css/flink.css index 98d0979474a90..b0d0a237b6389 100644 --- a/docs/page/css/flink.css +++ b/docs/page/css/flink.css @@ -236,3 +236,24 @@ td { *:hover > .anchorjs-link { transition: color .25s linear; } + +/*============================================================================= + Gelly Graphs +=============================================================================*/ + +svg.graph line { + stroke: rgb(255,0,0); + stroke-width: 4; +} + +svg.graph circle { + fill: red; + stroke: black; + stroke-width: 3; +} + +svg.graph text { + dominant-baseline: central; + font-size: 32px; + text-anchor: middle; +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java index 31999c77d44d4..5339b14298dab 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java @@ -43,7 +43,7 @@ public class CompleteGraph private long vertexCount; /** - * A {@link Graph} connecting every distinct pair of vertices. + * An undirected {@link Graph} connecting every distinct pair of vertices. * * @param env the Flink execution environment * @param vertexCount number of vertices diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java index d444ed717184c..05bfd89383b13 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java @@ -45,7 +45,7 @@ public class EmptyGraph private long vertexCount; /** - * The empty {@link Graph} containing no edges. + * The {@link Graph} containing no edges. * * @param env the Flink execution environment * @param vertexCount number of vertices diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java index e598cf9d53432..399a2f923740b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java @@ -49,7 +49,7 @@ public class GridGraph private long vertexCount = 1; /** - * A {@link Graph} connecting vertices in a regular tiling in one or more dimensions. + * An undirected {@link Graph} connecting vertices in a regular tiling in one or more dimensions. * * @param env the Flink execution environment */ diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java index c68f78a7eaf3f..246d8bb344879 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java @@ -75,7 +75,7 @@ public class RMatGraph /** * Generate a directed or undirected power-law {@link Graph} using the - * Recusrive Matrix (R-Mat) model. + * Recursive Matrix (R-Mat) model. * * @param env the Flink execution environment * @param randomGeneratorFactory source of randomness diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java index b7b262fe6938a..f3fa7db746672 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java @@ -34,7 +34,7 @@ public class GridGraphTest @Test public void testGraph() throws Exception { - Graph graph = new GridGraph(this.env) + Graph graph = new GridGraph(env) .addDimension(2, false) .addDimension(3, false) .generate(); @@ -51,7 +51,7 @@ public void testGraph() @Test public void testGraphMetrics() throws Exception { - Graph graph = new GridGraph(this.env) + Graph graph = new GridGraph(env) .addDimension(2, true) .addDimension(3, true) .addDimension(5, true) diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java index 6d35484c62afd..77eed89452858 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java @@ -36,7 +36,7 @@ public void testGraph() throws Exception { int dimensions = 3; - Graph graph = new HypercubeGraph(this.env, dimensions) + Graph graph = new HypercubeGraph(env, dimensions) .generate(); String vertices = "0; 1; 2; 3; 4; 5; 6; 7"; @@ -51,7 +51,7 @@ public void testGraphMetrics() throws Exception { int dimensions = 10; - Graph graph = new HypercubeGraph(this.env, dimensions) + Graph graph = new HypercubeGraph(env, dimensions) .generate(); assertEquals(1 << dimensions, graph.numberOfVertices()); From 70015f4baea6f6ab05e68a6c78ab42f827112b01 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 8 Apr 2016 14:22:56 -0400 Subject: [PATCH 4/4] Updates per Vasia's recommendations. --- docs/apis/batch/libs/gelly.md | 156 ++++++++++++++---- .../apache/flink/graph/examples/Graph500.java | 28 +++- .../generator/AbstractGraphGenerator.java | 2 +- .../flink/graph/generator/CycleGraph.java | 4 +- .../flink/graph/generator/GraphGenerator.java | 2 +- 5 files changed, 158 insertions(+), 34 deletions(-) diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index d915089b81e80..8c691f8836efe 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1744,6 +1744,59 @@ Gelly provides a collection of scalable graph generators. Each generator is * scale-free, generating the same graph regardless of parallelism * thrifty, using as few operators as possible +Graph generators are configured using the builder pattern. The parallelism of generator +operators can be set explicitly by calling `setParallelism(parallelism)`. Lowering the +parallelism will reduce the allocation of memory and network buffers. + +Graph-specific configuration must be called first, then configuration common to all +generators, and lastly the call to `generate()`. The following example configures a +grid graph with two dimensions, configures the parallelism, and generates the graph. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +boolean wrapEndpoints = false; + +int parallelism = 4; + +Graph graph = new GridGraph(env) + .addDimension(2, wrapEndpoints) + .addDimension(4, wrapEndpoints) + .setParallelism(parallelism) + .generate(); +{% endhighlight %} +
+ +
+{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.GridGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + +wrapEndpoints = false + +val parallelism = 4 + +val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDimension(4, wrapEndpoints).setParallelism(parallelism).generate() +{% endhighlight %} +
+
+ +### Provided graph generators + +* [Complete Graph](#complete-graph) +* [Cycle Graph](#cycle-graph) +* [Empty Graph](#empty-graph) +* [Grid Graph](#grid-graph) +* [Hypercube Graph](#hypercube-graph) +* [Path Graph](#path-graph) +* [RMat Graph](#rmat-graph) +* [Singleton Edge Graph](#singleton-edge-graph) +* [Star Graph](#star-graph) + ### Complete Graph An undirected graph connecting every distinct pair of vertices. @@ -1753,7 +1806,9 @@ An undirected graph connecting every distinct pair of vertices. {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -Graph graph = new CompleteGraph(env, 5) +long vertexCount = 5; + +Graph graph = new CompleteGraph(env, vertexCount) .generate(); {% endhighlight %} @@ -1765,7 +1820,9 @@ import org.apache.flink.graph.generator.CompleteGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment -val graph = new CompleteGraph(env.getJavaEnv, 5).generate() +val vertexCount = 5 + +val graph = new CompleteGraph(env.getJavaEnv, vertexCount).generate() {% endhighlight %} @@ -1813,7 +1870,9 @@ An undirected graph where all edges form a single cycle. {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -Graph graph = new CycleGraph(env, 5) +long vertexCount = 5; + +Graph graph = new CycleGraph(env, vertexCount) .generate(); {% endhighlight %} @@ -1825,7 +1884,9 @@ import org.apache.flink.graph.generator.CycleGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment -val graph = new CycleGraph(env.getJavaEnv, 5).generate() +val vertexCount = 5 + +val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate() {% endhighlight %} @@ -1865,7 +1926,9 @@ The graph containing no edges. {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -Graph graph = new EmptyGraph(env, 5) +long vertexCount = 5; + +Graph graph = new EmptyGraph(env, vertexCount) .generate(); {% endhighlight %} @@ -1877,7 +1940,9 @@ import org.apache.flink.graph.generator.EmptyGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment -val graph = new EmptyGraph(env.getJavaEnv, 5).generate() +val vertexCount = 5 + +val graph = new EmptyGraph(env.getJavaEnv, vertexCount).generate() {% endhighlight %} @@ -1905,15 +1970,20 @@ val graph = new EmptyGraph(env.getJavaEnv, 5).generate() ### Grid Graph An undirected graph connecting vertices in a regular tiling in one or more dimensions. +Each dimension is configured separately. When the dimension size is at least three the +endpoints are optionally connected by setting `wrapEndpoints`. Changing the following +example to `addDimension(4, true)` would connect `0` to `3` and `4` to `7`.
{% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +boolean wrapEndpoints = false; + Graph graph = new GridGraph(env) - .addDimension(2, false) - .addDimension(4, false) + .addDimension(2, wrapEndpoints) + .addDimension(4, wrapEndpoints) .generate(); {% endhighlight %}
@@ -1925,7 +1995,9 @@ import org.apache.flink.graph.generator.GridGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment -val graph = new GridGraph(env.getJavaEnv).addDimension(2, false).addDimension(4, false).generate() +val wrapEndpoints = false + +val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDimension(4, wrapEndpoints).generate() {% endhighlight %}
@@ -1969,14 +2041,17 @@ val graph = new GridGraph(env.getJavaEnv).addDimension(2, false).addDimension(4, ### Hypercube Graph -An undirected graph where edges form an n-dimensional hypercube. +An undirected graph where edges form an n-dimensional hypercube. Each vertex +in a hypercube connects to one other vertex in each dimension.
{% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -Graph graph = new HypercubeGraph(env, 2) +long dimensions = 3; + +Graph graph = new HypercubeGraph(env, dimensions) .generate(); {% endhighlight %}
@@ -1988,8 +2063,9 @@ import org.apache.flink.graph.generator.HypercubeGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment -// note: configured with the number of vertex pairs -val graph = new HypercubeGraph(env.getJavaEnv, 4).generate() +val dimensions = 3 + +val graph = new HypercubeGraph(env.getJavaEnv, dimensions).generate() {% endhighlight %}
@@ -2047,7 +2123,9 @@ An undirected Graph where all edges form a single path. {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -Graph graph = new PathGraph(env, 5) +long vertexCount = 5 + +Graph graph = new PathGraph(env, vertexCount) .generate(); {% endhighlight %} @@ -2059,7 +2137,9 @@ import org.apache.flink.graph.generator.PathGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment -val graph = new PathGraph(env.getJavaEnv, 5).generate() +val vertexCount = 5 + +val graph = new PathGraph(env.getJavaEnv, vertexCount).generate() {% endhighlight %} @@ -2088,9 +2168,13 @@ val graph = new PathGraph(env.getJavaEnv, 5).generate() ### RMat Graph -A directed or undirected power-law graph generated using the [Recursive Matrix (R-Mat)] -(http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) model. RMat is a stochastic -generator configured with a source of randomness. +A directed or undirected power-law graph generated using the +[Recursive Matrix (R-Mat)](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) model. + +RMat is a stochastic generator configured with a source of randomness implementing the +`RandomGenerableFactory` interface. Provided implemenations are `JDKRandomGeneratorFactory` +and `MersenneTwisterFactory`. These generate an initial sequence of random values which are +then used as seeds for generating the edges.
@@ -2122,10 +2206,14 @@ val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).generate(
-Manipulating the RMat constants and noise affects the degree-skew. The RMat generator can be -configured to produce a simple graph by removing self-loops and duplicate edges, with undirected -edges produced by a "clip-and-flip" throwing away the half matrix above the diagonal or a full -"flip" preserving and mirroring all edges. +The default RMat contants can be overridden as shown in the following example. +The contants define the interdependence of bits from each generated edge's source +and target labels. The RMat noise can be enabled and progressively perturbs the +contants while generating each edge. + +The RMat generator can be configured to produce a simple graph by removing self-loops +and duplicate edges. Symmetrization is performed either by a "clip-and-flip" throwing away +the half matrix above the diagonal or a full "flip" preserving and mirroring all edges.
@@ -2137,10 +2225,12 @@ RandomGenerableFactory rnd = new JDKRandomGeneratorFactory() int vertexCount = 1 << scale; int edgeCount = edgeFactor * vertexCount; +boolean clipAndFlip = false; + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) .setConstants(0.57f, 0.19f, 0.19f) .setNoise(true, 0.10f) - .setSimpleGraph(true, false) + .setSimpleGraph(true, clipAndFlip) .generate(); {% endhighlight %}
@@ -2155,7 +2245,9 @@ val env = ExecutionEnvironment.getExecutionEnvironment val vertexCount = 1 << scale val edgeCount = edgeFactor * vertexCount -val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, false).generate() +clipAndFlip = false + +val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, clipAndFlip).generate() {% endhighlight %}
@@ -2169,8 +2261,10 @@ An undirected graph containing isolated two-paths. {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +long vertexPairCount = 4 + // note: configured with the number of vertex pairs -Graph graph = new SingletonEdgeGraph(env, 4) +Graph graph = new SingletonEdgeGraph(env, vertexPairCount) .generate(); {% endhighlight %} @@ -2182,8 +2276,10 @@ import org.apache.flink.graph.generator.SingletonEdgeGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment +val vertexPairCount = 4 + // note: configured with the number of vertex pairs -val graph = new SingletonEdgeGraph(env.getJavaEnv, 4).generate() +val graph = new SingletonEdgeGraph(env.getJavaEnv, vertexPairCount).generate() {% endhighlight %} @@ -2231,7 +2327,9 @@ An undirected graph containing a single central vertex connected to all other le {% highlight java %} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -Graph graph = new StarGraph(env, 6) +long vertexCount = 6; + +Graph graph = new StarGraph(env, vertexCount) .generate(); {% endhighlight %} @@ -2243,7 +2341,9 @@ import org.apache.flink.graph.generator.StarGraph val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment -val graph = new StarGraph(env.getJavaEnv, 6).generate() +val vertexCount = 6 + +val graph = new StarGraph(env.getJavaEnv, vertexCount).generate() {% endhighlight %} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java index 9892d168cd57a..b9d6fbd841a7e 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java @@ -37,7 +37,7 @@ * Generate an RMat graph for Graph 500. * * Note that this does not yet implement permutation of vertex labels or edges. - + * * @see Graph 500 */ public class Graph500 { @@ -46,6 +46,10 @@ public class Graph500 { public static final int DEFAULT_EDGE_FACTOR = 16; + public static final boolean DEFAULT_SIMPLIFY = false; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + public static void main(String[] args) throws Exception { // Set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -62,13 +66,17 @@ public static void main(String[] args) throws Exception { long vertexCount = 1 << scale; long edgeCount = vertexCount * edgeFactor; + boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY); + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + DataSet> edges = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(simplify, clipAndFlip) .generate() .getEdges() .project(0, 1); // Print, hash, or write RMat graph to disk - switch (parameters.get("output", "hash")) { + switch (parameters.get("output", "")) { case "print": edges.print(); break; @@ -87,6 +95,22 @@ public static void main(String[] args) throws Exception { env.execute(); break; + default: + System.out.println("A Graph500 generator using the Recursive Matrix (RMat) graph generator."); + System.out.println(); + System.out.println("The graph matrix contains 2^scale vertices although not every vertex will"); + System.out.println("be represented in an edge. The number of edges is edge_factor * 2^scale edges"); + System.out.println("although some edges may be duplicates."); + System.out.println(); + System.out.println("Note: this does not yet implement permutation of vertex labels or edges."); + System.out.println(); + System.out.println("usage:"); + System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); + System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); + System.out.println(" Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + + " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); + + return; } JobExecutionResult result = env.getLastJobExecutionResult(); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java index 99336f6f5e225..2eee6d78ff9b8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java @@ -25,7 +25,7 @@ public abstract class AbstractGraphGenerator protected int parallelism = -1; @Override - public AbstractGraphGenerator setParallelism(int parallelism) { + public GraphGenerator setParallelism(int parallelism) { this.parallelism = parallelism; return this; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java index 365c8f45bc0e0..2671efece23f7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java @@ -33,7 +33,7 @@ public class CycleGraph private final ExecutionEnvironment env; // Required configuration - private int vertexCount; + private long vertexCount; /** * An undirected {@link Graph} where all edges form a single cycle. @@ -41,7 +41,7 @@ public class CycleGraph * @param env the Flink execution environment * @param vertexCount number of vertices */ - public CycleGraph(ExecutionEnvironment env, int vertexCount) { + public CycleGraph(ExecutionEnvironment env, long vertexCount) { if (vertexCount <= 0) { throw new IllegalArgumentException("Vertex count must be greater than zero"); } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java index b9e3be7fdf3e4..8fece815c8358 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java @@ -39,7 +39,7 @@ public interface GraphGenerator { * * @return generated graph */ - Graph generate(); + Graph generate(); /** * Override the operator parallelism.