Permalink
Browse files

[#1045] add runner classes for clustering coefficient computation and…

… triangle counting (#1047)

fixes #1045
  • Loading branch information...
matthiastaeschner authored and galpha committed Nov 5, 2018
1 parent 704925d commit 77c75bcb2c9dbb51aa0ee86250f065a67e4414d4
Showing with 967 additions and 224 deletions.
  1. +75 −0 ...mples/src/main/java/org/gradoop/utils/sampling/statistics/AverageClusteringCoefficientRunner.java
  2. +74 −0 ...amples/src/main/java/org/gradoop/utils/sampling/statistics/GlobalClusteringCoefficientRunner.java
  3. +3 −0 gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/SamplingStatisticsRunner.java
  4. +69 −0 gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/TriangleCountingRunner.java
  5. +4 −10 ...main/java/org/gradoop/flink/algorithms/gelly/clusteringcoefficient/ClusteringCoefficientBase.java
  6. +10 −29 ...ficient/{GellyClusteringCoefficientDirected.java → GellyGlobalClusteringCoefficientDirected.java}
  7. +11 −29 ...ent/{GellyClusteringCoefficientUndirected.java → GellyGlobalClusteringCoefficientUndirected.java}
  8. +68 −0 ...gradoop/flink/algorithms/gelly/clusteringcoefficient/GellyLocalClusteringCoefficientDirected.java
  9. +69 −0 ...adoop/flink/algorithms/gelly/clusteringcoefficient/GellyLocalClusteringCoefficientUndirected.java
  10. +7 −1 ...adoop/flink/algorithms/gelly/clusteringcoefficient/functions/LocalDirectedCCResultToTupleMap.java
  11. +7 −1 ...oop/flink/algorithms/gelly/clusteringcoefficient/functions/LocalUndirectedCCResultToTupleMap.java
  12. +1 −1 .../main/java/org/gradoop/flink/model/impl/operators/sampling/RandomLimitedDegreeVertexSampling.java
  13. +1 −1 ...src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomNonUniformVertexSampling.java
  14. +1 −1 ...flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomVertexEdgeSampling.java
  15. +1 −1 ...c/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomVertexNeighborhoodSampling.java
  16. +3 −3 ...oop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomVertexSampling.java
  17. +62 −0 ...java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageClusteringCoefficient.java
  18. +15 −0 .../java/org/gradoop/flink/model/impl/operators/sampling/statistics/SamplingEvaluationConstants.java
  19. +68 −0 ...oop/flink/model/impl/operators/sampling/statistics/functions/AddAverageCCValueToGraphHeadMap.java
  20. +27 −86 .../org/gradoop/flink/algorithms/gelly/clusteringcoefficient/GellyClusteringCoefficientTestBase.java
  21. +87 −0 ...op/flink/algorithms/gelly/clusteringcoefficient/GellyGlobalClusteringCoefficientDirectedTest.java
  22. +91 −0 .../flink/algorithms/gelly/clusteringcoefficient/GellyGlobalClusteringCoefficientUndirectedTest.java
  23. +39 −32 .../{GellyClusteringCoefficientDirectedTest.java → GellyLocalClusteringCoefficientDirectedTest.java}
  24. +37 −29 ...llyClusteringCoefficientUndirectedTest.java → GellyLocalClusteringCoefficientUndirectedTest.java}
  25. +137 −0 .../org/gradoop/flink/model/impl/operators/sampling/statistics/AverageClusteringCoefficientTest.java
@@ -0,0 +1,75 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.algorithms.gelly.clusteringcoefficient.GellyLocalClusteringCoefficientDirected;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageClusteringCoefficient;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;
/**
* Calls the computation of the average clustering coefficient for a directed logical graph.
* Uses {@link AverageClusteringCoefficient} which calls the Gradoop-Wrapper
* {@link GellyLocalClusteringCoefficientDirected} of Flinks ClusteringCoefficient-algorithm.
* Writes the average value to a csv-file named
* {@value SamplingEvaluationConstants#FILE_CLUSTERING_COEFFICIENT_AVERAGE} in the output directory,
* e.g.:
* <pre>
* BOF
* 0.2916
* EOF
* </pre>
*/
public class AverageClusteringCoefficientRunner
extends AbstractRunner implements ProgramDescription {
/**
* Calls the computation of the average clustering coefficient for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
*/
public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);
graph = new AverageClusteringCoefficient().execute(graph);
DataSet<Double> average = graph.getGraphHead().map(
gh -> gh.getPropertyValue(AverageClusteringCoefficient.PROPERTY_KEY_AVERAGE).getDouble());
StatisticWriter.writeCSV(average.map(new ObjectTo1<>()), appendSeparator(args[2]) +
SamplingEvaluationConstants.FILE_CLUSTERING_COEFFICIENT_AVERAGE);
getExecutionEnvironment().execute("Sampling Statistics: Average clustering coefficient");
}
@Override
public String getDescription() {
return AverageClusteringCoefficientRunner.class.getName();
}
}
@@ -0,0 +1,74 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.algorithms.gelly.clusteringcoefficient.ClusteringCoefficientBase;
import org.gradoop.flink.algorithms.gelly.clusteringcoefficient.GellyGlobalClusteringCoefficientDirected;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;
/**
* Calls the computation of the global clustering coefficient for a directed logical graph.
* Uses the Gradoop-Wrapper {@link GellyGlobalClusteringCoefficientDirected} of Flinks
* ClusteringCoefficient-algorithm. Writes the global value to a csv-file named
* {@value SamplingEvaluationConstants#FILE_CLUSTERING_COEFFICIENT_GLOBAL} in the output directory,
* e.g.:
* <pre>
* BOF
* 0.2916
* EOF
* </pre>
*/
public class GlobalClusteringCoefficientRunner
extends AbstractRunner implements ProgramDescription {
/**
* Calls the computation of the global clustering coefficient for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
*/
public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);
graph = new GellyGlobalClusteringCoefficientDirected().execute(graph);
DataSet<Double> global = graph.getGraphHead()
.map(gh -> gh.getPropertyValue(ClusteringCoefficientBase.PROPERTY_KEY_GLOBAL).getDouble());
StatisticWriter.writeCSV(global.map(new ObjectTo1<>()), appendSeparator(args[2]) +
SamplingEvaluationConstants.FILE_CLUSTERING_COEFFICIENT_GLOBAL);
getExecutionEnvironment().execute("Sampling Statistics: Global clustering coefficient");
}
@Override
public String getDescription() {
return AverageClusteringCoefficientRunner.class.getName();
}
}
@@ -49,6 +49,9 @@ public static void main(String[] args) throws Exception {
AverageIncomingDegreeRunner.main(args);
AverageOutgoingDegreeRunner.main(args);
ConnectedComponentsDistributionRunner.main(args);
AverageClusteringCoefficientRunner.main(args);
GlobalClusteringCoefficientRunner.main(args);
TriangleCountingRunner.main(args);
}
@Override
@@ -0,0 +1,69 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.algorithms.gelly.trianglecounting.GellyTriangleCounting;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;
/**
* Calls the computation of the triangle count (closed triplets) for a logical graph.
* Uses the Gradoop-Wrapper {@link GellyTriangleCounting} of Flinks TriangleEnumerator-algorithm.
* Writes the value to a csv-file named {@value SamplingEvaluationConstants#FILE_TRIANGLE_COUNT}
* in the output directory, e.g.:
* <pre>
* BOF
* 8
* EOF
* </pre>
*/
public class TriangleCountingRunner extends AbstractRunner implements ProgramDescription {
/**
* Calls the computation of the triangle count (closed triplets) for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
*/
public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);
DataSet<Long> triangleCount = new GellyTriangleCounting().execute(graph).getGraphHead()
.map(gh -> gh.getPropertyValue(GellyTriangleCounting.PROPERTY_KEY_TRIANGLES).getLong());
StatisticWriter.writeCSV(triangleCount.map(new ObjectTo1<>()),
appendSeparator(args[2]) + SamplingEvaluationConstants.FILE_TRIANGLE_COUNT);
getExecutionEnvironment().execute("Sampling Statistics: Triangle count");
}
@Override
public String getDescription() {
return TriangleCountingRunner.class.getName();
}
}
@@ -25,12 +25,11 @@
/**
* Base class for Gradoop EPGM model wrapper for Flink Gellys implementation of the clustering
* coefficient algorithm. Implementations compute the local, average and global clustering
* coefficient of a graph, where:
* coefficient algorithm. Implementations compute the local and global clustering coefficient of
* a graph, where:
* <pre>
* local - connectedness of a single vertex regarding the connections of its neighborhood, with
* value between 0.0 (no edges between neighbors) and 1.0 (neighbors fully connected)
* average - mean over all local values
* global - connectedness of the graph as ratio from closed triplets (triangles) to all triplets
* with value between 0.0 (no closed triplets) and 1.0 (all triplets closed)
* </pre>
@@ -42,11 +41,6 @@
*/
public static final String PROPERTY_KEY_LOCAL = "clustering_coefficient_local";
/**
* Property key to access the average clustering coefficient value stored the graph head
*/
public static final String PROPERTY_KEY_AVERAGE = "clustering_coefficient_average";
/**
* Property key to access the global clustering coefficient value stored the graph head
*/
@@ -71,8 +65,8 @@ protected LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> gra
* Executes the computation of the clustering coefficient.
*
* @param gellyGraph Gelly graph with initialized vertices
* @return {@link LogicalGraph} with local values written to the vertices, average and global
* value written to the graph head
* @return {@link LogicalGraph} with local values written to the vertices or global value
* written to the graph head
* @throws Exception Thrown if the gelly algorithm fails
*/
protected abstract LogicalGraph executeInternal(Graph<GradoopId, NullValue, NullValue> gellyGraph)
@@ -17,75 +17,56 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient;
import org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient;
import org.apache.flink.types.NullValue;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.algorithms.gelly.clusteringcoefficient.functions.LocalCCResultTupleToVertexJoin;
import org.gradoop.flink.algorithms.gelly.clusteringcoefficient.functions.LocalDirectedCCResultToTupleMap;
import org.gradoop.flink.algorithms.gelly.functions.WritePropertyToGraphHeadMap;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
/**
* Gradoop EPGM model wrapper for Flink Gellys implementation of the clustering coefficient
* algorithm for directed graphs {@link org.apache.flink.graph.library.clustering.directed}.
* Returns the initial {@link LogicalGraph} with local values written to the vertices, average
* and global value written to the graph head.
* Gradoop EPGM model wrapper for Flink Gellys implementation of the global clustering coefficient
* algorithm for directed graphs
* {@link org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient}.
* Returns the initial {@link LogicalGraph} with global value written to the graph head.
*/
public class GellyClusteringCoefficientDirected extends ClusteringCoefficientBase {
public class GellyGlobalClusteringCoefficientDirected extends ClusteringCoefficientBase {
/**
* Creates an instance of the GellyClusteringCoefficientDirected wrapper class.
* Creates an instance of the GellyGlobalClusteringCoefficientDirected wrapper class.
* Calls constructor of super class {@link ClusteringCoefficientBase}.
*/
public GellyClusteringCoefficientDirected() {
public GellyGlobalClusteringCoefficientDirected() {
super();
}
/**
* {@inheritDoc}
*
* Calls Flink Gelly algorithms to compute the local, average and global clustering coefficient
* for a directed graph.
* Calls Flink Gelly algorithms to compute the global clustering coefficient for a directed graph.
*/
@Override
protected LogicalGraph executeInternal(Graph<GradoopId, NullValue, NullValue> gellyGraph)
throws Exception {
DataSet<Vertex> resultVertices = new org.apache.flink.graph.library.clustering.directed
.LocalClusteringCoefficient<GradoopId, NullValue, NullValue>().run(gellyGraph)
.map(new LocalDirectedCCResultToTupleMap())
.join(currentGraph.getVertices())
.where(0).equalTo(new Id<>())
.with(new LocalCCResultTupleToVertexJoin());
AverageClusteringCoefficient average = new org.apache.flink.graph.library.clustering.directed
.AverageClusteringCoefficient<GradoopId, NullValue, NullValue>().run(gellyGraph);
GlobalClusteringCoefficient global = new org.apache.flink.graph.library.clustering.directed
.GlobalClusteringCoefficient<GradoopId, NullValue, NullValue>().run(gellyGraph);
currentGraph.getConfig().getExecutionEnvironment().execute();
double averageValue = average.getResult().getAverageClusteringCoefficient();
double globalValue = global.getResult().getGlobalClusteringCoefficientScore();
DataSet<GraphHead> resultHead = currentGraph.getGraphHead()
.map(new WritePropertyToGraphHeadMap(ClusteringCoefficientBase.PROPERTY_KEY_AVERAGE,
PropertyValue.create(averageValue)))
.map(new WritePropertyToGraphHeadMap(ClusteringCoefficientBase.PROPERTY_KEY_GLOBAL,
PropertyValue.create(globalValue)));
return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
resultHead, resultVertices, currentGraph.getEdges());
resultHead, currentGraph.getVertices(), currentGraph.getEdges());
}
@Override
public String getName() {
return GellyClusteringCoefficientDirected.class.getName();
return GellyGlobalClusteringCoefficientDirected.class.getName();
}
}
Oops, something went wrong.

0 comments on commit 77c75bc

Please sign in to comment.