Skip to content

Commit

Permalink
[FLINK-2012] [gelly] Added methods to remove/add multiple edges/vertices
Browse files Browse the repository at this point in the history
This squashes the following commits:

[gelly] Removed trailing comment

[gelly] Made remove methods use a coGroup fun
  • Loading branch information
andralungu authored and vasia committed May 26, 2015
1 parent b68049b commit edcfd8b
Show file tree
Hide file tree
Showing 3 changed files with 468 additions and 9 deletions.
12 changes: 12 additions & 0 deletions docs/libs/gelly_guide.md
Expand Up @@ -251,14 +251,26 @@ Gelly includes the following methods for adding and removing vertices and edges
// adds a Vertex and the given edges to the Graph. If the Vertex already exists, it will not be added again, but the given edges will.
Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex, List<Edge<K, EV>> edges)

// adds a data set of vertices and a list of edges to the Graph. If the vertices already exist in the graph, they will not be added once more, however the edges will.
Graph<K, VV, EV> addVertices(DataSet<Vertex<K, VV>> verticesToAdd, List<Edge<K, EV>> edges)

// adds an Edge to the Graph. If the source and target vertices do not exist in the graph, they will also be added.
Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue)

// adds a data set of edges to the Graph. If the vertices already exist in the graph, they will not be added, however the edges will.
Graph<K, VV, EV> addEdges(DataSet<Edge<K, EV>> newEdges, DataSet<Vertex<K, VV>> newVertices)

// removes the given Vertex and its edges from the Graph.
Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex)

// removes the given data set of Vertices and their edges from the Graph
Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved)

// removes *all* edges that match the given Edge from the Graph.
Graph<K, VV, EV> removeEdge(Edge<K, EV> edge)

// removes *all* edges that match the edges in the given data set
Graph<K, VV, EV> removeEdges(DataSet<Edge<K, EV>> edgesToBeRemoved)
{% endhighlight %}

Neighborhood Methods
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
Expand Down Expand Up @@ -1009,22 +1010,38 @@ public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
* will.
*
* @param vertex the vertex to add to the graph
* @param edges a list of edges to add to the grap
* @return the new graph containing the existing and newly added vertices
* @param edges a list of edges to add to the graph
* @return the new graph containing the existing and newly added vertex
* and edges
*/
@SuppressWarnings("unchecked")
public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex, List<Edge<K, EV>> edges) {
DataSet<Vertex<K, VV>> newVertex = this.context.fromElements(vertex);

// Take care of empty edge set
return addVertices(newVertex, edges);
}

/**
* Adds the data set of vertices and the edges, passed as input, to the graph.
* If the vertices already exist in the graph, they will not be added once more,
* however the edges will.
*
* @param verticesToAdd the data set of vertices to add
* @param edges a list of edges to add to the graph
* @return the new graph containing the existing and newly added vertices
* and edges
*/
@SuppressWarnings("unchecked")
public Graph<K, VV, EV> addVertices(DataSet<Vertex<K, VV>> verticesToAdd, List<Edge<K, EV>> edges) {

// Consider empty edge set
if (edges.isEmpty()) {
return new Graph<K, VV, EV>(this.vertices.union(newVertex)
return new Graph<K, VV, EV>(this.vertices.union(verticesToAdd)
.distinct(), this.edges, this.context);
}

// Add the vertex and its edges
DataSet<Vertex<K, VV>> newVertices = this.vertices.union(newVertex).distinct();
DataSet<Vertex<K, VV>> newVertices = this.vertices.union(verticesToAdd).distinct();
DataSet<Edge<K, EV>> newEdges = this.edges.union(context.fromCollection(edges));

return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
Expand All @@ -1048,6 +1065,23 @@ public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV e
return this.union(partialGraph);
}

/**
* Adds the given data set of edges to the graph. if the vertices do not already exist in the
* graph, they will also be added.
*
* If the vertex values are not required during the computation, it is recommended to use
* addEdges(edges)
*
* @param newEdges the data set of edges to be added
* @param newVertices their corresponding vertices and vertexValues
* @return a new graph containing the existing vertices and edges plus the newly added edges and vertices.
*/
@SuppressWarnings("unchecked")
public Graph<K, VV, EV> addEdges(DataSet<Edge<K, EV>> newEdges, DataSet<Vertex<K, VV>> newVertices) {
Graph<K, VV, EV> partialGraph = fromDataSet(newVertices, newEdges, context);
return this.union(partialGraph);
}

/**
* Removes the given vertex and its edges from the graph.
*
Expand Down Expand Up @@ -1100,6 +1134,55 @@ public boolean filter(Edge<K, EV> edge) throws Exception {
}

/**
* Removes the given data set of vertices and its edges from the graph.
*
* @param verticesToBeRemoved the data set of vertices to be removed
* @return the resulted graph containing the initial vertices and edges minus the vertices
* and edges removed.
*/
public Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {

DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
.with(new VerticesRemovalCoGroup<K, VV>());

DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
// if the edge source was removed, the edge will also be removed
.with(new ProjectEdgeToBeRemoved<K, VV, EV>())
// if the edge target was removed, the edge will also be removed
.join(newVertices).where(1).equalTo(0)
.with(new ProjectEdge<K, VV, EV>());

return new Graph<K, VV, EV>(newVertices, newEdges, context);
}

private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {

@Override
public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Vertex<K, VV>> vertexToBeRemoved,
Collector<Vertex<K, VV>> out) throws Exception {

final Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
final Iterator<Vertex<K, VV>> vertexToBeRemovedIterator = vertexToBeRemoved.iterator();
Vertex<K, VV> next;

if (vertexIterator.hasNext()) {
if (!vertexToBeRemovedIterator.hasNext()) {
next = vertexIterator.next();
out.collect(next);
}
}
}
}

@ForwardedFieldsSecond("f0; f1; f2")
private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
@Override
public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
return edge;
}
}

/**
* Removes all edges that match the given edge from the graph.
*
* @param edge the edge to remove
Expand All @@ -1126,6 +1209,39 @@ public boolean filter(Edge<K, EV> edge) {
}
}

/**
* Removes all the edges that match the edges in the given data set from the graph.
*
* @param edgesToBeRemoved the data set of edges to be removed
* @return a new graph where the edges have been removed and in which the vertices remained intact
*/
public Graph<K, VV, EV> removeEdges(DataSet<Edge<K, EV>> edgesToBeRemoved) {

DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(edgesToBeRemoved).where(0,1).equalTo(0,1)
.with(new EdgeRemovalCoGroup<K, EV>());

return new Graph<K, VV, EV>(this.vertices, newEdges, context);
}

private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {

@Override
public void coGroup(Iterable<Edge<K, EV>> edge, Iterable<Edge<K, EV>> edgeToBeRemoved,
Collector<Edge<K, EV>> out) throws Exception {

final Iterator<Edge<K, EV>> edgeIterator = edge.iterator();
final Iterator<Edge<K, EV>> edgeToBeRemovedIterator = edgeToBeRemoved.iterator();
Edge<K, EV> next;

if (edgeIterator.hasNext()) {
if (!edgeToBeRemovedIterator.hasNext()) {
next = edgeIterator.next();
out.collect(next);
}
}
}
}

/**
* Performs union on the vertices and edges sets of the input graphs
* removing duplicate vertices but maintaining duplicate edges.
Expand Down

0 comments on commit edcfd8b

Please sign in to comment.