Skip to content

Commit

Permalink
[FLINK-6382] [gelly] Support additional types for generated graphs in…
Browse files Browse the repository at this point in the history
… Gelly examples

The Gelly examples current support IntValue, LongValue, and StringValue
for RMatGraph. Allow transformations and tests for all generated graphs
for ByteValue, Byte, ShortValue, Short, CharValue, Character, Integer,
Long, and String.

This closes #3779
  • Loading branch information
greghogan committed May 1, 2017
1 parent d49efbd commit 3369578
Show file tree
Hide file tree
Showing 38 changed files with 2,054 additions and 463 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.output.Print;
import org.apache.flink.graph.drivers.parameter.ChoiceParameter; import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase; import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.CopyableValue;


/** /**
* Driver for directed and undirected graph metrics analytics. * Driver for directed and undirected graph metrics analytics.
Expand All @@ -36,7 +35,7 @@
* @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
* @see org.apache.flink.graph.library.metric.undirected.VertexMetrics * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
*/ */
public class GraphMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV> public class GraphMetrics<K extends Comparable<K>, VV, EV>
extends ParameterizedBase extends ParameterizedBase
implements Driver<K, VV, EV>, Hash, Print { implements Driver<K, VV, EV>, Hash, Print {


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.CSV;
import org.apache.flink.graph.drivers.output.Hash; import org.apache.flink.graph.drivers.output.Hash;
import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.output.Print;
import org.apache.flink.graph.drivers.parameter.BooleanParameter;
import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
import org.apache.flink.types.CopyableValue; import org.apache.flink.types.CopyableValue;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
private LongParameter littleParallelism = new LongParameter(this, "little_parallelism") private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT); .setDefaultValue(PARALLELISM_DEFAULT);


private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results");

@Override @Override
public String getName() { public String getName() {
return this.getClass().getSimpleName(); return this.getClass().getSimpleName();
Expand Down Expand Up @@ -88,6 +91,7 @@ protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>() .run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
.setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue()) .setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue())
.setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue()) .setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue())
.setMirrorResults(mirrorResults.getValue())
.setLittleParallelism(lp)); .setLittleParallelism(lp));
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.LongValue; import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;


Expand All @@ -32,8 +31,7 @@
* Generate a {@link org.apache.flink.graph.generator.CompleteGraph}. * Generate a {@link org.apache.flink.graph.generator.CompleteGraph}.
*/ */
public class CompleteGraph public class CompleteGraph
extends ParameterizedBase extends GeneratedGraph<LongValue> {
implements Input<LongValue, NullValue, NullValue> {


private LongParameter vertexCount = new LongParameter(this, "vertex_count") private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT); .setMinimumValue(MINIMUM_VERTEX_COUNT);
Expand All @@ -48,11 +46,16 @@ public String getName() {


@Override @Override
public String getIdentity() { public String getIdentity() {
return getName() + " (" + vertexCount.getValue() + ")"; return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")";
} }


@Override @Override
public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { protected long vertexCount() {
return vertexCount.getValue();
}

@Override
protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue()) return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue())
.setParallelism(littleParallelism.getValue().intValue()) .setParallelism(littleParallelism.getValue().intValue())
.generate(); .generate();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.LongValue; import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;


Expand All @@ -32,8 +31,7 @@
* Generate a {@link org.apache.flink.graph.generator.CycleGraph}. * Generate a {@link org.apache.flink.graph.generator.CycleGraph}.
*/ */
public class CycleGraph public class CycleGraph
extends ParameterizedBase extends GeneratedGraph<LongValue> {
implements Input<LongValue, NullValue, NullValue> {


private LongParameter vertexCount = new LongParameter(this, "vertex_count") private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT); .setMinimumValue(MINIMUM_VERTEX_COUNT);
Expand All @@ -48,11 +46,16 @@ public String getName() {


@Override @Override
public String getIdentity() { public String getIdentity() {
return getName() + " (" + vertexCount + ")"; return getTypeName() + " " + getName() + " (" + vertexCount + ")";
} }


@Override @Override
public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { protected long vertexCount() {
return vertexCount.getValue();
}

@Override
public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue()) return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue())
.setParallelism(littleParallelism.getValue().intValue()) .setParallelism(littleParallelism.getValue().intValue())
.generate(); .generate();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.LongParameter; import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.LongValue; import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;


Expand All @@ -31,8 +30,7 @@
* Generate an {@link org.apache.flink.graph.generator.EmptyGraph}. * Generate an {@link org.apache.flink.graph.generator.EmptyGraph}.
*/ */
public class EmptyGraph public class EmptyGraph
extends ParameterizedBase extends GeneratedGraph<LongValue> {
implements Input<LongValue, NullValue, NullValue> {


private LongParameter vertexCount = new LongParameter(this, "vertex_count") private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT); .setMinimumValue(MINIMUM_VERTEX_COUNT);
Expand All @@ -44,11 +42,16 @@ public String getName() {


@Override @Override
public String getIdentity() { public String getIdentity() {
return getName() + " (" + vertexCount + ")"; return getTypeName() + " " + getName() + " (" + vertexCount + ")";
} }


@Override @Override
public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) { protected long vertexCount() {
return vertexCount.getValue();
}

@Override
public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue()) return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue())
.generate(); .generate();
} }
Expand Down
Loading

0 comments on commit 3369578

Please sign in to comment.