Skip to content

Commit

Permalink
[FLINK-1201] [gelly] typeinfo in reverse and getOutdegrees
Browse files Browse the repository at this point in the history
  • Loading branch information
vasia authored and StephanEwen committed Feb 11, 2015
1 parent edcb55a commit bc19f4e
Showing 1 changed file with 52 additions and 42 deletions.
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DeltaIteration;
Expand All @@ -34,6 +35,7 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.mapred.MapFileOutputFormat;

import java.io.Serializable;

Expand All @@ -46,6 +48,12 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab

private final DataSet<Tuple3<K, K, EV>> edges;

private final TypeInformation<K> vertexKeyType;

private final TypeInformation<VV> vertexValueType;

private final TypeInformation<EV> edgeValueType;

private final TypeInformation<Tuple2<K, VV>> verticesType;

private final TypeInformation<Tuple3<K, K, EV>> edgesType;
Expand All @@ -58,29 +66,31 @@ public Graph(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges) {
this.vertices = vertices;
this.edges = edges;

TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
TypeInformation<VV> vertexValueType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(1);
TypeInformation<EV> edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2);
this.vertexKeyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
this.vertexValueType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(1);
this.edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2);

TypeInformation<?>[] vertexTypes = {(BasicTypeInfo<?>)keyType, vertexValueType};
TypeInformation<?>[] vertexTypes = {(BasicTypeInfo<?>)vertexKeyType, vertexValueType};
this.verticesType = new TupleTypeInfo<Tuple2<K, VV>>(vertexTypes);

TypeInformation<?>[] edgeTypes = {(BasicTypeInfo<?>)keyType, (BasicTypeInfo<?>)keyType, edgeValueType};
TypeInformation<?>[] edgeTypes = {(BasicTypeInfo<?>)vertexKeyType, (BasicTypeInfo<?>)vertexKeyType,
edgeValueType};
this.edgesType = new TupleTypeInfo<Tuple3<K, K, EV>>(edgeTypes);
}

public Graph(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, boolean undirected) {
this.vertices = vertices;
this.edges = edges;

TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
TypeInformation<VV> vertexValueType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(1);
TypeInformation<EV> edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2);
this.vertexKeyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
this.vertexValueType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(1);
this.edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2);

TypeInformation<?>[] vertexTypes = {(BasicTypeInfo<?>)keyType, (BasicTypeInfo<?>)keyType, vertexValueType};
TypeInformation<?>[] vertexTypes = {(BasicTypeInfo<?>)vertexKeyType, vertexValueType};
this.verticesType = new TupleTypeInfo<Tuple2<K, VV>>(vertexTypes);

TypeInformation<?>[] edgeTypes = {(BasicTypeInfo<?>)keyType, (BasicTypeInfo<?>)keyType, edgeValueType};
TypeInformation<?>[] edgeTypes = {(BasicTypeInfo<?>)vertexKeyType, (BasicTypeInfo<?>)vertexKeyType,
edgeValueType};
this.edgesType = new TupleTypeInfo<Tuple3<K, K, EV>>(edgeTypes);
this.isUndirected = undirected;
}
Expand Down Expand Up @@ -143,31 +153,37 @@ public boolean filter(Tuple3<K, K, EV> kevEdge) throws Exception {

/**
* Return the out-degree of all vertices in the graph
* @return A DataSet of Tuple2 containing the out-degrees of the vertices in the graph
* @return A DataSet of Tuple2<vertexId, outDegree>
*/
public DataSet<Tuple2<K, Integer>> outDegrees() {
return this.edges
.groupBy(new KeySelector<Tuple3<K, K, EV>, K>() {
@Override
public K getKey(Tuple3<K, K, EV> kevEdge) throws Exception {
return kevEdge.f0;
}
})
.reduceGroup(new GroupReduceFunction<Tuple3<K, K, EV>, Tuple2<K, Integer>>() {
@Override
public void reduce(Iterable<Tuple3<K, K, EV>> edges, Collector<Tuple2<K, Integer>> integerCollector)
throws Exception {

int count = 0;
for (Tuple3<K, K, EV> edge : edges) {
count++;
}

integerCollector.collect(new Tuple2<K, Integer>(edges.iterator().next().f0, count));
}
});
}
public DataSet<Tuple2<K, Long>> outDegrees() {

TypeInformation<?>[] types = {(BasicTypeInfo<?>)vertexKeyType, BasicTypeInfo.LONG_TYPE_INFO};

return vertices.join(edges).where(0).equalTo(0).map(new VertexKeyWithOne<K, EV, VV>(
new TupleTypeInfo<Tuple2<K,Long>>(types)))
.groupBy(0).sum(1);
}

private static final class VertexKeyWithOne<K, EV, VV> implements
MapFunction<Tuple2<Tuple2<K, VV>, Tuple3<K, K, EV>>, Tuple2<K, Long>>,
ResultTypeQueryable<Tuple2<K, Long>> {

private transient TypeInformation<Tuple2<K, Long>> resultType;

private VertexKeyWithOne(TypeInformation<Tuple2<K, Long>> resultType) {
this.resultType = resultType;
}

public Tuple2<K, Long> map(
Tuple2<Tuple2<K, VV>, Tuple3<K, K, EV>> value) {
return new Tuple2<K, Long>(value.f0.f0, 1L);
}

@Override
public TypeInformation<Tuple2<K, Long>> getProducedType() {
return this.resultType;
}
}
/**
* Push-Gather-Apply model of graph computation
* @param cog
Expand Down Expand Up @@ -203,7 +219,7 @@ public <MsgT> Graph<K, VV, EV> pga(CoGroupFunction<Tuple2<K, VV>, Tuple3<K, K, E
*/
public Graph<K, VV, EV> getUndirected() throws UnsupportedOperationException {
if (this.isUndirected) {
throw new UnsupportedOperationException("");
throw new UnsupportedOperationException("The graph is already undirected.");
}
else {
DataSet<Tuple3<K, K, EV>> undirectedEdges =
Expand Down Expand Up @@ -241,15 +257,10 @@ public Tuple3<K, K, EV> map(Tuple3<K, K, EV> value) {
*/
public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
if (this.isUndirected) {
throw new UnsupportedOperationException("");
throw new UnsupportedOperationException("The graph is already undirected.");
}
else {
DataSet<Tuple3<K, K, EV>> undirectedEdges = edges.map(new MapFunction<Tuple3<K, K, EV>,
Tuple3<K, K, EV>>() {
public Tuple3<K, K, EV> map(Tuple3<K, K, EV> edge){
return new Tuple3<K, K, EV>(edge.f1, edge.f0, edge.f2);
}
});
DataSet<Tuple3<K, K, EV>> undirectedEdges = edges.map(new ReverseEdgesMap<>(edgesType));
return new Graph<K, VV, EV>(vertices, (DataSet<Tuple3<K, K, EV>>) undirectedEdges, true);
}
}
Expand All @@ -258,7 +269,6 @@ public Tuple3<K, K, EV> map(Tuple3<K, K, EV> edge){
EV extends Serializable> Graph<K, VV, EV>
create(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges) {
return new Graph<K, VV, EV>(vertices, edges);

}

/**
Expand Down

0 comments on commit bc19f4e

Please sign in to comment.