Permalink
Browse files

[#951] add degree distribution and average degree to sampling statist…

…ics (#962)

fixes #951
  • Loading branch information...
matthiastaeschner authored and ChrizZz110 committed Aug 30, 2018
1 parent 9df800b commit 4ff3ba0762b05b5895bcd8980be68f26e3f38d14
Showing with 687 additions and 1 deletion.
  1. +69 −0 gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/AverageDegreeRunner.java
  2. +70 −0 ...oop-examples/src/main/java/org/gradoop/utils/sampling/statistics/AverageIncomingDegreeRunner.java
  3. +70 −0 ...oop-examples/src/main/java/org/gradoop/utils/sampling/statistics/AverageOutgoingDegreeRunner.java
  4. +4 −1 gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/GraphDensityRunner.java
  5. +11 −0 gradoop-examples/src/main/java/org/gradoop/utils/sampling/statistics/SamplingStatisticsRunner.java
  6. +58 −0 ...flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageDegree.java
  7. +58 −0 ...c/main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageIncomingDegree.java
  8. +58 −0 ...c/main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageOutgoingDegree.java
  9. +36 −0 .../java/org/gradoop/flink/model/impl/operators/sampling/statistics/SamplingEvaluationConstants.java
  10. +55 −0 ...ink/model/impl/operators/sampling/statistics/functions/AddSumDegreesToGraphHeadCrossFunction.java
  11. +57 −0 .../org/gradoop/flink/model/impl/operators/sampling/statistics/functions/CalculateAverageDegree.java
  12. +47 −0 ...k/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageDegreeTest.java
  13. +47 −0 ...st/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageIncomingDegreeTest.java
  14. +47 −0 ...st/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageOutgoingDegreeTest.java
@@ -0,0 +1,69 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageDegree;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;
/**
* Calls the average degree computation for a logical graph. Writes the result to a csv-file
* named {@value SamplingEvaluationConstants#FILE_AVERAGE_DEGREE}
* in the output directory, containing a single line with the average degree value, e.g.:
* <pre>
* BOF
* 4
* EOF
* </pre>
*/
public class AverageDegreeRunner extends AbstractRunner implements ProgramDescription {
/**
* Calls the average degree computation for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
*/
public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);
DataSet<Long> averageDegree = graph.callForGraph(new AverageDegree()).getGraphHead()
.map(gh -> gh.getPropertyValue(SamplingEvaluationConstants.PROPERTY_KEY_AVERAGE_DEGREE)
.getLong());
StatisticWriter.writeCSV(averageDegree.map(new ObjectTo1<>()),
appendSeparator(args[2]) + SamplingEvaluationConstants.FILE_AVERAGE_DEGREE);
getExecutionEnvironment().execute("Sampling Statistics: Average degree");
}
@Override
public String getDescription() {
return AverageDegreeRunner.class.getName();
}
}
@@ -0,0 +1,70 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageIncomingDegree;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;
/**
* Calls the average incoming degree computation for a logical graph. Writes the result to a
* csv-file named {@value SamplingEvaluationConstants#FILE_AVERAGE_INCOMING_DEGREE}
* in the output directory, containing a single line with the average incoming degree value, e.g.:
* <pre>
* BOF
* 4
* EOF
* </pre>
*/
public class AverageIncomingDegreeRunner extends AbstractRunner implements ProgramDescription {
/**
* Calls the average incoming degree computation for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
*/
public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);
DataSet<Long> averageIncomingDegree = graph.callForGraph(new AverageIncomingDegree())
.getGraphHead()
.map(gh -> gh.getPropertyValue(
SamplingEvaluationConstants.PROPERTY_KEY_AVERAGE_INCOMING_DEGREE).getLong());
StatisticWriter.writeCSV(averageIncomingDegree.map(new ObjectTo1<>()),
appendSeparator(args[2]) + SamplingEvaluationConstants.FILE_AVERAGE_INCOMING_DEGREE);
getExecutionEnvironment().execute("Sampling Statistics: Average incoming degree");
}
@Override
public String getDescription() {
return AverageIncomingDegreeRunner.class.getName();
}
}
@@ -0,0 +1,70 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageOutgoingDegree;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;
/**
* Calls the average outgoing degree computation for a logical graph. Writes the result to a
* csv-file named {@value SamplingEvaluationConstants#FILE_AVERAGE_OUTGOING_DEGREE}
* in the output directory, containing a single line with the average outgoing degree value, e.g.:
* <pre>
* BOF
* 4
* EOF
* </pre>
*/
public class AverageOutgoingDegreeRunner extends AbstractRunner implements ProgramDescription {
/**
* Calls the average outgoing degree computation for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
*/
public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);
DataSet<Long> averageOutgoingDegree = graph.callForGraph(new AverageOutgoingDegree())
.getGraphHead()
.map(gh -> gh.getPropertyValue(
SamplingEvaluationConstants.PROPERTY_KEY_AVERAGE_OUTGOING_DEGREE).getLong());
StatisticWriter.writeCSV(averageOutgoingDegree.map(new ObjectTo1<>()),
appendSeparator(args[2]) + SamplingEvaluationConstants.FILE_AVERAGE_OUTGOING_DEGREE);
getExecutionEnvironment().execute("Sampling Statistics: Average outgoing degree");
}
@Override
public String getDescription() {
return AverageOutgoingDegreeRunner.class.getName();
}
}
@@ -28,19 +28,22 @@
* Calls the graph density computation for a logical graph.
* Writes the result to a csv-file named {@value SamplingEvaluationConstants#FILE_DENSITY} in
* the output directory, containing a single line with the graph density value, e.g.:
*
* <pre>
* BOF
* 0.281
* EOF
* </pre>
*/
public class GraphDensityRunner extends AbstractRunner implements ProgramDescription {
/**
* Calls the graph density computation for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
@@ -17,6 +17,9 @@
import org.apache.flink.api.common.ProgramDescription;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.utils.statistics.VertexDegreeDistributionRunner;
import org.gradoop.utils.statistics.VertexIncomingDegreeDistributionRunner;
import org.gradoop.utils.statistics.VertexOutgoingDegreeDistributionRunner;
/**
* Calls the computation of all given graph properties for a logical graph. Results are written
@@ -28,15 +31,23 @@
* Calls the computation of all given graph properties.
* (List of called graph properties will be extended over time)
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception if something goes wrong
*/
public static void main(String[] args) throws Exception {
GraphDensityRunner.main(args);
VertexDegreeDistributionRunner.main(args);
VertexOutgoingDegreeDistributionRunner.main(args);
VertexIncomingDegreeDistributionRunner.main(args);
AverageDegreeRunner.main(args);
AverageIncomingDegreeRunner.main(args);
AverageOutgoingDegreeRunner.main(args);
}
@Override
@@ -0,0 +1,58 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.operators.aggregation.functions.count.VertexCount;
import org.gradoop.flink.model.impl.operators.sampling.statistics.functions.AddSumDegreesToGraphHeadCrossFunction;
import org.gradoop.flink.model.impl.operators.sampling.statistics.functions.CalculateAverageDegree;
import org.gradoop.flink.model.impl.operators.statistics.VertexDegrees;
/**
* Calculates the average degree of a graph and writes it to the graph head.
* Uses: {@code ceiling( sum(vertex degrees) / |vertices| )}
*/
public class AverageDegree implements UnaryGraphToGraphOperator {
/**
* Calculates the average degree of the input graph and writes it to the graph head.
*
* @param graph the input graph
* @return LogicalGraph with the average degree value written to the graph head.
*/
@Override
public LogicalGraph execute(LogicalGraph graph) {
graph = graph.aggregate(new VertexCount());
DataSet<GraphHead> newGraphHead = new VertexDegrees().execute(graph)
.sum(1)
.crossWithTiny(graph.getGraphHead().first(1))
.with(new AddSumDegreesToGraphHeadCrossFunction(
SamplingEvaluationConstants.PROPERTY_KEY_SUM_DEGREES))
.map(new CalculateAverageDegree(
SamplingEvaluationConstants.PROPERTY_KEY_AVERAGE_DEGREE));
return graph.getConfig().getLogicalGraphFactory()
.fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges());
}
@Override
public String getName() {
return AverageDegree.class.getName();
}
}
@@ -0,0 +1,58 @@
/*
* Copyright © 2014 - 2018 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.sampling.statistics;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.GraphHead;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
import org.gradoop.flink.model.impl.operators.aggregation.functions.count.VertexCount;
import org.gradoop.flink.model.impl.operators.sampling.statistics.functions.AddSumDegreesToGraphHeadCrossFunction;
import org.gradoop.flink.model.impl.operators.sampling.statistics.functions.CalculateAverageDegree;
import org.gradoop.flink.model.impl.operators.statistics.IncomingVertexDegrees;
/**
* Calculates the average incoming degree of a graph and writes it to the graph head.
* Uses: {@code ceiling( sum(vertex incoming degrees) / |vertices| )}
*/
public class AverageIncomingDegree implements UnaryGraphToGraphOperator {
/**
* Calculates the average incoming degree of the input graph and writes it to the graph head.
*
* @param graph the input graph
* @return LogicalGraph with the average incoming degree value written to the graph head.
*/
@Override
public LogicalGraph execute(LogicalGraph graph) {
graph = graph.aggregate(new VertexCount());
DataSet<GraphHead> newGraphHead = new IncomingVertexDegrees().execute(graph)
.sum(1)
.crossWithTiny(graph.getGraphHead().first(1))
.with(new AddSumDegreesToGraphHeadCrossFunction(
SamplingEvaluationConstants.PROPERTY_KEY_SUM_DEGREES))
.map(new CalculateAverageDegree(
SamplingEvaluationConstants.PROPERTY_KEY_AVERAGE_INCOMING_DEGREE));
return graph.getConfig().getLogicalGraphFactory()
.fromDataSets(newGraphHead, graph.getVertices(), graph.getEdges());
}
@Override
public String getName() {
return AverageIncomingDegree.class.getName();
}
}
Oops, something went wrong.

0 comments on commit 4ff3ba0

Please sign in to comment.