From 11f5bb821c9d8b1e7beef72d0e6df7dc0fb9db72 Mon Sep 17 00:00:00 2001 From: vasia Date: Thu, 24 Sep 2015 22:08:10 +0200 Subject: [PATCH 1/3] [FLINK-2561] [gelly] add missing methods to Graph: add-remove edges/vertices, difference, graph creation methods, validate, getTriplets. Add missing utility mappers. --- .../org/apache/flink/graph/scala/Graph.scala | 151 +++++++++++++++++- .../graph/scala/utils/Tuple2ToVertexMap.scala | 31 ++++ .../graph/scala/utils/Tuple3ToEdgeMap.scala | 31 ++++ 3 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 73e175eb92306..ed58ffd53ec85 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -23,26 +23,108 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{tuple => jtuple} import org.apache.flink.api.scala._ import org.apache.flink.graph._ +import org.apache.flink.graph.validation.GraphValidator import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction} import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction} import org.apache.flink.{graph => jg} - import _root_.scala.collection.JavaConverters._ import _root_.scala.reflect.ClassTag +import org.apache.flink.types.NullValue object Graph { + + /** + * Creates a Graph from a DataSet of vertices and a DataSet of edges. + */ def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV] = { wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv)) } + /** + * Creates a Graph from a DataSet of edges. + * Vertices are created automatically and their values are set to NullValue. + */ + def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] + (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { + wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv)) + } + + /** + * Creates a graph from a DataSet of edges. + * Vertices are created automatically and their values are set by applying the provided + * map function to the vertex ids. + */ + def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment, + mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv)) + } + + /** + * Creates a Graph from a Seq of vertices and a Seq of edges. + */ def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV] = { wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges .asJavaCollection, env.getJavaEnv)) } + + /** + * Creates a Graph from a Seq of edges. + * Vertices are created automatically and their values are set to NullValue. + */ + def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] + (edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { + wrapGraph(jg.Graph.fromCollection[K, EV](edges.asJavaCollection, env.getJavaEnv)) + } + + /** + * Creates a graph from a Seq of edges. + * Vertices are created automatically and their values are set by applying the provided + * map function to the vertex ids. + */ + def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment, + mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv)) + } + + /** + * Creates a Graph from a DataSets of Tuples. + */ + def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], + env: ExecutionEnvironment): Graph[K, VV, EV] = { + val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet + val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv)) + } + + /** + * Creates a Graph from a DataSet of Tuples representing the edges. + * Vertices are created automatically and their values are set to NullValue. + */ + def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] + (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { + val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv)) + } + + /** + * Creates a Graph from a DataSet of Tuples representing the edges. + * Vertices are created automatically and their values are set by applying the provided + * map function to the vertex ids. + */ + def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment, + mapper: MapFunction[K, VV]): Graph[K, VV, EV] = { + val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv)) + } + } /** @@ -92,6 +174,14 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2)) } + /** + * @return a DataSet of Triplets, + * consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue) + */ + def getTriplets(): DataSet[Triplet[K, VV, EV]] = { + wrap(jgraph.getTriplets()) + } + /** * Apply a function to the attribute of each vertex in the graph. * @@ -575,6 +665,29 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Adds the list of vertices, passed as input, to the graph. + * If the vertices already exist in the graph, they will not be added once more. + * + * @param verticesToAdd the list of vertices to add + * @return the new graph containing the existing and newly added vertices + */ + def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.addVertices(vertices.asJava)) + } + + /** + * Adds the given list edges to the graph. + * + * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored. + * + * @param newEdges the data set of edges to be added + * @return a new graph containing the existing edges plus the newly added edges. + */ + def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.addEdges(edges.asJava)) + } + + /** * Adds the given edge to the graph. If the source and target vertices do * not exist in the graph, they will also be added. * @@ -599,6 +712,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.removeVertex(vertex)) } + /** + * Removes the given vertex and its edges from the graph. + * + * @param vertex the vertex to remove + * @return the new graph containing the existing vertices and edges without + * the removed vertex and its edges + */ + def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.removeVertices(vertices.asJava)) + } + /** * Removes all edges that match the given edge from the graph. * @@ -610,6 +734,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.removeEdge(edge)) } + /** + * Removes all the edges that match the edges in the given data set from the graph. + * + * @param edgesToBeRemoved the list of edges to be removed + * @return a new graph where the edges have been removed and in which the vertices remained intact + */ + def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = { + wrapGraph(jgraph.removeEdges(edges.asJava)) + } + /** * Performs union on the vertices and edges sets of the input graphs * removing duplicate vertices but maintaining duplicate edges. @@ -621,6 +755,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.union(graph.getWrappedGraph)) } + /** + * Performs Difference on the vertex and edge sets of the input graphs + * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed + * @param graph the graph to perform difference with + * @return a new graph where the common vertices and edges have been removed + */ + def difference(graph: Graph[K, VV, EV]) = { + wrapGraph(jgraph.difference(graph.getWrappedGraph)) + } + /** * Compute an aggregate over the neighbor values of each * vertex. @@ -732,4 +876,9 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, maxIterations, parameters)) } + + def validate(validator: GraphValidator[K, VV, EV]): Boolean = { + jgraph.validate(validator) + } + } diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala new file mode 100644 index 0000000000000..f2b1133b005fa --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.utils + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.Vertex + +class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] { + + private val serialVersionUID: Long = 1L + + override def map(value: (K, VV)): Vertex[K, VV] = { + new Vertex(value._1, value._2) + } +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala new file mode 100644 index 0000000000000..00cb074f1b304 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.utils + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.Edge + +class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] { + + private val serialVersionUID: Long = 1L + + override def map(value: (K, K, EV)): Edge[K, EV] = { + new Edge(value._1, value._2, value._3) + } +} From 9d87747663c424e29a7c236e187cdf8e80076d4f Mon Sep 17 00:00:00 2001 From: vasia Date: Thu, 24 Sep 2015 23:31:38 +0200 Subject: [PATCH 2/3] [FLINK-2561] [gelly] convert existing tests to use collect instead of files; add tests for newly added operations. Add completeness test: fromCsvReader method is missing. --- flink-staging/flink-gelly-scala/pom.xml | 8 +- .../org/apache/flink/graph/scala/Graph.scala | 9 +- .../test/GellyScalaAPICompletenessTest.scala | 45 +++++ .../scala/test/operations/DegreesITCase.scala | 39 ++--- .../operations/GraphMutationsITCase.scala | 165 ++++++++++++++---- .../operations/GraphOperationsITCase.scala | 151 ++++++++++------ .../test/operations/JoinWithEdgesITCase.scala | 45 ++--- .../operations/JoinWithVerticesITCase.scala | 29 +-- .../test/operations/MapEdgesITCase.scala | 33 +--- .../test/operations/MapVerticesITCase.scala | 33 +--- .../ReduceOnEdgesMethodsITCase.scala | 63 +++---- .../ReduceOnNeighborMethodsITCase.scala | 46 ++--- 12 files changed, 355 insertions(+), 311 deletions(-) create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala diff --git a/flink-staging/flink-gelly-scala/pom.xml b/flink-staging/flink-gelly-scala/pom.xml index a1f0da7deea2f..edcb865738da0 100644 --- a/flink-staging/flink-gelly-scala/pom.xml +++ b/flink-staging/flink-gelly-scala/pom.xml @@ -48,7 +48,13 @@ under the License. flink-gelly ${project.version} - + + org.apache.flink + flink-tests + ${project.version} + test + test-jar + org.apache.flink flink-test-utils diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index ed58ffd53ec85..35af1edc23b6d 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -100,7 +100,8 @@ object Graph { env: ExecutionEnvironment): Graph[K, VV, EV] = { val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet - wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv)) + wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, + env.getJavaEnv)) } /** @@ -678,7 +679,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { /** * Adds the given list edges to the graph. * - * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored. + * When adding an edge for a non-existing set of vertices, + * the edge is considered invalid and ignored. * * @param newEdges the data set of edges to be added * @return a new graph containing the existing edges plus the newly added edges. @@ -757,7 +759,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { /** * Performs Difference on the vertex and edge sets of the input graphs - * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed + * removes common vertices and edges. If a source/target vertex is removed, + * its corresponding edge will also be removed * @param graph the graph to perform difference with * @return a new graph where the common vertices and edges have been removed */ diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala new file mode 100644 index 0000000000000..c63c4f8909398 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala + +import java.lang.reflect.Method +import org.apache.flink.graph.scala._ +import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase +import org.apache.flink.graph.{Graph => JavaGraph} +import scala.language.existentials +import org.junit.Test + +/** + * This checks whether the Gelly Scala API is up to feature parity with the Java API. + * Implements the {@link ScalaAPICompletenessTest} for Gelly. + */ +class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { + + override def isExcludedByName(method: Method): Boolean = { + val name = method.getDeclaringClass.getName + "." + method.getName + val excludedNames = Seq("org.apache.flink.graph.Graph.getContext", + // NOTE: until fromCsvReader() is added to to the Scala API Graph + "org.apache.flink.graph.Graph.fromCsvReader") + excludedNames.contains(name) + } + + @Test + override def testCompleteness(): Unit = { + checkMethods("Graph", "Graph", classOf[JavaGraph[_, _, _]], classOf[Graph[_, _, _]]) + } +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala index 98dbbe9254e7c..6196f995562bc 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala @@ -26,42 +26,23 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testInDegrees { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.inDegrees().writeAsCsv(resultPath) - env.execute - expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n" + val res = graph.inDegrees().collect.toList + expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -70,9 +51,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.outDegrees().writeAsCsv(resultPath) - env.execute - expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n" + val res = graph.outDegrees().collect.toList + expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -81,8 +62,8 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getDegrees().writeAsCsv(resultPath) - env.execute - expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n" + val res = graph.getDegrees().collect.toList + expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala index 687b0a7581f8a..3cb92c4f2cef0 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala @@ -27,33 +27,14 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testAddVertex { @@ -62,9 +43,9 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L)) - newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) - env.execute + val res = newgraph.getVertices.collect().toList expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -74,9 +55,9 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L)) - newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) - env.execute + val res = newgraph.getVertices.collect().toList expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -86,9 +67,37 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L)) - newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) - env.execute + val res = newgraph.getVertices.collect().toList + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + + val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](6L, 6L), + new Vertex[Long, Long](7L, 7L))) + val res = newgraph.getVertices.collect().toList + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + "7,7\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddVerticesExisting { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + + val newgraph = graph.addVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](5L, 5L), + new Vertex[Long, Long](6L, 6L))) + val res = newgraph.getVertices.collect().toList expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -98,9 +107,9 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -110,10 +119,36 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L), + new Vertex[Long, Long](2L, 2L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveValidAndInvalidVertex { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L), + new Vertex[Long, Long](6L, 6L))) + val res = newgraph.getEdges.collect.toList + expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -124,10 +159,38 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L, 1L), 61L) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + "6,1,61\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(2L, 4L, 24L), + new Edge(4L, 1L, 41L), new Edge(4L, 3L, 43L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "2,4,24\n" + "3,4,34\n" + "3,5," + + "35\n" + "4,1,41\n" + "4,3,43\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testAddEdgesInvalidVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addEdges(List[Edge[Long, Long]](new Edge(6L, 1L, 61L), + new Edge(7L, 8L, 78L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," + + "35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -138,10 +201,10 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L, 2L), 12L) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," + "35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -151,9 +214,9 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -163,9 +226,35 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect.toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L), + new Edge(4L, 5L, 45L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testRemoveSameEdgeTwiceEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L), + new Edge(1L, 2L, 12L))) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala index 713eb8d3ea15f..7f7ebc0bdc75d 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala @@ -27,44 +27,26 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - + @Test @throws(classOf[Exception]) def testUndirected { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getUndirected().getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.getUndirected.getEdges.collect().toList; + expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," + "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" + "5,1,51\n" + "1,5,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -73,10 +55,11 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.reverse().getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.reverse().getEdges.collect().toList; + expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," + "45\n" + "1,5,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -85,7 +68,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.subgraph(new FilterFunction[Vertex[Long, Long]] { + val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] { @throws(classOf[Exception]) def filter(vertex: Vertex[Long, Long]): Boolean = { return (vertex.getValue > 2) @@ -96,9 +79,10 @@ MultipleProgramsTestBase(mode) { override def filter(edge: Edge[Long, Long]): Boolean = { return (edge.getValue > 34) } - }).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + }).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -107,12 +91,13 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.subgraph( + val res = graph.subgraph( vertex => vertex.getValue > 2, edge => edge.getValue > 34 - ).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + ).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -121,14 +106,15 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] { + val res = graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] { @throws(classOf[Exception]) def filter(vertex: Vertex[Long, Long]): Boolean = { vertex.getValue > 2 } - }).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + }).getEdges.collect().toList; + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -137,11 +123,12 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnVertices( + val res = graph.filterOnVertices( vertex => vertex.getValue > 2 - ).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + ).getEdges.collect().toList; + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -150,14 +137,15 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] { + val res = graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] { @throws(classOf[Exception]) def filter(edge: Edge[Long, Long]): Boolean = { edge.getValue > 34 } - }).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + }).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -166,11 +154,12 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.filterOnEdges( + val res = graph.filterOnEdges( edge => edge.getValue > 34 - ).getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + ).getEdges.collect().toList; + expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -179,9 +168,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - env.fromElements(graph.numberOfVertices).writeAsText(resultPath) - env.execute + val res = env.fromElements(graph.numberOfVertices).collect().toList expectedResult = "5" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -190,9 +179,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - env.fromElements(graph.numberOfEdges).writeAsText(resultPath) - env.execute + val res = env.fromElements(graph.numberOfEdges).collect().toList expectedResult = "7" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -201,9 +190,9 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getVertexIds.writeAsText(resultPath) - env.execute + val res = graph.getVertexIds.collect().toList expectedResult = "1\n2\n3\n4\n5\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -212,9 +201,10 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.getEdgeIds.writeAsCsv(resultPath) - env.execute - expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n" + val res = graph.getEdgeIds.collect().toList + expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" + + "(5,1)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -231,9 +221,62 @@ MultipleProgramsTestBase(mode) { ) val newgraph = graph.union(Graph.fromCollection(vertices, edges, env)) - newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) - env.execute + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + "6,1,61\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testDifference { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]]( + new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](3L, 3L), + new Vertex[Long, Long](6L, 6L) + ) + val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]]( + new Edge[Long, Long](1L, 3L, 13L), new Edge[Long, Long](1L, 6L, 16L), + new Edge[Long, Long](6L, 3L, 63L) + ) + + val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env)) + val res = newgraph.getEdges.collect().toList + expectedResult = "4,5,45\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testDifferenceNoCommonVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]]( + new Vertex[Long, Long](6L, 6L) + ) + val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]]( + new Edge[Long, Long](6L, 6L, 66L) + ) + + val newgraph = graph.difference(Graph.fromCollection(vertices, edges, env)) + val res = newgraph.getEdges.collect().toList + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + + "45\n" + "5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) + } + + @Test + @throws(classOf[Exception]) + def testTriplets { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val res = graph.getTriplets.collect().toList + expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" + + "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala index e19463e0b3b05..eae8bd55f0c4e 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala @@ -29,33 +29,14 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testWithEdgesInputDataset { @@ -64,10 +45,10 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), new AddValuesMapper) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -79,10 +60,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -94,10 +75,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -109,10 +90,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -124,10 +105,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -139,10 +120,10 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - result.getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = result.getEdges.collect.toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala index 4b8f3542455f1..8d18d5889849c 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala @@ -28,33 +28,14 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testJoinWithVertexSet { @@ -63,9 +44,9 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new VertexToTuple2Map[Long, Long]), new AddValuesMapper) - result.getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute + val res = result.getVertices.collect.toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -77,9 +58,9 @@ MultipleProgramsTestBase(mode) { val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long]) val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet, (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue) - result.getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute + val res = result.getVertices.collect.toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala index 7e5ad14659de8..0fa8d2bbaf21a 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala @@ -18,7 +18,6 @@ package org.apache.flink.graph.scala.test.operations - import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala._ import org.apache.flink.graph.Edge @@ -29,42 +28,21 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testWithSameValue { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapEdges(new AddOneMapper) - .getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + @@ -72,6 +50,7 @@ MultipleProgramsTestBase(mode) { "3,5,36\n" + "4,5,46\n" + "5,1,52\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -80,9 +59,8 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapEdges(edge => edge.getValue + 1) - .getEdgesAsTuple3().writeAsCsv(resultPath) - env.execute + val res = graph.mapEdges(edge => edge.getValue + 1) + .getEdges.collect.toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + @@ -90,6 +68,7 @@ MultipleProgramsTestBase(mode) { "3,5,36\n" + "4,5,46\n" + "5,1,52\n" + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] { diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala index a22cfbd3f93ca..c1ab3eaecf3af 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala @@ -28,48 +28,27 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testWithSameValue { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapVertices(new AddOneMapper) - .getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute - + val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + "4,5\n" + "5,6\n"; + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @Test @@ -78,15 +57,13 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.mapVertices(vertex => vertex.getValue + 1) - .getVerticesAsTuple2().writeAsCsv(resultPath) - env.execute - + val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect.toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + "4,5\n" + "5,6\n"; + TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] { diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala index 6ed383aae8a8a..695f74a1c8a8f 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala @@ -28,46 +28,24 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testAllNeighborsWithValueGreaterThanFour { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, - EdgeDirection.ALL) - result.writeAsCsv(resultPath) - env.execute - - - expectedResult = "5,1\n" + "5,3\n" + "5,4" + val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, + EdgeDirection.ALL).collect.toList + expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -77,13 +55,12 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) - result.writeAsCsv(resultPath) - env.execute - - - expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" + - "3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4" + val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) + .collect.toList + expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" + + "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" + + "(5,1)\n" + "(5,3)\n" + "(5,4)" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -94,9 +71,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.OUT) - verticesWithLowestOutNeighbor.writeAsCsv(resultPath) - env.execute - expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n" + val res = verticesWithLowestOutNeighbor.collect.toList + expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -107,9 +84,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.IN) - verticesWithLowestOutNeighbor.writeAsCsv(resultPath) - env.execute - expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n" + val res = verticesWithLowestOutNeighbor.collect.toList + expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -120,9 +97,9 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue, EdgeDirection.ALL) - verticesWithMaxEdgeWeight.writeAsCsv(resultPath) - env.execute - expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n" + val res = verticesWithMaxEdgeWeight.collect.toList + expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long, diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala index 52e6d7a66868a..b01e7508571b1 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala @@ -28,42 +28,24 @@ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{After, Before, Rule, Test} +import _root_.scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends MultipleProgramsTestBase(mode) { - private var resultPath: String = null private var expectedResult: String = null - var tempFolder: TemporaryFolder = new TemporaryFolder() - - @Rule - def getFolder(): TemporaryFolder = { - tempFolder; - } - - @Before - @throws(classOf[Exception]) - def before { - resultPath = tempFolder.newFile.toURI.toString - } - - @After - @throws(classOf[Exception]) - def after { - TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) - } - @Test @throws(classOf[Exception]) def testSumOfAllNeighborsNoValue { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath) - env.execute - expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n" + val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL) + .collect.toList + expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -72,9 +54,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath) - env.execute - expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n" + val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect.toList + expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -84,9 +66,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL) - result.writeAsCsv(resultPath) - env.execute - expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n" + val res = result.collect.toList + expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @Test @@ -97,9 +79,9 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN) - result.writeAsCsv(resultPath) - env.execute - expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285" + val res = result.collect.toList + expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)" + TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } final class SumNeighbors extends ReduceNeighborsFunction[Long] { From 6b552545b1ee72cd19d91381792c8e817e78be8b Mon Sep 17 00:00:00 2001 From: vasia Date: Fri, 25 Sep 2015 11:20:15 +0200 Subject: [PATCH 3/3] [FLINK-2561] [gelly] add GraphMetrics Scala example --- .../graph/scala/example/GraphMetrics.scala | 129 ++++++++++++++++++ .../graph/scala/utils/EdgeToTuple3Map.scala | 3 +- .../graph/scala/utils/Tuple2ToVertexMap.scala | 3 +- .../graph/scala/utils/Tuple3ToEdgeMap.scala | 3 +- .../graph/scala/utils/VertexToTuple2Map.scala | 3 +- .../scala/test/operations/DegreesITCase.scala | 6 +- .../operations/GraphMutationsITCase.scala | 12 +- .../test/operations/JoinWithEdgesITCase.scala | 12 +- .../operations/JoinWithVerticesITCase.scala | 4 +- .../test/operations/MapEdgesITCase.scala | 4 +- .../test/operations/MapVerticesITCase.scala | 4 +- .../ReduceOnEdgesMethodsITCase.scala | 10 +- .../ReduceOnNeighborMethodsITCase.scala | 8 +- .../flink/graph/example/GraphMetrics.java | 2 +- 14 files changed, 164 insertions(+), 39 deletions(-) create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala new file mode 100644 index 0000000000000..68d9285beeb17 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.graph.scala.example + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.types.NullValue +import org.apache.flink.graph.Edge +import org.apache.flink.util.Collector + +/** + * This example illustrates how to use Gelly metrics methods and get simple statistics + * from the input graph. + * + * The program creates a random graph and computes and prints + * the following metrics: + * - number of vertices + * - number of edges + * - average node degree + * - the vertex ids with the max/min in- and out-degrees + * + * The input file is expected to contain one edge per line, + * with long IDs and no values, in the following format: + * {{{ + * \t + * }}} + * If no arguments are provided, the example runs with a random graph of 100 vertices. + * + */ +object GraphMetrics { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + /** create the graph **/ + val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env) + + /** get the number of vertices **/ + val numVertices = graph.numberOfVertices; + + /** get the number of edges **/ + val numEdges = graph.numberOfEdges; + + /** compute the average node degree **/ + val verticesWithDegrees = graph.getDegrees; + val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble) + + /** find the vertex with the maximum in-degree **/ + val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1) + + /** find the vertex with the minimum in-degree **/ + val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1) + + /** find the vertex with the maximum out-degree **/ + val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1) + + /** find the vertex with the minimum out-degree **/ + val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1) + + /** print the results **/ + env.fromElements(numVertices).printOnTaskManager("Total number of vertices") + env.fromElements(numEdges).printOnTaskManager("Total number of edges") + avgDegree.printOnTaskManager("Average node degree") + maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") + minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") + maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") + minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") + + } + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 1) { + edgesPath = args(0) + true + } else { + System.err.println("Usage: GraphMetrics ") + false + } + } else { + System.out.println("Executing GraphMetrics example with built-in default data.") + System.out.println(" Provide parameters to read input data from a file.") + System.out.println(" Usage: GraphMetrics ") + true + } + } + + private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)]( + edgesPath, + fieldDelimiter = "\t").map( + in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance())) + } + else { + env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]]( + (key: Long, out: Collector[Edge[Long, NullValue]]) => { + val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt + for ( i <- 0 to numOutEdges ) { + var target: Long = ((Math.random() * numVertices) + 1).toLong + new Edge[Long, NullValue](key, target, NullValue.getInstance()) + } + }) + } + } + + private var fileOutput: Boolean = false + private var edgesPath: String = null + private var outputPath: String = null + private val numVertices = 100 +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala index 0d7d2afffc08b..909dbb49aaf7d 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Edge +@SerialVersionUID(1L) class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] { - private val serialVersionUID: Long = 1L - override def map(value: Edge[K, EV]): (K, K, EV) = { (value.getSource, value.getTarget, value.getValue) } diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala index f2b1133b005fa..fd6b8c535bdad 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple2ToVertexMap.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Vertex +@SerialVersionUID(1L) class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] { - private val serialVersionUID: Long = 1L - override def map(value: (K, VV)): Vertex[K, VV] = { new Vertex(value._1, value._2) } diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala index 00cb074f1b304..d0e07cc8c5857 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/Tuple3ToEdgeMap.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Edge +@SerialVersionUID(1L) class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] { - private val serialVersionUID: Long = 1L - override def map(value: (K, K, EV)): Edge[K, EV] = { new Edge(value._1, value._2, value._3) } diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala index de77832ec67f1..faf4e102a20c4 100644 --- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala @@ -21,10 +21,9 @@ package org.apache.flink.graph.scala.utils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.graph.Vertex +@SerialVersionUID(1L) class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] { - private val serialVersionUID: Long = 1L - override def map(value: Vertex[K, VV]): (K, VV) = { (value.getId, value.getValue) } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala index 6196f995562bc..b34704957f6db 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala @@ -40,7 +40,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.inDegrees().collect.toList + val res = graph.inDegrees.collect().toList expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -51,7 +51,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.outDegrees().collect.toList + val res = graph.outDegrees.collect().toList expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.getDegrees().collect.toList + val res = graph.getDegrees.collect().toList expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala index 3cb92c4f2cef0..4b776e2d6fa77 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala @@ -119,7 +119,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L)) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -146,7 +146,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeVertices(List[Vertex[Long, Long]](new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](6L, 6L))) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -159,7 +159,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L, 1L), 61L) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" + "6,1,61\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -201,7 +201,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L, 2L), 12L) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," + "35\n" + "4,5,45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -214,7 +214,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L)) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -226,7 +226,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L)) - val res = newgraph.getEdges.collect.toList + val res = newgraph.getEdges.collect().toList expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + "45\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala index eae8bd55f0c4e..3dc90fc1f5665 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala @@ -45,7 +45,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), new AddValuesMapper) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -75,7 +75,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -90,7 +90,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + "90\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -105,7 +105,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) @@ -120,7 +120,7 @@ MultipleProgramsTestBase(mode) { val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => originalValue + tupleValue) - val res = result.getEdges.collect.toList + val res = result.getEdges.collect().toList expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + "80\n" + "5,1,102\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala index 8d18d5889849c..98ee8b6a32c8f 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala @@ -44,7 +44,7 @@ MultipleProgramsTestBase(mode) { .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new VertexToTuple2Map[Long, Long]), new AddValuesMapper) - val res = result.getVertices.collect.toList + val res = result.getVertices.collect().toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -58,7 +58,7 @@ MultipleProgramsTestBase(mode) { val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long]) val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet, (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue) - val res = result.getVertices.collect.toList + val res = result.getVertices.collect().toList expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala index 0fa8d2bbaf21a..bdfd569c1af2a 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala @@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapEdges(new AddOneMapper).getEdges.collect.toList + val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + @@ -60,7 +60,7 @@ MultipleProgramsTestBase(mode) { val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.mapEdges(edge => edge.getValue + 1) - .getEdges.collect.toList + .getEdges.collect().toList expectedResult = "1,2,13\n" + "1,3,14\n" + "" + "2,3,24\n" + diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala index c1ab3eaecf3af..2e51d90cd834a 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala @@ -42,7 +42,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapVertices(new AddOneMapper).getVertices.collect.toList + val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + @@ -57,7 +57,7 @@ MultipleProgramsTestBase(mode) { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect.toList + val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect().toList expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala index 695f74a1c8a8f..dcd1deb0985a0 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala @@ -43,7 +43,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, - EdgeDirection.ALL).collect.toList + EdgeDirection.ALL).collect().toList expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -56,7 +56,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) - .collect.toList + .collect().toList expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" + "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" + "(5,1)\n" + "(5,3)\n" + "(5,4)" @@ -71,7 +71,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.OUT) - val res = verticesWithLowestOutNeighbor.collect.toList + val res = verticesWithLowestOutNeighbor.collect().toList expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -84,7 +84,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue, EdgeDirection.IN) - val res = verticesWithLowestOutNeighbor.collect.toList + val res = verticesWithLowestOutNeighbor.collect().toList expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -97,7 +97,7 @@ class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMod .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue, EdgeDirection.ALL) - val res = verticesWithMaxEdgeWeight.collect.toList + val res = verticesWithMaxEdgeWeight.collect().toList expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala index b01e7508571b1..aef5493aea93b 100644 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala @@ -43,7 +43,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL) - .collect.toList + .collect().toList expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -54,7 +54,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect.toList + val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect().toList expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -66,7 +66,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL) - val res = result.collect.toList + val res = result.collect().toList expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } @@ -79,7 +79,7 @@ class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecution .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) val result = graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN) - val res = result.collect.toList + val res = result.collect().toList expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)" TestBaseUtils.compareResultAsText(res.asJava, expectedResult) } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java index 591ed269cef34..b808e7617e9d4 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java @@ -30,7 +30,7 @@ import org.apache.flink.types.NullValue; /** - * This example illustrate how to use Gelly metrics methods and get simple statistics + * This example illustrates how to use Gelly metrics methods and get simple statistics * from the input graph. * * The program creates a random graph and computes and prints