Skip to content

Commit

Permalink
[FLINK-1632][gelly] Removed bcast var in GraphMetrics and PageRank
Browse files Browse the repository at this point in the history
This closes #462
  • Loading branch information
andralungu authored and vasia committed Mar 17, 2015
1 parent 9db170f commit a1daadc
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 62 deletions.
6 changes: 3 additions & 3 deletions docs/gelly_guide.md
Expand Up @@ -160,10 +160,10 @@ DataSet<Tuple2<K, Long>> outDegrees()
DataSet<Tuple2<K, Long>> getDegrees() DataSet<Tuple2<K, Long>> getDegrees()


// get the number of vertices // get the number of vertices
DataSet<Integer> numberOfVertices() long numberOfVertices()


// get the number of edges // get the number of edges
DataSet<Integer> numberOfEdges() long numberOfEdges()


{% endhighlight %} {% endhighlight %}


Expand Down Expand Up @@ -386,7 +386,7 @@ List<Vertex<Long, Long>> vertices = ...
// create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)} // create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
List<Edge<Long, Long>> edges = ... List<Edge<Long, Long>> edges = ...


Graph<Long, Long, Long> graph = Graph.fromcollection(vertices, edges, env); Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);


// will return false: 6 is an invalid ID // will return false: 6 is an invalid ID
graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
Expand Down
Expand Up @@ -926,7 +926,7 @@ public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
* the maximum number of iterations for the inner delta iteration * the maximum number of iterations for the inner delta iteration
* @return true if the graph is weakly connected. * @return true if the graph is weakly connected.
*/ */
public Boolean isWeaklyConnected(int maxIterations) throws Exception { public boolean isWeaklyConnected(int maxIterations) throws Exception {
// first, convert to an undirected graph // first, convert to an undirected graph
Graph<K, VV, EV> graph = this.getUndirected(); Graph<K, VV, EV> graph = this.getUndirected();


Expand Down Expand Up @@ -1145,7 +1145,7 @@ public <M> Graph<K, VV, EV> runVertexCentricIteration(
return new Graph<K, VV, EV>(newVertices, this.edges, this.context); return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
} }


public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) { public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
return algorithm.run(this); return algorithm.run(this);
} }


Expand Down
Expand Up @@ -27,5 +27,5 @@
*/ */
public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> {


public Graph<K, VV, EV> run(Graph<K, VV, EV> input); public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
} }
Expand Up @@ -18,16 +18,12 @@


package org.apache.flink.graph.example; package org.apache.flink.graph.example;


import java.util.Collection;

import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.example.utils.ExampleUtils; import org.apache.flink.graph.example.utils.ExampleUtils;
Expand Down Expand Up @@ -72,8 +68,7 @@ public static void main(String[] args) throws Exception {
DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees(); DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();


DataSet<Double> avgNodeDegree = verticesWithDegrees DataSet<Double> avgNodeDegree = verticesWithDegrees
.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper()) .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
.withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");


/** find the vertex with the maximum in-degree **/ /** find the vertex with the maximum in-degree **/
DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId()); DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
Expand All @@ -100,17 +95,14 @@ public static void main(String[] args) throws Exception {
} }


@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class AvgNodeDegreeMapper extends RichMapFunction<Tuple2<Long, Long>, Double> { private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {


private int numberOfVertices; private long numberOfVertices;


@Override public AvgNodeDegreeMapper(long numberOfVertices) {
public void open(Configuration parameters) throws Exception { this.numberOfVertices = numberOfVertices;
Collection<Integer> bCastSet = getRuntimeContext()
.getBroadcastVariable("numberOfVertices");
numberOfVertices = bCastSet.iterator().next();
} }

public Double map(Tuple2<Long, Long> sumTuple) { public Double map(Tuple2<Long, Long> sumTuple) {
return (double) (sumTuple.f1 / numberOfVertices) ; return (double) (sumTuple.f1 / numberOfVertices) ;
} }
Expand Down
Expand Up @@ -20,11 +20,6 @@


import java.io.Serializable; import java.io.Serializable;


<<<<<<< HEAD
import org.apache.flink.api.java.DataSet;
=======
import org.apache.flink.api.java.ExecutionEnvironment;
>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.GraphAlgorithm;
Expand All @@ -45,25 +40,13 @@ public PageRank(double beta, int maxIterations) {
} }


@Override @Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) { public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {


DataSet<Integer> numberOfVertices = network.numberOfVertices(); final long numberOfVertices = network.numberOfVertices();


VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration( VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations); new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
<<<<<<< HEAD maxIterations);

iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);

=======
try {
iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
} catch (Exception e) {
e.printStackTrace();
}
>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
return network.runVertexCentricIteration(iteration); return network.runVertexCentricIteration(iteration);
} }


Expand All @@ -76,15 +59,11 @@ public static final class VertexRankUpdater<K extends Comparable<K> & Serializab
extends VertexUpdateFunction<K, Double, Double> { extends VertexUpdateFunction<K, Double, Double> {


private final double beta; private final double beta;
private int numVertices; private final long numVertices;


public VertexRankUpdater(double beta) { public VertexRankUpdater(double beta, long numberOfVertices) {
this.beta = beta; this.beta = beta;
} this.numVertices = numberOfVertices;

@Override
public void preSuperstep(){
numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
} }


@Override @Override
Expand All @@ -110,11 +89,10 @@ public void updateVertex(K vertexKey, Double vertexValue,
public static final class RankMessenger<K extends Comparable<K> & Serializable> public static final class RankMessenger<K extends Comparable<K> & Serializable>
extends MessagingFunction<K, Double, Double, Double> { extends MessagingFunction<K, Double, Double, Double> {


private int numVertices; private final long numVertices;


@Override public RankMessenger(long numberOfVertices) {
public void preSuperstep(){ this.numVertices = numberOfVertices;
numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
} }


@Override @Override
Expand Down
Expand Up @@ -33,6 +33,6 @@
public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements Serializable { implements Serializable {


public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception; public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;


} }
Expand Up @@ -38,11 +38,11 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
* Checks that the edge set input contains valid vertex Ids, i.e. that they * Checks that the edge set input contains valid vertex Ids, i.e. that they
* also exist in the vertex input set. * also exist in the vertex input set.
* *
* @return a Boolean stating whether a graph is valid * @return a boolean stating whether a graph is valid
* with respect to its vertex ids. * with respect to its vertex ids.
*/ */
@Override @Override
public Boolean validate(Graph<K, VV, EV> graph) throws Exception { public boolean validate(Graph<K, VV, EV> graph) throws Exception {
DataSet<Tuple1<K>> edgeIds = graph.getEdges() DataSet<Tuple1<K>> edgeIds = graph.getEdges()
.flatMap(new MapEdgeIds<K, EV>()).distinct(); .flatMap(new MapEdgeIds<K, EV>()).distinct();
DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0) DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
Expand Down Expand Up @@ -76,10 +76,4 @@ public Tuple1<K> map(K key) throws Exception {
} }
} }


private static final class InvalidIdsMap implements MapFunction<Integer, Boolean> {
public Boolean map(Integer numberOfInvalidIds) throws Exception {
return numberOfInvalidIds == 0;
}
}

} }

0 comments on commit a1daadc

Please sign in to comment.