diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index faf4d0b7ba149..39187e4dfa2be 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -18,11 +18,11 @@ package flink.graphs; + import org.apache.flink.api.common.functions.*; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; -import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -30,52 +30,44 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.io.CsvReader; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.spargel.java.VertexCentricIteration; -import org.apache.flink.util.Collector; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.util.Collector; import java.io.Serializable; -import java.util.Arrays; import java.util.Collection; -import java.util.List; @SuppressWarnings("serial") public class Graph & Serializable, VV extends Serializable, - EV extends Serializable> implements Serializable { - - private final ExecutionEnvironment context; + EV extends Serializable> implements Serializable{ private final DataSet> vertices; private final DataSet> edges; - /** a graph is directed by default */ - private boolean isUndirected = false; - - private static TypeInformation vertexKeyType; + private boolean isUndirected; + + private static TypeInformation keyType; private static TypeInformation vertexValueType; + private static TypeInformation edgeValueType; - public Graph(DataSet> vertices, DataSet> edges, ExecutionEnvironment context) { - this.vertices = vertices; - this.edges = edges; - this.context = context; - Graph.vertexKeyType = ((TupleTypeInfo) vertices.getType()).getTypeAt(0); - Graph.vertexValueType = ((TupleTypeInfo) vertices.getType()).getTypeAt(1); + public Graph(DataSet> vertices, DataSet> edges) { + + /** a graph is directed by default */ + this(vertices, edges, false); } - public Graph(DataSet> vertices, DataSet> edges, ExecutionEnvironment context, - boolean undirected) { + public Graph(DataSet> vertices, DataSet> edges, boolean undirected) { this.vertices = vertices; this.edges = edges; - this.context = context; this.isUndirected = undirected; - Graph.vertexKeyType = ((TupleTypeInfo) vertices.getType()).getTypeAt(0); + + Graph.keyType = ((TupleTypeInfo) vertices.getType()).getTypeAt(0); Graph.vertexValueType = ((TupleTypeInfo) vertices.getType()).getTypeAt(1); + Graph.edgeValueType = ((TupleTypeInfo) edges.getType()).getTypeAt(2); } public DataSet> getVertices() { @@ -114,7 +106,39 @@ public TypeInformation> getProducedType() { TypeInformation newVertexValueType = TypeExtractor.getMapReturnTypes(innerMapper, (TypeInformation)vertexValueType); - return new TupleTypeInfo>(vertexKeyType, newVertexValueType); + return new TupleTypeInfo>(keyType, newVertexValueType); + } + } + + /** + * Apply a function to the attribute of each edge in the graph. + * @param mapper + * @return + */ + public DataSet> mapEdges(final MapFunction mapper) { + return edges.map(new ApplyMapperToEdgeWithType(mapper)); + } + + private static final class ApplyMapperToEdgeWithType implements MapFunction + , Tuple3>, ResultTypeQueryable> { + + private MapFunction innerMapper; + + public ApplyMapperToEdgeWithType(MapFunction theMapper) { + this.innerMapper = theMapper; + } + + public Tuple3 map(Tuple3 value) throws Exception { + return new Tuple3(value.f0, value.f1, innerMapper.map(value.f2)); + } + + @Override + public TypeInformation> getProducedType() { + @SuppressWarnings("unchecked") + TypeInformation newEdgeValueType = TypeExtractor.getMapReturnTypes(innerMapper, + (TypeInformation)edgeValueType); + + return new TupleTypeInfo>(keyType, keyType, newEdgeValueType); } } @@ -140,7 +164,7 @@ public Graph subgraph(FilterFunction vertexFilter, FilterFunction DataSet> filteredEdges = remainingEdges.filter( new ApplyEdgeFilter(edgeFilter)); - return new Graph(filteredVertices, filteredEdges, this.context); + return new Graph(filteredVertices, filteredEdges); } @ConstantFieldsFirst("0->0;1->1;2->2") @@ -222,7 +246,7 @@ public Graph pga(CoGroupFunction, Tuple3> result = iteration.closeWith(a, a); - return new Graph<>(result, this.edges, this.context); + return new Graph<>(result, this.edges); } /** @@ -237,7 +261,7 @@ public Graph getUndirected() throws UnsupportedOperationException { else { DataSet> undirectedEdges = edges.union(edges.map(new ReverseEdgesMap())); - return new Graph(vertices, undirectedEdges, this.context, true); + return new Graph(vertices, undirectedEdges, true); } } @@ -261,14 +285,14 @@ public Graph reverse() throws UnsupportedOperationException { } else { DataSet> undirectedEdges = edges.map(new ReverseEdgesMap()); - return new Graph(vertices, (DataSet>) undirectedEdges, this.context, true); + return new Graph(vertices, (DataSet>) undirectedEdges, true); } } public static & Serializable, VV extends Serializable, EV extends Serializable> Graph - create(DataSet> vertices, DataSet> edges, ExecutionEnvironment context) { - return new Graph(vertices, edges, context); + create(DataSet> vertices, DataSet> edges) { + return new Graph(vertices, edges); } /** @@ -336,8 +360,7 @@ public Tuple3 map(Tuple3 value) throws Exception { public static & Serializable, VV extends Serializable, EV extends Serializable> Graph readGraphFromCsvFile(ExecutionEnvironment env, String Tuple2Filepath, char Tuple2Delimiter, String edgeFilepath, char edgeDelimiter, - Class Tuple2IdClass, Class Tuple2ValueClass, Class edgeValueClass, - ExecutionEnvironment context) { + Class Tuple2IdClass, Class Tuple2ValueClass, Class edgeValueClass) { CsvReader Tuple2Reader = new CsvReader(Tuple2Filepath, env); DataSet> vertices = Tuple2Reader.fieldDelimiter(Tuple2Delimiter) @@ -359,215 +382,23 @@ public Tuple3 map(Tuple3 value) throws Exception { } }); - return Graph.create(vertices, edges, context); - } - - /** - * @return Singleton DataSet containing the vertex count - */ - public DataSet numberOfVertices () { - return GraphUtils.count(vertices); - } - - /** - * - * @return Singleton DataSet containing the edge count - */ - public DataSet numberOfEdges () { - return GraphUtils.count(edges); - } - - /** - * - * @return The IDs of the vertices as DataSet - */ - public DataSet getVertexIds () { - return vertices.map(new ExtractVertexIDMapper()); - } - - private static final class ExtractVertexIDMapper implements MapFunction, K> { - @Override - public K map(Tuple2 vertex) throws Exception { - return vertex.f0; - } - } - - public DataSet> getEdgeIds () { - return edges.map(new ExtractEdgeIDsMapper()); - } - - private static final class ExtractEdgeIDsMapper implements MapFunction, Tuple2> { - @Override - public Tuple2 map(Tuple3 edge) throws Exception { - return new Tuple2(edge.f0, edge.f1); - } - } - - /** - * Checks the weak connectivity of a graph. - * @param maxIterations the maximum number of iterations for the inner delta iteration - * @return true if the graph is weakly connected. - */ - public DataSet isWeaklyConnected (int maxIterations) { - Graph graph; - - if (!(this.isUndirected)) { - // first, convert to an undirected graph - graph = this.getUndirected(); - } - else { - graph = this; - } - - DataSet vertexIds = graph.getVertexIds(); - DataSet> verticesWithInitialIds = vertexIds - .map(new DuplicateVertexIDMapper()); - - DataSet> edgeIds = graph.getEdgeIds(); - - DeltaIteration, Tuple2> iteration = verticesWithInitialIds - .iterateDelta(verticesWithInitialIds, maxIterations, 0); - - DataSet> changes = iteration.getWorkset() - .join(edgeIds).where(0).equalTo(0) - .with(new FindNeighborsJoin()) - .groupBy(0) - .aggregate(Aggregations.MIN, 1) - .join(iteration.getSolutionSet()).where(0).equalTo(0) - .with(new VertexWithNewComponentJoin()); - - DataSet> components = iteration.closeWith(changes, changes); - DataSet result = GraphUtils.count(components.groupBy(1).reduceGroup( - new EmitFirstReducer())).map(new CheckIfOneComponentMapper()); - return result; - } - - private static final class DuplicateVertexIDMapper implements MapFunction> { - @Override - public Tuple2 map(K k) { - return new Tuple2(k, k); - } - } - - private static final class FindNeighborsJoin implements JoinFunction, Tuple2, - Tuple2> { - @Override - public Tuple2 join(Tuple2 vertexWithComponent, Tuple2 edge) { - return new Tuple2(edge.f1, vertexWithComponent.f1); - } + return Graph.create(vertices, edges); } - private static final class VertexWithNewComponentJoin> - implements FlatJoinFunction, Tuple2, Tuple2> { - @Override - public void join(Tuple2 candidate, Tuple2 old, Collector> out) { - if (candidate.f1.compareTo(old.f1) < 0) { - out.collect(candidate); - } - } - } - - private static final class EmitFirstReducer implements - GroupReduceFunction, Tuple2> { - public void reduce(Iterable> values, Collector> out) { - out.collect(values.iterator().next()); - } - } - - private static final class CheckIfOneComponentMapper implements MapFunction { - @Override - public Boolean map(Integer n) { - return (n == 1); - } + /** + * Creates a graph from the given vertex and edge collections + * @param env + * @param v the collection of vertices + * @param e the collection of edges + * @return a new graph formed from the set of edges and vertices + */ + public static & Serializable, VV extends Serializable, + EV extends Serializable> Graph fromCollection(ExecutionEnvironment env, Collection> v, + Collection> e) throws Exception { + DataSet> vertices = env.fromCollection(v); + DataSet> edges = env.fromCollection(e); + + return Graph.create(vertices, edges); } - - public Graph fromCollection (Collection> vertices, Collection> edges) { - - DataSet> v = context.fromCollection(vertices); - DataSet> e = context.fromCollection(edges); - - return new Graph(v, e, context); - } - - //TODO kostas add functionality - public DataSet> fromCollection (Collection> vertices) { - return null; - } - - public Graph addVertex (Tuple2 vertex, List> edges) { - Graph newVertex = this.fromCollection(Arrays.asList(vertex), edges); - return this.union(newVertex); - } - public Graph removeVertex (Tuple2 vertex) { - - DataSet> vertexToRemove = context.fromCollection(Arrays.asList(vertex)); - - DataSet> newVertices = getVertices().filter( - new RemoveVertexFilter()).withBroadcastSet( - vertexToRemove, "vertexToRemove"); - - DataSet> newEdges = getEdges().filter( - new RemoveEdgeFilter()).withBroadcastSet( - vertexToRemove, "vertexToRemove"); - - return new Graph(newVertices, newEdges, this.context); - } - - private static final class RemoveVertexFilter extends RichFilterFunction> { - - private Tuple2 vertexToRemove; - - @SuppressWarnings("unchecked") - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.vertexToRemove = (Tuple2) getRuntimeContext().getBroadcastVariable("vertexToRemove").get(0); - } - - @Override - public boolean filter(Tuple2 vertex) throws Exception { - return !vertex.f0.equals(vertexToRemove.f0); - } - } - - private static final class RemoveEdgeFilter extends RichFilterFunction> { - - private Tuple2 vertexToRemove; - - @SuppressWarnings("unchecked") - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.vertexToRemove = (Tuple2) getRuntimeContext().getBroadcastVariable("vertexToRemove").get(0); - } - - @Override - public boolean filter(Tuple3 edge) throws Exception { - - if (edge.f0.equals(vertexToRemove.f0)) { - return false; - } - if (edge.f1.equals(vertexToRemove.f0)) { - return false; - } - return true; - } - } - - public Graph addEdge (Tuple3 edge, Tuple2 source, Tuple2 target) { - Graph newEdges = this.fromCollection(Arrays.asList(source, target), Arrays.asList(edge)); - return this.union(newEdges); - } - - public Graph union (Graph graph) { - DataSet> unionedVertices = graph.getVertices().union(this.getVertices()); - DataSet> unionedEdges = graph.getEdges().union(this.getEdges()); - return new Graph(unionedVertices, unionedEdges, this.context); - } - - public Graph passMessages (VertexCentricIteration iteration) { - DataSet> newVertices = iteration.createResult(); - return new Graph(newVertices, edges, this.context); - } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java index 971364baab910..f12bbdfd493c3 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java @@ -2,15 +2,11 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; -import java.util.List; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.JavaProgramTestBase; import org.junit.runner.RunWith; @@ -20,7 +16,7 @@ @RunWith(Parameterized.class) public class TestGraphOperations extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 8; + private static int NUM_PROGRAMS = 5; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -145,172 +141,22 @@ public boolean filter(Long value) throws Exception { } case 5: { /* - * Test numberOfVertices() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.numberOfVertices().writeAsText(resultPath); - - env.execute(); - return "5"; - } - case 6: { - /* - * Test numberOfEdges() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.numberOfEdges().writeAsText(resultPath); - - env.execute(); - return "7"; - } - case 7: { - /* - * Test getVertexIds() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.getVertexIds().writeAsText(resultPath); - - env.execute(); - return "1\n2\n3\n4\n5\n"; - } - case 8: { - /* - * Test getEdgeIds() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.getEdgeIds().writeAsCsv(resultPath); - - env.execute(); - return "1,2\n" + "1,3\n" + - "2,3\n" + "3,4\n" + - "3,5\n" + "4,5\n" + - "5,1\n"; - } - case 9: { - /* - * Test addVertex() - */ - + * Test fromCollection: + */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - List> edges = new ArrayList>(); - edges.add(new Tuple3(6L, 1L, 61L)); - - graph = graph.addVertex(new Tuple2(6L, 6L), edges); - - graph.getEdges().writeAsCsv(resultPath); - - env.execute(); - - return "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - - } - case 10: { - /* - * Test removeVertex() - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph = graph.removeVertex(new Tuple2(5L, 5L)); - - graph.getEdges().writeAsCsv(resultPath); - - env.execute(); - - return "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n"; - } - case 11: { - /* - * Test addEdge() - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph graph = Graph.fromCollection(env, TestGraphUtils.getLongLongVertices(env), + TestGraphUtils.getLongLongEdges(env)); - List> vertices = new ArrayList>(); - List> edges = new ArrayList>(); - - vertices.add(new Tuple2(6L, 6L)); - edges.add(new Tuple3(6L, 1L, 61L)); - - graph = graph.union(graph.fromCollection(vertices, edges)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - return "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } - case 12: { - /* - * Test union() - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - - graph = graph.addEdge(new Tuple3(6L, 1L, 61L), - new Tuple2(6L, 6L), new Tuple2(1L, 1L)); - - graph.getEdges().writeAsCsv(resultPath); - - env.execute(); - - return "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } - case 13: { - /* - * Test passMessages() - */ - } + "5,1,51\n"; + } default: throw new IllegalArgumentException("Invalid program id"); } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java index a4d50c26efa93..a2da77f8e4644 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java @@ -36,18 +36,39 @@ public static final DataSet> getLongLongEdgeData( return env.fromCollection(edges); } - - public static final DataSet> getDisconnectedLongLongEdgeData( + + /** + * Function that produces an ArrayList of vertices + */ + public static final List> getLongLongVertices( + ExecutionEnvironment env) { + List> vertices = new ArrayList<>(); + vertices.add(new Tuple2(1L, 1L)); + vertices.add(new Tuple2(2L, 2L)); + vertices.add(new Tuple2(3L, 3L)); + vertices.add(new Tuple2(4L, 4L)); + vertices.add(new Tuple2(5L, 5L)); + + return vertices; + } + + /** + * Function that produces an ArrayList of edges + */ + public static final List> getLongLongEdges( ExecutionEnvironment env) { List> edges = new ArrayList>(); edges.add(new Tuple3(1L, 2L, 12L)); edges.add(new Tuple3(1L, 3L, 13L)); edges.add(new Tuple3(2L, 3L, 23L)); + edges.add(new Tuple3(3L, 4L, 34L)); + edges.add(new Tuple3(3L, 5L, 35L)); edges.add(new Tuple3(4L, 5L, 45L)); - - return env.fromCollection(edges); + edges.add(new Tuple3(5L, 1L, 51L)); + + return edges; } - + public static class DummyCustomType implements Serializable { private static final long serialVersionUID = 1L;