From 240e8895c6e1d5ea6f67370d276cb58fd0ecddb8 Mon Sep 17 00:00:00 2001 From: Pieter-Jan Van Aeken Date: Mon, 10 Aug 2015 14:06:52 +0200 Subject: [PATCH] [FLINK-1962] Add Gelly Scala API This closes #1004 --- .../api/scala/ExecutionEnvironment.scala | 4 + flink-staging/flink-gelly-scala/pom.xml | 198 +++++ .../flink/graph/scala/EdgesFunction.scala | 35 + .../scala/EdgesFunctionWithVertexValue.scala | 33 + .../org/apache/flink/graph/scala/Graph.scala | 734 ++++++++++++++++++ .../flink/graph/scala/NeighborsFunction.scala | 37 + .../NeighborsFunctionWithVertexValue.scala | 40 + .../apache/flink/graph/scala/package.scala | 30 + .../graph/scala/utils/EdgeToTuple3Map.scala | 31 + .../graph/scala/utils/VertexToTuple2Map.scala | 31 + .../graph/scala/test/TestGraphUtils.scala | 55 ++ .../scala/test/operations/DegreesITCase.scala | 88 +++ .../operations/GraphMutationsITCase.scala | 171 ++++ .../operations/GraphOperationsITCase.scala | 238 ++++++ .../test/operations/JoinWithEdgesITCase.scala | 170 ++++ .../operations/JoinWithVerticesITCase.scala | 93 +++ .../test/operations/MapEdgesITCase.scala | 102 +++ .../test/operations/MapVerticesITCase.scala | 99 +++ .../ReduceOnEdgesMethodsITCase.scala | 173 +++++ .../ReduceOnNeighborMethodsITCase.scala | 144 ++++ .../java/org/apache/flink/graph/Graph.java | 179 +++++ flink-staging/pom.xml | 1 + 22 files changed, 2686 insertions(+) create mode 100644 flink-staging/flink-gelly-scala/pom.xml create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala create mode 100644 flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala create mode 100644 flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index d53c54cdaae6c..17311e9f9253a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -64,6 +64,10 @@ import scala.reflect.ClassTag */ class ExecutionEnvironment(javaEnv: JavaEnv) { + /** + * @return the Java Execution environment. + */ + def getJavaEnv: JavaEnv = javaEnv /** * Gets the config object. */ diff --git a/flink-staging/flink-gelly-scala/pom.xml b/flink-staging/flink-gelly-scala/pom.xml new file mode 100644 index 0000000000000..390dbb84a9bbd --- /dev/null +++ b/flink-staging/flink-gelly-scala/pom.xml @@ -0,0 +1,198 @@ + + + + + flink-staging + org.apache.flink + 0.10-SNAPSHOT + .. + + 4.0.0 + + flink-gelly-scala + + jar + + + + org.apache.flink + flink-scala + ${project.version} + + + org.apache.flink + flink-clients + ${project.version} + + + org.apache.flink + flink-gelly + ${project.version} + + + + org.apache.flink + flink-test-utils + ${project.version} + test + + + com.google.guava + guava + ${guava.version} + + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.1.4 + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.8 + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + org.scala-lang:scala-library + org.scala-lang:scala-compiler + + + **/*.scala + **/*.java + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + org.scalastyle + scalastyle-maven-plugin + 0.5.0 + + + + check + + + + + false + true + true + false + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${project.basedir}/../../tools/maven/scalastyle-config.xml + ${project.basedir}/scalastyle-output.xml + UTF-8 + + + + + + \ No newline at end of file diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala new file mode 100644 index 0000000000000..70a5fdf181ad3 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala + +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.graph.Edge +import org.apache.flink.util.Collector + +abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] { + + def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]) + + override def iterateEdges(edges: java.lang.Iterable[Tuple2[K, Edge[K, EV]]], out: + Collector[T]): Unit = { + val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(edges) + .map(jtuple => (jtuple.f0, jtuple.f1)) + iterateEdges(scalaIterable, out) + } +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala new file mode 100644 index 0000000000000..82589b6fbf95d --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala + +import org.apache.flink.graph.{Edge, Vertex} +import org.apache.flink.util.Collector + +abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph +.EdgesFunctionWithVertexValue[K, VV, EV, T] { + @throws(classOf[Exception]) + def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]) + + override def iterateEdges(v: Vertex[K, VV], edges: java.lang.Iterable[Edge[K, EV]], out: + Collector[T]) = { + iterateEdges(v, scala.collection.JavaConversions.iterableAsScalaIterable(edges), out) + } +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala new file mode 100644 index 0000000000000..738fd903def97 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -0,0 +1,734 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala + +import org.apache.flink.api.common.functions.{FilterFunction, MapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.{tuple => jtuple} +import org.apache.flink.api.scala._ +import org.apache.flink.graph._ +import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction} +import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction} +import org.apache.flink.{graph => jg} + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.reflect.ClassTag + +object Graph { + def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], + env: ExecutionEnvironment): Graph[K, VV, EV] = { + wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv)) + } + + def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: + ExecutionEnvironment): Graph[K, VV, EV] = { + wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges + .asJavaCollection, env.getJavaEnv)) + } +} + +/** + * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}. + * @param jgraph the underlying java api Graph. + * @tparam K the key type for vertex and edge identifiers + * @tparam VV the value type for vertices + * @tparam EV the value type for edges + * @see org.apache.flink.graph.Edge + * @see org.apache.flink.graph.Vertex + */ +final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: +TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { + + private[flink] def getWrappedGraph = jgraph + + + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + if (jgraph.getContext.getConfig.isClosureCleanerEnabled) { + ClosureCleaner.clean(f, checkSerializable) + } + ClosureCleaner.ensureSerializable(f) + f + } + + /** + * @return the vertex DataSet. + */ + def getVertices = wrap(jgraph.getVertices) + + /** + * @return the edge DataSet. + */ + def getEdges = wrap(jgraph.getEdges) + + /** + * @return the vertex DataSet as Tuple2. + */ + def getVerticesAsTuple2(): DataSet[(K, VV)] = { + wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1)) + } + + /** + * @return the edge DataSet as Tuple3. + */ + def getEdgesAsTuple3(): DataSet[(K, K, EV)] = { + wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2)) + } + + /** + * Apply a function to the attribute of each vertex in the graph. + * + * @param mapper the map function to apply. + * @return a new graph + */ + def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]): + Graph[K, NV, EV] = { + new Graph[K, NV, EV](jgraph.mapVertices[NV]( + mapper, + createTypeInformation[Vertex[K, NV]] + )) + } + + /** + * Apply a function to the attribute of each vertex in the graph. + * + * @param fun the map function to apply. + * @return a new graph + */ + def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV] = { + val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K, VV], NV] { + val cleanFun = clean(fun) + + def map(in: Vertex[K, VV]): NV = cleanFun(in) + } + new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper, createTypeInformation[Vertex[K, NV]])) + } + + /** + * Apply a function to the attribute of each edge in the graph. + * + * @param mapper the map function to apply. + * @return a new graph + */ + def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, + VV, NV] = { + new Graph[K, VV, NV](jgraph.mapEdges[NV]( + mapper, + createTypeInformation[Edge[K, NV]] + )) + } + + /** + * Apply a function to the attribute of each edge in the graph. + * + * @param fun the map function to apply. + * @return a new graph + */ + def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV] = { + val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K, EV], NV] { + val cleanFun = clean(fun) + + def map(in: Edge[K, EV]): NV = cleanFun(in) + } + new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, createTypeInformation[Edge[K, NV]])) + } + + /** + * Joins the vertex DataSet of this graph with an input DataSet and applies + * a UDF on the resulted values. + * + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. + * @return a new graph where the vertex values have been updated. + */ + def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: MapFunction[ + (VV, T), VV]): Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() { + override def map(value: jtuple.Tuple2[VV, T]): VV = { + mapper.map((value.f0, value.f1)) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + scalatuple._2)).javaSet + wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper)) + } + + /** + * Joins the vertex DataSet of this graph with an input DataSet and applies + * a UDF on the resulted values. + * + * @param inputDataSet the DataSet to join with. + * @param fun the UDF map function to apply. + * @return a new graph where the vertex values have been updated. + */ + def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV): + Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() { + val cleanFun = clean(fun) + + override def map(value: jtuple.Tuple2[VV, T]): VV = { + cleanFun(value.f0, value.f1) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + scalatuple._2)).javaSet + wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper)) + } + + /** + * Joins the edge DataSet with an input DataSet on a composite key of both + * source and target and applies a UDF on the resulted values. + * + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. + * @tparam T the return type + * @return a new graph where the edge values have been updated. + */ + def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], mapper: MapFunction[ + (EV, T), EV]): Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() { + override def map(value: jtuple.Tuple2[EV, T]): EV = { + mapper.map((value.f0, value.f1)) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1, + scalatuple._2, scalatuple._3)).javaSet + wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper)) + } + + /** + * Joins the edge DataSet with an input DataSet on a composite key of both + * source and target and applies a UDF on the resulted values. + * + * @param inputDataSet the DataSet to join with. + * @param fun the UDF map function to apply. + * @tparam T the return type + * @return a new graph where the edge values have been updated. + */ + def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV): + Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() { + val cleanFun = clean(fun) + + override def map(value: jtuple.Tuple2[EV, T]): EV = { + cleanFun(value.f0, value.f1) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1, + scalatuple._2, scalatuple._3)).javaSet + wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper)) + } + + /** + * Joins the edge DataSet with an input DataSet on the source key of the + * edges and the first attribute of the input DataSet and applies a UDF on + * the resulted values. In case the inputDataSet contains the same key more + * than once, only the first value will be considered. + * + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. + * @tparam T the return type + * @return a new graph where the edge values have been updated. + */ + def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: + MapFunction[(EV, T), EV]): Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() { + override def map(value: jtuple.Tuple2[EV, T]): EV = { + mapper.map((value.f0, value.f1)) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + scalatuple._2)).javaSet + wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper)) + } + + /** + * Joins the edge DataSet with an input DataSet on the source key of the + * edges and the first attribute of the input DataSet and applies a UDF on + * the resulted values. In case the inputDataSet contains the same key more + * than once, only the first value will be considered. + * + * @param inputDataSet the DataSet to join with. + * @param fun the UDF map function to apply. + * @tparam T the return type + * @return a new graph where the edge values have been updated. + */ + def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) => + EV): Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() { + val cleanFun = clean(fun) + + override def map(value: jtuple.Tuple2[EV, T]): EV = { + cleanFun(value.f0, value.f1) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + scalatuple._2)).javaSet + wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper)) + } + + /** + * Joins the edge DataSet with an input DataSet on the target key of the + * edges and the first attribute of the input DataSet and applies a UDF on + * the resulted values. Should the inputDataSet contain the same key more + * than once, only the first value will be considered. + * + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. + * @tparam T the return type + * @return a new graph where the edge values have been updated. + */ + def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: + MapFunction[(EV, T), EV]): Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() { + override def map(value: jtuple.Tuple2[EV, T]): EV = { + mapper.map((value.f0, value.f1)) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + scalatuple._2)).javaSet + wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper)) + } + + /** + * Joins the edge DataSet with an input DataSet on the target key of the + * edges and the first attribute of the input DataSet and applies a UDF on + * the resulted values. Should the inputDataSet contain the same key more + * than once, only the first value will be considered. + * + * @param inputDataSet the DataSet to join with. + * @param fun the UDF map function to apply. + * @tparam T the return type + * @return a new graph where the edge values have been updated. + */ + def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) => + EV): Graph[K, VV, EV] = { + val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() { + val cleanFun = clean(fun) + + override def map(value: jtuple.Tuple2[EV, T]): EV = { + cleanFun(value.f0, value.f1) + } + } + val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + scalatuple._2)).javaSet + wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper)) + } + + /** + * Apply filtering functions to the graph and return a sub-graph that + * satisfies the predicates for both vertices and edges. + * + * @param vertexFilter the filter function for vertices. + * @param edgeFilter the filter function for edges. + * @return the resulting sub-graph. + */ + def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K, + EV]]) = { + wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter)) + } + + /** + * Apply filtering functions to the graph and return a sub-graph that + * satisfies the predicates for both vertices and edges. + * + * @param vertexFilterFun the filter function for vertices. + * @param edgeFilterFun the filter function for edges. + * @return the resulting sub-graph. + */ + def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun: Edge[K, EV] => + Boolean) = { + val vertexFilter = new FilterFunction[Vertex[K, VV]] { + val cleanVertexFun = clean(vertexFilterFun) + + override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value) + } + + val edgeFilter = new FilterFunction[Edge[K, EV]] { + val cleanEdgeFun = clean(edgeFilterFun) + + override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value) + } + + wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter)) + } + + /** + * Apply a filtering function to the graph and return a sub-graph that + * satisfies the predicates only for the vertices. + * + * @param vertexFilter the filter function for vertices. + * @return the resulting sub-graph. + */ + def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = { + wrapGraph(jgraph.filterOnVertices(vertexFilter)) + } + + /** + * Apply a filtering function to the graph and return a sub-graph that + * satisfies the predicates only for the vertices. + * + * @param vertexFilterFun the filter function for vertices. + * @return the resulting sub-graph. + */ + def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = { + val vertexFilter = new FilterFunction[Vertex[K, VV]] { + val cleanVertexFun = clean(vertexFilterFun) + + override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value) + } + + wrapGraph(jgraph.filterOnVertices(vertexFilter)) + } + + /** + * Apply a filtering function to the graph and return a sub-graph that + * satisfies the predicates only for the edges. + * + * @param edgeFilter the filter function for edges. + * @return the resulting sub-graph. + */ + def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = { + wrapGraph(jgraph.filterOnEdges(edgeFilter)) + } + + /** + * Apply a filtering function to the graph and return a sub-graph that + * satisfies the predicates only for the edges. + * + * @param edgeFilterFun the filter function for edges. + * @return the resulting sub-graph. + */ + def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = { + val edgeFilter = new FilterFunction[Edge[K, EV]] { + val cleanEdgeFun = clean(edgeFilterFun) + + override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value) + } + + wrapGraph(jgraph.filterOnEdges(edgeFilter)) + } + + /** + * Return the in-degree of all vertices in the graph + * + * @return A DataSet of Tuple2 + */ + def inDegrees(): DataSet[(K, Long)] = { + wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) + } + + /** + * Return the out-degree of all vertices in the graph + * + * @return A DataSet of Tuple2 + */ + def outDegrees(): DataSet[(K, Long)] = { + wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) + } + + /** + * Return the degree of all vertices in the graph + * + * @return A DataSet of Tuple2 + */ + def getDegrees(): DataSet[(K, Long)] = { + wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) + } + + /** + * This operation adds all inverse-direction edges to the graph. + * + * @return the undirected graph. + */ + def getUndirected(): Graph[K, VV, EV] = { + new Graph(jgraph.getUndirected) + } + + /** + * Reverse the direction of the edges in the graph + * + * @return a new graph with all edges reversed + * @throws UnsupportedOperationException + */ + def reverse(): Graph[K, VV, EV] = { + new Graph(jgraph.reverse()) + } + + /** + * Compute an aggregate over the edges of each vertex. The function applied + * on the edges has access to the vertex value. + * + * @param edgesFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @tparam T the output type + * @return a dataset of a T + */ + def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: + EdgesFunctionWithVertexValue[K, VV, EV, + T], direction: EdgeDirection): + DataSet[T] = { + wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T])) + } + + /** + * Compute an aggregate over the edges of each vertex. The function applied + * on the edges has access to the vertex value. + * + * @param edgesFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @tparam T the output type + * @return a dataset of a T + */ + def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], + direction: EdgeDirection): DataSet[T] = { + wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T])) + } + + /** + * Compute an aggregate over the neighbors (edges and vertices) of each + * vertex. The function applied on the neighbors has access to the vertex + * value. + * + * @param neighborsFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @tparam T the output type + * @return a dataset of a T + */ + def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: + NeighborsFunctionWithVertexValue[K, + VV, EV, T], direction: + EdgeDirection): DataSet[T] = { + wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T])) + } + + /** + * Compute an aggregate over the neighbors (edges and vertices) of each + * vertex. + * + * @param neighborsFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @tparam T the output type + * @return a dataset of a T + */ + def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: + NeighborsFunction[K, VV, EV, T], + direction: EdgeDirection): + DataSet[T] = { + wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T])) + } + + /** + * @return a long integer representing the number of vertices + */ + def numberOfVertices(): Long = { + jgraph.numberOfVertices() + } + + /** + * @return a long integer representing the number of edges + */ + def numberOfEdges(): Long = { + jgraph.numberOfEdges() + } + + /** + * @return The IDs of the vertices as DataSet + */ + def getVertexIds(): DataSet[K] = { + wrap(jgraph.getVertexIds) + } + + /** + * @return The IDs of the edges as DataSet + */ + def getEdgeIds(): DataSet[(K, K)] = { + wrap(jgraph.getEdgeIds).map(jtuple => (jtuple.f0, jtuple.f1)) + } + + /** + * Adds the input vertex to the graph. If the vertex already + * exists in the graph, it will not be added again. + * + * @param vertex the vertex to be added + * @return the new graph containing the existing vertices as well as the one just added + */ + def addVertex(vertex: Vertex[K, VV]) = { + wrapGraph(jgraph.addVertex(vertex)) + } + + /** + * Adds the given edge to the graph. If the source and target vertices do + * not exist in the graph, they will also be added. + * + * @param source the source vertex of the edge + * @param target the target vertex of the edge + * @param edgeValue the edge value + * @return the new graph containing the existing vertices and edges plus the + * newly added edge + */ + def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV) = { + wrapGraph(jgraph.addEdge(source, target, edgeValue)) + } + + /** + * Removes the given vertex and its edges from the graph. + * + * @param vertex the vertex to remove + * @return the new graph containing the existing vertices and edges without + * the removed vertex and its edges + */ + def removeVertex(vertex: Vertex[K, VV]) = { + wrapGraph(jgraph.removeVertex(vertex)) + } + + /** + * Removes all edges that match the given edge from the graph. + * + * @param edge the edge to remove + * @return the new graph containing the existing vertices and edges without + * the removed edges + */ + def removeEdge(edge: Edge[K, EV]) = { + wrapGraph(jgraph.removeEdge(edge)) + } + + /** + * Performs union on the vertices and edges sets of the input graphs + * removing duplicate vertices but maintaining duplicate edges. + * + * @param graph the graph to perform union with + * @return a new graph + */ + def union(graph: Graph[K, VV, EV]) = { + wrapGraph(jgraph.union(graph.getWrappedGraph)) + } + + /** + * Compute an aggregate over the neighbor values of each + * vertex. + * + * @param reduceNeighborsFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value) + */ + def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: + EdgeDirection): DataSet[(K, VV)] = { + wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(jtuple => (jtuple + .f0, jtuple.f1)) + } + + /** + * Compute an aggregate over the edge values of each vertex. + * + * @param reduceEdgesFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @return a Dataset containing one value per vertex(vertex key, aggegate edge value) + * @throws IllegalArgumentException + */ + def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): + DataSet[(K, EV)] = { + wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0, + jtuple.f1)) + } + + def run(algorithm: GraphAlgorithm[K, VV, EV]) = { + wrapGraph(jgraph.run(algorithm)) + } + + /** + * Runs a Vertex-Centric iteration on the graph. + * No configuration options are provided. + * + * @param vertexUpdateFunction the vertex update function + * @param messagingFunction the messaging function + * @param maxIterations maximum number of iterations to perform + * + * @return the updated Graph after the vertex-centric iteration has converged or + * after maximumNumberOfIterations. + */ + def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M], + messagingFunction: MessagingFunction[K, VV, M, EV], + maxIterations: Int): Graph[K, VV, EV] = { + wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction, + maxIterations)) + } + + /** + * Runs a Vertex-Centric iteration on the graph with configuration options. + * + * @param vertexUpdateFunction the vertex update function + * @param messagingFunction the messaging function + * @param maxIterations maximum number of iterations to perform + * @param parameters the iteration configuration parameters + * + * @return the updated Graph after the vertex-centric iteration has converged or + * after maximumNumberOfIterations. + */ + def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M], + messagingFunction: MessagingFunction[K, VV, M, EV], + maxIterations: Int, parameters: VertexCentricConfiguration): + Graph[K, VV, EV] = { + wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction, + maxIterations, parameters)) + } + + /** + * Runs a Gather-Sum-Apply iteration on the graph. + * No configuration options are provided. + * + * @param gatherFunction the gather function collects information about adjacent + * vertices and edges + * @param sumFunction the sum function aggregates the gathered information + * @param applyFunction the apply function updates the vertex values with the aggregates + * @param maxIterations maximum number of iterations to perform + * @tparam M the intermediate type used between gather, sum and apply + * + * @return the updated Graph after the gather-sum-apply iteration has converged or + * after maximumNumberOfIterations. + */ + def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: + SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K, + VV, EV] = { + wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, + maxIterations)) + } + + /** + * Runs a Gather-Sum-Apply iteration on the graph with configuration options. + * + * @param gatherFunction the gather function collects information about adjacent + * vertices and edges + * @param sumFunction the sum function aggregates the gathered information + * @param applyFunction the apply function updates the vertex values with the aggregates + * @param maxIterations maximum number of iterations to perform + * @param parameters the iteration configuration parameters + * @tparam M the intermediate type used between gather, sum and apply + * + * @return the updated Graph after the gather-sum-apply iteration has converged or + * after maximumNumberOfIterations. + */ + def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: + SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int, + parameters: GSAConfiguration): Graph[K, VV, EV] = { + wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, + maxIterations, parameters)) + } +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala new file mode 100644 index 0000000000000..ca15dabdccace --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala + +import org.apache.flink.api.java.tuple.Tuple3 +import org.apache.flink.graph.{Edge, Vertex} +import org.apache.flink.util.Collector + + +abstract class NeighborsFunction[K, VV, EV, T] extends org.apache.flink.graph +.NeighborsFunction[K, VV, EV, T] { + + def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T]) + + override def iterateNeighbors(neighbors: java.lang.Iterable[Tuple3[K, Edge[K, EV], Vertex[K, + VV]]], out: Collector[T]) = { + val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors) + .map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2)) + iterateNeighbors(scalaIterable, out) + } +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala new file mode 100644 index 0000000000000..cefc277c00ad2 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala + +import java.lang + +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.graph.{Edge, Vertex} +import org.apache.flink.util.Collector + + +abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph +.NeighborsFunctionWithVertexValue[K, VV, EV, T] { + + def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV]) + ], out: Collector[T]): Unit + + override def iterateNeighbors(vertex: Vertex[K, VV], neighbors: lang.Iterable[Tuple2[Edge[K, + EV], Vertex[K, VV]]], out: Collector[T]): Unit = { + val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors) + .map(jtuple => (jtuple.f0, jtuple.f1)) + iterateNeighbors(vertex, scalaIterable, out) + } +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala new file mode 100644 index 0000000000000..159a1003554fe --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.graph.{Graph => JGraph} + +import _root_.scala.reflect.ClassTag + + +package object scala { + private[flink] def wrapGraph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, + EV: TypeInformation : ClassTag](javagraph: JGraph[K, VV, EV]) = new Graph[K, VV, EV](javagraph) +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala new file mode 100644 index 0000000000000..0d7d2afffc08b --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.utils + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.Edge + +class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] { + + private val serialVersionUID: Long = 1L + + override def map(value: Edge[K, EV]): (K, K, EV) = { + (value.getSource, value.getTarget, value.getValue) + } +} diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala new file mode 100644 index 0000000000000..de77832ec67f1 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.utils + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.graph.Vertex + +class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] { + + private val serialVersionUID: Long = 1L + + override def map(value: Vertex[K, VV]): (K, VV) = { + (value.getId, value.getValue) + } +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala new file mode 100644 index 0000000000000..1c2cf54d33a56 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.{Edge, Vertex} + +object TestGraphUtils { + + def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = { + return env.fromCollection(getLongLongVertices) + } + + def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = { + return env.fromCollection(getLongLongEdges) + } + + def getLongLongVertices: List[Vertex[Long, Long]] = { + List( + new Vertex[Long, Long](1L, 1L), + new Vertex[Long, Long](2L, 2L), + new Vertex[Long, Long](3L, 3L), + new Vertex[Long, Long](4L, 4L), + new Vertex[Long, Long](5L, 5L) + ) + } + + def getLongLongEdges: List[Edge[Long, Long]] = { + List( + new Edge[Long, Long](1L, 2L, 12L), + new Edge[Long, Long](1L, 3L, 13L), + new Edge[Long, Long](2L, 3L, 23L), + new Edge[Long, Long](3L, 4L, 34L), + new Edge[Long, Long](3L, 5L, 35L), + new Edge[Long, Long](4L, 5L, 45L), + new Edge[Long, Long](5L, 1L, 51L) + ) + } +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala new file mode 100644 index 0000000000000..98dbbe9254e7c --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testInDegrees { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.inDegrees().writeAsCsv(resultPath) + env.execute + expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n" + } + + @Test + @throws(classOf[Exception]) + def testOutDegrees { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.outDegrees().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n" + } + + @Test + @throws(classOf[Exception]) + def testGetDegrees { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.getDegrees().writeAsCsv(resultPath) + env.execute + expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n" + } +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala new file mode 100644 index 0000000000000..687b0a7581f8a --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.graph.{Edge, Vertex} +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testAddVertex { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + + val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L)) + newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) + env.execute + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + } + + @Test + @throws(classOf[Exception]) + def testAddVertexExisting { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L)) + newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) + env.execute + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + } + + @Test + @throws(classOf[Exception]) + def testAddVertexNoEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L)) + newgraph.getVerticesAsTuple2.writeAsCsv(resultPath) + env.execute + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n" + } + + @Test + @throws(classOf[Exception]) + def testRemoveVertex { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L)) + newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + } + + @Test + @throws(classOf[Exception]) + def testRemoveInvalidVertex { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L)) + newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + + "45\n" + "5,1,51\n" + } + + @Test + @throws(classOf[Exception]) + def testAddEdge { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L, + 1L), 61L) + newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + + "45\n" + "5,1,51\n" + "6,1,61\n" + } + + @Test + @throws(classOf[Exception]) + def testAddExistingEdge { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L, + 2L), 12L) + newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," + + "35\n" + "4,5,45\n" + "5,1,51\n" + } + + @Test + @throws(classOf[Exception]) + def testRemoveEdge { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L)) + newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + } + + @Test + @throws(classOf[Exception]) + def testRemoveInvalidEdge { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L)) + newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + + "45\n" + "5,1,51\n" + } +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala new file mode 100644 index 0000000000000..d49e56559c777 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.graph.{Edge, Vertex} +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testUndirected { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.getUndirected().getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," + + "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" + + "5,1,51\n" + "1,5,51\n" + } + + @Test + @throws(classOf[Exception]) + def testReverse { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.reverse().getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," + + "45\n" + "1,5,51\n" + } + + @Test + @throws(classOf[Exception]) + def testSubGraph { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.subgraph(new FilterFunction[Vertex[Long, Long]] { + @throws(classOf[Exception]) + def filter(vertex: Vertex[Long, Long]): Boolean = { + return (vertex.getValue > 2) + } + }, new FilterFunction[Edge[Long, Long]] { + + @throws(classOf[Exception]) + override def filter(edge: Edge[Long, Long]): Boolean = { + return (edge.getValue > 34) + } + }).getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "3,5,35\n" + "4,5,45\n" + } + + @Test + @throws(classOf[Exception]) + def testSubGraphSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.subgraph( + vertex => vertex.getValue > 2, + edge => edge.getValue > 34 + ).getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "3,5,35\n" + "4,5,45\n" + } + + @Test + @throws(classOf[Exception]) + def testFilterOnVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] { + @throws(classOf[Exception]) + def filter(vertex: Vertex[Long, Long]): Boolean = { + vertex.getValue > 2 + } + }).getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + } + + @Test + @throws(classOf[Exception]) + def testFilterOnVerticesSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.filterOnVertices( + vertex => vertex.getValue > 2 + ).getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + } + + @Test + @throws(classOf[Exception]) + def testFilterOnEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] { + @throws(classOf[Exception]) + def filter(edge: Edge[Long, Long]): Boolean = { + edge.getValue > 34 + } + }).getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + } + + @Test + @throws(classOf[Exception]) + def testFilterOnEdgesSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.filterOnEdges( + edge => edge.getValue > 34 + ).getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n" + } + + @Test + @throws(classOf[Exception]) + def testNumberOfVertices { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + env.fromElements(graph.numberOfVertices).writeAsText(resultPath) + env.execute + expectedResult = "5" + } + + @Test + @throws(classOf[Exception]) + def testNumberOfEdges { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + env.fromElements(graph.numberOfEdges).writeAsText(resultPath) + env.execute + expectedResult = "7" + } + + @Test + @throws(classOf[Exception]) + def testVertexIds { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.getVertexIds.writeAsText(resultPath) + env.execute + expectedResult = "1\n2\n3\n4\n5\n" + } + + @Test + @throws(classOf[Exception]) + def testEdgesIds { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.getEdgeIds.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n" + } + + @Test + @throws(classOf[Exception]) + def testUnion { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]]( + new Vertex[Long, Long](6L, 6L) + ) + val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]]( + new Edge[Long, Long](6L, 1L, 61L) + ) + + val newgraph = graph.union(Graph.fromCollection(vertices, edges, env)) + newgraph.getEdgesAsTuple3.writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," + + "45\n" + "5,1,51\n" + "6,1,61\n" + } +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala new file mode 100644 index 0000000000000..e19463e0b3b05 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.graph.Edge +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.graph.scala.utils.EdgeToTuple3Map +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testWithEdgesInputDataset { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new + EdgeToTuple3Map[Long, Long]), new AddValuesMapper) + result.getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + + "90\n" + "5,1,102\n" + } + + @Test + @throws(classOf[Exception]) + def testWithEdgesInputDatasetSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new + EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) => + originalValue + tupleValue) + result.getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + + "90\n" + "5,1,102\n" + } + + @Test + @throws(classOf[Exception]) + def testWithEdgesOnSource { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges + .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => + originalValue + tupleValue) + result.getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + + "90\n" + "5,1,102\n" + } + + @Test + @throws(classOf[Exception]) + def testWithEdgesOnSourceSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges + .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => + originalValue + tupleValue) + result.getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + + "90\n" + "5,1,102\n" + } + + @Test + @throws(classOf[Exception]) + def testWithEdgesOnTarget { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges + .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => + originalValue + tupleValue) + result.getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + + "80\n" + "5,1,102\n" + } + + @Test + @throws(classOf[Exception]) + def testWithEdgesOnTargetSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges + .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => + originalValue + tupleValue) + result.getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + + "80\n" + "5,1,102\n" + } + + + final class AddValuesMapper extends MapFunction[(Long, Long), Long] { + @throws(classOf[Exception]) + def map(tuple: (Long, Long)): Long = { + tuple._1 + tuple._2 + } + } + + final class ProjectSourceAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] { + @throws(classOf[Exception]) + def map(edge: Edge[Long, Long]): (Long, Long) = { + (edge.getSource, edge.getValue) + } + } + + final class ProjectTargetAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] { + @throws(classOf[Exception]) + def map(edge: Edge[Long, Long]): (Long, Long) = { + (edge.getTarget, edge.getValue) + } + } + +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala new file mode 100644 index 0000000000000..4b8f3542455f1 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.graph.scala.utils.VertexToTuple2Map +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testJoinWithVertexSet { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new + VertexToTuple2Map[Long, Long]), new AddValuesMapper) + result.getVerticesAsTuple2().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" + } + + @Test + @throws(classOf[Exception]) + def testJoinWithVertexSetSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long]) + val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet, + (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue) + result.getVerticesAsTuple2().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" + } + + + final class AddValuesMapper extends MapFunction[(Long, Long), Long] { + @throws(classOf[Exception]) + def map(tuple: (Long, Long)): Long = { + tuple._1 + tuple._2 + } + } + +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala new file mode 100644 index 0000000000000..7e5ad14659de8 --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.graph.Edge +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testWithSameValue { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.mapEdges(new AddOneMapper) + .getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,13\n" + + "1,3,14\n" + "" + + "2,3,24\n" + + "3,4,35\n" + + "3,5,36\n" + + "4,5,46\n" + + "5,1,52\n" + } + + @Test + @throws(classOf[Exception]) + def testWithSameValueSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.mapEdges(edge => edge.getValue + 1) + .getEdgesAsTuple3().writeAsCsv(resultPath) + env.execute + expectedResult = "1,2,13\n" + + "1,3,14\n" + "" + + "2,3,24\n" + + "3,4,35\n" + + "3,5,36\n" + + "4,5,46\n" + + "5,1,52\n" + } + + final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] { + @throws(classOf[Exception]) + def map(edge: Edge[Long, Long]): Long = { + edge.getValue + 1 + } + } + +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala new file mode 100644 index 0000000000000..a22cfbd3f93ca --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.graph.Vertex +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testWithSameValue { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.mapVertices(new AddOneMapper) + .getVerticesAsTuple2().writeAsCsv(resultPath) + env.execute + + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,6\n"; + } + + @Test + @throws(classOf[Exception]) + def testWithSameValueSugar { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.mapVertices(vertex => vertex.getValue + 1) + .getVerticesAsTuple2().writeAsCsv(resultPath) + env.execute + + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,6\n"; + } + + final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] { + @throws(classOf[Exception]) + def map(vertex: Vertex[Long, Long]): Long = { + vertex.getValue + 1 + } + } + +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala new file mode 100644 index 0000000000000..6ed383aae8a8a --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + +import org.apache.flink.api.scala._ +import org.apache.flink.graph._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph} +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.apache.flink.util.Collector +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) + extends MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testAllNeighborsWithValueGreaterThanFour { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, + EdgeDirection.ALL) + result.writeAsCsv(resultPath) + env.execute + + + expectedResult = "5,1\n" + "5,3\n" + "5,4" + } + + + @Test + @throws(classOf[Exception]) + def testAllNeighbors { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) + result.writeAsCsv(resultPath) + env.execute + + + expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" + + "3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4" + } + + @Test + @throws(classOf[Exception]) + def testLowestWeightOutNeighborNoValue { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new + SelectMinWeightNeighborNoValue, EdgeDirection.OUT) + verticesWithLowestOutNeighbor.writeAsCsv(resultPath) + env.execute + expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n" + } + + @Test + @throws(classOf[Exception]) + def testLowestWeightInNeighborNoValue { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new + SelectMinWeightNeighborNoValue, EdgeDirection.IN) + verticesWithLowestOutNeighbor.writeAsCsv(resultPath) + env.execute + expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n" + } + + @Test + @throws(classOf[Exception]) + def testMaxWeightAllNeighbors { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new + SelectMaxWeightNeighborNoValue, EdgeDirection.ALL) + verticesWithMaxEdgeWeight.writeAsCsv(resultPath) + env.execute + expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n" + } + + final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long, + Long, Long, (Long, Long)] { + @throws(classOf[Exception]) + override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out: + Collector[(Long, Long)]): Unit = { + for (edge <- edges) { + if (v.getValue > 4) { + if (v.getId == edge.getTarget) { + out.collect((v.getId, edge.getSource)) + } + else { + out.collect((v.getId, edge.getTarget)) + } + } + } + } + } + + final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] { + @throws(classOf[Exception]) + override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[ + (Long, Long)]) { + for (edge <- edges) { + if (edge._1.equals(edge._2.getTarget)) { + out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource)) + } + else { + out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget)) + } + } + } + } + + final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] { + override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = { + Math.min(firstEdgeValue, secondEdgeValue) + } + } + + final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] { + override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = { + Math.max(firstEdgeValue, secondEdgeValue) + } + } + +} diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala new file mode 100644 index 0000000000000..52e6d7a66868a --- /dev/null +++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.operations + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _} +import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex} +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.apache.flink.util.Collector +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{After, Before, Rule, Test} + +@RunWith(classOf[Parameterized]) +class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) + extends MultipleProgramsTestBase(mode) { + + private var resultPath: String = null + private var expectedResult: String = null + + var tempFolder: TemporaryFolder = new TemporaryFolder() + + @Rule + def getFolder(): TemporaryFolder = { + tempFolder; + } + + @Before + @throws(classOf[Exception]) + def before { + resultPath = tempFolder.newFile.toURI.toString + } + + @After + @throws(classOf[Exception]) + def after { + TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath) + } + + @Test + @throws(classOf[Exception]) + def testSumOfAllNeighborsNoValue { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath) + env.execute + expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n" + } + + @Test + @throws(classOf[Exception]) + def testSumOfOutNeighborsNoValue { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath) + env.execute + expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n" + } + + @Test + @throws(classOf[Exception]) + def testSumOfAllNeighbors { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL) + result.writeAsCsv(resultPath) + env.execute + expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n" + } + + @Test + @throws(classOf[Exception]) + def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val result = graph.groupReduceOnNeighbors(new + SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN) + result.writeAsCsv(resultPath) + env.execute + expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285" + } + + final class SumNeighbors extends ReduceNeighborsFunction[Long] { + override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = { + firstNeighbor + secondNeighbor + } + } + + final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long, (Long, + Long)] { + @throws(classOf[Exception]) + def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long], + Vertex[Long, Long])], out: Collector[(Long, Long)]) { + var sum: Long = 0 + for (neighbor <- neighbors) { + sum += neighbor._2.getValue + } + out.collect((vertex.getId, sum + vertex.getValue)) + } + } + + final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends + NeighborsFunction[Long, Long, Long, (Long, Long)] { + @throws(classOf[Exception]) + def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])], + out: Collector[(Long, Long)]) { + var sum: Long = 0 + var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null + val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] = + neighbors.iterator + while (neighborsIterator.hasNext) { + next = neighborsIterator.next + sum += next._3.getValue * next._2.getValue + } + if (next._1 > 2) { + out.collect(new Tuple2[Long, Long](next._1, sum)) + out.collect(new Tuple2[Long, Long](next._1, sum * 2)) + } + } + } + +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index ff279491d9711..8552c01a31b54 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -383,6 +383,17 @@ public Graph mapVertices(final MapFunction, NV> ma TypeInformation> returnType = (TypeInformation>) new TupleTypeInfo( Vertex.class, keyType, valueType); + return mapVertices(mapper, returnType); + } + + /** + * Apply a function to the attribute of each vertex in the graph. + * + * @param mapper the map function to apply. + * @param returnType the explicit return type. + * @return a new graph + */ + public Graph mapVertices(final MapFunction, NV> mapper, TypeInformation> returnType) { DataSet> mappedVertices = vertices.map( new MapFunction, Vertex>() { public Vertex map(Vertex value) throws Exception { @@ -411,6 +422,18 @@ public Graph mapEdges(final MapFunction, NV> mapper) TypeInformation> returnType = (TypeInformation>) new TupleTypeInfo( Edge.class, keyType, keyType, valueType); + return mapEdges(mapper, returnType); + } + + /** + * Apply a function to the attribute of each edge in the graph. + * + * @param mapper the map function to apply. + * @param returnType the explicit return type. + * @return a new graph + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Graph mapEdges(final MapFunction, NV> mapper, TypeInformation> returnType) { DataSet> mappedEdges = edges.map( new MapFunction, Edge>() { public Edge map(Edge value) throws Exception { @@ -752,6 +775,38 @@ public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue + * the output type + * @param typeInfo the explicit return type. + * @return a dataset of a T + * @throws IllegalArgumentException + */ + public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue edgesFunction, + EdgeDirection direction, TypeInformation typeInfo) throws IllegalArgumentException { + + switch (direction) { + case IN: + return vertices.coGroup(edges).where(0).equalTo(1) + .with(new ApplyCoGroupFunction(edgesFunction)).returns(typeInfo); + case OUT: + return vertices.coGroup(edges).where(0).equalTo(0) + .with(new ApplyCoGroupFunction(edgesFunction)).returns(typeInfo); + case ALL: + return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode())) + .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges(edgesFunction)).returns(typeInfo); + default: + throw new IllegalArgumentException("Illegal edge direction"); + } + } + /** * Compute an aggregate over the edges of each vertex. The function applied * on the edges only has access to the vertex id (not the vertex value). @@ -785,6 +840,40 @@ public DataSet groupReduceOnEdges(EdgesFunction edgesFunction, } } + /** + * Compute an aggregate over the edges of each vertex. The function applied + * on the edges only has access to the vertex id (not the vertex value). + * + * @param edgesFunction + * the function to apply to the neighborhood + * @param direction + * the edge direction (in-, out-, all-) + * @param + * the output type + * @param typeInfo the explicit return type. + * @return a dataset of T + * @throws IllegalArgumentException + */ + public DataSet groupReduceOnEdges(EdgesFunction edgesFunction, + EdgeDirection direction, TypeInformation typeInfo) throws IllegalArgumentException { + + switch (direction) { + case IN: + return edges.map(new ProjectVertexIdMap(1)) + .withForwardedFields("f1->f0") + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction(edgesFunction)).returns(typeInfo); + case OUT: + return edges.map(new ProjectVertexIdMap(0)) + .withForwardedFields("f0") + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction(edgesFunction)).returns(typeInfo); + case ALL: + return edges.flatMap(new EmitOneEdgePerNode()) + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction(edgesFunction)).returns(typeInfo); + default: + throw new IllegalArgumentException("Illegal edge direction"); + } + } + private static final class ProjectVertexIdMap implements MapFunction< Edge, Tuple2>> { @@ -1410,6 +1499,51 @@ public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue the output type + * @param typeInfo the explicit return type. + * @return a dataset of a T + * @throws IllegalArgumentException + */ + public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue neighborsFunction, + EdgeDirection direction, TypeInformation typeInfo) throws IllegalArgumentException { + switch (direction) { + case IN: + // create pairs + DataSet, Vertex>> edgesWithSources = edges + .join(this.vertices).where(0).equalTo(0); + return vertices.coGroup(edgesWithSources) + .where(0).equalTo("f0.f1") + .with(new ApplyNeighborCoGroupFunction(neighborsFunction)).returns(typeInfo); + case OUT: + // create pairs + DataSet, Vertex>> edgesWithTargets = edges + .join(this.vertices).where(1).equalTo(0); + return vertices.coGroup(edgesWithTargets) + .where(0).equalTo("f0.f0") + .with(new ApplyNeighborCoGroupFunction(neighborsFunction)).returns(typeInfo); + case ALL: + // create pairs + DataSet, Vertex>> edgesWithNeighbors = edges + .flatMap(new EmitOneEdgeWithNeighborPerNode()) + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectEdgeWithNeighbor()); + + return vertices.coGroup(edgesWithNeighbors) + .where(0).equalTo(0) + .with(new ApplyCoGroupFunctionOnAllNeighbors(neighborsFunction)).returns(typeInfo); + default: + throw new IllegalArgumentException("Illegal edge direction"); + } + } + + /** * Compute an aggregate over the neighbors (edges and vertices) of each * vertex. The function applied on the neighbors only has access to the @@ -1454,6 +1588,51 @@ public DataSet groupReduceOnNeighbors(NeighborsFunction nei } } + /** + * Compute an aggregate over the neighbors (edges and vertices) of each + * vertex. The function applied on the neighbors only has access to the + * vertex id (not the vertex value). + * + * @param neighborsFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @param the output type + * @param typeInfo the explicit return type. + * @return a dataset of a T + * @throws IllegalArgumentException + */ + public DataSet groupReduceOnNeighbors(NeighborsFunction neighborsFunction, + EdgeDirection direction, TypeInformation typeInfo) throws IllegalArgumentException { + switch (direction) { + case IN: + // create pairs + DataSet, Vertex>> edgesWithSources = edges + .join(this.vertices).where(0).equalTo(0) + .with(new ProjectVertexIdJoin(1)) + .withForwardedFieldsFirst("f1->f0"); + return edgesWithSources.groupBy(0).reduceGroup( + new ApplyNeighborGroupReduceFunction(neighborsFunction)).returns(typeInfo); + case OUT: + // create pairs + DataSet, Vertex>> edgesWithTargets = edges + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectVertexIdJoin(0)) + .withForwardedFieldsFirst("f0"); + return edgesWithTargets.groupBy(0).reduceGroup( + new ApplyNeighborGroupReduceFunction(neighborsFunction)).returns(typeInfo); + case ALL: + // create pairs + DataSet, Vertex>> edgesWithNeighbors = edges + .flatMap(new EmitOneEdgeWithNeighborPerNode()) + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectEdgeWithNeighbor()); + + return edgesWithNeighbors.groupBy(0).reduceGroup( + new ApplyNeighborGroupReduceFunction(neighborsFunction)).returns(typeInfo); + default: + throw new IllegalArgumentException("Illegal edge direction"); + } + } + private static final class ApplyNeighborGroupReduceFunction implements GroupReduceFunction, Vertex>, T>, ResultTypeQueryable { diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml index b3aec14c87e8f..a0cda670054ed 100644 --- a/flink-staging/pom.xml +++ b/flink-staging/pom.xml @@ -46,6 +46,7 @@ under the License. flink-table flink-ml flink-language-binding + flink-gelly-scala