From d89addc0fd2e5e64de1667f0d701d901fa57c627 Mon Sep 17 00:00:00 2001 From: andralungu Date: Fri, 8 May 2015 19:01:53 +0200 Subject: [PATCH] [FLINK-1975][gelly] Graph getUndirected improvement --- .../src/main/java/org/apache/flink/graph/Graph.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 1c0052d5a8806..25c76e8355bf8 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -706,7 +706,7 @@ public DataSet> getDegrees() { */ public Graph getUndirected() { - DataSet> undirectedEdges = edges.union(edges.map(new ReverseEdgesMap())); + DataSet> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap()); return new Graph(vertices, undirectedEdges, this.context); } @@ -923,6 +923,16 @@ public Edge map(Edge value) { } } + private static final class RegularAndReversedEdgesMap & Serializable, EV extends Serializable> + implements FlatMapFunction, Edge> { + + @Override + public void flatMap(Edge edge, Collector> out) throws Exception { + out.collect(new Edge(edge.f0, edge.f1, edge.f2)); + out.collect(new Edge(edge.f1, edge.f0, edge.f2)); + } + } + /** * Reverse the direction of the edges in the graph *