Skip to content

Commit

Permalink
[FLINK-1514] [gelly] Removed GGC example, added Connected Components …
Browse files Browse the repository at this point in the history
…instead
  • Loading branch information
balidani authored and vasia committed Apr 25, 2015
1 parent 722719f commit e1f56e9
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 81 deletions.
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
Expand All @@ -33,14 +32,13 @@
import org.apache.flink.graph.gsa.GatherSumApplyIteration; import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.RichEdge; import org.apache.flink.graph.gsa.RichEdge;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;


import java.util.HashSet;

/** /**
* This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration * This is an implementation of the connected components algorithm, using a gather-sum-apply iteration
*/ */
public class GSAGreedyGraphColoringExample implements ProgramDescription { public class GSAConnectedComponentsExample implements ProgramDescription {


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Program // Program
Expand All @@ -54,27 +52,27 @@ public static void main(String[] args) throws Exception {


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env); DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env); DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);


Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env); Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);


// Gather the target vertices into a one-element set // Simply return the vertex value of each vertex
GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather(); GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();


// Merge the sets between neighbors // Select the lower value among neighbors
SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum(); SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();


// Find the minimum vertex id in the set which will be propagated // Set the lower value for each vertex
ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply(); ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();


// Execute the GSA iteration // Execute the GSA iteration
GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration = GatherSumApplyIteration<Long, Long, NullValue, Long> iteration =
graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations); graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration); Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(iteration);


// Extract the vertices as the result // Extract the vertices as the result
DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices(); DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();


// emit result // emit result
if (fileOutput) { if (fileOutput) {
Expand All @@ -83,52 +81,38 @@ public static void main(String[] args) throws Exception {
greedyGraphColoring.print(); greedyGraphColoring.print();
} }


env.execute("GSA Greedy Graph Coloring"); env.execute("GSA Connected Components");
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Greedy Graph Coloring UDFs // Connected Components UDFs
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


private static final class GreedyGraphColoringGather private static final class ConnectedComponentsGather
extends GatherFunction<Double, Double, HashSet<Double>> { extends GatherFunction<Long, NullValue, Long> {
@Override @Override
public HashSet<Double> gather(RichEdge<Double, Double> richEdge) { public Long gather(RichEdge<Long, NullValue> richEdge) {

HashSet<Double> result = new HashSet<Double>();
result.add(richEdge.getSrcVertexValue());


return result; return richEdge.getSrcVertexValue();
} }
}; };


private static final class GreedyGraphColoringSum private static final class ConnectedComponentsSum
extends SumFunction<Double, Double, HashSet<Double>> { extends SumFunction<Long, NullValue, Long> {
@Override @Override
public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) { public Long sum(Long newValue, Long currentValue) {


HashSet<Double> result = new HashSet<Double>(); return Math.min(newValue, currentValue);
result.addAll(newValue);
result.addAll(currentValue);

return result;
} }
}; };


private static final class GreedyGraphColoringApply private static final class ConnectedComponentsApply
extends ApplyFunction<Double, Double, HashSet<Double>> { extends ApplyFunction<Long, NullValue, Long> {
@Override @Override
public void apply(HashSet<Double> set, Double src) { public void apply(Long summedValue, Long origValue) {
double minValue = src;
for (Double d : set) {
if (d < minValue) {
minValue = d;
}
}


// This is the condition that enables the termination of the iteration if (summedValue < origValue) {
if (minValue < src) { setResult(summedValue);
setResult(minValue);
} }
} }
}; };
Expand All @@ -151,7 +135,7 @@ private static boolean parseParameters(String[] args) {
fileOutput = true; fileOutput = true;


if(args.length != 4) { if(args.length != 4) {
System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " + System.err.println("Usage: GSAConnectedComponentsExample <vertex path> <edge path> " +
"<result path> <max iterations>"); "<result path> <max iterations>");
return false; return false;
} }
Expand All @@ -161,57 +145,57 @@ private static boolean parseParameters(String[] args) {
outputPath = args[2]; outputPath = args[2];
maxIterations = Integer.parseInt(args[3]); maxIterations = Integer.parseInt(args[3]);
} else { } else {
System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data."); System.out.println("Executing GSA Connected Components example with built-in default data.");
System.out.println(" Provide parameters to read input data from files."); System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files."); System.out.println(" See the documentation for the correct format of input files.");
System.out.println(" Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " System.out.println(" Usage: GSAConnectedComponentsExample <vertex path> <edge path> "
+ "<result path> <max iterations>"); + "<result path> <max iterations>");
} }
return true; return true;
} }


private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) { private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
if(fileOutput) { if(fileOutput) {
return env return env
.readCsvFile(vertexInputPath) .readCsvFile(vertexInputPath)
.fieldDelimiter(" ") .fieldDelimiter(" ")
.lineDelimiter("\n") .lineDelimiter("\n")
.types(Long.class, Double.class) .types(Long.class, Long.class)
.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() { .map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
@Override @Override
public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception { public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
return new Vertex<Long, Double>(value.f0, value.f1); return new Vertex<Long, Long>(value.f0, value.f1);
} }
}); });
} }


return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Double>>() { return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Long>>() {
@Override @Override
public Vertex<Long, Double> map(Long value) throws Exception { public Vertex<Long, Long> map(Long value) throws Exception {
return new Vertex<Long, Double>(value, (double) value); return new Vertex<Long, Long>(value, value);
} }
}); });
} }


private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) { private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
if(fileOutput) { if(fileOutput) {
return env.readCsvFile(edgeInputPath) return env.readCsvFile(edgeInputPath)
.fieldDelimiter(" ") .fieldDelimiter(" ")
.lineDelimiter("\n") .lineDelimiter("\n")
.types(Long.class, Long.class, Double.class) .types(Long.class, Long.class)
.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() { .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
@Override @Override
public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception { public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
return new Edge<Long, Double>(value.f0, value.f1, value.f2); return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
} }
}); });
} }


return env.generateSequence(0, 5).flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() { // Generates 3 components of size 2
return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
@Override @Override
public void flatMap(Long value, Collector<Edge<Long, Double>> out) throws Exception { public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
out.collect(new Edge<Long, Double>(value, (value + 1) % 6, 0.0)); out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
out.collect(new Edge<Long, Double>(value, (value + 2) % 6, 0.0));
} }
}); });
} }
Expand Down
Expand Up @@ -23,7 +23,8 @@
import java.io.Serializable; import java.io.Serializable;


/** /**
* A wrapper around Tuple3<VV, EV, VV> for convenience in the GatherFunction * This class represents a <sourceVertex, edge> pair
* This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
* @param <VV> the vertex value type * @param <VV> the vertex value type
* @param <EV> the edge value type * @param <EV> the edge value type
*/ */
Expand Down
Expand Up @@ -20,7 +20,7 @@


import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.io.Files; import com.google.common.io.Files;
import org.apache.flink.graph.example.GSAGreedyGraphColoringExample; import org.apache.flink.graph.example.GSAConnectedComponentsExample;
import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample; import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After; import org.junit.After;
Expand Down Expand Up @@ -68,17 +68,18 @@ public void after() throws Exception{
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Greedy Graph Coloring Test // Connected Components Test
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


@Test @Test
public void testGreedyGraphColoring() throws Exception { public void testGreedyGraphColoring() throws Exception {
GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"}); GSAConnectedComponentsExample.main(new String[]{verticesPath, edgesPath, resultPath, "16"});
expectedResult = "1 1.0\n" + expectedResult = "1 1\n" +
"2 1.0\n" + "2 1\n" +
"3 1.0\n" + "3 1\n" +
"4 1.0\n" + "4 1\n" +
"5 1.0\n"; "5 1\n" +
"6 6\n";


} }


Expand All @@ -93,19 +94,21 @@ public void testSingleSourceShortestPath() throws Exception {
"2 12.0\n" + "2 12.0\n" +
"3 13.0\n" + "3 13.0\n" +
"4 47.0\n" + "4 47.0\n" +
"5 48.0\n"; "5 48.0\n" +
"6 Infinity\n";
} }




// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Sample data // Sample data
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


private static final String VERTICES = "1 1.0\n" + private static final String VERTICES = "1 1\n" +
"2 2.0\n" + "2 2\n" +
"3 3.0\n" + "3 3\n" +
"4 4.0\n" + "4 4\n" +
"5 5.0\n"; "5 5\n" +
"6 6\n";


private static final String EDGES = "1 2 12.0\n" + private static final String EDGES = "1 2 12.0\n" +
"1 3 13.0\n" + "1 3 13.0\n" +
Expand Down

0 comments on commit e1f56e9

Please sign in to comment.