Skip to content

Commit

Permalink
[FLINK-1514] [gelly] Fixed inconsistencies after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
balidani authored and vasia committed Apr 25, 2015
1 parent 740d437 commit 837508d
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 60 deletions.
31 changes: 0 additions & 31 deletions flink-staging/flink-gelly/pom.xml 100644 → 100755
Expand Up @@ -57,35 +57,4 @@ under the License.
<version>${guava.version}</version> <version>${guava.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>

<!-- See main pom.xml for explanation of profiles -->
<profiles>
<profile>
<id>hadoop-1</id>
<activation>
<property>
<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
<!--hadoop1--><name>hadoop.profile</name><value>1</value>
</property>
</activation>
<dependencies>
<!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop-2</id>
<activation>
<property>
<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
<!--hadoop2--><name>!hadoop.profile</name>
</property>
</activation>
</profile>
</profiles>
</project> </project>
Expand Up @@ -83,8 +83,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
private final DataSet<Edge<K, EV>> edges; private final DataSet<Edge<K, EV>> edges;


/** /**
* Creates a graph from two DataSets: vertices and edges and allow setting * Creates a graph from two DataSets: vertices and edges
* the undirected property
* *
* @param vertices a DataSet of vertices. * @param vertices a DataSet of vertices.
* @param edges a DataSet of edges. * @param edges a DataSet of edges.
Expand Down Expand Up @@ -919,7 +918,7 @@ public long numberOfVertices() throws Exception {
} }


/** /**
* @return Singleton DataSet containing the edge count * @return a long integer representing the number of edges
*/ */
public long numberOfEdges() throws Exception { public long numberOfEdges() throws Exception {
return edges.count(); return edges.count();
Expand Down Expand Up @@ -1016,13 +1015,6 @@ public void reduce(Iterable<Tuple2<K, K>> values, Collector<Tuple2<K, K>> out) {
} }
} }


private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> {
@Override
public Boolean map(Integer n) {
return (n == 1);
}
}

/** /**
* Adds the input vertex and edges to the graph. If the vertex already * Adds the input vertex and edges to the graph. If the vertex already
* exists in the graph, it will not be added again, but the given edges * exists in the graph, it will not be added again, but the given edges
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.RichEdge; import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;


Expand Down Expand Up @@ -70,13 +70,13 @@ public static void main(String[] args) throws Exception {
graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations); graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);


// Extract the vertices as the result // Extract the vertices as the result
DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices(); DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();


// emit result // emit result
if (fileOutput) { if (fileOutput) {
greedyGraphColoring.writeAsCsv(outputPath, "\n", " "); connectedComponents.writeAsCsv(outputPath, "\n", " ");
} else { } else {
greedyGraphColoring.print(); connectedComponents.print();
} }


env.execute("GSA Connected Components"); env.execute("GSA Connected Components");
Expand All @@ -99,7 +99,7 @@ public void flatMap(Edge<Long, NullValue> edge, Collector<Vertex<Long, Long>> ou
private static final class ConnectedComponentsGather private static final class ConnectedComponentsGather
extends GatherFunction<Long, NullValue, Long> { extends GatherFunction<Long, NullValue, Long> {
@Override @Override
public Long gather(RichEdge<Long, NullValue> richEdge) { public Long gather(Neighbor<Long, NullValue> richEdge) {


return richEdge.getSrcVertexValue(); return richEdge.getSrcVertexValue();
} }
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.graph.gsa.ApplyFunction; import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction; import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.RichEdge; import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;


/** /**
Expand Down Expand Up @@ -116,7 +116,7 @@ public void flatMap(Edge<Long, Double> edge, Collector<Vertex<Long, Double>> out
private static final class SingleSourceShortestPathGather private static final class SingleSourceShortestPathGather
extends GatherFunction<Double, Double, Double> { extends GatherFunction<Double, Double, Double> {
@Override @Override
public Double gather(RichEdge<Double, Double> richEdge) { public Double gather(Neighbor<Double, Double> richEdge) {
return richEdge.getSrcVertexValue() + richEdge.getEdgeValue(); return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
} }
}; };
Expand Down
Expand Up @@ -24,7 +24,7 @@


public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable { public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {


public abstract M gather(RichEdge<VV, EV> richEdge); public abstract M gather(Neighbor<VV, EV> neighbor);


/** /**
* This method is executed once per superstep before the vertex update function is invoked for each vertex. * This method is executed once per superstep before the vertex update function is invoked for each vertex.
Expand Down
Expand Up @@ -117,15 +117,15 @@ public DataSet<Vertex<K, VV>> createResult() {
final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration = final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos); vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);


// Prepare the rich edges // Prepare the neighbors
DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> richEdges = iteration DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
.getWorkset() .getWorkset()
.join(edgeDataSet) .join(edgeDataSet)
.where(0) .where(0)
.equalTo(0); .equalTo(0);


// Gather, sum and apply // Gather, sum and apply
DataSet<Tuple2<K, M>> gatheredSet = richEdges.map(gatherUdf); DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf); DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
DataSet<Vertex<K, VV>> appliedSet = summedSet DataSet<Vertex<K, VV>> appliedSet = summedSet
.join(iteration.getSolutionSet()) .join(iteration.getSolutionSet())
Expand Down Expand Up @@ -178,12 +178,12 @@ private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tupl
} }


@Override @Override
public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> richEdge) throws Exception { public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple) throws Exception {
RichEdge<VV, EV> userRichEdge = new RichEdge<VV, EV>(richEdge.f0.getValue(), Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
richEdge.f1.getValue()); neighborTuple.f1.getValue());


K key = richEdge.f1.getTarget(); K key = neighborTuple.f1.getTarget();
M result = this.gatherFunction.gather(userRichEdge); M result = this.gatherFunction.gather(neighbor);
return new Tuple2<K, M>(key, result); return new Tuple2<K, M>(key, result);
} }


Expand Down
Expand Up @@ -28,12 +28,12 @@
* @param <VV> the vertex value type * @param <VV> the vertex value type
* @param <EV> the edge value type * @param <EV> the edge value type
*/ */
public class RichEdge<VV extends Serializable, EV extends Serializable> public class Neighbor<VV extends Serializable, EV extends Serializable>
extends Tuple2<VV, EV> { extends Tuple2<VV, EV> {


public RichEdge() {} public Neighbor() {}


public RichEdge(VV src, EV edge) { public Neighbor(VV src, EV edge) {
super(src, edge); super(src, edge);
} }


Expand Down

0 comments on commit 837508d

Please sign in to comment.