/
AverageIncomingDegreeRunner.java
70 lines (62 loc) · 2.59 KB
/
AverageIncomingDegreeRunner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.statistics;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.statistics.AverageIncomingDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;
/**
* Calls the average incoming degree computation for a logical graph. Writes the result to a
* csv-file named {@value SamplingEvaluationConstants#FILE_AVERAGE_INCOMING_DEGREE}
* in the output directory, containing a single line with the average incoming degree value, e.g.:
* <pre>
* BOF
* 4
* EOF
* </pre>
*/
public class AverageIncomingDegreeRunner extends AbstractRunner implements ProgramDescription {
/**
* Calls the average incoming degree computation for the graph.
*
* <pre>
* args[0] - path to graph
* args[1] - format of graph (csv, json, indexed)
* args[2] - output path
* </pre>
*
* @param args command line arguments
* @throws Exception in case of read/write failure
*/
public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);
DataSet<Long> averageIncomingDegree = graph.callForGraph(new AverageIncomingDegree())
.getGraphHead()
.map(gh -> gh.getPropertyValue(
SamplingEvaluationConstants.PROPERTY_KEY_AVERAGE_INCOMING_DEGREE).getLong());
StatisticWriter.writeCSV(averageIncomingDegree.map(new ObjectTo1<>()),
appendSeparator(args[2]) + SamplingEvaluationConstants.FILE_AVERAGE_INCOMING_DEGREE);
getExecutionEnvironment().execute("Sampling Statistics: Average incoming degree");
}
@Override
public String getDescription() {
return AverageIncomingDegreeRunner.class.getName();
}
}