diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 7854c54fac091..c6843e4fa4200 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -55,6 +55,7 @@ import org.apache.flink.graph.spargel.ScatterGatherConfiguration; import org.apache.flink.graph.spargel.ScatterGatherIteration; import org.apache.flink.graph.utils.EdgeToTuple3Map; +import org.apache.flink.graph.utils.Tuple2ToEdgeMap; import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.graph.utils.VertexToTuple2Map; @@ -74,8 +75,7 @@ /** * Represents a Graph consisting of {@link Edge edges} and {@link Vertex * vertices}. - * - * + * * @see org.apache.flink.graph.Edge * @see org.apache.flink.graph.Vertex * @@ -176,17 +176,24 @@ public static Graph fromDataSet(DataSet> ve public static Graph fromDataSet( DataSet> edges, ExecutionEnvironment context) { - DataSet> vertices = edges.flatMap(new EmitSrcAndTarget()).distinct(); + DataSet> vertices = edges + .flatMap(new EmitSrcAndTarget()) + .name("Source and target IDs") + .distinct() + .name("IDs"); return new Graph<>(vertices, edges, context); } - private static final class EmitSrcAndTarget implements FlatMapFunction< - Edge, Vertex> { + private static final class EmitSrcAndTarget + implements FlatMapFunction, Vertex> { + private Vertex output = new Vertex<>(null, NullValue.getInstance()); public void flatMap(Edge edge, Collector> out) { - out.collect(new Vertex<>(edge.f0, NullValue.getInstance())); - out.collect(new Vertex<>(edge.f1, NullValue.getInstance())); + output.f0 = edge.f0; + out.collect(output); + output.f0 = edge.f1; + out.collect(output); } } @@ -214,22 +221,32 @@ public static Graph fromDataSet(DataSet> edge Vertex.class, keyType, valueType); DataSet> vertices = edges - .flatMap(new EmitSrcAndTargetAsTuple1()).distinct() - .map(new MapFunction, Vertex>() { - public Vertex map(Tuple1 value) throws Exception { - return new Vertex<>(value.f0, vertexValueInitializer.map(value.f0)); - } - }).returns(returnType).withForwardedFields("f0"); + .flatMap(new EmitSrcAndTargetAsTuple1()) + .name("Source and target IDs") + .distinct() + .name("IDs") + .map(new MapFunction, Vertex>() { + private Vertex output = new Vertex<>(); + + public Vertex map(Tuple1 value) throws Exception { + output.f0 = value.f0; + output.f1 = vertexValueInitializer.map(value.f0); + return output; + } + }).returns(returnType).withForwardedFields("f0").name("Initialize vertex values"); return new Graph<>(vertices, edges, context); } - private static final class EmitSrcAndTargetAsTuple1 implements FlatMapFunction< - Edge, Tuple1> { + private static final class EmitSrcAndTargetAsTuple1 + implements FlatMapFunction, Tuple1> { + private Tuple1 output = new Tuple1<>(); public void flatMap(Edge edge, Collector> out) { - out.collect(new Tuple1<>(edge.f0)); - out.collect(new Tuple1<>(edge.f1)); + output.f0 = edge.f0; + out.collect(output); + output.f0 = edge.f1; + out.collect(output); } } @@ -251,8 +268,14 @@ public void flatMap(Edge edge, Collector> out) { public static Graph fromTupleDataSet(DataSet> vertices, DataSet> edges, ExecutionEnvironment context) { - DataSet> vertexDataSet = vertices.map(new Tuple2ToVertexMap()); - DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); + DataSet> vertexDataSet = vertices + .map(new Tuple2ToVertexMap()) + .name("Type conversion"); + + DataSet> edgeDataSet = edges + .map(new Tuple3ToEdgeMap()) + .name("Type conversion"); + return fromDataSet(vertexDataSet, edgeDataSet, context); } @@ -272,7 +295,10 @@ public static Graph fromTupleDataSet(DataSet Graph fromTupleDataSet(DataSet> edges, ExecutionEnvironment context) { - DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); + DataSet> edgeDataSet = edges + .map(new Tuple3ToEdgeMap()) + .name("Type conversion"); + return fromDataSet(edgeDataSet, context); } @@ -295,7 +321,10 @@ public static Graph fromTupleDataSet(DataSet Graph fromTupleDataSet(DataSet> edges, final MapFunction vertexValueInitializer, ExecutionEnvironment context) { - DataSet> edgeDataSet = edges.map(new Tuple3ToEdgeMap()); + DataSet> edgeDataSet = edges + .map(new Tuple3ToEdgeMap()) + .name("Type conversion"); + return fromDataSet(edgeDataSet, vertexValueInitializer, context); } @@ -313,13 +342,10 @@ public static Graph fromTupleDataSet(DataSet Graph fromTuple2DataSet(DataSet> edges, ExecutionEnvironment context) { - DataSet> edgeDataSet = edges.map( - new MapFunction, Edge>() { + DataSet> edgeDataSet = edges + .map(new Tuple2ToEdgeMap()) + .name("To Edge"); - public Edge map(Tuple2 input) { - return new Edge<>(input.f0, input.f1, NullValue.getInstance()); - } - }).withForwardedFields("f0; f1"); return fromDataSet(edgeDataSet, context); } @@ -341,13 +367,10 @@ public Edge map(Tuple2 input) { public static Graph fromTuple2DataSet(DataSet> edges, final MapFunction vertexValueInitializer, ExecutionEnvironment context) { - DataSet> edgeDataSet = edges.map( - new MapFunction, Edge>() { + DataSet> edgeDataSet = edges + .map(new Tuple2ToEdgeMap()) + .name("To Edge"); - public Edge map(Tuple2 input) { - return new Edge<>(input.f0, input.f1, NullValue.getInstance()); - } - }).withForwardedFields("f0; f1"); return fromDataSet(edgeDataSet, vertexValueInitializer, context); } @@ -458,10 +481,13 @@ public DataSet> getEdgesAsTuple3() { * @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue) */ public DataSet> getTriplets() { - return this.getVertices().join(this.getEdges()).where(0).equalTo(0) - .with(new ProjectEdgeWithSrcValue()) - .join(this.getVertices()).where(1).equalTo(0) - .with(new ProjectEdgeWithVertexValues()); + return this.getVertices() + .join(this.getEdges()).where(0).equalTo(0) + .with(new ProjectEdgeWithSrcValue()) + .name("Project edge with source value") + .join(this.getVertices()).where(1).equalTo(0) + .with(new ProjectEdgeWithVertexValues()) + .name("Project edge with vertex values"); } @ForwardedFieldsFirst("f1->f2") @@ -521,12 +547,17 @@ public Graph mapVertices(final MapFunction, NV> ma public Graph mapVertices(final MapFunction, NV> mapper, TypeInformation> returnType) { DataSet> mappedVertices = vertices.map( new MapFunction, Vertex>() { + private Vertex output = new Vertex<>(); + public Vertex map(Vertex value) throws Exception { - return new Vertex<>(value.f0, mapper.map(value)); + output.f0 = value.f0; + output.f1 = mapper.map(value); + return output; } }) .returns(returnType) - .withForwardedFields("f0"); + .withForwardedFields("f0") + .name("Map vertices"); return new Graph<>(mappedVertices, this.edges, this.context); } @@ -550,6 +581,32 @@ public Graph mapEdges(final MapFunction, NV> mapper) return mapEdges(mapper, returnType); } + /** + * Apply a function to the attribute of each edge in the graph. + * + * @param mapper the map function to apply. + * @param returnType the explicit return type. + * @return a new graph + */ + public Graph mapEdges(final MapFunction, NV> mapper, TypeInformation> returnType) { + DataSet> mappedEdges = edges.map( + new MapFunction, Edge>() { + private Edge output = new Edge<>(); + + public Edge map(Edge value) throws Exception { + output.f0 = value.f0; + output.f1 = value.f1; + output.f2 = mapper.map(value); + return output; + } + }) + .returns(returnType) + .withForwardedFields("f0; f1") + .name("Map edges"); + + return new Graph<>(this.vertices, mappedEdges, this.context); + } + /** * Translate {@link Vertex} and {@link Edge} IDs using the given {@link MapFunction}. * @@ -586,26 +643,6 @@ public Graph translateEdgeValues(TranslateFunction tr return run(new TranslateEdgeValues(translator)); } - /** - * Apply a function to the attribute of each edge in the graph. - * - * @param mapper the map function to apply. - * @param returnType the explicit return type. - * @return a new graph - */ - public Graph mapEdges(final MapFunction, NV> mapper, TypeInformation> returnType) { - DataSet> mappedEdges = edges.map( - new MapFunction, Edge>() { - public Edge map(Edge value) throws Exception { - return new Edge<>(value.f0, value.f1, mapper.map(value)); - } - }) - .returns(returnType) - .withForwardedFields("f0; f1"); - - return new Graph<>(this.vertices, mappedEdges, this.context); - } - /** * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies * a user-defined transformation on the values of the matched records. @@ -627,7 +664,8 @@ public Graph joinWithVertices(DataSet> inputDataSet, DataSet> resultedVertices = this.getVertices() .coGroup(inputDataSet).where(0).equalTo(0) - .with(new ApplyCoGroupToVertexValues(vertexJoinFunction)); + .with(new ApplyCoGroupToVertexValues(vertexJoinFunction)) + .name("Join with vertices"); return new Graph<>(resultedVertices, this.edges, this.context); } @@ -680,12 +718,14 @@ public Graph joinWithEdges(DataSet> inputDataSet, DataSet> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(0, 1).equalTo(0, 1) - .with(new ApplyCoGroupToEdgeValues(edgeJoinFunction)); + .with(new ApplyCoGroupToEdgeValues(edgeJoinFunction)) + .name("Join with edges"); return new Graph<>(this.vertices, resultedEdges, this.context); } private static final class ApplyCoGroupToEdgeValues - implements CoGroupFunction, Tuple3, Edge> { + implements CoGroupFunction, Tuple3, Edge> { + private Edge output = new Edge<>(); private EdgeJoinFunction edgeJoinFunction; @@ -704,9 +744,10 @@ public void coGroup(Iterable> edges, Iterable> input if (inputIterator.hasNext()) { final Tuple3 inputNext = inputIterator.next(); - collector.collect(new Edge<>(inputNext.f0, - inputNext.f1, edgeJoinFunction.edgeJoin( - edgesIterator.next().f2, inputNext.f2))); + output.f0 = inputNext.f0; + output.f1 = inputNext.f1; + output.f2 = edgeJoinFunction.edgeJoin(edgesIterator.next().f2, inputNext.f2); + collector.collect(output); } else { collector.collect(edgesIterator.next()); } @@ -734,13 +775,15 @@ public Graph joinWithEdgesOnSource(DataSet> inputDat DataSet> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(0).equalTo(0) - .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(edgeJoinFunction)); + .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(edgeJoinFunction)) + .name("Join with edges on source"); return new Graph<>(this.vertices, resultedEdges, this.context); } private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget - implements CoGroupFunction, Tuple2, Edge> { + implements CoGroupFunction, Tuple2, Edge> { + private Edge output = new Edge<>(); private EdgeJoinFunction edgeJoinFunction; @@ -749,8 +792,8 @@ public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(EdgeJoinFunction ma } @Override - public void coGroup(Iterable> edges, - Iterable> input, Collector> collector) throws Exception { + public void coGroup(Iterable> edges, Iterable> input, + Collector> collector) throws Exception { final Iterator> edgesIterator = edges.iterator(); final Iterator> inputIterator = input.iterator(); @@ -761,8 +804,10 @@ public void coGroup(Iterable> edges, while (edgesIterator.hasNext()) { Edge edgesNext = edgesIterator.next(); - collector.collect(new Edge<>(edgesNext.f0, - edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1))); + output.f0 = edgesNext.f0; + output.f1 = edgesNext.f1; + output.f2 = edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1); + collector.collect(output); } } else { @@ -793,7 +838,8 @@ public Graph joinWithEdgesOnTarget(DataSet> inputDat DataSet> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(1).equalTo(0) - .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(edgeJoinFunction)); + .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(edgeJoinFunction)) + .name("Join with edges on target"); return new Graph<>(this.vertices, resultedEdges, this.context); } @@ -813,7 +859,7 @@ public Graph subgraph(FilterFunction> vertexFilter, Fil DataSet> remainingEdges = this.edges.join(filteredVertices) .where(0).equalTo(0).with(new ProjectEdge()) .join(filteredVertices).where(1).equalTo(0) - .with(new ProjectEdge()); + .with(new ProjectEdge()).name("Subgraph"); DataSet> filteredEdges = remainingEdges.filter(edgeFilter); @@ -834,7 +880,7 @@ public Graph filterOnVertices(FilterFunction> vertexFil DataSet> remainingEdges = this.edges.join(filteredVertices) .where(0).equalTo(0).with(new ProjectEdge()) .join(filteredVertices).where(1).equalTo(0) - .with(new ProjectEdge()); + .with(new ProjectEdge()).name("Filter on vertices"); return new Graph<>(filteredVertices, remainingEdges, this.context); } @@ -847,7 +893,7 @@ public Graph filterOnVertices(FilterFunction> vertexFil * @return the resulting sub-graph. */ public Graph filterOnEdges(FilterFunction> edgeFilter) { - DataSet> filteredEdges = this.edges.filter(edgeFilter); + DataSet> filteredEdges = this.edges.filter(edgeFilter).name("Filter on edges"); return new Graph<>(this.vertices, filteredEdges, this.context); } @@ -867,7 +913,8 @@ public void join(Edge first, Vertex second, Collector> */ public DataSet> outDegrees() { - return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup()); + return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup()) + .name("Out-degree"); } private static final class CountNeighborsCoGroup @@ -903,7 +950,8 @@ public void coGroup(Iterable> vertex, Iterable> outEdg */ public DataSet> inDegrees() { - return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup()); + return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup()) + .name("In-degree"); } /** @@ -912,7 +960,9 @@ public DataSet> inDegrees() { * @return A DataSet of {@code Tuple2} */ public DataSet> getDegrees() { - return outDegrees().union(inDegrees()).groupBy(0).sum(1); + return outDegrees() + .union(inDegrees()).name("In- and out-degree") + .groupBy(0).sum(1).name("Sum"); } /** @@ -922,7 +972,8 @@ public DataSet> getDegrees() { */ public Graph getUndirected() { - DataSet> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap()); + DataSet> undirectedEdges = edges. + flatMap(new RegularAndReversedEdgesMap()).name("To undirected graph"); return new Graph<>(vertices, undirectedEdges, this.context); } @@ -946,13 +997,15 @@ public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue(edgesFunction)); + .with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on in-edges"); case OUT: return vertices.coGroup(edges).where(0).equalTo(0) - .with(new ApplyCoGroupFunction<>(edgesFunction)); + .with(new ApplyCoGroupFunction<>(edgesFunction)).name("GroupReduce on out-edges"); case ALL: - return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode())) - .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)); + return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode()) + .name("Emit edge")) + .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)) + .name("GroupReduce on in- and out-edges"); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -979,13 +1032,17 @@ public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue(edgesFunction)).returns(typeInfo); + .with(new ApplyCoGroupFunction<>(edgesFunction)) + .name("GroupReduce on in-edges").returns(typeInfo); case OUT: return vertices.coGroup(edges).where(0).equalTo(0) - .with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo); + .with(new ApplyCoGroupFunction<>(edgesFunction)) + .name("GroupReduce on out-edges").returns(typeInfo); case ALL: - return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode())) - .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)).returns(typeInfo); + return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode()) + .name("Emit edge")) + .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)) + .name("GroupReduce on in- and out-edges").returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1011,15 +1068,18 @@ public DataSet groupReduceOnEdges(EdgesFunction edgesFunction, switch (direction) { case IN: return edges.map(new ProjectVertexIdMap(1)) - .withForwardedFields("f1->f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)); + .withForwardedFields("f1->f0").name("Vertex ID") + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) + .name("GroupReduce on in-edges"); case OUT: return edges.map(new ProjectVertexIdMap(0)) - .withForwardedFields("f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)); + .withForwardedFields("f0").name("Vertex ID") + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) + .name("GroupReduce on out-edges"); case ALL: - return edges.flatMap(new EmitOneEdgePerNode()) - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)); + return edges.flatMap(new EmitOneEdgePerNode()).name("Emit edge") + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) + .name("GroupReduce on in- and out-edges"); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1045,16 +1105,19 @@ public DataSet groupReduceOnEdges(EdgesFunction edgesFunction, switch (direction) { case IN: - return edges.map(new ProjectVertexIdMap(1)) + return edges.map(new ProjectVertexIdMap(1)).name("Vertex ID") .withForwardedFields("f1->f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) + .name("GroupReduce on in-edges").returns(typeInfo); case OUT: - return edges.map(new ProjectVertexIdMap(0)) + return edges.map(new ProjectVertexIdMap(0)).name("Vertex ID") .withForwardedFields("f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) + .name("GroupReduce on out-edges").returns(typeInfo); case ALL: - return edges.flatMap(new EmitOneEdgePerNode()) - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo); + return edges.flatMap(new EmitOneEdgePerNode()).name("Emit edge") + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)) + .name("GroupReduce on in- and out-edges").returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1220,20 +1283,25 @@ public TypeInformation getProducedType() { @ForwardedFields("f0->f1; f1->f0; f2") private static final class ReverseEdgesMap - implements MapFunction, Edge> { + implements MapFunction, Edge> { + public Edge output = new Edge<>(); - public Edge map(Edge value) { - return new Edge<>(value.f1, value.f0, value.f2); + public Edge map(Edge edge) { + output.setFields(edge.f1, edge.f0, edge.f2); + return output; } } private static final class RegularAndReversedEdgesMap - implements FlatMapFunction, Edge> { + implements FlatMapFunction, Edge> { + public Edge output = new Edge<>(); @Override public void flatMap(Edge edge, Collector> out) throws Exception { - out.collect(new Edge<>(edge.f0, edge.f1, edge.f2)); - out.collect(new Edge<>(edge.f1, edge.f0, edge.f2)); + out.collect(edge); + + output.setFields(edge.f1, edge.f0, edge.f2); + out.collect(output); } } @@ -1244,7 +1312,7 @@ public void flatMap(Edge edge, Collector> out) throws Excepti * @throws UnsupportedOperationException */ public Graph reverse() throws UnsupportedOperationException { - DataSet> reversedEdges = edges.map(new ReverseEdgesMap()); + DataSet> reversedEdges = edges.map(new ReverseEdgesMap()).name("Reverse edges"); return new Graph<>(vertices, reversedEdges, this.context); } @@ -1266,7 +1334,7 @@ public long numberOfEdges() throws Exception { * @return The IDs of the vertices as DataSet */ public DataSet getVertexIds() { - return vertices.map(new ExtractVertexIDMapper()); + return vertices.map(new ExtractVertexIDMapper()).name("Vertex IDs"); } private static final class ExtractVertexIDMapper @@ -1281,7 +1349,7 @@ public K map(Vertex vertex) { * @return The IDs of the edges as DataSet */ public DataSet> getEdgeIds() { - return edges.map(new ExtractEdgeIDsMapper()); + return edges.map(new ExtractEdgeIDsMapper()).name("Edge IDs"); } @ForwardedFields("f0; f1") @@ -1317,7 +1385,7 @@ public Graph addVertex(final Vertex vertex) { public Graph addVertices(List> verticesToAdd) { // Add the vertices DataSet> newVertices = this.vertices.coGroup(this.context.fromCollection(verticesToAdd)) - .where(0).equalTo(0).with(new VerticesUnionCoGroup()); + .where(0).equalTo(0).with(new VerticesUnionCoGroup()).name("Add vertices"); return new Graph<>(newVertices, this.edges, this.context); } @@ -1371,9 +1439,9 @@ public Graph addEdges(List> newEdges) { DataSet> validNewEdges = this.getVertices().join(newEdgesDataSet) .where(0).equalTo(0) - .with(new JoinVerticesWithEdgesOnSrc()) + .with(new JoinVerticesWithEdgesOnSrc()).name("Join with source") .join(this.getVertices()).where(1).equalTo(0) - .with(new JoinWithVerticesOnTrg()); + .with(new JoinWithVerticesOnTrg()).name("Join with target"); return Graph.fromDataSet(this.vertices, this.edges.union(validNewEdges), this.context); } @@ -1435,14 +1503,14 @@ public Graph removeVertices(List> verticesToBeRemoved) private Graph removeVertices(DataSet> verticesToBeRemoved) { DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0) - .with(new VerticesRemovalCoGroup()); + .with(new VerticesRemovalCoGroup()).name("Remove vertices"); DataSet > newEdges = newVertices.join(getEdges()).where(0).equalTo(0) // if the edge source was removed, the edge will also be removed - .with(new ProjectEdgeToBeRemoved()) + .with(new ProjectEdgeToBeRemoved()).name("Edges to be removed") // if the edge target was removed, the edge will also be removed .join(newVertices).where(1).equalTo(0) - .with(new ProjectEdge()); + .with(new ProjectEdge()).name("Remove edges"); return new Graph<>(newVertices, newEdges, context); } @@ -1466,8 +1534,6 @@ public void coGroup(Iterable> vertex, Iterable> vert } } - - @ForwardedFieldsSecond("f0; f1; f2") private static final class ProjectEdgeToBeRemoved implements JoinFunction, Edge, Edge> { @Override @@ -1484,7 +1550,7 @@ public Edge join(Vertex vertex, Edge edge) throws Exception * the removed edges */ public Graph removeEdge(Edge edge) { - DataSet> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<>(edge)); + DataSet> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<>(edge)).name("Remove edge"); return new Graph<>(this.vertices, newEdges, this.context); } @@ -1512,7 +1578,7 @@ public boolean filter(Edge edge) { public Graph removeEdges(List> edgesToBeRemoved) { DataSet> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved)) - .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup()); + .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup()).name("Remove edges"); return new Graph<>(this.vertices, newEdges, context); } @@ -1538,8 +1604,18 @@ public void coGroup(Iterable> edge, Iterable> edgeToBeRe * @return a new graph */ public Graph union(Graph graph) { - DataSet> unionedVertices = graph.getVertices().union(this.getVertices()).distinct(); - DataSet> unionedEdges = graph.getEdges().union(this.getEdges()); + DataSet> unionedVertices = graph + .getVertices() + .union(this.getVertices()) + .name("Vertices") + .distinct() + .name("Vertices"); + + DataSet> unionedEdges = graph + .getEdges() + .union(this.getEdges()) + .name("Edges"); + return new Graph<>(unionedVertices, unionedEdges, this.context); } @@ -1603,8 +1679,9 @@ private DataSet> getDistinctEdgeIntersection(DataSet> ed public Edge join(Edge first, Edge second) throws Exception { return first; } - }).withForwardedFieldsFirst("*") - .distinct(); + }).withForwardedFieldsFirst("*").name("Intersect edges") + .distinct() + .name("Edges"); } /** @@ -1619,7 +1696,8 @@ private DataSet> getPairwiseEdgeIntersection(DataSet> ed .coGroup(edges) .where(0, 1, 2) .equalTo(0, 1, 2) - .with(new MatchingEdgeReducer()); + .with(new MatchingEdgeReducer()) + .name("Intersect edges"); } /** @@ -1827,27 +1905,27 @@ public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue pairs DataSet, Vertex>> edgesWithSources = edges - .join(this.vertices).where(0).equalTo(0); + .join(this.vertices).where(0).equalTo(0).name("Edge with source vertex"); return vertices.coGroup(edgesWithSources) .where(0).equalTo("f0.f1") - .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).name("Neighbors function"); case OUT: // create pairs DataSet, Vertex>> edgesWithTargets = edges - .join(this.vertices).where(1).equalTo(0); + .join(this.vertices).where(1).equalTo(0).name("Edge with target vertex"); return vertices.coGroup(edgesWithTargets) .where(0).equalTo("f0.f0") - .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).name("Neighbors function"); case ALL: // create pairs DataSet, Vertex>> edgesWithNeighbors = edges - .flatMap(new EmitOneEdgeWithNeighborPerNode()) + .flatMap(new EmitOneEdgeWithNeighborPerNode()).name("Forward and reverse edges") .join(this.vertices).where(1).equalTo(0) - .with(new ProjectEdgeWithNeighbor()); + .with(new ProjectEdgeWithNeighbor()).name("Edge with vertex"); return vertices.coGroup(edgesWithNeighbors) .where(0).equalTo(0) - .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)); + .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).name("Neighbors function"); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1875,27 +1953,30 @@ public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue pairs DataSet, Vertex>> edgesWithSources = edges - .join(this.vertices).where(0).equalTo(0); + .join(this.vertices).where(0).equalTo(0).name("Edge with source vertex"); return vertices.coGroup(edgesWithSources) .where(0).equalTo("f0.f1") - .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)) + .name("Neighbors function").returns(typeInfo); case OUT: // create pairs DataSet, Vertex>> edgesWithTargets = edges - .join(this.vertices).where(1).equalTo(0); + .join(this.vertices).where(1).equalTo(0).name("Edge with target vertex"); return vertices.coGroup(edgesWithTargets) .where(0).equalTo("f0.f0") - .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)) + .name("Neighbors function").returns(typeInfo); case ALL: // create pairs DataSet, Vertex>> edgesWithNeighbors = edges - .flatMap(new EmitOneEdgeWithNeighborPerNode()) + .flatMap(new EmitOneEdgeWithNeighborPerNode()).name("Forward and reverse edges") .join(this.vertices).where(1).equalTo(0) - .with(new ProjectEdgeWithNeighbor()); + .with(new ProjectEdgeWithNeighbor()).name("Edge with vertex"); return vertices.coGroup(edgesWithNeighbors) .where(0).equalTo(0) - .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).returns(typeInfo); + .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)) + .name("Neighbors function").returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1924,26 +2005,26 @@ public DataSet groupReduceOnNeighbors(NeighborsFunction nei DataSet, Vertex>> edgesWithSources = edges .join(this.vertices).where(0).equalTo(0) .with(new ProjectVertexIdJoin(1)) - .withForwardedFieldsFirst("f1->f0"); + .withForwardedFieldsFirst("f1->f0").name("Edge with source vertex ID"); return edgesWithSources.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<>(neighborsFunction)); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function"); case OUT: // create pairs DataSet, Vertex>> edgesWithTargets = edges .join(this.vertices).where(1).equalTo(0) .with(new ProjectVertexIdJoin(0)) - .withForwardedFieldsFirst("f0"); + .withForwardedFieldsFirst("f0").name("Edge with target vertex ID"); return edgesWithTargets.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<>(neighborsFunction)); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function"); case ALL: // create pairs DataSet, Vertex>> edgesWithNeighbors = edges - .flatMap(new EmitOneEdgeWithNeighborPerNode()) + .flatMap(new EmitOneEdgeWithNeighborPerNode()).name("Forward and reverse edges") .join(this.vertices).where(1).equalTo(0) - .with(new ProjectEdgeWithNeighbor()); + .with(new ProjectEdgeWithNeighbor()).name("Edge with vertex ID"); return edgesWithNeighbors.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<>(neighborsFunction)); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).name("Neighbors function"); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1973,26 +2054,29 @@ public DataSet groupReduceOnNeighbors(NeighborsFunction nei DataSet, Vertex>> edgesWithSources = edges .join(this.vertices).where(0).equalTo(0) .with(new ProjectVertexIdJoin(1)) - .withForwardedFieldsFirst("f1->f0"); + .withForwardedFieldsFirst("f1->f0").name("Edge with source vertex ID"); return edgesWithSources.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)) + .name("Neighbors function").returns(typeInfo); case OUT: // create pairs DataSet, Vertex>> edgesWithTargets = edges .join(this.vertices).where(1).equalTo(0) .with(new ProjectVertexIdJoin(0)) - .withForwardedFieldsFirst("f0"); + .withForwardedFieldsFirst("f0").name("Edge with target vertex ID"); return edgesWithTargets.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)) + .name("Neighbors function").returns(typeInfo); case ALL: // create pairs DataSet, Vertex>> edgesWithNeighbors = edges .flatMap(new EmitOneEdgeWithNeighborPerNode()) .join(this.vertices).where(1).equalTo(0) - .with(new ProjectEdgeWithNeighbor()); + .with(new ProjectEdgeWithNeighbor()).name("Edge with vertex ID"); return edgesWithNeighbors.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)) + .name("Neighbors function").returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -2170,26 +2254,26 @@ public DataSet> reduceOnNeighbors(ReduceNeighborsFunction redu final DataSet> verticesWithSourceNeighborValues = edges .join(this.vertices).where(0).equalTo(0) .with(new ProjectVertexWithNeighborValueJoin(1)) - .withForwardedFieldsFirst("f1->f0"); + .withForwardedFieldsFirst("f1->f0").name("Vertex with in-neighbor value"); return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction( - reduceNeighborsFunction)); + reduceNeighborsFunction)).name("Neighbors function"); case OUT: // create pairs DataSet> verticesWithTargetNeighborValues = edges .join(this.vertices).where(1).equalTo(0) .with(new ProjectVertexWithNeighborValueJoin(0)) - .withForwardedFieldsFirst("f0"); + .withForwardedFieldsFirst("f0").name("Vertex with out-neighbor value"); return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction( - reduceNeighborsFunction)); + reduceNeighborsFunction)).name("Neighbors function"); case ALL: // create pairs DataSet> verticesWithNeighborValues = edges .flatMap(new EmitOneEdgeWithNeighborPerNode()) .join(this.vertices).where(1).equalTo(0) - .with(new ProjectNeighborValue()); + .with(new ProjectNeighborValue()).name("Vertex with neighbor value"); return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction( - reduceNeighborsFunction)); + reduceNeighborsFunction)).name("Neighbors function"); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -2231,15 +2315,21 @@ public DataSet> reduceOnEdges(ReduceEdgesFunction reduceEdgesF case IN: return edges.map(new ProjectVertexWithEdgeValueMap(1)) .withForwardedFields("f1->f0") - .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)); + .name("Vertex with in-edges") + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) + .name("Reduce on edges"); case OUT: return edges.map(new ProjectVertexWithEdgeValueMap(0)) .withForwardedFields("f0->f0") - .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)); + .name("Vertex with out-edges") + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) + .name("Reduce on edges"); case ALL: return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode()) .withForwardedFields("f2->f1") - .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)); + .name("Vertex with all edges") + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) + .name("Reduce on edges"); default: throw new IllegalArgumentException("Illegal edge direction"); } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java index 4859e12f5f689..6547f9a8221c3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java @@ -20,12 +20,14 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.Path; +import org.apache.flink.graph.utils.Tuple2ToEdgeMap; +import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.types.NullValue; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.util.Preconditions; /** @@ -44,9 +46,6 @@ public class GraphCsvReader { protected CsvReader edgeReader; protected CsvReader vertexReader; protected MapFunction mapper; - protected Class vertexKey; - protected Class vertexValue; - protected Class edgeValue; //-------------------------------------------------------------------------------------------------------------------- public GraphCsvReader(Path vertexPath, Path edgePath, ExecutionEnvironment context) { @@ -104,17 +103,18 @@ public GraphCsvReader(String edgePath, final MapFunction mapper, public Graph types(Class vertexKey, Class vertexValue, Class edgeValue) { - DataSet> vertices; - if (edgeReader == null) { - throw new RuntimeException("The edges input file cannot be null!"); + throw new RuntimeException("The edge input file cannot be null!"); } DataSet> edges = edgeReader.types(vertexKey, vertexKey, edgeValue); // the vertex value can be provided by an input file or a user-defined mapper if (vertexReader != null) { - vertices = vertexReader.types(vertexKey, vertexValue); + DataSet> vertices = vertexReader + .types(vertexKey, vertexValue) + .name(GraphCsvReader.class.getName()); + return Graph.fromTupleDataSet(vertices, edges, executionContext); } else if (mapper != null) { @@ -135,10 +135,12 @@ else if (mapper != null) { public Graph edgeTypes(Class vertexKey, Class edgeValue) { if (edgeReader == null) { - throw new RuntimeException("The edges input file cannot be null!"); + throw new RuntimeException("The edge input file cannot be null!"); } - DataSet> edges = edgeReader.types(vertexKey, vertexKey, edgeValue); + DataSet> edges = edgeReader + .types(vertexKey, vertexKey, edgeValue) + .name(GraphCsvReader.class.getName()); return Graph.fromTupleDataSet(edges, executionContext); } @@ -151,20 +153,16 @@ public Graph edgeTypes(Class vertexKey, Class e public Graph keyType(Class vertexKey) { if (edgeReader == null) { - throw new RuntimeException("The edges input file cannot be null!"); + throw new RuntimeException("The edge input file cannot be null!"); } - DataSet> edges = edgeReader.types(vertexKey, vertexKey) - .map(new MapFunction, Tuple3>() { - - private static final long serialVersionUID = -2981792951286476970L; + DataSet> edges = edgeReader + .types(vertexKey, vertexKey) + .name(GraphCsvReader.class.getName()) + .map(new Tuple2ToEdgeMap()) + .name("Type conversion"); - public Tuple3 map(Tuple2 edge) { - return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance()); - } - }).withForwardedFields("f0;f1"); - - return Graph.fromTupleDataSet(edges, executionContext); + return Graph.fromDataSet(edges, executionContext); } /** @@ -178,28 +176,29 @@ public Tuple3 map(Tuple2 edge) { */ @SuppressWarnings({ "serial", "unchecked" }) public Graph vertexTypes(Class vertexKey, Class vertexValue) { - - DataSet> vertices; if (edgeReader == null) { - throw new RuntimeException("The edges input file cannot be null!"); + throw new RuntimeException("The edge input file cannot be null!"); } - DataSet> edges = edgeReader.types(vertexKey, vertexKey) - .map(new MapFunction, Tuple3>() { - - public Tuple3 map(Tuple2 input) { - return new Tuple3<>(input.f0, input.f1, NullValue.getInstance()); - } - }).withForwardedFields("f0;f1"); + DataSet> edges = edgeReader + .types(vertexKey, vertexKey) + .name(GraphCsvReader.class.getName()) + .map(new Tuple2ToEdgeMap()) + .name("To Edge"); // the vertex value can be provided by an input file or a user-defined mapper if (vertexReader != null) { - vertices = vertexReader.types(vertexKey, vertexValue); - return Graph.fromTupleDataSet(vertices, edges, executionContext); + DataSet> vertices = vertexReader + .types(vertexKey, vertexValue) + .name(GraphCsvReader.class.getName()) + .map(new Tuple2ToVertexMap()) + .name("Type conversion"); + + return Graph.fromDataSet(vertices, edges, executionContext); } else if (mapper != null) { - return Graph.fromTupleDataSet(edges, (MapFunction) mapper, executionContext); + return Graph.fromDataSet(edges, (MapFunction) mapper, executionContext); } else { throw new RuntimeException("Vertex values have to be specified through a vertices input file" diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java new file mode 100644 index 0000000000000..5eb828710d3cd --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToEdgeMap.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.types.NullValue; + +/** + * Create an Edge from a Tuple2. + * + * The new edge's value is set to {@link NullValue}. + * + * @param edge ID type + */ +@ForwardedFields("f0; f1") +public class Tuple2ToEdgeMap implements MapFunction, Edge> { + + private static final long serialVersionUID = 1L; + + private Edge edge = new Edge<>(null, null, NullValue.getInstance()); + + @Override + public Edge map(Tuple2 tuple) { + edge.f0 = tuple.f0; + edge.f1 = tuple.f1; + return edge; + } + +}