Skip to content

Commit

Permalink
[FLINK-1201] [gelly] changed mapVertices to return a Graph and simpli…
Browse files Browse the repository at this point in the history
…fied SSSP
  • Loading branch information
vasia authored and StephanEwen committed Feb 11, 2015
1 parent 2815df3 commit f41b9ef
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 117 deletions.
Expand Up @@ -111,36 +111,35 @@ public DataSet<Edge<K, EV>> getEdges() {
/** /**
* Apply a function to the attribute of each vertex in the graph. * Apply a function to the attribute of each vertex in the graph.
* @param mapper * @param mapper
* @return * @return a new graph
*/ */
public <NV extends Serializable> DataSet<Vertex<K, NV>> mapVertices(final MapFunction<VV, NV> mapper) { public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
return vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper)); DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper));
return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context);
} }


private static final class ApplyMapperToVertexWithType<K extends Comparable<K> & Serializable, private static final class ApplyMapperToVertexWithType<K extends Comparable<K> & Serializable,
VV extends Serializable, NV extends Serializable> implements MapFunction VV extends Serializable, NV extends Serializable> implements MapFunction
<Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, NV>> { <Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, NV>> {


private MapFunction<VV, NV> innerMapper; private MapFunction<Vertex<K, VV>, NV> innerMapper;


public ApplyMapperToVertexWithType(MapFunction<VV, NV> theMapper) { public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper) {
this.innerMapper = theMapper; this.innerMapper = theMapper;
} }


public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception { public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
return new Vertex<K, NV>(value.f0, innerMapper.map(value.f1)); return new Vertex<K, NV>(value.f0, innerMapper.map(value));
} }


@Override @Override
public TypeInformation<Vertex<K, NV>> getProducedType() { public TypeInformation<Vertex<K, NV>> getProducedType() {
@SuppressWarnings("unchecked") TypeInformation<Vertex<K, VV>> vertextypeInfo = new TupleTypeInfo<Vertex<K, VV>>(keyType, vertexValueType);
TypeInformation<NV> newVertexValueType = TypeExtractor.getMapReturnTypes(innerMapper, TypeInformation<NV> newVertexValueType = TypeExtractor.getMapReturnTypes(innerMapper, vertextypeInfo);
(TypeInformation<VV>)vertexValueType);

return new TupleTypeInfo<Vertex<K, NV>>(keyType, newVertexValueType); return new TupleTypeInfo<Vertex<K, NV>>(keyType, newVertexValueType);
} }
} }

/** /**
* Apply a function to the attribute of each edge in the graph. * Apply a function to the attribute of each edge in the graph.
* @param mapper * @param mapper
Expand Down
Expand Up @@ -3,14 +3,13 @@
import flink.graphs.Edge; import flink.graphs.Edge;
import flink.graphs.Graph; import flink.graphs.Graph;
import flink.graphs.Vertex; import flink.graphs.Vertex;
import flink.graphs.example.utils.ExampleUtils;
import flink.graphs.library.SingleSourceShortestPaths; import flink.graphs.library.SingleSourceShortestPaths;

import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;


import java.util.ArrayList;
import java.util.List;

public class SingleSourceShortestPathsExample implements ProgramDescription { public class SingleSourceShortestPathsExample implements ProgramDescription {


private static int maxIterations = 5; private static int maxIterations = 5;
Expand All @@ -19,54 +18,24 @@ public static void main (String [] args) throws Exception {


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


DataSet<Vertex<Long,Double>> vertices = getLongDoubleVertexData(env); DataSet<Vertex<Long, Double>> vertices = ExampleUtils.getLongDoubleVertexData(env);


DataSet<Edge<Long,Double>> edges = getLongDoubleEdgeData(env); DataSet<Edge<Long, Double>> edges = ExampleUtils.getLongDoubleEdgeData(env);


Long srcVertexId = 1L; Long srcVertexId = 1L;


Graph<Long, Double, Double> graph = Graph.create(vertices, edges, env); Graph<Long, Double, Double> graph = Graph.create(vertices, edges, env);


DataSet<Vertex<Long,Double>> singleSourceShortestPaths = DataSet<Vertex<Long,Double>> singleSourceShortestPaths =
graph.run(new SingleSourceShortestPaths(srcVertexId, maxIterations)).getVertices(); graph.run(new SingleSourceShortestPaths<Long>(srcVertexId, maxIterations)).getVertices();


singleSourceShortestPaths.print(); singleSourceShortestPaths.print();


env.execute(); env.execute();
} }



@Override @Override
public String getDescription() { public String getDescription() {
return "Single Source Shortest Paths"; return "Single Source Shortest Paths";
} }

@SuppressWarnings("serial")
public static final DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
ExecutionEnvironment env) {
List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
vertices.add(new Vertex(1L, 1.0));
vertices.add(new Vertex(2L, 2.0));
vertices.add(new Vertex(3L, 3.0));
vertices.add(new Vertex(4L, 4.0));
vertices.add(new Vertex(5L, 5.0));

return env.fromCollection(vertices);
}

@SuppressWarnings("serial")
public static final DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
ExecutionEnvironment env) {
List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
edges.add(new Edge(1L, 2L, 12.0));
edges.add(new Edge(1L, 3L, 13.0));
edges.add(new Edge(2L, 3L, 23.0));
edges.add(new Edge(3L, 4L, 34.0));
edges.add(new Edge(3L, 5L, 35.0));
edges.add(new Edge(4L, 5L, 45.0));
edges.add(new Edge(5L, 1L, 51.0));

return env.fromCollection(edges);
}

} }
@@ -1,6 +1,8 @@
package flink.graphs.example.utils; package flink.graphs.example.utils;


import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;


import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -103,5 +105,31 @@ public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Excep
} }
}); });
} }

public static final DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
ExecutionEnvironment env) {
List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
vertices.add(new Vertex<Long, Double>(1L, 1.0));
vertices.add(new Vertex<Long, Double>(2L, 2.0));
vertices.add(new Vertex<Long, Double>(3L, 3.0));
vertices.add(new Vertex<Long, Double>(4L, 4.0));
vertices.add(new Vertex<Long, Double>(5L, 5.0));

return env.fromCollection(vertices);
}

public static final DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
ExecutionEnvironment env) {
List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
edges.add(new Edge<Long, Double>(5L, 1L, 51.0));

return env.fromCollection(edges);
}
} }


@@ -1,18 +1,16 @@
package flink.graphs.library; package flink.graphs.library;


import flink.graphs.*; import flink.graphs.*;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.spargel.java.MessageIterator; import org.apache.flink.spargel.java.MessageIterator;
import org.apache.flink.spargel.java.MessagingFunction; import org.apache.flink.spargel.java.MessagingFunction;
import org.apache.flink.spargel.java.OutgoingEdge; import org.apache.flink.spargel.java.OutgoingEdge;
import org.apache.flink.spargel.java.VertexUpdateFunction; import org.apache.flink.spargel.java.VertexUpdateFunction;


import java.io.Serializable; import java.io.Serializable;


@SuppressWarnings("serial")
public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Double, Double> { public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Double, Double> {


private final K srcVertexId; private final K srcVertexId;
Expand All @@ -25,29 +23,39 @@ public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {


@Override @Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
DataSet<Vertex<K, Double>> sourceVertex = input.getVertices().filter(
new SelectVertex<K>(srcVertexId));

DataSet<Vertex<K, Double>> verticesWithInitialDistance = sourceVertex.cross(input.getVertices())
.map(new InitSrcVertex<K>());

Graph<K, Double, Double> graph = Graph.create(verticesWithInitialDistance, input.getEdges(),
ExecutionEnvironment.getExecutionEnvironment());


return graph.runVertexCentricIteration( return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
.runVertexCentricIteration(
new VertexDistanceUpdater<K>(), new VertexDistanceUpdater<K>(),
new MinDistanceMessenger<K>(), new MinDistanceMessenger<K>(),
maxIterations maxIterations
); );
} }


public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
implements MapFunction<Vertex<K,Double>, Double> {

private K srcVertexId;

public InitVerticesMapper(K srcId) {
this.srcVertexId = srcId;
}

public Double map(Vertex<K, Double> value) {
if (value.f0.equals(srcVertexId)) {
return 0.0;
}
else {
return Double.MAX_VALUE;
}
}
}


/** /**
* Function that updates the value of a vertex by picking the minimum distance from all incoming messages. * Function that updates the value of a vertex by picking the minimum distance from all incoming messages.
* *
* @param <K> * @param <K>
*/ */
@SuppressWarnings("serial")
public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable> public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable>
extends VertexUpdateFunction<K, Double, Double> { extends VertexUpdateFunction<K, Double, Double> {


Expand Down Expand Up @@ -76,7 +84,6 @@ public void updateVertex(K vertexKey, Double vertexValue, MessageIterator<Double
* *
* @param <K> * @param <K>
*/ */
@SuppressWarnings("serial")
public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable> public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable>
extends MessagingFunction<K, Double, Double, Double> { extends MessagingFunction<K, Double, Double, Double> {


Expand All @@ -87,32 +94,4 @@ public void sendMessages(K vertexKey, Double newDistance) throws Exception {
} }
} }
} }

private static final class SelectVertex<K extends Comparable<K> & Serializable>
implements FilterFunction<Vertex<K, Double>> {
private K id;

public SelectVertex(K id) {
this.id = id;
}

@Override
public boolean filter(Vertex<K, Double> vertex) throws Exception {
return vertex.getId().equals(id);
}
}

private static final class InitSrcVertex<K extends Comparable<K> & Serializable>
implements MapFunction<Tuple2<Vertex<K, Double>, Vertex<K,Double>>, Vertex<K,Double>> {

@Override
public Vertex<K, Double> map(Tuple2<Vertex<K, Double>, Vertex<K, Double>> vertexVertexTuple2) throws Exception {
if(vertexVertexTuple2.f0.f0.equals(vertexVertexTuple2.f1.f0)) {
return new Vertex<>(vertexVertexTuple2.f0.f0, 0.0);
} else {
return new Vertex<>(vertexVertexTuple2.f1.f0, Double.MAX_VALUE);
}
}
}

} }
Expand Up @@ -75,11 +75,11 @@ public static String runProgram(int progId, String resultPath) throws Exception
Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env)); TestGraphUtils.getLongLongEdgeData(env));


DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new MapFunction<Long, Long>() { DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Long>() {
public Long map(Long value) throws Exception { public Long map(Vertex<Long, Long> value) throws Exception {
return value+1; return value.getValue()+1;
} }
}); }).getVertices();


mappedVertices.writeAsCsv(resultPath); mappedVertices.writeAsCsv(resultPath);
env.execute(); env.execute();
Expand All @@ -98,31 +98,30 @@ public Long map(Long value) throws Exception {
Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env)); TestGraphUtils.getLongLongEdgeData(env));


DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new MapFunction<Long, String>() { DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, String>() {
public String map(Long value) throws Exception { public String map(Vertex<Long, Long> vertex) throws Exception {
String stringValue; String stringValue;
if (value == 1) { if (vertex.getValue() == 1) {
stringValue = "one"; stringValue = "one";
} }
else if (value == 2) { else if (vertex.getValue() == 2) {
stringValue = "two"; stringValue = "two";
} }
else if (value == 3) { else if (vertex.getValue() == 3) {
stringValue = "three"; stringValue = "three";
} }
else if (value == 4) { else if (vertex.getValue() == 4) {
stringValue = "four"; stringValue = "four";
} }
else if (value == 5) { else if (vertex.getValue() == 5) {
stringValue = "five"; stringValue = "five";
} }
else { else {
stringValue = ""; stringValue = "";
} }

return stringValue; return stringValue;
} }
}); }).getVertices();


mappedVertices.writeAsCsv(resultPath); mappedVertices.writeAsCsv(resultPath);
env.execute(); env.execute();
Expand All @@ -141,13 +140,13 @@ else if (value == 5) {
Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env)); TestGraphUtils.getLongLongEdgeData(env));


DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new MapFunction<Long, Tuple1<Long>>() { DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Tuple1<Long>>() {
public Tuple1<Long> map(Long value) throws Exception { public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
Tuple1<Long> tupleValue = new Tuple1<Long>(); Tuple1<Long> tupleValue = new Tuple1<Long>();
tupleValue.setFields(value); tupleValue.setFields(vertex.getValue());
return tupleValue; return tupleValue;
} }
}); }).getVertices();


mappedVertices.writeAsCsv(resultPath); mappedVertices.writeAsCsv(resultPath);
env.execute(); env.execute();
Expand All @@ -166,13 +165,13 @@ public Tuple1<Long> map(Long value) throws Exception {
Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env)); TestGraphUtils.getLongLongEdgeData(env));


DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new MapFunction<Long, DummyCustomType>() { DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, DummyCustomType>() {
public DummyCustomType map(Long value) throws Exception { public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
DummyCustomType dummyValue = new DummyCustomType(); DummyCustomType dummyValue = new DummyCustomType();
dummyValue.setIntField(value.intValue()); dummyValue.setIntField(vertex.getValue().intValue());
return dummyValue; return dummyValue;
} }
}); }).getVertices();


mappedVertices.writeAsCsv(resultPath); mappedVertices.writeAsCsv(resultPath);
env.execute(); env.execute();
Expand All @@ -192,14 +191,14 @@ public DummyCustomType map(Long value) throws Exception {
TestGraphUtils.getLongLongEdgeData(env)); TestGraphUtils.getLongLongEdgeData(env));


DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices( DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
new MapFunction<Long, DummyCustomParameterizedType<Double>>() { new MapFunction<Vertex<Long, Long>, DummyCustomParameterizedType<Double>>() {
public DummyCustomParameterizedType<Double> map(Long value) throws Exception { public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
dummyValue.setIntField(value.intValue()); dummyValue.setIntField(vertex.getValue().intValue());
dummyValue.setTField(new Double(value)); dummyValue.setTField(new Double(vertex.getValue()));
return dummyValue; return dummyValue;
} }
}); }).getVertices();


mappedVertices.writeAsCsv(resultPath); mappedVertices.writeAsCsv(resultPath);
env.execute(); env.execute();
Expand All @@ -214,5 +213,5 @@ public DummyCustomParameterizedType<Double> map(Long value) throws Exception {
} }
} }
} }

} }

0 comments on commit f41b9ef

Please sign in to comment.