Skip to content

Commit

Permalink
[FLINK-1514] [gelly] renamed fields in Neighbor class; added forwarde…
Browse files Browse the repository at this point in the history
…d fields hint in GSAIteration
  • Loading branch information
vasia committed Apr 25, 2015
1 parent 4e3165e commit 66d72ac
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 7 deletions.
Expand Up @@ -89,7 +89,7 @@ public Long map(Long vertexId) {
private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> { private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {


public Long gather(Neighbor<Long, NullValue> neighbor) { public Long gather(Neighbor<Long, NullValue> neighbor) {
return neighbor.getSrcVertexValue(); return neighbor.getNeighborValue();
} }
}; };


Expand Down
Expand Up @@ -97,8 +97,8 @@ public Double map(Long id) {
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class CalculateDistances extends GatherFunction<Double, Double, Double> { private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {


public Double gather(Neighbor<Double, Double> richEdge) { public Double gather(Neighbor<Double, Double> neighbor) {
return richEdge.getSrcVertexValue() + richEdge.getEdgeValue(); return neighbor.getNeighborValue() + neighbor.getEdgeValue();
} }
}; };


Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CustomUnaryOperation; import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
Expand Down Expand Up @@ -127,12 +128,15 @@ public DataSet<Vertex<K, VV>> createResult() {
// Gather, sum and apply // Gather, sum and apply
DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf); DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf); DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
DataSet<Vertex<K, VV>> appliedSet = summedSet JoinOperator<?, ?, Vertex<K, VV>> appliedSet = summedSet
.join(iteration.getSolutionSet()) .join(iteration.getSolutionSet())
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(applyUdf); .with(applyUdf);


// let the operator know that we preserve the key field
appliedSet.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");

return iteration.closeWith(appliedSet, appliedSet); return iteration.closeWith(appliedSet, appliedSet);
} }


Expand Down
Expand Up @@ -34,11 +34,11 @@ public class Neighbor<VV extends Serializable, EV extends Serializable>


public Neighbor() {} public Neighbor() {}


public Neighbor(VV src, EV edge) { public Neighbor(VV neighborValue, EV edgeValue) {
super(src, edge); super(neighborValue, edgeValue);
} }


public VV getSrcVertexValue() { public VV getNeighborValue() {
return this.f0; return this.f0;
} }


Expand Down

0 comments on commit 66d72ac

Please sign in to comment.