Skip to content

Commit

Permalink
[FLINK-1514] [gelly] Removed edge value type from ApplyFunction; Reus…
Browse files Browse the repository at this point in the history
…e the output vertex

This closes #408
  • Loading branch information
vasia committed Apr 26, 2015
1 parent 40f5f3a commit 6e24879
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 31 deletions.
Empty file modified flink-staging/flink-gelly/pom.xml 100755 → 100644
Empty file.
Expand Up @@ -214,7 +214,7 @@ public static <K extends Comparable<K> & Serializable, VV extends Serializable,
public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
}
}).returns(returnType);
}).returns(returnType).withForwardedFields("f0");

return new Graph<K, VV, EV>(vertices, edges, context);
}
Expand Down Expand Up @@ -1213,7 +1213,7 @@ public <M> Graph<K, VV, EV> runVertexCentricIteration(
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
ApplyFunction<VV, EV, M> applyFunction, int maximumNumberOfIterations) {
ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {

GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
Expand Down
Expand Up @@ -102,7 +102,7 @@ public Long sum(Long newValue, Long currentValue) {
};

@SuppressWarnings("serial")
private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {

public void apply(Long summedValue, Long origValue) {
if (summedValue < origValue) {
Expand Down
Expand Up @@ -111,7 +111,7 @@ public Double sum(Double newValue, Double currentValue) {
};

@SuppressWarnings("serial")
private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {

public void apply(Double newDistance, Double oldDistance) {
if (newDistance < oldDistance) {
Expand Down
Expand Up @@ -19,22 +19,25 @@
package org.apache.flink.graph.gsa;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector;

import java.io.Serializable;

@SuppressWarnings("serial")
public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
public abstract class ApplyFunction<K extends Comparable<K> & Serializable, VV extends Serializable, M>
implements Serializable {

public abstract void apply(M message, VV vertexValue);
public abstract void apply(M newValue, VV currentValue);

/**
* Sets the result for the apply function
*
* @param result the result of the apply phase
*/
public void setResult(VV result) {
out.collect(result);
outVal.f1 = result;
out.collect(outVal);
}

/**
Expand All @@ -58,14 +61,17 @@ public void setResult(VV result) {
@SuppressWarnings("unused")
private IterationRuntimeContext runtimeContext;

private Collector<VV> out;
private Collector<Vertex<K, VV>> out;

private Vertex<K, VV> outVal;

public void init(IterationRuntimeContext iterationRuntimeContext) {
this.runtimeContext = iterationRuntimeContext;
};

public void setOutput(Collector<VV> out) {
public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
this.out = out;
this.outVal = vertex;
}

}
Expand Up @@ -58,13 +58,13 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,

private final GatherFunction<VV, EV, M> gather;
private final SumFunction<VV, EV, M> sum;
private final ApplyFunction<VV, EV, M> apply;
private final ApplyFunction<K, VV, M> apply;
private final int maximumNumberOfIterations;

// ----------------------------------------------------------------------------------

private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {

Validate.notNull(gather);
Validate.notNull(sum);
Expand Down Expand Up @@ -161,7 +161,7 @@ public DataSet<Vertex<K, VV>> createResult() {
*/
public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply,
int maximumNumberOfIterations) {
return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
}
Expand Down Expand Up @@ -253,32 +253,19 @@ private static final class ApplyUdf<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {

private final ApplyFunction<VV, EV, M> applyFunction;
private final ApplyFunction<K, VV, M> applyFunction;
private transient TypeInformation<Vertex<K, VV>> resultType;

private ApplyUdf(ApplyFunction<VV, EV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
this.applyFunction = applyFunction;
this.resultType = resultType;
}

@Override
public void join(Tuple2<K, M> arg0, Vertex<K, VV> arg1, final Collector<Vertex<K, VV>> out) throws Exception {

final K key = arg1.getId();
Collector<VV> userOut = new Collector<VV>() {
@Override
public void collect(VV record) {
out.collect(new Vertex<K, VV>(key, record));
}

@Override
public void close() {
out.close();
}
};

this.applyFunction.setOutput(userOut);
this.applyFunction.apply(arg0.f1, arg1.getValue());
public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {

this.applyFunction.setOutput(currentValue, out);
this.applyFunction.apply(newValue.f1, currentValue.getValue());
}

@Override
Expand Down

0 comments on commit 6e24879

Please sign in to comment.