Skip to content

Commit

Permalink
[FLINK-1201] [gelly] Fixed/added tests, based on Vasia's feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
balidani authored and StephanEwen committed Feb 11, 2015
1 parent 16703c5 commit eabf2bb
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 60 deletions.
Expand Up @@ -30,14 +30,18 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.spargel.java.VertexCentricIteration;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -494,14 +498,32 @@ public DataSet<Tuple2<K, VV>> fromCollection (Collection<Tuple2<K,VV>> vertices)
return null;
}

public Graph<K, VV, EV> addVertex (Tuple2<K,VV> vertex, List<Tuple3<K,K,EV>> edges) {
Graph<K,VV,EV> newVertex = this.fromCollection(Arrays.asList(vertex), edges);
return this.union(newVertex);
public Graph<K, VV, EV> addVertex (final Tuple2<K,VV> vertex, List<Tuple3<K,K,EV>> edges) {

DataSet<Tuple2<K, VV>> newVertex = this.context.fromCollection(Arrays.asList(vertex));

// Take care of empty edge set
if (edges.isEmpty()) {
return Graph.create(getVertices().union(newVertex), getEdges(), context);
}

// Do not add already existing vertices (and their edges)
DataSet<Tuple2<K, VV>> oldVertices = getVertices();
DataSet<Tuple2<K, VV>> newVertices = getVertices().union(newVertex).distinct();

if (oldVertices.equals(newVertices)) {
return this;
}

// Add the vertex and its edges
DataSet<Tuple3<K, K, EV>> newEdges = getEdges().union(context.fromCollection(edges));
return Graph.create(newVertices, newEdges, context);
}

public Graph<K, VV, EV> addEdge (Tuple3<K,K,EV> edge, Tuple2<K,VV> source, Tuple2<K,VV> target) {
Graph<K,VV,EV> newEdges = this.fromCollection(Arrays.asList(source, target), Arrays.asList(edge));
return this.union(newEdges);

Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, target), Arrays.asList(edge));
return this.union(partialGraph);
}

public Graph<K, VV, EV> removeVertex (Tuple2<K,VV> vertex) {
Expand All @@ -513,25 +535,12 @@ public Graph<K, VV, EV> removeVertex (Tuple2<K,VV> vertex) {
vertexToRemove, "vertexToRemove");

DataSet<Tuple3<K, K, EV>> newEdges = getEdges().filter(
new RemoveEdgeFilter<K, VV, EV>()).withBroadcastSet(
new VertexRemovalEdgeFilter<K, VV, EV>()).withBroadcastSet(
vertexToRemove, "vertexToRemove");

return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
}

public Graph<K, VV, EV> removeEdge (Tuple3<K,K,EV> edge) {

DataSet<Tuple3<K,K,EV>> edgeToRemove = context.fromCollection(Arrays.asList(edge));

DataSet<Tuple2<K, VV>> newVertices = getVertices();

DataSet<Tuple3<K, K, EV>> newEdges = getEdges().filter(
new RemoveEdgeFilter<K, VV, EV>()).withBroadcastSet(
edgeToRemove, "edgeToRemove");

return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
}

private static final class RemoveVertexFilter<K, VV> extends RichFilterFunction<Tuple2<K, VV>> {

private Tuple2<K, VV> vertexToRemove;
Expand All @@ -549,56 +558,62 @@ public boolean filter(Tuple2<K, VV> vertex) throws Exception {
}
}

private static final class RemoveEdgeFilter<K, VV, EV> extends RichFilterFunction<Tuple3<K, K, EV>> {
private static final class VertexRemovalEdgeFilter<K, VV, EV> extends RichFilterFunction<Tuple3<K, K, EV>> {

private Tuple2<K, VV> vertexToRemove;
private Tuple3<K, K, EV> edgeToRemove;

@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);


List<Tuple2<K, VV>> vertexVariable = null;
List<Tuple3<K, K, EV>> edgeVariable = null;

try {
vertexVariable = getRuntimeContext().getBroadcastVariable("vertexToRemove");
} catch (IllegalArgumentException ex) {}

try {
edgeVariable = getRuntimeContext().getBroadcastVariable("edgeToRemove");
} catch (IllegalArgumentException ex) {}

if (vertexVariable != null) {
vertexToRemove = vertexVariable.get(0);
vertexToRemove = (Tuple2<K, VV>) getRuntimeContext().getBroadcastVariable("vertexToRemove").get(0);
}

@Override
public boolean filter(Tuple3<K, K, EV> edge) throws Exception {

if (edge.f0.equals(vertexToRemove.f0)) {
return false;
}

if (edgeVariable != null) {
edgeToRemove = edgeVariable.get(0);
if (edge.f1.equals(vertexToRemove.f0)) {
return false;
}
return true;
}
}

public Graph<K, VV, EV> removeEdge (Tuple3<K,K,EV> edge) {

DataSet<Tuple3<K,K,EV>> edgeToRemove = context.fromCollection(Arrays.asList(edge));

DataSet<Tuple3<K, K, EV>> newEdges = getEdges().filter(
new EdgeRemovalEdgeFilter<K, VV, EV>()).withBroadcastSet(
edgeToRemove, "edgeToRemove");

return new Graph<K, VV, EV>(this.getVertices(), newEdges, this.context);
}

private static final class EdgeRemovalEdgeFilter<K, VV, EV> extends RichFilterFunction<Tuple3<K, K, EV>> {

private Tuple3<K, K, EV> edgeToRemove;

@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

edgeToRemove = (Tuple3<K, K, EV>) getRuntimeContext().getBroadcastVariable("edgeToRemove").get(0);
}

@Override
public boolean filter(Tuple3<K, K, EV> edge) throws Exception {

if (vertexToRemove != null) {
if (edge.f0.equals(vertexToRemove.f0)) {
return false;
}
if (edge.f1.equals(vertexToRemove.f0)) {
return false;
}
return true;
} else if (edgeToRemove != null) {
if (edge.f0.equals(edgeToRemove.f0)
&& edge.f1.equals(edgeToRemove.f1)) {
return false;
}
return true;
}
return true;
if (edge.f0.equals(edgeToRemove.f0)
&& edge.f1.equals(edgeToRemove.f1)) {
return false;
}
return true;
}
}

Expand Down
Expand Up @@ -20,7 +20,7 @@
@RunWith(Parameterized.class)
public class TestGraphOperations extends JavaProgramTestBase {

private static int NUM_PROGRAMS = 16;
private static int NUM_PROGRAMS = 18;

private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
Expand Down Expand Up @@ -257,6 +257,32 @@ public boolean filter(Long value) throws Exception {

}
case 11: {
/*
* Test addVertex() -- add vertex with empty edge set
*/

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);

List<Tuple3<Long, Long, Long>> edges = new ArrayList<Tuple3<Long, Long, Long>>();

graph = graph.addVertex(new Tuple2<Long, Long>(6L, 6L), edges);

graph.getVertices().writeAsCsv(resultPath);

env.execute();

return "1,1\n" +
"2,2\n" +
"3,3\n" +
"4,4\n" +
"5,5\n" +
"6,6\n";

}
case 12: {
/*
* Test removeVertex() -- simple case
*/
Expand All @@ -278,7 +304,7 @@ public boolean filter(Long value) throws Exception {
"3,4,34\n";

}
case 12: {
case 13: {
/*
* Test removeVertex() -- remove an invalid vertex
*/
Expand All @@ -302,7 +328,7 @@ public boolean filter(Long value) throws Exception {
"4,5,45\n" +
"5,1,51\n";
}
case 13: {
case 14: {
/*
* Test addEdge() -- simple case
*/
Expand All @@ -329,7 +355,34 @@ public boolean filter(Long value) throws Exception {
"5,1,51\n" +
"6,1,61\n";
}
case 14: {
case 15: {
/*
* Test addEdge() -- add already existing edge
*/

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);


graph = graph.addEdge(new Tuple3<Long, Long, Long>(1L, 2L, 12L),
new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L));

graph.getEdges().writeAsCsv(resultPath);

env.execute();

return "1,2,12\n" +
"1,2,12\n" +
"1,3,13\n" +
"2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
}
case 16: {
/*
* Test removeEdge() -- simple case
*/
Expand All @@ -353,7 +406,7 @@ public boolean filter(Long value) throws Exception {
"4,5,45\n";

}
case 15: {
case 17: {
/*
* Test removeEdge() -- invalid edge
*/
Expand All @@ -378,7 +431,7 @@ public boolean filter(Long value) throws Exception {
"5,1,51\n";

}
case 16: {
case 18: {
/*
* Test union()
*/
Expand Down

0 comments on commit eabf2bb

Please sign in to comment.