From 653b095f12e89dfc2599bb550b000a9b5f21b1bf Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 6 Apr 2017 08:52:32 -0400 Subject: [PATCH] [FLINK-6195] [build] Move gelly-examples jar from opt to examples The opt directory should be reserved for Flink JARs which users may optionally move to lib to be loaded by the runtime. flink-gelly-examples is a user program so is being moved to the examples folder. --- docs/dev/libs/gelly/index.md | 26 +++++---- flink-dist/pom.xml | 20 ++++--- flink-dist/src/main/assemblies/bin.xml | 8 +++ flink-dist/src/main/assemblies/opt.xml | 7 --- .../java/org/apache/flink/graph/Runner.java | 4 +- .../apache/flink/graph/drivers/EdgeList.java | 53 ++++++++++++++++--- .../flink/graph/utils/EdgeToTuple2Map.java | 45 ++++++++++++++++ .../apache/flink/graph/asm/AsmTestBase.java | 10 ++-- 8 files changed, 132 insertions(+), 41 deletions(-) create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md index 40018e84f58a2..193ba866fcec0 100644 --- a/docs/dev/libs/gelly/index.md +++ b/docs/dev/libs/gelly/index.md @@ -71,24 +71,22 @@ The remaining sections provide a description of available methods and present se Running Gelly Examples ---------------------- -The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads") -in the folder **opt** (for versions older than Flink 1.2 these can be manually downloaded from -[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly)). - -To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to -Flink's **lib** directory. +The Gelly library jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads") +in the **opt** directory (for versions older than Flink 1.2 these can be manually downloaded from +[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly)). To run the Gelly examples the **flink-gelly** (for +Java) or **flink-gelly-scala** (for Scala) jar must be copied to Flink's **lib** directory. ~~~bash cp opt/flink-gelly_*.jar lib/ cp opt/flink-gelly-scala_*.jar lib/ ~~~ -Gelly's examples jar includes drivers for each of the library methods. After configuring and starting the cluster, list -the available algorithm classes: +Gelly's examples jar includes drivers for each of the library methods and is provided in the **examples** directory. +After configuring and starting the cluster, list the available algorithm classes: ~~~bash ./bin/start-cluster.sh -./bin/flink run opt/flink-gelly-examples_*.jar +./bin/flink run examples/flink-gelly-examples_*.jar ~~~ The Gelly drivers can generate graph data or read the edge list from a CSV file (each node in a cluster must have access @@ -96,13 +94,13 @@ to the input file). The algorithm description, available inputs and outputs, and algorithm is selected. Print usage for [JaccardIndex](./library_methods.html#jaccard-index): ~~~bash -./bin/flink run opt/flink-gelly-examples_*.jar --algorithm JaccardIndex +./bin/flink run examples/flink-gelly-examples_*.jar --algorithm JaccardIndex ~~~ Display [graph metrics](./library_methods.html#metric) for a million vertex graph: ~~~bash -./bin/flink run opt/flink-gelly-examples_*.jar \ +./bin/flink run examples/flink-gelly-examples_*.jar \ --algorithm GraphMetrics --order directed \ --input RMatGraph --type integer --scale 20 --simplify directed \ --output print @@ -119,17 +117,17 @@ Run a few algorithms and monitor the job progress in Flink's Web UI: ~~~bash wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt -./bin/flink run -q opt/flink-gelly-examples_*.jar \ +./bin/flink run -q examples/flink-gelly-examples_*.jar \ --algorithm GraphMetrics --order undirected \ --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \ --output print -./bin/flink run -q opt/flink-gelly-examples_*.jar \ +./bin/flink run -q examples/flink-gelly-examples_*.jar \ --algorithm ClusteringCoefficient --order undirected \ --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \ --output hash -./bin/flink run -q opt/flink-gelly-examples_*.jar \ +./bin/flink run -q examples/flink-gelly-examples_*.jar \ --algorithm JaccardIndex \ --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \ --output hash diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 0eea0aa31ee19..e42aea6124795 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -155,6 +155,19 @@ under the License. compile + + + + org.apache.flink + flink-gelly-examples_2.10 + ${project.version} + provided + + + + ../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-${project.version}.jar + examples/ + flink-gelly-examples_2.10-${project.version}.jar + 0644 + diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml index c6dc30709c994..3622ece29d348 100644 --- a/flink-dist/src/main/assemblies/opt.xml +++ b/flink-dist/src/main/assemblies/opt.xml @@ -52,13 +52,6 @@ 0644 - - ../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-${project.version}.jar - opt/ - flink-gelly-examples_2.10-${project.version}.jar - 0644 - - ../flink-libraries/flink-gelly-scala/target/flink-gelly-scala_2.10-${project.version}-jar-with-dependencies.jar opt/ diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java index 03248141221a8..4b6cf42112188 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java @@ -108,7 +108,7 @@ private static String getAlgorithmsListing() { strBuilder .appendNewLine() - .appendln("Select an algorithm to view usage: flink run opt/flink-gelly-examples_.jar --algorithm ") + .appendln("Select an algorithm to view usage: flink run examples/flink-gelly-examples_.jar --algorithm ") .appendNewLine() .appendln("Available algorithms:"); @@ -139,7 +139,7 @@ private static String getAlgorithmUsage(String algorithmName) { .appendNewLine() .appendln(algorithm.getLongDescription()) .appendNewLine() - .append("usage: flink run opt/flink-gelly-examples_.jar --algorithm ") + .append("usage: flink run examples/flink-gelly-examples_.jar --algorithm ") .append(algorithmName) .append(" [algorithm options] --input [input options] --output [output options]") .appendNewLine() diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java index 85f32c3c164c2..524e70ff55837 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java @@ -18,7 +18,11 @@ package org.apache.flink.graph.drivers; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.dataset.ChecksumHashCode; @@ -28,11 +32,17 @@ import org.apache.flink.graph.drivers.output.Hash; import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.graph.utils.EdgeToTuple2Map; +import org.apache.flink.types.NullValue; import java.util.List; /** - * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s. + * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}. + * + * @param graph ID type + * @param vertex value type + * @param edge value type */ public class EdgeList extends ParameterizedBase @@ -77,16 +87,45 @@ public void print(String executionName) throws Exception { // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 List> records = collector.run(edges).execute(executionName); - for (Edge result : records) { - System.out.println(result); + if (hasNullValueEdges(edges)) { + for (Edge result : records) { + System.out.println("(" + result.f0 + "," + result.f1 + ")"); + } + } else { + for (Edge result : records) { + System.out.println(result); + } } - } @Override public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { - edges - .writeAsCsv(filename, lineDelimiter, fieldDelimiter) - .name("CSV: " + filename); + if (hasNullValueEdges(edges)) { + edges + .map(new EdgeToTuple2Map()) + .name("Edge to Tuple2") + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } else { + edges + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } + } + + /** + * Check whether the edge type of the {@link DataSet} is {@link NullValue}. + * + * @param edges data set for introspection + * @param graph ID type + * @param edge value type + * @return whether the edge type of the {@link DataSet} is {@link NullValue} + */ + private static boolean hasNullValueEdges(DataSet> edges) { + TypeInformation genericTypeInfo = edges.getType(); + @SuppressWarnings("unchecked") + TupleTypeInfo> tupleTypeInfo = (TupleTypeInfo>) genericTypeInfo; + + return tupleTypeInfo.getTypeAt(2).equals(ValueTypeInfo.NULL_VALUE_TYPE_INFO); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java new file mode 100644 index 0000000000000..1e500eafbefc4 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; + +/** + * Create a Tuple2 DataSet from the vertices of an Edge DataSet + * + * @param edge ID type + * @param edge value type + */ +@ForwardedFields("f0; f1") +public class EdgeToTuple2Map implements MapFunction, Tuple2> { + + private static final long serialVersionUID = 1L; + + private Tuple2 output = new Tuple2<>(); + + @Override + public Tuple2 map(Edge edge) { + output.f0 = edge.f0; + output.f1 = edge.f1; + return output; + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java index b057121a6823e..cd057573298b6 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java @@ -102,15 +102,17 @@ public void setup() .generate(); /* - ./bin/flink run -c org.apache.flink.graph.drivers.Graph500 flink-gelly-examples_2.10-1.2-SNAPSHOT.jar \ - --directed true --simplify true --scale 10 --edge_factor 16 --output csv --filename directedRMatGraph.csv + ./bin/flink run examples/flink-gelly-examples_*.jar --algorithm EdgeList \ + --input RMatGraph --type long --simplify directed --scale 10 --edge_factor 16 \ + --output csv --output_filename directedRMatGraph.csv */ directedRMatGraph = rmatGraph .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); /* - ./bin/flink run -c org.apache.flink.graph.drivers.Graph500 flink-gelly-examples_2.10-1.2-SNAPSHOT.jar \ - --directed false --simplify true --scale 10 --edge_factor 16 --output csv --filename undirectedRMatGraph.csv + ./bin/flink run examples/flink-gelly-examples_*.jar --algorithm EdgeList \ + --input RMatGraph --type long --simplify undirected --scale 10 --edge_factor 16 \ + --output csv --output_filename undirectedRMatGraph.csv */ undirectedRMatGraph = rmatGraph .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false));