Skip to content

Commit

Permalink
[FLINK-1201] [gelly] changed spargel classes to work with Vertex and …
Browse files Browse the repository at this point in the history
…Edge types
  • Loading branch information
vasia authored and StephanEwen committed Feb 11, 2015
1 parent b0b1295 commit d45c049
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 341 deletions.
Expand Up @@ -53,7 +53,6 @@
import flink.graphs.spargel.VertexCentricIteration; import flink.graphs.spargel.VertexCentricIteration;
import flink.graphs.spargel.VertexUpdateFunction; import flink.graphs.spargel.VertexUpdateFunction;
import flink.graphs.utils.GraphUtils; import flink.graphs.utils.GraphUtils;
import flink.graphs.utils.Tuple2ToVertexMap;
import flink.graphs.validation.GraphValidator; import flink.graphs.validation.GraphValidator;


/** /**
Expand Down Expand Up @@ -1058,15 +1057,12 @@ public Graph<K, VV, EV> union (Graph<K, VV, EV> graph) {
* @param maximumNumberOfIterations maximum number of iterations to perform * @param maximumNumberOfIterations maximum number of iterations to perform
* @return * @return
*/ */
@SuppressWarnings("unchecked")
public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) { MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) {
DataSet<Tuple2<K, VV>> tupleVertices = (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(
DataSet<Tuple3<K, K, EV>> tupleEdges = (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges; VertexCentricIteration.withEdges(edges,
DataSet<Tuple2<K, VV>> newVertices = tupleVertices.runOperation(
VertexCentricIteration.withValuedEdges(tupleEdges,
vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); vertexUpdateFunction, messagingFunction, maximumNumberOfIterations));
return new Graph<K, VV, EV>(newVertices.map(new Tuple2ToVertexMap<K, VV>()), edges, context); return new Graph<K, VV, EV>(newVertices, edges, context);
} }


/** /**
Expand Down
@@ -1,11 +1,10 @@
package flink.graphs.library; package flink.graphs.library;



import flink.graphs.Edge;
import flink.graphs.Graph; import flink.graphs.Graph;
import flink.graphs.GraphAlgorithm; import flink.graphs.GraphAlgorithm;
import flink.graphs.spargel.MessageIterator; import flink.graphs.spargel.MessageIterator;
import flink.graphs.spargel.MessagingFunction; import flink.graphs.spargel.MessagingFunction;
import flink.graphs.spargel.OutgoingEdge;
import flink.graphs.spargel.VertexUpdateFunction; import flink.graphs.spargel.VertexUpdateFunction;


import java.io.Serializable; import java.io.Serializable;
Expand Down Expand Up @@ -69,8 +68,8 @@ public static final class RankMessenger<K extends Comparable<K> & Serializable>


@Override @Override
public void sendMessages(K vertexId, Double newRank) { public void sendMessages(K vertexId, Double newRank) {
for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) { for (Edge<K, Double> edge : getOutgoingEdges()) {
sendMessageTo(edge.target(), newRank * edge.edgeValue()); sendMessageTo(edge.getTarget(), newRank * edge.getValue());
} }
} }
} }
Expand Down
Expand Up @@ -3,7 +3,6 @@
import flink.graphs.*; import flink.graphs.*;
import flink.graphs.spargel.MessageIterator; import flink.graphs.spargel.MessageIterator;
import flink.graphs.spargel.MessagingFunction; import flink.graphs.spargel.MessagingFunction;
import flink.graphs.spargel.OutgoingEdge;
import flink.graphs.spargel.VertexUpdateFunction; import flink.graphs.spargel.VertexUpdateFunction;


import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -87,8 +86,8 @@ public static final class MinDistanceMessenger<K extends Comparable<K> & Seriali


@Override @Override
public void sendMessages(K vertexKey, Double newDistance) throws Exception { public void sendMessages(K vertexKey, Double newDistance) throws Exception {
for (OutgoingEdge<K, Double> edge : getOutgoingEdges()) { for (Edge<K, Double> edge : getOutgoingEdges()) {
sendMessageTo(edge.target(), newDistance + edge.edgeValue()); sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
} }
} }
} }
Expand Down
Expand Up @@ -26,10 +26,11 @@
import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.types.Value; import org.apache.flink.types.Value;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;


import flink.graphs.Edge;

/** /**
* The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}. * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
* *
Expand All @@ -38,7 +39,8 @@
* @param <Message> The type of the message sent between vertices along the edges. * @param <Message> The type of the message sent between vertices along the edges.
* @param <EdgeValue> The type of the values that are associated with the edges. * @param <EdgeValue> The type of the values that are associated with the edges.
*/ */
public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements Serializable { public abstract class MessagingFunction<VertexKey extends Comparable<VertexKey> & Serializable,
VertexValue extends Serializable, Message, EdgeValue extends Serializable> implements Serializable {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


Expand Down Expand Up @@ -79,19 +81,13 @@ public void postSuperstep() throws Exception {}
* @return An iterator with all outgoing edges. * @return An iterator with all outgoing edges.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Iterable<OutgoingEdge<VertexKey, EdgeValue>> getOutgoingEdges() { public Iterable<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
if (edgesUsed) { if (edgesUsed) {
throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once."); throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
} }
edgesUsed = true; edgesUsed = true;

this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges);
if (this.edgeWithValueIter != null) { return this.edgeIterator;
this.edgeWithValueIter.set((Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>>) edges);
return this.edgeWithValueIter;
} else {
this.edgeNoValueIter.set((Iterator<Tuple2<VertexKey, VertexKey>>) edges);
return this.edgeNoValueIter;
}
} }


/** /**
Expand Down Expand Up @@ -186,22 +182,15 @@ public <T> Collection<T> getBroadcastSet(String name) {


private Collector<Tuple2<VertexKey, Message>> out; private Collector<Tuple2<VertexKey, Message>> out;


private EdgesIteratorNoEdgeValue<VertexKey, EdgeValue> edgeNoValueIter; private EdgesIterator<VertexKey, EdgeValue> edgeIterator;

private EdgesIteratorWithEdgeValue<VertexKey, EdgeValue> edgeWithValueIter;


private boolean edgesUsed; private boolean edgesUsed;




void init(IterationRuntimeContext context, boolean hasEdgeValue) { void init(IterationRuntimeContext context) {
this.runtimeContext = context; this.runtimeContext = context;
this.outValue = new Tuple2<VertexKey, Message>(); this.outValue = new Tuple2<VertexKey, Message>();

this.edgeIterator = new EdgesIterator<VertexKey, EdgeValue>();
if (hasEdgeValue) {
this.edgeWithValueIter = new EdgesIteratorWithEdgeValue<VertexKey, EdgeValue>();
} else {
this.edgeNoValueIter = new EdgesIteratorNoEdgeValue<VertexKey, EdgeValue>();
}
} }


void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) { void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
Expand All @@ -210,52 +199,15 @@ void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
this.edgesUsed = false; this.edgesUsed = false;
} }



private static final class EdgesIterator<VertexKey extends Comparable<VertexKey> & Serializable,

EdgeValue extends Serializable>
private static final class EdgesIteratorNoEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue> implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>>
implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
{
private Iterator<Tuple2<VertexKey, VertexKey>> input;

private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>();


void set(Iterator<Tuple2<VertexKey, VertexKey>> input) {
this.input = input;
}

@Override
public boolean hasNext() {
return input.hasNext();
}

@Override
public OutgoingEdge<VertexKey, EdgeValue> next() {
Tuple2<VertexKey, VertexKey> next = input.next();
edge.set(next.f1, null);
return edge;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

@Override
public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() {
return this;
}
}


private static final class EdgesIteratorWithEdgeValue<VertexKey extends Comparable<VertexKey>, EdgeValue>
implements Iterator<OutgoingEdge<VertexKey, EdgeValue>>, Iterable<OutgoingEdge<VertexKey, EdgeValue>>
{ {
private Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input; private Iterator<Edge<VertexKey, EdgeValue>> input;


private OutgoingEdge<VertexKey, EdgeValue> edge = new OutgoingEdge<VertexKey, EdgeValue>(); private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();


void set(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> input) { void set(Iterator<Edge<VertexKey, EdgeValue>> input) {
this.input = input; this.input = input;
} }


Expand All @@ -265,9 +217,10 @@ public boolean hasNext() {
} }


@Override @Override
public OutgoingEdge<VertexKey, EdgeValue> next() { public Edge<VertexKey, EdgeValue> next() {
Tuple3<VertexKey, VertexKey, EdgeValue> next = input.next(); Edge<VertexKey, EdgeValue> next = input.next();
edge.set(next.f1, next.f2); edge.setTarget(next.f1);
edge.setValue(next.f2);
return edge; return edge;
} }


Expand All @@ -276,7 +229,7 @@ public void remove() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public Iterator<OutgoingEdge<VertexKey, EdgeValue>> iterator() { public Iterator<Edge<VertexKey, EdgeValue>> iterator() {
return this; return this;
} }
} }
Expand Down

This file was deleted.

0 comments on commit d45c049

Please sign in to comment.