Permalink
Browse files

[#689] Add Gelly Page Rank and generalize Gelly algorithm handling

* Remove type argument from gradoop to gelly mapper interfaces.

Type arguments are not valid here, because flink fails to detect the
actual types.
Interfaces will be replaced with 2 interfaces and 4 implementations:
- a common super interface for each vertices and edges
- an implementation mapping to NullValue
- an implementation mapping to PropertyValue

* Add missing implementations of gradoop to gelly mapper interfaces.
* Implement PageRank from Gelly + Checkstyle formatting
* Fixes #689
  • Loading branch information...
p-f authored and s1ck committed Oct 3, 2017
1 parent 9495551 commit 3c767954ebf982b92524e07b8f7cbb69273d4352
Showing with 508 additions and 48 deletions.
  1. +1 −1 gradoop-examples/src/main/java/org/gradoop/examples/grouping/Communities.java
  2. +1 −1 gradoop-examples/src/main/java/org/gradoop/examples/sna/SNABenchmark2.java
  3. +89 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/GellyAlgorithm.java
  4. +28 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/functions/EdgeToGellyEdge.java
  5. +4 −6 ...agation/functions/EdgeToGellyEdgeMapper.java → gelly/functions/EdgeToGellyEdgeWithNullValue.java}
  6. +58 −0 .../src/main/java/org/gradoop/flink/algorithms/gelly/functions/EdgeToGellyEdgeWithPropertyValue.java
  7. +29 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/functions/VertexToGellyVertex.java
  8. +46 −0 .../src/main/java/org/gradoop/flink/algorithms/gelly/functions/VertexToGellyVertexWithNullValue.java
  9. +4 −7 ...tions/VertexToGellyVertexMapper.java → gelly/functions/VertexToGellyVertexWithPropertyValue.java}
  10. +19 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/functions/package-info.java
  11. +1 −1 ...src/main/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/GellyLabelPropagation.java
  12. +3 −3 ...c/main/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/GradoopLabelPropagation.java
  13. +13 −23 ...link/src/main/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/LabelPropagation.java
  14. +1 −1 ...in/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/functions/LPMessageFunction.java
  15. +1 −1 ...ain/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/functions/LPUpdateFunction.java
  16. +1 −1 ...rc/main/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/functions/LPVertexJoin.java
  17. +1 −1 ...rc/main/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/functions/package-info.java
  18. +1 −1 ...op-flink/src/main/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/package-info.java
  19. +19 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/package-info.java
  20. +101 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/pagerank/PageRank.java
  21. +48 −0 ...link/src/main/java/org/gradoop/flink/algorithms/gelly/pagerank/functions/PageRankToAttribute.java
  22. +19 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/pagerank/functions/package-info.java
  23. +19 −0 gradoop-flink/src/main/java/org/gradoop/flink/algorithms/gelly/pagerank/package-info.java
  24. +1 −1 ...st/java/org/gradoop/flink/algorithms/{ → gelly}/labelpropagation/GradoopLabelPropagationTest.java
@@ -16,7 +16,7 @@
package org.gradoop.examples.grouping;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.flink.algorithms.labelpropagation.GellyLabelPropagation;
import org.gradoop.flink.algorithms.gelly.labelpropagation.GellyLabelPropagation;
import org.gradoop.flink.io.api.DataSink;
import org.gradoop.flink.io.api.DataSource;
import org.gradoop.flink.io.impl.csv.CSVDataSink;
@@ -22,7 +22,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.gradoop.examples.AbstractRunner;
import org.gradoop.examples.utils.ExampleOutput;
import org.gradoop.flink.algorithms.labelpropagation.GellyLabelPropagation;
import org.gradoop.flink.algorithms.gelly.labelpropagation.GellyLabelPropagation;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.api.functions.TransformationFunction;
import org.gradoop.flink.model.impl.operators.aggregation.ApplyAggregation;
@@ -0,0 +1,89 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.algorithms.gelly.functions.EdgeToGellyEdge;
import org.gradoop.flink.algorithms.gelly.functions.VertexToGellyVertex;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.api.operators.UnaryGraphToGraphOperator;
/**
* Base class for Algorithms executed in Flink Gelly.
*
* @param <E> Value type for vertices.
* @param <F> Value type for edges.
*/
public abstract class GellyAlgorithm<E, F> implements UnaryGraphToGraphOperator {
/**
* The graph used in {@link GellyAlgorithm#execute(LogicalGraph)}.
*/
protected LogicalGraph currentGraph;
/**
* Function mapping to edge to gelly edge.
*/
private final EdgeToGellyEdge<F> toGellyEdge;
/**
* Function mapping vertex to gelly vertex.
*/
private final VertexToGellyVertex<E> toGellyVertex;
/**
* Base constructor, only setting the mapper functions.
*
* @param vertexValue Function mapping vertices from Gradoop to Gelly.
* @param edgeValue function mapping edges from Gradoop to Gelly.
*/
protected GellyAlgorithm(VertexToGellyVertex<E> vertexValue, EdgeToGellyEdge<F> edgeValue) {
this.toGellyVertex = vertexValue;
this.toGellyEdge = edgeValue;
}
@Override
public LogicalGraph execute(LogicalGraph graph) {
currentGraph = graph;
return executeInGelly(transformToGelly(graph));
}
/**
* Default transformation from a Gradoop Graph to a Gelly Graph.
*
* @param graph Gradoop Graph.
* @return Gelly Graph.
*/
protected Graph<GradoopId, E, F> transformToGelly(LogicalGraph graph) {
DataSet<Vertex<GradoopId, E>> gellyVertices = graph.getVertices().map(toGellyVertex);
DataSet<Edge<GradoopId, F>> gellyEdges = graph.getEdges().map(toGellyEdge);
return Graph.fromDataSet(gellyVertices, gellyEdges,
graph.getConfig().getExecutionEnvironment());
}
/**
* Perform some operation in Gelly and transform the Gelly graph back to a Gradoop
* {@link LogicalGraph}.
*
* @param graph The Gelly graph.
* @return The Gradoop graph.
*/
protected abstract LogicalGraph executeInGelly(Graph<GradoopId, E, F> graph);
}
@@ -0,0 +1,28 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.functions;
import org.apache.flink.api.common.functions.MapFunction;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge;
/**
* Convert a Gradoop {@link Edge} to a Gelly Edge.
*
* @param <E> Type of the output Gelly Edge.
*/
public interface EdgeToGellyEdge<E> extends MapFunction<Edge, org.apache.flink.graph.Edge<GradoopId, E>> {
}
@@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.labelpropagation.functions;
package org.gradoop.flink.algorithms.gelly.functions;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.types.NullValue;
import org.gradoop.common.model.impl.id.GradoopId;
@@ -26,17 +25,16 @@
* identifier and {@link NullValue} as edge value.
*/
@FunctionAnnotation.ForwardedFields("sourceId->f0;targetId->f1")
public class EdgeToGellyEdgeMapper implements
MapFunction<Edge, org.apache.flink.graph.Edge<GradoopId, NullValue>> {
public class EdgeToGellyEdgeWithNullValue implements EdgeToGellyEdge<NullValue> {
/**
* Reduce object instantiations
*/
private final org.apache.flink.graph.Edge<GradoopId, NullValue> reuseEdge;
/**
* Constructor
* Constructor.
*/
public EdgeToGellyEdgeMapper() {
public EdgeToGellyEdgeWithNullValue() {
reuseEdge = new org.apache.flink.graph.Edge<>();
reuseEdge.setValue(NullValue.getInstance());
}
@@ -0,0 +1,58 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.functions;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Edge;
import org.gradoop.common.model.impl.properties.PropertyValue;
/**
* Maps EPGM edge to a Gelly edge consisting of EPGM source and target
* identifier and {@link PropertyValue} as edge value.
*/
@FunctionAnnotation.ForwardedFields("sourceId->f0;targetId->f1")
@FunctionAnnotation.ReadFields("properties")
public class EdgeToGellyEdgeWithPropertyValue implements EdgeToGellyEdge<PropertyValue> {
/**
* Property key to get the value for.
*/
private final String propertyKey;
/**
* Reduce object instantiations.
*/
private final org.apache.flink.graph.Edge<GradoopId, PropertyValue>
reuseEdge;
/**
* Constructor.
*
* @param propertyKey property key for get property value
*/
public EdgeToGellyEdgeWithPropertyValue(String propertyKey) {
this.propertyKey = propertyKey;
this.reuseEdge = new org.apache.flink.graph.Edge<>();
}
@Override
public org.apache.flink.graph.Edge<GradoopId, PropertyValue> map(Edge epgmEdge) {
reuseEdge.setSource(epgmEdge.getSourceId());
reuseEdge.setTarget(epgmEdge.getTargetId());
reuseEdge.setValue(epgmEdge.getPropertyValue(propertyKey));
return reuseEdge;
}
}
@@ -0,0 +1,29 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.functions;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.graph.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
/**
* Convert a Gradoop {@link Vertex} to a Gelly Vertex.
*
* @param <E> Type of the output Gelly Vertex.
*/
public interface VertexToGellyVertex<E>
extends MapFunction<org.gradoop.common.model.impl.pojo.Vertex, Vertex<GradoopId, E>> {
}
@@ -0,0 +1,46 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.functions;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.types.NullValue;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Vertex;
/**
* Maps EPGM vertex to a Gelly vertex with the {@link GradoopId} as its id.
*/
@FunctionAnnotation.ForwardedFields("id->f0")
public class VertexToGellyVertexWithNullValue implements VertexToGellyVertex<NullValue> {
/**
* Reduce object instantiations
*/
private final org.apache.flink.graph.Vertex<GradoopId, NullValue> reuseVertex;
/**
* Constructor.
*/
public VertexToGellyVertexWithNullValue() {
reuseVertex = new org.apache.flink.graph.Vertex<>();
reuseVertex.setValue(NullValue.getInstance());
}
@Override
public org.apache.flink.graph.Vertex<GradoopId, NullValue> map(Vertex epgmVertex) {
reuseVertex.setId(epgmVertex.getId());
return reuseVertex;
}
}
@@ -13,22 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.labelpropagation.functions;
package org.gradoop.flink.algorithms.gelly.functions;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
/**
* Maps EPGM vertex to a Gelly vertex consisting of the EPGM identifier and the
* label propagation value.
* Maps EPGM vertex to a Gelly vertex consisting of the EPGM identifier and a {@link PropertyValue}.
*/
@FunctionAnnotation.ForwardedFields("id->f0")
@FunctionAnnotation.ReadFields("properties")
public class VertexToGellyVertexMapper implements
MapFunction<Vertex, org.apache.flink.graph.Vertex<GradoopId, PropertyValue>> {
public class VertexToGellyVertexWithPropertyValue implements VertexToGellyVertex<PropertyValue> {
/**
* Property key to access the label value which will be propagated
*/
@@ -45,7 +42,7 @@
*
* @param propertyKey property key for get property value
*/
public VertexToGellyVertexMapper(String propertyKey) {
public VertexToGellyVertexWithPropertyValue(String propertyKey) {
this.propertyKey = propertyKey;
this.reuseVertex = new org.apache.flink.graph.Vertex<>();
}
@@ -0,0 +1,19 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains common functions used to map Gradoop to Gelly graphs.
*/
package org.gradoop.flink.algorithms.gelly.functions;
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.labelpropagation;
package org.gradoop.flink.algorithms.gelly.labelpropagation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
@@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.labelpropagation;
package org.gradoop.flink.algorithms.gelly.labelpropagation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.types.NullValue;
import org.gradoop.flink.algorithms.labelpropagation.functions.LPMessageFunction;
import org.gradoop.flink.algorithms.labelpropagation.functions.LPUpdateFunction;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.algorithms.gelly.labelpropagation.functions.LPMessageFunction;
import org.gradoop.flink.algorithms.gelly.labelpropagation.functions.LPUpdateFunction;
/**
* Executes the label propagation integrated in Gradoop.
Oops, something went wrong.

0 comments on commit 3c76795

Please sign in to comment.