Skip to content

Commit

Permalink
[FLINK-1201] [gelly] added execution environment to create calls in test
Browse files Browse the repository at this point in the history
  • Loading branch information
vasia authored and StephanEwen committed Feb 11, 2015
1 parent c018c9f commit 65d54c1
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 56 deletions.
Expand Up @@ -18,54 +18,53 @@


package flink.graphs; package flink.graphs;



import org.apache.flink.api.common.functions.*; import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.spargel.java.VertexCentricIteration;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;


import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;




@SuppressWarnings("serial") @SuppressWarnings("serial")
public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable> implements Serializable{ EV extends Serializable> implements Serializable{


private final ExecutionEnvironment context;

private final DataSet<Tuple2<K, VV>> vertices; private final DataSet<Tuple2<K, VV>> vertices;


private final DataSet<Tuple3<K, K, EV>> edges; private final DataSet<Tuple3<K, K, EV>> edges;


/** a graph is directed by default */ /** a graph is directed by default */
private boolean isUndirected = false; private boolean isUndirected = false;

private static TypeInformation<?> vertexKeyType;
private static TypeInformation<?> vertexValueType;




public Graph(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges) { public Graph(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
this.vertices = vertices; this.vertices = vertices;
this.edges = edges; this.edges = edges;
Graph.vertexKeyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); this.context = context;
Graph.vertexValueType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(1);
} }


public Graph(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, boolean undirected) { public Graph(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context,
boolean undirected) {
this.vertices = vertices; this.vertices = vertices;
this.edges = edges; this.edges = edges;
this.context = context;
this.isUndirected = undirected; this.isUndirected = undirected;
Graph.vertexKeyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
Graph.vertexValueType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(1);
} }


public DataSet<Tuple2<K, VV>> getVertices() { public DataSet<Tuple2<K, VV>> getVertices() {
Expand All @@ -75,37 +74,29 @@ public DataSet<Tuple2<K, VV>> getVertices() {
public DataSet<Tuple3<K, K, EV>> getEdges() { public DataSet<Tuple3<K, K, EV>> getEdges() {
return edges; return edges;
} }

/** /**
* Apply a function to the attribute of each vertex in the graph. * Apply a function to the attribute of each Tuple2 in the graph
* @param mapper * @param mapper A function that transforms the attribute of each Tuple2
* @return * @return A DataSet of Tuple2 which contains the new values of all vertices
*/ */
public <NV extends Serializable> DataSet<Tuple2<K, NV>> mapVertices(final MapFunction<VV, NV> mapper) { //TODO: support changing the vertex value type
return vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper)); public DataSet<Tuple2<K, VV>> mapVertices(final MapFunction<VV, VV> mapper) {
return vertices.map(new ApplyMapperToVertex<K, VV>(mapper));
} }


private static final class ApplyMapperToVertexWithType<K, VV, NV> implements MapFunction private static final class ApplyMapperToVertex<K, VV> implements MapFunction
<Tuple2<K, VV>, Tuple2<K, NV>>, ResultTypeQueryable<Tuple2<K, NV>> { <Tuple2<K, VV>, Tuple2<K, VV>> {


private MapFunction<VV, NV> innerMapper; private MapFunction<VV, VV> innerMapper;


public ApplyMapperToVertexWithType(MapFunction<VV, NV> theMapper) { public ApplyMapperToVertex(MapFunction<VV, VV> theMapper) {
this.innerMapper = theMapper; this.innerMapper = theMapper;
} }


public Tuple2<K, NV> map(Tuple2<K, VV> value) throws Exception { public Tuple2<K, VV> map(Tuple2<K, VV> value) throws Exception {
return new Tuple2<K, NV>(value.f0, innerMapper.map(value.f1)); return new Tuple2<K, VV>(value.f0, innerMapper.map(value.f1));
} }

@Override
public TypeInformation<Tuple2<K, NV>> getProducedType() {
@SuppressWarnings("unchecked")
TypeInformation<NV> newVertexValueType = TypeExtractor.getMapReturnTypes(innerMapper,
(TypeInformation<VV>)vertexValueType);

return new TupleTypeInfo<Tuple2<K, NV>>(vertexKeyType, newVertexValueType);
}
} }


/** /**
Expand All @@ -130,7 +121,7 @@ public Graph<K, VV, EV> subgraph(FilterFunction<VV> vertexFilter, FilterFunction
DataSet<Tuple3<K, K, EV>> filteredEdges = remainingEdges.filter( DataSet<Tuple3<K, K, EV>> filteredEdges = remainingEdges.filter(
new ApplyEdgeFilter<K, EV>(edgeFilter)); new ApplyEdgeFilter<K, EV>(edgeFilter));


return new Graph<K, VV, EV>(filteredVertices, filteredEdges); return new Graph<K, VV, EV>(filteredVertices, filteredEdges, this.context);
} }


@ConstantFieldsFirst("0->0;1->1;2->2") @ConstantFieldsFirst("0->0;1->1;2->2")
Expand Down Expand Up @@ -212,7 +203,7 @@ public <MsgT> Graph<K, VV, EV> pga(CoGroupFunction<Tuple2<K, VV>, Tuple3<K, K, E


DataSet<Tuple2<K, VV>> result = iteration.closeWith(a, a); DataSet<Tuple2<K, VV>> result = iteration.closeWith(a, a);


return new Graph<>(result, this.edges); return new Graph<>(result, this.edges, this.context);
} }


/** /**
Expand All @@ -227,7 +218,7 @@ public Graph<K, VV, EV> getUndirected() throws UnsupportedOperationException {
else { else {
DataSet<Tuple3<K, K, EV>> undirectedEdges = DataSet<Tuple3<K, K, EV>> undirectedEdges =
edges.union(edges.map(new ReverseEdgesMap<K, EV>())); edges.union(edges.map(new ReverseEdgesMap<K, EV>()));
return new Graph<K, VV, EV>(vertices, undirectedEdges, true); return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context, true);
} }
} }


Expand All @@ -251,14 +242,14 @@ public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
} }
else { else {
DataSet<Tuple3<K, K, EV>> undirectedEdges = edges.map(new ReverseEdgesMap<K, EV>()); DataSet<Tuple3<K, K, EV>> undirectedEdges = edges.map(new ReverseEdgesMap<K, EV>());
return new Graph<K, VV, EV>(vertices, (DataSet<Tuple3<K, K, EV>>) undirectedEdges, true); return new Graph<K, VV, EV>(vertices, (DataSet<Tuple3<K, K, EV>>) undirectedEdges, this.context, true);
} }
} }


public static <K extends Comparable<K> & Serializable, VV extends Serializable, public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable> Graph<K, VV, EV> EV extends Serializable> Graph<K, VV, EV>
create(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges) { create(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
return new Graph<K, VV, EV>(vertices, edges); return new Graph<K, VV, EV>(vertices, edges, context);
} }


/** /**
Expand Down Expand Up @@ -326,7 +317,8 @@ public Tuple3<K, K, EV> map(Tuple3<K, K, EV> value) throws Exception {
public static <K extends Comparable<K> & Serializable, VV extends Serializable, public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable> Graph<K, VV, EV> readGraphFromCsvFile(ExecutionEnvironment env, EV extends Serializable> Graph<K, VV, EV> readGraphFromCsvFile(ExecutionEnvironment env,
String Tuple2Filepath, char Tuple2Delimiter, String edgeFilepath, char edgeDelimiter, String Tuple2Filepath, char Tuple2Delimiter, String edgeFilepath, char edgeDelimiter,
Class<K> Tuple2IdClass, Class<VV> Tuple2ValueClass, Class<EV> edgeValueClass) { Class<K> Tuple2IdClass, Class<VV> Tuple2ValueClass, Class<EV> edgeValueClass,
ExecutionEnvironment context) {


CsvReader Tuple2Reader = new CsvReader(Tuple2Filepath, env); CsvReader Tuple2Reader = new CsvReader(Tuple2Filepath, env);
DataSet<Tuple2<K, VV>> vertices = Tuple2Reader.fieldDelimiter(Tuple2Delimiter) DataSet<Tuple2<K, VV>> vertices = Tuple2Reader.fieldDelimiter(Tuple2Delimiter)
Expand All @@ -348,7 +340,183 @@ public Tuple3<K, K, EV> map(Tuple3<K, K, EV> value) throws Exception {
} }
}); });


return Graph.create(vertices, edges); return Graph.create(vertices, edges, context);
} }


/**
* @return Singleton DataSet containing the vertex count
*/
@SuppressWarnings("unchecked")
public DataSet<Integer> numberOfVertices () {
return GraphUtils.count((DataSet<Object>) (DataSet<?>) vertices);
}

/**
*
* @return Singleton DataSet containing the edge count
*/
@SuppressWarnings("unchecked")
public DataSet<Integer> numberOfEdges () {
return GraphUtils.count((DataSet<Object>) (DataSet<?>) edges);
}

/**
*
* @return The IDs of the vertices as DataSet
*/
public DataSet<K> getVertexIds () {
return vertices.map(new MapFunction<Tuple2<K, VV>, K>() {
@Override
public K map(Tuple2<K, VV> vertex) throws Exception {
return vertex.f0;
}
});
}

public DataSet<Tuple2<K,K>> getEdgeIds () {
return edges.map(new MapFunction<Tuple3<K, K, EV>, Tuple2<K, K>>() {
@Override
public Tuple2<K, K> map(Tuple3<K, K, EV> edge) throws Exception {
return new Tuple2<K,K>(edge.f0, edge.f1);
}
});
}

@SuppressWarnings("unchecked")
public DataSet<Boolean> isWeaklyConnected () {

DataSet<K> vertexIds = this.getVertexIds();
DataSet<Tuple2<K,K>> verticesWithInitialIds = vertexIds
.map(new MapFunction<K, Tuple2<K, K>>() {
@Override
public Tuple2<K, K> map(K k) throws Exception {
return new Tuple2<K, K>(k, k);
}
});

DataSet<Tuple2<K,K>> edgeIds = this.getEdgeIds();

DeltaIteration<Tuple2<K,K>, Tuple2<K,K>> iteration = verticesWithInitialIds
.iterateDelta(verticesWithInitialIds, 10, 0);

DataSet<Tuple2<K, K>> changes = iteration.getWorkset()
.join(edgeIds).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>>() {
@Override
public Tuple2<K, K> join(Tuple2<K, K> vertexWithComponent, Tuple2<K, K> edge) throws Exception {
return new Tuple2<K,K>(edge.f1, vertexWithComponent.f1);
}
})
.groupBy(0)
.aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new FlatJoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>>() {
@Override
public void join(Tuple2<K, K> candidate, Tuple2<K, K> old, Collector<Tuple2<K, K>> out) throws Exception {
if (candidate.f1.compareTo(old.f1) < 0) {
out.collect(candidate);
}
}
});

DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);

DataSet<Boolean> result = GraphUtils.count((DataSet<Object>) (DataSet<?>) components)
.map(new MapFunction<Integer, Boolean>() {
@Override
public Boolean map(Integer n) throws Exception {
if (n == 1)
return false;
else
return true;
}
});

return result;
}

//TODO kostas add functionality
public Graph<K, VV, EV> fromCollection (Collection<Tuple2<K,VV>> vertices, Collection<Tuple3<K,K,EV>> edges) {
return null;
}

//TODO kostas add functionality
public DataSet<Tuple2<K, VV>> fromCollection (Collection<Tuple2<K,VV>> vertices) {
return null;
}


public Graph<K, VV, EV> addVertex (Tuple2<K,VV> vertex, List<Tuple3<K,K,EV>> edges) {
Graph<K,VV,EV> newVertex = this.fromCollection(Arrays.asList(vertex), edges);
return this.union(newVertex);
}

@SuppressWarnings("unchecked")
public Graph<K, VV, EV> removeVertex (Tuple2<K,VV> vertex) {

DataSet<Tuple2<K,VV>> vertexToRemove = fromCollection(Arrays.asList(vertex));

DataSet<Tuple2<K,VV>> newVertices = getVertices()
.filter(new RichFilterFunction<Tuple2<K, VV>>() {
private Tuple2<K, VV> vertexToRemove;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.vertexToRemove = (Tuple2<K, VV>) getRuntimeContext().getBroadcastVariable("vertexToRemove").get(0);
}

@Override
public boolean filter(Tuple2<K, VV> vertex) throws Exception {
if (vertex.f0.equals(vertexToRemove.f0)) {
return false;
} else {
return true;
}
}
}).withBroadcastSet(vertexToRemove, "vertexToRemove");

DataSet<Tuple3<K,K,EV>> newEdges = getEdges()
.filter(new RichFilterFunction<Tuple3<K,K,EV>>() {
private Tuple2<K, VV> vertexToRemove;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.vertexToRemove = (Tuple2<K, VV>) getRuntimeContext().getBroadcastVariable("vertexToRemove").get(0);
}

@Override
public boolean filter(Tuple3<K,K,EV> edge) throws Exception {
if (edge.f0.equals(vertexToRemove.f0)) {
return false;
}
if (edge.f1.equals(vertexToRemove.f0)) {
return false;
}
return true;
}
}).withBroadcastSet(vertexToRemove, "vertexToRemove");

return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
}


public Graph<K, VV, EV> addEdge (Tuple3<K,K,EV> edge, Tuple2<K,VV> source, Tuple2<K,VV> target) {
Graph<K,VV,EV> newEdges = this.fromCollection(Arrays.asList(source, target), Arrays.asList(edge));
return this.union(newEdges);
}

public Graph<K, VV, EV> union (Graph<K, VV, EV> graph) {
DataSet<Tuple2<K,VV>> unionedVertices = graph.getVertices().union(this.getVertices());
DataSet<Tuple3<K,K,EV>> unionedEdges = graph.getEdges().union(this.getEdges());
return new Graph<K,VV,EV>(unionedVertices, unionedEdges, this.context);
}

public Graph<K, VV, EV> passMessages (VertexCentricIteration<K, VV, ?, EV> iteration) {
DataSet<Tuple2<K,VV>> newVertices = iteration.createResult();
return new Graph<K,VV,EV>(newVertices, edges, this.context);
}


} }
Expand Up @@ -7,7 +7,8 @@


public class GraphUtils { public class GraphUtils {


public static DataSet<Integer> count (DataSet<Object> set) { @SuppressWarnings("serial")
public static DataSet<Integer> count (DataSet<Object> set) {
return set return set
.map(new MapFunction<Object, Integer>() { .map(new MapFunction<Object, Integer>() {
@Override @Override
Expand Down

0 comments on commit 65d54c1

Please sign in to comment.