Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

import java.text.NumberFormat;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Driver for the library implementations of Global and Local Clustering Coefficient.
*
Expand Down Expand Up @@ -89,12 +91,15 @@ public static void main(String[] args) throws Exception {
env.getConfig().enableObjectReuse();

ParameterTool parameters = ParameterTool.fromArgs(args);

if (! parameters.has("directed")) {
printUsage();
return;
}
boolean directedAlgorithm = parameters.getBoolean("directed");

int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);

// global and local clustering coefficient results
GraphAnalytic gcc;
DataSet lcc;
Expand All @@ -120,14 +125,18 @@ public static void main(String[] args) throws Exception {

if (directedAlgorithm) {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
} else {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
}
} break;

Expand All @@ -137,14 +146,18 @@ public static void main(String[] args) throws Exception {

if (directedAlgorithm) {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
} else {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
}
} break;

Expand All @@ -164,51 +177,66 @@ public static void main(String[] args) throws Exception {
long edgeCount = vertexCount * edgeFactor;

Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.setParallelism(little_parallelism)
.generate();

if (directedAlgorithm) {
if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
.setParallelism(little_parallelism));

gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
.setIncludeZeroDegreeVertices(false)
.setLittleParallelism(little_parallelism));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
.setParallelism(little_parallelism))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
.setParallelism(little_parallelism));

gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
.setIncludeZeroDegreeVertices(false)
.setLittleParallelism(little_parallelism));
}
} else {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);

if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
.setParallelism(little_parallelism));

gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
.setIncludeZeroDegreeVertices(false)
.setLittleParallelism(little_parallelism));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
.setParallelism(little_parallelism))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
.setParallelism(little_parallelism));

gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
.setIncludeZeroDegreeVertices(false)
.setLittleParallelism(little_parallelism));
}
}
} break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

import java.text.NumberFormat;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Driver for the library implementation of Jaccard Index.
*
Expand Down Expand Up @@ -87,6 +89,8 @@ public static void main(String[] args) throws Exception {

ParameterTool parameters = ParameterTool.fromArgs(args);

int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);

DataSet ji;

switch (parameters.get("input", "")) {
Expand All @@ -107,13 +111,15 @@ public static void main(String[] args) throws Exception {
case "integer": {
ji = reader
.keyType(LongValue.class)
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
} break;

case "string": {
ji = reader
.keyType(StringValue.class)
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
} break;

default:
Expand All @@ -131,20 +137,26 @@ public static void main(String[] args) throws Exception {
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;

boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);

Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.setParallelism(little_parallelism)
.generate();

boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);

if (scale > 32) {
ji = graph
.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>());
.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
.setParallelism(little_parallelism))
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
} else {
ji = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>());
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue())
.setParallelism(little_parallelism))
.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
.setParallelism(little_parallelism))
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
}
} break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {

EdgeDegreesPair rhs = (EdgeDegreesPair) other;

parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {

EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;

parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {

EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;

parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
// merge configurations

includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down Expand Up @@ -141,7 +142,7 @@ public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
.equalTo(0)
.with(new JoinVertexWithVertexDegrees<K, VV>())
.setParallelism(parallelism)
.name("Join zero degree vertices");
.name("Zero degree vertices");
}

return vertexDegrees;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
// merge configurations

includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand All @@ -114,7 +115,7 @@ public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
.getEdges()
.map(new MapEdgeToTargetId<K, EV>())
.setParallelism(parallelism)
.name("Map edge to target ID");
.name("Edge to target ID");

// t, d(t)
DataSet<Vertex<K, LongValue>> targetDegree = targetIds
Expand All @@ -131,7 +132,7 @@ public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
.equalTo(0)
.with(new JoinVertexWithVertexDegree<K, VV>())
.setParallelism(parallelism)
.name("Join zero degree vertices");
.name("Zero degree vertices");
}

return targetDegree;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
// merge configurations

includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand All @@ -114,7 +115,7 @@ public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
.getEdges()
.map(new MapEdgeToSourceId<K, EV>())
.setParallelism(parallelism)
.name("Map edge to source ID");
.name("Edge to source ID");

// s, d(s)
DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
Expand All @@ -131,7 +132,7 @@ public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
.equalTo(0)
.with(new JoinVertexWithVertexDegree<K, VV>())
.setParallelism(parallelism)
.name("Join zero degree vertices");
.name("Zero degree vertices");
}

return sourceDegree;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
EdgeDegreePair rhs = (EdgeDegreePair) other;

reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
EdgeSourceDegree rhs = (EdgeSourceDegree) other;

reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
EdgeTargetDegree rhs = (EdgeTargetDegree) other;

reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {

includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand All @@ -138,7 +139,7 @@ public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
.getEdges()
.map(mapEdgeToId)
.setParallelism(parallelism)
.name("Map edge to vertex ID");
.name("Edge to vertex ID");

// v, deg(v)
DataSet<Vertex<K, LongValue>> degree = vertexIds
Expand All @@ -156,7 +157,7 @@ public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
.equalTo(0)
.with(new JoinVertexWithVertexDegree<K, VV>())
.setParallelism(parallelism)
.name("Join zero degree vertices");
.name("Zero degree vertices");
}

return degree;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {

reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
parallelism = Math.min(parallelism, rhs.parallelism);
parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));

return true;
}
Expand Down
Loading