Skip to content
Permalink
Browse files

[#1200] sampling cleanup (#1210)

fixes #1200
  • Loading branch information...
galpha committed Apr 12, 2019
1 parent 2fb56bf commit 60e4b2b595814bb8e4930ac7872a97ffdf03cb71
Showing with 356 additions and 356 deletions.
  1. +1 −0 gradoop-examples/src/main/java/org/gradoop/benchmark/sampling/SamplingBenchmark.java
  2. +17 −1 gradoop-examples/src/main/java/org/gradoop/benchmark/sampling/SamplingBuilder.java
  3. +2 −2 gradoop-examples/src/main/java/org/gradoop/utils/sampling/RandomWalkSamplingRunner.java
  4. +10 −1 ...p-examples/src/main/java/org/gradoop/utils/sampling/{statistics → }/SamplingStatisticsRunner.java
  5. +3 −3 .../src/main/java/org/gradoop/utils/{sampling → }/statistics/AverageClusteringCoefficientRunner.java
  6. +3 −3 gradoop-examples/src/main/java/org/gradoop/utils/{sampling → }/statistics/AverageDegreeRunner.java
  7. +3 −3 ...xamples/src/main/java/org/gradoop/utils/{sampling → }/statistics/AverageIncomingDegreeRunner.java
  8. +3 −3 ...xamples/src/main/java/org/gradoop/utils/{sampling → }/statistics/AverageOutgoingDegreeRunner.java
  9. +3 −3 ...c/main/java/org/gradoop/utils/{sampling → }/statistics/ConnectedComponentsDistributionRunner.java
  10. +3 −3 ...oop-examples/src/main/java/org/gradoop/utils/{sampling → }/statistics/DegreeCentralityRunner.java
  11. +2 −2 ...s/src/main/java/org/gradoop/utils/{sampling → }/statistics/GlobalClusteringCoefficientRunner.java
  12. +3 −3 gradoop-examples/src/main/java/org/gradoop/utils/{sampling → }/statistics/GraphDensityRunner.java
  13. +2 −2 ...oop-examples/src/main/java/org/gradoop/utils/{sampling → }/statistics/TriangleCountingRunner.java
  14. +5 −5 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/randomjump/KRandomJumpGellyVCI.java
  15. +17 −18 .../org/gradoop/flink/algorithms/gelly/vertexdegrees/functions/DistinctVertexDegreesToAttribute.java
  16. +10 −7 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/PageRankSampling.java
  17. +5 −4 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomEdgeSampling.java
  18. +12 −11 .../main/java/org/gradoop/flink/model/impl/operators/sampling/RandomLimitedDegreeVertexSampling.java
  19. +19 −16 ...src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomNonUniformVertexSampling.java
  20. +4 −4 ...flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomVertexEdgeSampling.java
  21. +8 −7 ...c/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomVertexNeighborhoodSampling.java
  22. +4 −4 ...oop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomVertexSampling.java
  23. +12 −11 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/RandomWalkSampling.java
  24. +2 −65 gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/SamplingAlgorithm.java
  25. +93 −0 ...flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/common/SamplingConstants.java
  26. +7 −1 ...radoop/flink/model/impl/operators/sampling/{statistics → common}/SamplingEvaluationConstants.java
  27. +2 −2 ...oop-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/common}/package-info.java
  28. +9 −9 ...gradoop/flink/model/impl/operators/sampling/functions/AddPageRankScoresToVertexCrossFunction.java
  29. +9 −9 ...gradoop/flink/model/impl/operators/sampling/functions/FilterVerticesWithDegreeOtherThanGiven.java
  30. +6 −6 .../java/org/gradoop/flink/model/impl/operators/sampling/functions/NonUniformVertexRandomFilter.java
  31. +3 −3 ...in/java/org/gradoop/flink/model/impl/operators/sampling/functions/PageRankResultVertexFilter.java
  32. +7 −7 ...op/flink/model/impl/operators/sampling/functions/{VertexRandomMarkedMap.java → RandomVertex.java}
  33. +0 −45 ...p-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/functions/VertexWithId.java
  34. +0 −19 .../main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/functions/package-info.java
  35. +0 −19 ...-flink/src/main/java/org/gradoop/flink/model/impl/operators/sampling/statistics/package-info.java
  36. +2 −2 ...org/gradoop/flink/model/impl/operators/{sampling → }/statistics/AverageClusteringCoefficient.java
  37. +4 −4 .../src/main/java/org/gradoop/flink/model/impl/operators/{sampling → }/statistics/AverageDegree.java
  38. +4 −4 ...n/java/org/gradoop/flink/model/impl/operators/{sampling → }/statistics/AverageIncomingDegree.java
  39. +4 −4 ...n/java/org/gradoop/flink/model/impl/operators/{sampling → }/statistics/AverageOutgoingDegree.java
  40. +5 −4 .../gradoop/flink/model/impl/operators/{sampling → }/statistics/ConnectedComponentsDistribution.java
  41. +3 −3 ...ueConnectedComponentsDistribution.java → statistics/ConnectedComponentsDistributionAsValues.java}
  42. +1 −3 ...c/main/java/org/gradoop/flink/model/impl/operators/{sampling → }/statistics/DegreeCentrality.java
  43. +1 −1 ...in/java/org/gradoop/flink/model/impl/operators/{sampling → }/statistics/DegreeCentralityBase.java
  44. +3 −2 ...k/src/main/java/org/gradoop/flink/model/impl/operators/{sampling → }/statistics/GraphDensity.java
  45. +1 −1 ...link/model/impl/operators/{sampling → }/statistics/functions/AddAverageCCValueToGraphHeadMap.java
  46. +1 −1 ...odel/impl/operators/{sampling → }/statistics/functions/AddSumDegreesToGraphHeadCrossFunction.java
  47. +1 −1 ...radoop/flink/model/impl/operators/{sampling → }/statistics/functions/AggregateListOfWccEdges.java
  48. +1 −1 ...oop/flink/model/impl/operators/{sampling → }/statistics/functions/AggregateListOfWccVertices.java
  49. +2 −2 ...gradoop/flink/model/impl/operators/{sampling → }/statistics/functions/CalculateAverageDegree.java
  50. +1 −1 ...a/org/gradoop/flink/model/impl/operators/{sampling → }/statistics/functions/CalculateDensity.java
  51. +1 −1 ...l/impl/operators/{sampling → }/statistics/functions/GetConnectedComponentDistributionFlatMap.java
  52. +11 −11 ...op-flink/src/test/java/org/gradoop/flink/algorithms/gelly/randomjump/KRandomJumpGellyVCITest.java
  53. +0 −1 gradoop-flink/src/test/java/org/gradoop/flink/io/impl/csv/CSVDataSinkTest.java
  54. +8 −7 ...oop-flink/src/test/java/org/gradoop/flink/model/impl/operators/sampling/PageRankSamplingTest.java
  55. +1 −0 .../org/gradoop/flink/model/impl/operators/sampling/statistics/AverageClusteringCoefficientTest.java
  56. +2 −0 ...k/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageDegreeTest.java
  57. +2 −0 ...st/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageIncomingDegreeTest.java
  58. +2 −0 ...st/java/org/gradoop/flink/model/impl/operators/sampling/statistics/AverageOutgoingDegreeTest.java
  59. +1 −0 ...g/gradoop/flink/model/impl/operators/sampling/statistics/ConnectedComponentsDistributionTest.java
  60. +1 −0 ...rc/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/DegreeCentralityTest.java
  61. +2 −0 ...nk/src/test/java/org/gradoop/flink/model/impl/operators/sampling/statistics/GraphDensityTest.java
  62. +2 −1 ...java/org/gradoop/flink/model/impl/operators/sampling/statistics/ValueConnectedComponentsTest.java
@@ -55,6 +55,7 @@
* 4 ---> RandomVertexEdgeSampling
* 5 ---> RandomVertexNeighborhoodSampling
* 6 ---> RandomVertexSampling
* 7 ---> RandomWalkSampling
*/
private static final String OPTION_SELECTED_ALGORITHM = "a";
/**
@@ -22,6 +22,7 @@
import org.gradoop.flink.model.impl.operators.sampling.RandomLimitedDegreeVertexSampling;
import org.gradoop.flink.model.impl.operators.sampling.RandomEdgeSampling;
import org.gradoop.flink.model.impl.operators.sampling.PageRankSampling;
import org.gradoop.flink.model.impl.operators.sampling.RandomWalkSampling;
import org.gradoop.flink.model.impl.operators.sampling.SamplingAlgorithm;

/**
@@ -47,7 +48,9 @@
/** RandomVertexNeighborhoodSampling enum constant */
RANDOM_VERTEX_NEIGHBORHOOD_SAMPLING(RandomVertexNeighborhoodSampling.class.getSimpleName()),
/** RandomVertexSampling enum constant */
RANDOM_VERTEX_SAMPLING(RandomVertexSampling.class.getSimpleName());
RANDOM_VERTEX_SAMPLING(RandomVertexSampling.class.getSimpleName()),
/** RandomWalkSampling enum constant */
RANDOM_WALK_SAMPLING(RandomWalkSampling.class.getSimpleName());

/** Property denoting the simple classname of a sampling algorithm */
private final String name;
@@ -160,6 +163,19 @@ static SamplingAlgorithm buildSelectedSamplingAlgorithm(int ordinal, String[] co
new String[]{"1", "2"}, constructorParams.length);
}

case RANDOM_WALK_SAMPLING:
if (constructorParams.length == 2) {
return new RandomWalkSampling(Float.parseFloat(constructorParams[0]),
Integer.parseInt(constructorParams[1]));
} else if (constructorParams.length == 4) {
return new RandomWalkSampling(Float.parseFloat(constructorParams[0]),
Integer.parseInt(constructorParams[1]), Float.parseFloat(constructorParams[2]),
Integer.parseInt(constructorParams[3]));
} else {
throw createInstantiationException(Algorithm.RANDOM_WALK_SAMPLING.name,
new String[]{"1", "2"}, constructorParams.length);
}

default:
throw new IllegalArgumentException(
"Something went wrong. Please select an other sampling algorithm.");
@@ -46,8 +46,8 @@ public static void main(String[] args) throws Exception {
LogicalGraph graph = readLogicalGraph(args[0], args[1]);

LogicalGraph sampledGraph = graph.callForGraph(new RandomWalkSampling(
Double.parseDouble(args[2]), Integer.parseInt(args[3]),
Double.parseDouble(args[4]), Integer.parseInt(args[5])));
Float.parseFloat(args[2]), Integer.parseInt(args[3]),
Float.parseFloat(args[4]), Integer.parseInt(args[5])));

writeLogicalGraph(sampledGraph, args[6], args[7]);
}
@@ -13,10 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.sampling;

import org.apache.flink.api.common.ProgramDescription;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.utils.statistics.AverageClusteringCoefficientRunner;
import org.gradoop.utils.statistics.AverageDegreeRunner;
import org.gradoop.utils.statistics.AverageIncomingDegreeRunner;
import org.gradoop.utils.statistics.AverageOutgoingDegreeRunner;
import org.gradoop.utils.statistics.ConnectedComponentsDistributionRunner;
import org.gradoop.utils.statistics.DegreeCentralityRunner;
import org.gradoop.utils.statistics.GlobalClusteringCoefficientRunner;
import org.gradoop.utils.statistics.GraphDensityRunner;
import org.gradoop.utils.statistics.TriangleCountingRunner;
import org.gradoop.utils.statistics.VertexDegreeDistributionRunner;
import org.gradoop.utils.statistics.VertexIncomingDegreeDistributionRunner;
import org.gradoop.utils.statistics.VertexOutgoingDegreeDistributionRunner;
@@ -13,16 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.algorithms.gelly.clusteringcoefficient.GellyLocalClusteringCoefficientDirected;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageClusteringCoefficient;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.AverageClusteringCoefficient;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageDegree;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.AverageDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageIncomingDegree;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.AverageIncomingDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.AverageOutgoingDegree;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.AverageOutgoingDegree;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,13 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.impl.operators.sampling.statistics.ConnectedComponentsDistribution;
import org.gradoop.flink.model.impl.operators.statistics.ConnectedComponentsDistribution;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,16 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.sampling.statistics.DegreeCentrality;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.DegreeCentrality;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
@@ -22,7 +22,7 @@
import org.gradoop.flink.algorithms.gelly.clusteringcoefficient.GellyGlobalClusteringCoefficientDirected;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.GraphDensity;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.GraphDensity;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.utils.sampling.statistics;
package org.gradoop.utils.statistics;

import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.DataSet;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.algorithms.gelly.trianglecounting.GellyTriangleCounting;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.operators.sampling.statistics.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingEvaluationConstants;
import org.gradoop.flink.model.impl.operators.statistics.writer.StatisticWriter;

/**
@@ -42,7 +42,7 @@
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of2;
import org.gradoop.flink.model.impl.operators.sampling.SamplingAlgorithm;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingConstants;

import java.util.HashSet;
import java.util.Set;
@@ -198,10 +198,10 @@ public LogicalGraph executeInGelly(Graph<Long, VCIVertexValue, Long> gellyGraph)
DataSet<org.gradoop.common.model.impl.pojo.Edge> visitedEdges = currentGraph.getEdges()
.leftOuterJoin(visitedGellyEdgeIds)
.where(new Id<>()).equalTo("*")
.with(new EPGMEdgeWithGellyEdgeIdJoin(SamplingAlgorithm.PROPERTY_KEY_SAMPLED));
.with(new EPGMEdgeWithGellyEdgeIdJoin(SamplingConstants.PROPERTY_KEY_SAMPLED));

DataSet<GradoopId> visitedSourceTargetIds = visitedEdges
.flatMap(new GetVisitedSourceTargetIdsFlatMap(SamplingAlgorithm.PROPERTY_KEY_SAMPLED))
.flatMap(new GetVisitedSourceTargetIdsFlatMap(SamplingConstants.PROPERTY_KEY_SAMPLED))
.distinct();

// compute new visited vertices
@@ -211,11 +211,11 @@ public LogicalGraph executeInGelly(Graph<Long, VCIVertexValue, Long> gellyGraph)
.with(new GellyVertexWithLongIdToGradoopIdJoin())
.join(currentGraph.getVertices())
.where(0).equalTo(new Id<>())
.with(new GellyVertexWithEPGMVertexJoin(SamplingAlgorithm.PROPERTY_KEY_SAMPLED));
.with(new GellyVertexWithEPGMVertexJoin(SamplingConstants.PROPERTY_KEY_SAMPLED));

visitedVertices = visitedVertices.leftOuterJoin(visitedSourceTargetIds)
.where(new Id<>()).equalTo("*")
.with(new VertexWithVisitedSourceTargetIdJoin(SamplingAlgorithm.PROPERTY_KEY_SAMPLED));
.with(new VertexWithVisitedSourceTargetIdJoin(SamplingConstants.PROPERTY_KEY_SAMPLED));

// return graph
return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(
@@ -25,46 +25,45 @@
/**
* Stores the in-degree, out-degree and the sum of both as a property in vertex
*/
public class DistinctVertexDegreesToAttribute implements JoinFunction<org.apache.flink.graph.Vertex<GradoopId, VertexDegrees.Degrees>, Vertex, Vertex> {
public class DistinctVertexDegreesToAttribute
implements JoinFunction
<org.apache.flink.graph.Vertex<GradoopId, VertexDegrees.Degrees>, Vertex, Vertex> {

/**
* Property to store the sum of vertex degrees in.
*/
private final String vertexDegreesPropery;
private final String vertexDegreesProperty;
/**
* Property to store the in vertex degree in.
*/
private final String vertexInDegreePropery;
private final String vertexInDegreeProperty;
/**
* Property to store the out vertex degree in.
*/
private final String vertexOutDegreePropery;
private final String vertexOutDegreeProperty;

/**
* Stores the in, out and sum of in and out degrees of a vertex.
*
* @param vertexDegreesPropery property key to store sum degree
* @param vertexInDegreesPropery property key to store in degree
* @param vertexOutDegreesPropery property key to store out degree
* @param vertexDegreesProperty property key to store sum degree
* @param vertexInDegreesProperty property key to store in degree
* @param vertexOutDegreesProperty property key to store out degree
*/
public DistinctVertexDegreesToAttribute(String vertexDegreesPropery,
String vertexInDegreesPropery, String vertexOutDegreesPropery) {
this.vertexDegreesPropery = vertexDegreesPropery;
this.vertexInDegreePropery = vertexInDegreesPropery;
this.vertexOutDegreePropery = vertexOutDegreesPropery;
public DistinctVertexDegreesToAttribute(String vertexDegreesProperty,
String vertexInDegreesProperty, String vertexOutDegreesProperty) {
this.vertexDegreesProperty = vertexDegreesProperty;
this.vertexInDegreeProperty = vertexInDegreesProperty;
this.vertexOutDegreeProperty = vertexOutDegreesProperty;
}

@Override
public Vertex join(org.apache.flink.graph.Vertex<GradoopId, Degrees> degree, Vertex vertex)
throws Exception {
vertex.setProperty(
vertexDegreesPropery,
vertex.setProperty(vertexDegreesProperty,
PropertyValue.create(degree.getValue().getDegree().getValue()));
vertex.setProperty(
vertexInDegreePropery,
vertex.setProperty(vertexInDegreeProperty,
PropertyValue.create(degree.getValue().getInDegree().getValue()));
vertex.setProperty(
vertexOutDegreePropery,
vertex.setProperty(vertexOutDegreeProperty,
PropertyValue.create(degree.getValue().getOutDegree().getValue()));
return vertex;
}
@@ -28,11 +28,12 @@
import org.gradoop.flink.model.impl.operators.aggregation.functions.max.MaxVertexProperty;
import org.gradoop.flink.model.impl.operators.aggregation.functions.min.MinVertexProperty;
import org.gradoop.flink.model.impl.operators.aggregation.functions.sum.SumVertexProperty;
import org.gradoop.flink.model.impl.operators.sampling.common.SamplingConstants;
import org.gradoop.flink.model.impl.operators.sampling.functions.AddPageRankScoresToVertexCrossFunction;
import org.gradoop.flink.model.impl.operators.sampling.functions.PageRankResultVertexFilter;

/**
* Computes a PageRank-Sampling of the graph.
* Computes a PageRank-Sampling of the graph (new graph head will be generated).
*
* Uses the Gradoop-Wrapper of Flinks PageRank-algorithm {@link PageRank} with a dampening factor
* and a number of maximum iterations. It computes a per-vertex score which is the sum of the
@@ -113,15 +114,18 @@ public PageRankSampling(double dampeningFactor, int maxIteration, double thresho
public LogicalGraph sample(LogicalGraph graph) {

LogicalGraph pageRankGraph = new PageRank(
PAGE_RANK_SCORE_PROPERTY_KEY, dampeningFactor, maxIteration, true).execute(graph);
SamplingConstants.PAGE_RANK_SCORE_PROPERTY_KEY,
dampeningFactor,
maxIteration,
true).execute(graph);

graph = graph.getConfig().getLogicalGraphFactory().fromDataSets(
graph.getGraphHead(), pageRankGraph.getVertices(), pageRankGraph.getEdges());

graph = graph
.aggregate(new MinVertexProperty(PAGE_RANK_SCORE_PROPERTY_KEY),
new MaxVertexProperty(PAGE_RANK_SCORE_PROPERTY_KEY),
new SumVertexProperty(PAGE_RANK_SCORE_PROPERTY_KEY),
.aggregate(new MinVertexProperty(SamplingConstants.PAGE_RANK_SCORE_PROPERTY_KEY),
new MaxVertexProperty(SamplingConstants.PAGE_RANK_SCORE_PROPERTY_KEY),
new SumVertexProperty(SamplingConstants.PAGE_RANK_SCORE_PROPERTY_KEY),
new VertexCount());

DataSet<Vertex> scaledVertices = graph.getVertices()
@@ -138,8 +142,7 @@ public LogicalGraph sample(LogicalGraph graph) {
.where(new TargetId<>()).equalTo(new Id<>())
.with(new LeftSide<>());

graph = graph.getConfig().getLogicalGraphFactory().fromDataSets(
graph.getGraphHead(), scaledVertices, newEdges);
graph = graph.getFactory().fromDataSets(scaledVertices, newEdges);

return graph;
}
Oops, something went wrong.

0 comments on commit 60e4b2b

Please sign in to comment.
You can’t perform that action at this time.