Skip to content

Commit

Permalink
[#691] Implement Connected Components via Gelly
Browse files Browse the repository at this point in the history
* Implement Connected Components via Gelly.
* fixes #691
  • Loading branch information
p-f authored and s1ck committed Oct 16, 2017
1 parent 9d391b1 commit a67c091
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
files="GraphStatistics.java"
lines="0-9999"/>
<suppress checks="IllegalCatch"
files="PageRank.java"
lines="50-70"/>
files="GellyAlgorithm.java"
lines="60-70"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ protected GellyAlgorithm(VertexToGellyVertex<E> vertexValue, EdgeToGellyEdge<F>
@Override
public LogicalGraph execute(LogicalGraph graph) {
currentGraph = graph;
return executeInGelly(transformToGelly(graph));
try {
return executeInGelly(transformToGelly(graph));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
Expand All @@ -85,5 +89,5 @@ protected Graph<GradoopId, E, F> transformToGelly(LogicalGraph graph) {
* @param graph The Gelly graph.
* @return The Gradoop graph.
*/
protected abstract LogicalGraph executeInGelly(Graph<GradoopId, E, F> graph);
protected abstract LogicalGraph executeInGelly(Graph<GradoopId, E, F> graph) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.connectedcomponents;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.types.NullValue;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.algorithms.gelly.GellyAlgorithm;
import org.gradoop.flink.algorithms.gelly.connectedcomponents.functions.ConnectedComponentToAttribute;
import org.gradoop.flink.algorithms.gelly.functions.EdgeToGellyEdgeWithNullValue;
import org.gradoop.flink.algorithms.gelly.functions.VertexToGellyVertexWithGradoopId;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;

/**
* A gradoop operator wrapping {@link org.apache.flink.graph.library.ConnectedComponents}.
* The result will be the same {@link LogicalGraph} with a component id assigned to each vertex
* as a Property.
*/
public class AnnotateWeaklyConnectedComponents extends GellyAlgorithm<GradoopId, NullValue> {

/**
* Property key to store the component id in.
*/
private final String propertyKey;

/**
* Maximum number of iterations.
*/
private final int maxIterations;

/**
* Constructor for connected components with a maximum number of iterations.
*
* @param propertyKey Property key to store the component id in.
* @param maxIterations The maximum number of iterations.
*/
public AnnotateWeaklyConnectedComponents(String propertyKey, int maxIterations) {
super(new VertexToGellyVertexWithGradoopId(), new EdgeToGellyEdgeWithNullValue());
this.propertyKey = propertyKey;
this.maxIterations = maxIterations;
}

@Override
protected LogicalGraph executeInGelly(Graph<GradoopId, GradoopId, NullValue> graph)
throws Exception {
DataSet<Vertex> newVertices = new org.apache.flink.graph.library.ConnectedComponents<GradoopId,
GradoopId, NullValue>(maxIterations)
.run(graph)
.join(currentGraph.getVertices())
.where(0)
.equalTo(new Id<>())
.with(new ConnectedComponentToAttribute(propertyKey));
return currentGraph.getConfig().getLogicalGraphFactory().fromDataSets(newVertices,
currentGraph.getEdges());
}

@Override
public String getName() {
return getClass().getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.connectedcomponents;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.Vertex;
import org.gradoop.flink.model.api.epgm.GraphCollection;
import org.gradoop.flink.model.api.epgm.LogicalGraph;
import org.gradoop.flink.model.api.operators.UnaryGraphToCollectionOperator;
import org.gradoop.flink.model.impl.functions.epgm.PropertyRemover;

/**
* Split a {@link LogicalGraph} into a {@link GraphCollection} of its weakly connected components.
*/
public class WeaklyConnectedComponents implements UnaryGraphToCollectionOperator {

/**
* Default property key to temporarily store the component id.
*/
private static final String DEFAULT_PROPERTY_KEY = "_wcc_component_id";

/**
* Maximum number of iterations for;
*/
private final int maxIterations;

/**
* Property key to temporarily store the component id.
*/
private final String propertyKey;

/**
* Initialize the operator using the default property key.
*
* @param maxIterations Maximum number of iterations for
* {@link AnnotateWeaklyConnectedComponents}.
*/
public WeaklyConnectedComponents(int maxIterations) {
this(DEFAULT_PROPERTY_KEY, maxIterations);
}

/**
* Initialize the operator.
*
* @param propertyKey Property key to temporarily store the component id.
* @param maxIterations Maximum number of iteration for
* {@link AnnotateWeaklyConnectedComponents}.
*/
public WeaklyConnectedComponents(String propertyKey, int maxIterations) {
this.maxIterations = maxIterations;
this.propertyKey = propertyKey;
}


@Override
public GraphCollection execute(LogicalGraph graph) {
LogicalGraph withWccAnnotations = graph
.callForGraph(new AnnotateWeaklyConnectedComponents(propertyKey, maxIterations));
GraphCollection split = withWccAnnotations.splitBy(propertyKey);
DataSet<Vertex> vertices = split.getVertices()
.map(new PropertyRemover<>(propertyKey));
return graph.getConfig().getGraphCollectionFactory().fromDataSets(split.getGraphHeads(),
vertices, split.getEdges());
}

@Override
public String getName() {
return WeaklyConnectedComponents.class.getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.connectedcomponents.functions;

import org.apache.flink.api.common.functions.JoinFunction;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Vertex;

/**
* Stores the component id (as a {@link GradoopId} of one of the components vertices) as a property
* in the vertex.
*/
public class ConnectedComponentToAttribute
implements JoinFunction<org.apache.flink.graph.Vertex<GradoopId, GradoopId>, Vertex, Vertex> {

/**
* Property to store the component id in.
*/
private final String componentProperty;

/**
* Stores the connected components result as a Property.
*
* @param componentProperty Property name.
*/
public ConnectedComponentToAttribute(String componentProperty) {
this.componentProperty = componentProperty;
}

@Override
public Vertex join(org.apache.flink.graph.Vertex<GradoopId, GradoopId> gellyVertex,
Vertex gradoopVertex) {
gradoopVertex.setProperty(componentProperty, gellyVertex.getValue());
return gradoopVertex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains functions related to the Connected Components algorithm.
*/
package org.gradoop.flink.algorithms.gelly.connectedcomponents.functions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains classes related to the Connected Components algorithm.
*/
package org.gradoop.flink.algorithms.gelly.connectedcomponents;
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright © 2014 - 2017 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.algorithms.gelly.functions;

import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.Vertex;

/**
* Maps EPGM vertex to a Gelly vertex with the {@link GradoopId} as its id and value.
*/
@FunctionAnnotation.ForwardedFields("id->f0;id->f1")
public class VertexToGellyVertexWithGradoopId implements VertexToGellyVertex<GradoopId> {
/**
* Reduce object instantiations
*/
private final org.apache.flink.graph.Vertex<GradoopId, GradoopId> reuseVertex;

/**
* Constructor.
*/
public VertexToGellyVertexWithGradoopId() {
reuseVertex = new org.apache.flink.graph.Vertex<>();
}

@Override
public org.apache.flink.graph.Vertex<GradoopId, GradoopId> map(Vertex epgmVertex) {
GradoopId id = epgmVertex.getId();
reuseVertex.setId(id);
reuseVertex.setValue(id);
return reuseVertex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,7 @@ public PageRank(String propertyKey, double dampingFactor, int iterations) {
}

@Override
public LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> graph) {
try {
return executeInternal(graph);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Wrapping the {@link PageRank#execute(LogicalGraph)} functionality to handle exceptions.
*
* @param graph The input graph.
* @return The output graph.
* @throws Exception unhandled Exception from Gelly.
*/
private LogicalGraph executeInternal(Graph<GradoopId, NullValue, NullValue> graph)
protected LogicalGraph executeInGelly(Graph<GradoopId, NullValue, NullValue> graph)
throws Exception {
DataSet<Vertex> newVertices =
new org.apache.flink.graph.library.link_analysis.PageRank<GradoopId, NullValue, NullValue>(
Expand Down
Loading

0 comments on commit a67c091

Please sign in to comment.