Skip to content

Commit

Permalink
[FLINK-1201] [gelly] fixed subGraph() and added test
Browse files Browse the repository at this point in the history
  • Loading branch information
vasia authored and StephanEwen committed Feb 11, 2015
1 parent 35d88bc commit 830a47f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 24 deletions.
Expand Up @@ -22,12 +22,14 @@
import org.apache.flink.api.common.functions.*; import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;


import java.io.Serializable; import java.io.Serializable;


Expand Down Expand Up @@ -88,35 +90,64 @@ public Tuple2<K, VV> map(Tuple2<K, VV> value) throws Exception {
} }


/** /**
* Apply filtering functions to the graph and return a sub-graph that satisfies * Apply value-based filtering functions to the graph
* the predicates * and return a sub-graph that satisfies the predicates
* @param Tuple2Filter * for both vertex values and edge values.
* @param vertexFilter
* @param edgeFilter * @param edgeFilter
* @return * @return
*/ */
// TODO(thvasilo): Add proper edge filtering functionality public Graph<K, VV, EV> subgraph(FilterFunction<VV> vertexFilter, FilterFunction<EV> edgeFilter) {
public Graph<K, VV, EV> subgraph(final FilterFunction<VV> Tuple2Filter, final FilterFunction<EV> edgeFilter) {

DataSet<Tuple2<K, VV>> filteredVertices = this.vertices.filter(
DataSet<Tuple2<K, VV>> filteredVertices = this.vertices.filter(new FilterFunction<Tuple2<K, VV>>() { new ApplyVertexFilter<K, VV>(vertexFilter));
@Override
public boolean filter(Tuple2<K, VV> kvvTuple2) throws Exception { DataSet<Tuple3<K, K, EV>> remainingEdges = this.edges.join(filteredVertices)
return Tuple2Filter.filter(kvvTuple2.f1); .where(0).equalTo(0)
} .with(new ProjectEdge<K, VV, EV>())
}); .join(filteredVertices).where(1).equalTo(0)

.with(new ProjectEdge<K, VV, EV>());
// Should combine with Tuple2 filter function as well, so that only
// edges that satisfy edge filter *and* connect vertices that satisfy Tuple2 DataSet<Tuple3<K, K, EV>> filteredEdges = remainingEdges.filter(
// filter are returned new ApplyEdgeFilter<K, EV>(edgeFilter));
DataSet<Tuple3<K, K, EV>> filteredEdges = this.edges.filter(new FilterFunction<Tuple3<K, K, EV>>() {
@Override
public boolean filter(Tuple3<K, K, EV> kevEdge) throws Exception {
return edgeFilter.filter(kevEdge.f2);
}
});


return new Graph<K, VV, EV>(filteredVertices, filteredEdges); return new Graph<K, VV, EV>(filteredVertices, filteredEdges);
} }

@ConstantFieldsFirst("0->0;1->1;2->2")
private static final class ProjectEdge<K, VV, EV> implements FlatJoinFunction<Tuple3<K,K,EV>, Tuple2<K,VV>,
Tuple3<K,K,EV>> {
public void join(Tuple3<K, K, EV> first,
Tuple2<K, VV> second, Collector<Tuple3<K, K, EV>> out) {
out.collect(first);
}
}

private static final class ApplyVertexFilter<K, VV> implements FilterFunction<Tuple2<K, VV>> {


private FilterFunction<VV> innerFilter;

public ApplyVertexFilter(FilterFunction<VV> theFilter) {
this.innerFilter = theFilter;
}

public boolean filter(Tuple2<K, VV> value) throws Exception {
return innerFilter.filter(value.f1);
}

}

private static final class ApplyEdgeFilter<K, EV> implements FilterFunction<Tuple3<K, K, EV>> {

private FilterFunction<EV> innerFilter;

public ApplyEdgeFilter(FilterFunction<EV> theFilter) {
this.innerFilter = theFilter;
}
public boolean filter(Tuple3<K, K, EV> value) throws Exception {
return innerFilter.filter(value.f2);
}
}


/** /**
* Return the out-degree of all vertices in the graph * Return the out-degree of all vertices in the graph
Expand Down
Expand Up @@ -5,6 +5,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;


import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -18,7 +19,7 @@
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestGraphOperations extends JavaProgramTestBase { public class TestGraphOperations extends JavaProgramTestBase {


private static int NUM_PROGRAMS = 4; private static int NUM_PROGRAMS = 5;


private int curProgId = config.getInteger("ProgramId", -1); private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath; private String resultPath;
Expand Down Expand Up @@ -143,8 +144,26 @@ public Long map(Long value) throws Exception {
} }
case 5: { case 5: {
/* /*
* Test subgraph: * Test subgraph:
*/ */
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env));
graph.subgraph(new FilterFunction<Long>() {
public boolean filter(Long value) throws Exception {
return (value > 2);
}
},
new FilterFunction<Long>() {
public boolean filter(Long value) throws Exception {
return (value > 34);
}
}).getEdges().writeAsCsv(resultPath);

env.execute();
return "3,5,35\n" +
"4,5,45\n";
} }
default: default:
throw new IllegalArgumentException("Invalid program id"); throw new IllegalArgumentException("Invalid program id");
Expand Down

0 comments on commit 830a47f

Please sign in to comment.