Skip to content

Commit

Permalink
[FLINK-3277] [gelly] Use Value types in Gelly API
Browse files Browse the repository at this point in the history
This closes #1671
  • Loading branch information
greghogan committed Jun 28, 2016
1 parent 10898a9 commit 40749dd
Show file tree
Hide file tree
Showing 18 changed files with 138 additions and 130 deletions.
24 changes: 12 additions & 12 deletions docs/apis/batch/libs/gelly.md
Expand Up @@ -346,13 +346,13 @@ DataSet<K> getVertexIds()
DataSet<Tuple2<K, K>> getEdgeIds() DataSet<Tuple2<K, K>> getEdgeIds()


// get a DataSet of <vertex ID, in-degree> pairs for all vertices // get a DataSet of <vertex ID, in-degree> pairs for all vertices
DataSet<Tuple2<K, Long>> inDegrees() DataSet<Tuple2<K, LongValue>> inDegrees()


// get a DataSet of <vertex ID, out-degree> pairs for all vertices // get a DataSet of <vertex ID, out-degree> pairs for all vertices
DataSet<Tuple2<K, Long>> outDegrees() DataSet<Tuple2<K, LongValue>> outDegrees()


// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees // get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
DataSet<Tuple2<K, Long>> getDegrees() DataSet<Tuple2<K, LongValue>> getDegrees()


// get the number of vertices // get the number of vertices
long numberOfVertices() long numberOfVertices()
Expand Down Expand Up @@ -381,13 +381,13 @@ getVertexIds: DataSet[K]
getEdgeIds: DataSet[(K, K)] getEdgeIds: DataSet[(K, K)]


// get a DataSet of <vertex ID, in-degree> pairs for all vertices // get a DataSet of <vertex ID, in-degree> pairs for all vertices
inDegrees: DataSet[(K, Long)] inDegrees: DataSet[(K, LongValue)]


// get a DataSet of <vertex ID, out-degree> pairs for all vertices // get a DataSet of <vertex ID, out-degree> pairs for all vertices
outDegrees: DataSet[(K, Long)] outDegrees: DataSet[(K, LongValue)]


// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees // get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
getDegrees: DataSet[(K, Long)] getDegrees: DataSet[(K, LongValue)]


// get the number of vertices // get the number of vertices
numberOfVertices: Long numberOfVertices: Long
Expand Down Expand Up @@ -519,13 +519,13 @@ Note that if the input dataset contains a key multiple times, all Gelly join met
{% highlight java %} {% highlight java %}
Graph<Long, Double, Double> network = ... Graph<Long, Double, Double> network = ...


DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees(); DataSet<Tuple2<Long, LongValue>> vertexOutDegrees = network.outDegrees();


// assign the transition probabilities as the edge weights // assign the transition probabilities as the edge weights
Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
new VertexJoinFunction<Double, Long>() { new VertexJoinFunction<Double, LongValue>() {
public Double vertexJoin(Double vertexValue, Long inputValue) { public Double vertexJoin(Double vertexValue, LongValue inputValue) {
return vertexValue / inputValue; return vertexValue / inputValue.getValue();
} }
}); });
{% endhighlight %} {% endhighlight %}
Expand All @@ -535,10 +535,10 @@ Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(v
{% highlight scala %} {% highlight scala %}
val network: Graph[Long, Double, Double] = ... val network: Graph[Long, Double, Double] = ...


val vertexOutDegrees: DataSet[(Long, Long)] = network.outDegrees val vertexOutDegrees: DataSet[(Long, LongValue)] = network.outDegrees


// assign the transition probabilities as the edge weights // assign the transition probabilities as the edge weights
val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: Long) => v1 / v2) val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: LongValue) => v1 / v2.getValue)
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
Expand Down
Expand Up @@ -18,7 +18,6 @@


package org.apache.flink.graph.examples; package org.apache.flink.graph.examples;


import org.apache.flink.graph.examples.utils.ExampleUtils;
import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
Expand All @@ -27,6 +26,8 @@
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.examples.utils.ExampleUtils;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;


/** /**
Expand Down Expand Up @@ -66,7 +67,7 @@ public static void main(String[] args) throws Exception {
long numEdges = graph.numberOfEdges(); long numEdges = graph.numberOfEdges();


/** compute the average node degree **/ /** compute the average node degree **/
DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees(); DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = graph.getDegrees();


DataSet<Double> avgNodeDegree = verticesWithDegrees DataSet<Double> avgNodeDegree = verticesWithDegrees
.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices)); .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
Expand Down Expand Up @@ -96,22 +97,22 @@ public static void main(String[] args) throws Exception {
} }


@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> { private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, LongValue>, Double> {


private long numberOfVertices; private long numberOfVertices;


public AvgNodeDegreeMapper(long numberOfVertices) { public AvgNodeDegreeMapper(long numberOfVertices) {
this.numberOfVertices = numberOfVertices; this.numberOfVertices = numberOfVertices;
} }


public Double map(Tuple2<Long, Long> sumTuple) { public Double map(Tuple2<Long, LongValue> sumTuple) {
return (double) (sumTuple.f1 / numberOfVertices) ; return (double) (sumTuple.f1.getValue() / numberOfVertices) ;
} }
} }


@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> { private static final class ProjectVertexId implements MapFunction<Tuple2<Long, LongValue>, Long> {
public Long map(Tuple2<Long, Long> value) { return value.f0; } public Long map(Tuple2<Long, LongValue> value) { return value.f0; }
} }


@Override @Override
Expand Down
Expand Up @@ -61,7 +61,7 @@ object GraphMetrics {


/** compute the average node degree **/ /** compute the average node degree **/
val verticesWithDegrees = graph.getDegrees val verticesWithDegrees = graph.getDegrees
val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble) val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble)


/** find the vertex with the maximum in-degree **/ /** find the vertex with the maximum in-degree **/
val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1) val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
Expand Down
Expand Up @@ -18,24 +18,22 @@


package org.apache.flink.graph.scala package org.apache.flink.graph.scala


import org.apache.flink.util.Preconditions
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction} import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{tuple => jtuple} import org.apache.flink.api.java.{tuple => jtuple}
import org.apache.flink.api.scala._ import org.apache.flink.api.scala._
import org.apache.flink.graph._ import org.apache.flink.graph._
import org.apache.flink.graph.asm.translate.TranslateFunction import org.apache.flink.graph.asm.translate.TranslateFunction
import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction} import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, VertexCentricConfiguration}
import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction} import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.types.{LongValue, NullValue}
import org.apache.flink.util.Preconditions
import org.apache.flink.{graph => jg} import org.apache.flink.{graph => jg}


import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.ClassTag import _root_.scala.reflect.ClassTag
import org.apache.flink.types.NullValue
import org.apache.flink.graph.pregel.ComputeFunction
import org.apache.flink.graph.pregel.MessageCombiner
import org.apache.flink.graph.pregel.VertexCentricConfiguration


object Graph { object Graph {


Expand Down Expand Up @@ -803,26 +801,26 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* *
* @return A DataSet of Tuple2<vertexId, inDegree> * @return A DataSet of Tuple2<vertexId, inDegree>
*/ */
def inDegrees(): DataSet[(K, Long)] = { def inDegrees(): DataSet[(K, LongValue)] = {
wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue())) wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
} }


/** /**
* Return the out-degree of all vertices in the graph * Return the out-degree of all vertices in the graph
* *
* @return A DataSet of Tuple2<vertexId, outDegree> * @return A DataSet of Tuple2<vertexId, outDegree>
*/ */
def outDegrees(): DataSet[(K, Long)] = { def outDegrees(): DataSet[(K, LongValue)] = {
wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue())) wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
} }


/** /**
* Return the degree of all vertices in the graph * Return the degree of all vertices in the graph
* *
* @return A DataSet of Tuple2<vertexId, degree> * @return A DataSet of Tuple2<vertexId, degree>
*/ */
def getDegrees(): DataSet[(K, Long)] = { def getDegrees(): DataSet[(K, LongValue)] = {
wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue())) wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
} }


/** /**
Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map; import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.graph.validation.GraphValidator; import org.apache.flink.graph.validation.GraphValidator;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;


Expand Down Expand Up @@ -867,25 +868,31 @@ public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>>
* *
* @return A DataSet of {@code Tuple2<vertexId, outDegree>} * @return A DataSet of {@code Tuple2<vertexId, outDegree>}
*/ */
public DataSet<Tuple2<K, Long>> outDegrees() { public DataSet<Tuple2<K, LongValue>> outDegrees() {


return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>()); return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
} }


private static final class CountNeighborsCoGroup<K, VV, EV> private static final class CountNeighborsCoGroup<K, VV, EV>
implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> { implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, LongValue>> {
private LongValue degree = new LongValue();

private Tuple2<K, LongValue> vertexDegree = new Tuple2<>(null, degree);

@SuppressWarnings("unused") @SuppressWarnings("unused")
public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges, public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges,
Collector<Tuple2<K, Long>> out) { Collector<Tuple2<K, LongValue>> out) {
long count = 0; long count = 0;
for (Edge<K, EV> edge : outEdges) { for (Edge<K, EV> edge : outEdges) {
count++; count++;
} }
degree.setValue(count);


Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator(); Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();


if(vertexIterator.hasNext()) { if(vertexIterator.hasNext()) {
out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count)); vertexDegree.f0 = vertexIterator.next().f0;
out.collect(vertexDegree);
} else { } else {
throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds"); throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
} }
Expand All @@ -897,7 +904,7 @@ public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdg
* *
* @return A DataSet of {@code Tuple2<vertexId, inDegree>} * @return A DataSet of {@code Tuple2<vertexId, inDegree>}
*/ */
public DataSet<Tuple2<K, Long>> inDegrees() { public DataSet<Tuple2<K, LongValue>> inDegrees() {


return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>()); return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
} }
Expand All @@ -907,7 +914,7 @@ public DataSet<Tuple2<K, Long>> inDegrees() {
* *
* @return A DataSet of {@code Tuple2<vertexId, degree>} * @return A DataSet of {@code Tuple2<vertexId, degree>}
*/ */
public DataSet<Tuple2<K, Long>> getDegrees() { public DataSet<Tuple2<K, LongValue>> getDegrees() {
return outDegrees().union(inDegrees()).groupBy(0).sum(1); return outDegrees().union(inDegrees()).groupBy(0).sum(1);
} }


Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.types.LongValue;


/** /**
* This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration. * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
Expand Down Expand Up @@ -56,8 +57,7 @@ public GSAPageRank(double beta, int maxIterations) {


@Override @Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {

DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();


Graph<K, Double, Double> networkWithWeights = network Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
Expand Down Expand Up @@ -114,10 +114,10 @@ public void apply(Double rankSum, Double currentValue) {
} }


@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class InitWeights implements EdgeJoinFunction<Double, Long> { private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {


public Double edgeJoin(Double edgeValue, Long inputValue) { public Double edgeJoin(Double edgeValue, LongValue inputValue) {
return edgeValue / (double) inputValue; return edgeValue / (double) inputValue.getValue();
} }
} }
} }
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration; import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.types.LongValue;


/** /**
* This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration. * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
Expand Down Expand Up @@ -56,8 +57,7 @@ public PageRank(double beta, int maxIterations) {


@Override @Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception { public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {

DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();


Graph<K, Double, Double> networkWithWeights = network Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights()); .joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
Expand Down Expand Up @@ -118,10 +118,10 @@ public void sendMessages(Vertex<K, Double> vertex) {
} }


@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class InitWeights implements EdgeJoinFunction<Double, Long> { private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {


public Double edgeJoin(Double edgeValue, Long inputValue) { public Double edgeJoin(Double edgeValue, LongValue inputValue) {
return edgeValue / (double) inputValue; return edgeValue / (double) inputValue.getValue();
} }
} }


Expand Down

0 comments on commit 40749dd

Please sign in to comment.