Skip to content

Commit

Permalink
[FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.…
Browse files Browse the repository at this point in the history
…count()
  • Loading branch information
andralungu authored and vasia committed Mar 17, 2015
1 parent 9077a53 commit 9db170f
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 98 deletions.
Expand Up @@ -50,7 +50,6 @@
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.EdgeToTuple3Map;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
Expand Down Expand Up @@ -293,7 +292,7 @@ public ExecutionEnvironment getContext() {
*
* @return true if the Graph is valid.
*/
public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
return validator.validate(this);
}

Expand Down Expand Up @@ -877,17 +876,17 @@ public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
}

/**
* @return Singleton DataSet containing the vertex count
* @return a long integer representing the number of vertices
*/
public DataSet<Integer> numberOfVertices() {
return GraphUtils.count(vertices, context);
public long numberOfVertices() throws Exception {
return vertices.count();
}

/**
* @return Singleton DataSet containing the edge count
* @return a long integer representing the number of edges
*/
public DataSet<Integer> numberOfEdges() {
return GraphUtils.count(edges, context);
public long numberOfEdges() throws Exception {
return edges.count();
}

/**
Expand Down Expand Up @@ -927,7 +926,7 @@ public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
* the maximum number of iterations for the inner delta iteration
* @return true if the graph is weakly connected.
*/
public DataSet<Boolean> isWeaklyConnected(int maxIterations) {
public Boolean isWeaklyConnected(int maxIterations) throws Exception {
// first, convert to an undirected graph
Graph<K, VV, EV> graph = this.getUndirected();

Expand All @@ -948,9 +947,7 @@ public DataSet<Boolean> isWeaklyConnected(int maxIterations) {
.with(new VertexWithNewComponentJoin<K>());

DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);
DataSet<Boolean> result = GraphUtils.count(components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()),
context).map(new CheckIfOneComponentMapper());
return result;
return components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()).count() == 1;
}

private static final class DuplicateVertexIDMapper<K> implements MapFunction<K, Tuple2<K, K>> {
Expand Down Expand Up @@ -983,13 +980,6 @@ public void reduce(Iterable<Tuple2<K, K>> values, Collector<Tuple2<K, K>> out) {
}
}

private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> {
@Override
public Boolean map(Integer n) {
return (n == 1);
}
}

/**
* Adds the input vertex and edges to the graph. If the vertex already
* exists in the graph, it will not be added again, but the given edges
Expand Down
Expand Up @@ -63,17 +63,17 @@ public static void main(String[] args) throws Exception {
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);

/** get the number of vertices **/
DataSet<Integer> numVertices = graph.numberOfVertices();
long numVertices = graph.numberOfVertices();

/** get the number of edges **/
DataSet<Integer> numEdges = graph.numberOfEdges();
long numEdges = graph.numberOfEdges();

/** compute the average node degree **/
DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();

DataSet<Double> avgNodeDegree = verticesWithDegrees
.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
.withBroadcastSet(numVertices, "numberOfVertices");
.withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");

/** find the vertex with the maximum in-degree **/
DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
Expand All @@ -88,8 +88,8 @@ public static void main(String[] args) throws Exception {
DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());

/** print the results **/
ExampleUtils.printResult(numVertices, "Total number of vertices");
ExampleUtils.printResult(numEdges, "Total number of edges");
ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
ExampleUtils.printResult(avgNodeDegree, "Average node degree");
ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");
Expand Down
Expand Up @@ -20,7 +20,11 @@

import java.io.Serializable;

<<<<<<< HEAD
import org.apache.flink.api.java.DataSet;
=======
import org.apache.flink.api.java.ExecutionEnvironment;
>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
Expand All @@ -47,10 +51,19 @@ public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {

VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
<<<<<<< HEAD

iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);

=======
try {
iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
} catch (Exception e) {
e.printStackTrace();
}
>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
return network.runVertexCentricIteration(iteration);
}

Expand Down

This file was deleted.

Expand Up @@ -20,7 +20,6 @@

import java.io.Serializable;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;

/**
Expand All @@ -34,6 +33,6 @@
public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements Serializable {

public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;

}
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.util.Collector;

import java.io.Serializable;
Expand All @@ -39,18 +38,17 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
* Checks that the edge set input contains valid vertex Ids, i.e. that they
* also exist in the vertex input set.
*
* @return a singleton DataSet<Boolean> stating whether a graph is valid
* @return a Boolean stating whether a graph is valid
* with respect to its vertex ids.
*/
@Override
public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
DataSet<Tuple1<K>> edgeIds = graph.getEdges()
.flatMap(new MapEdgeIds<K, EV>()).distinct();
DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);

return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()),
graph.getContext()).map(new InvalidIdsMap());
return invalidIds.map(new KToTupleMap<K>()).count() == 0;
}

private static final class MapEdgeIds<K extends Comparable<K> & Serializable, EV extends Serializable>
Expand Down
Expand Up @@ -62,7 +62,7 @@ public void testWithConnectedDirected() throws Exception {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);

graph.isWeaklyConnected(10).writeAsText(resultPath);
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);

env.execute();
expectedResult = "true\n";
Expand All @@ -78,7 +78,7 @@ public void testWithDisconnectedDirected() throws Exception {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);

graph.isWeaklyConnected(10).writeAsText(resultPath);
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);

env.execute();
expectedResult = "false\n";
Expand All @@ -94,7 +94,7 @@ public void testWithConnectedUndirected() throws Exception {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();

graph.isWeaklyConnected(10).writeAsText(resultPath);
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);

env.execute();
expectedResult = "true\n";
Expand All @@ -110,7 +110,7 @@ public void testWithDisconnectedUndirected() throws Exception {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();

graph.isWeaklyConnected(10).writeAsText(resultPath);
env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);

env.execute();
expectedResult = "false\n";
Expand Down
Expand Up @@ -123,9 +123,9 @@ public void testValidate() throws Exception {
DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);

Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());

result.writeAsText(resultPath);
env.fromElements(result).writeAsText(resultPath);
env.execute();

expectedResult = "true\n";
Expand All @@ -141,8 +141,8 @@ public void testValidateWithInvalidIds() throws Exception {
DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);

Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
result.writeAsText(resultPath);
Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
env.fromElements(result).writeAsText(resultPath);
env.execute();

expectedResult = "false\n";
Expand Down
Expand Up @@ -180,7 +180,7 @@ public void testNumberOfVertices() throws Exception {

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph.numberOfVertices().writeAsText(resultPath);
env.fromElements(graph.numberOfVertices()).writeAsText(resultPath);

env.execute();
expectedResult = "5";
Expand All @@ -195,7 +195,7 @@ public void testNumberOfEdges() throws Exception {

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph.numberOfEdges().writeAsText(resultPath);
env.fromElements(graph.numberOfEdges()).writeAsText(resultPath);

env.execute();
expectedResult = "7";
Expand Down

0 comments on commit 9db170f

Please sign in to comment.