Skip to content
Permalink
Browse files
JIRA-1148
closes #39
  • Loading branch information
Maja Kabiljo committed Jun 2, 2017
1 parent 02e73b9 commit e2f82b25d19faa263bbdbdf074948c6537d85e3a
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
@@ -352,10 +352,15 @@ Block calculateConnectedComponentSizes(
Pair<LongWritable, LongWritable> componentToReducePair = Pair.of(
new LongWritable(), new LongWritable(1));
LongWritable reusableLong = new LongWritable();
return Pieces.reduceAndBroadcast(
"CalcConnectedComponentSizes",
// This reduce operation is stateless so we can use a single instance
BasicMapReduce<LongWritable, LongWritable, LongWritable> reduceOperation =
new BasicMapReduce<>(
LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG),
LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG);
return Pieces.reduceAndBroadcastWithArrayOfHandles(
"CalcConnectedComponentSizes",
3137, /* Just using some large prime number */
() -> reduceOperation,
vertex -> getComponent.get(vertex).get(),
(Vertex<LongWritable, V, Writable> vertex) -> {
componentToReducePair.getLeft().set(getComponent.get(vertex).get());
return componentToReducePair;
@@ -17,7 +17,9 @@
*/
package org.apache.giraph.block_app.library;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.giraph.block_app.framework.api.BlockMasterApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
@@ -26,13 +28,16 @@
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.library.internal.SendMessagePiece;
import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.function.Consumer;
import org.apache.giraph.function.PairConsumer;
import org.apache.giraph.function.Supplier;
import org.apache.giraph.function.vertex.ConsumerWithVertex;
import org.apache.giraph.function.vertex.SupplierFromVertex;
import org.apache.giraph.graph.Vertex;
@@ -319,6 +324,91 @@ public String toString() {
};
}

/**
* Like reduceAndBroadcast, but uses array of handles for reducers and
* broadcasts, to make it feasible and performant when values are large.
* Each supplied value to reduce will be reduced in the handle defined by
* handleHashSupplier%numHandles
*
* @param <S> Single value type, objects passed on workers
* @param <R> Reduced value type
* @param <I> Vertex id type
* @param <V> Vertex value type
* @param <E> Edge value type
*/
public static
<S, R extends Writable, I extends WritableComparable, V extends Writable,
E extends Writable>
Piece<I, V, E, NoMessage, Object> reduceAndBroadcastWithArrayOfHandles(
final String name,
final int numHandles,
final Supplier<ReduceOperation<S, R>> reduceOp,
final SupplierFromVertex<I, V, E, Long> handleHashSupplier,
final SupplierFromVertex<I, V, E, S> valueSupplier,
final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
return new Piece<I, V, E, NoMessage, Object>() {
protected ArrayOfHandles.ArrayOfReducers<S, R> reducers;
protected BroadcastArrayHandle<R> broadcasts;

private int getHandleIndex(Vertex<I, V, E> vertex) {
return (int) Math.abs(handleHashSupplier.get(vertex) % numHandles);
}

@Override
public void registerReducers(
final CreateReducersApi reduceApi, Object executionStage) {
reducers = new ArrayOfHandles.ArrayOfReducers<>(
numHandles,
new Supplier<ReducerHandle<S, R>>() {
@Override
public ReducerHandle<S, R> get() {
return reduceApi.createLocalReducer(reduceOp.get());
}
});
}

@Override
public VertexSender<I, V, E> getVertexSender(
BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
Object executionStage) {
return new InnerVertexSender() {
@Override
public void vertexSend(Vertex<I, V, E> vertex) {
reducers.get(getHandleIndex(vertex)).reduce(
valueSupplier.get(vertex));
}
};
}

@Override
public void masterCompute(BlockMasterApi master, Object executionStage) {
broadcasts = reducers.broadcastValue(master);
}

@Override
public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
final List<R> values = new ArrayList<>();
for (int i = 0; i < numHandles; i++) {
values.add(broadcasts.get(i).getBroadcast(workerApi));
}
return new InnerVertexReceiver() {
@Override
public void vertexReceive(
Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
reducedValueConsumer.apply(
vertex, values.get(getHandleIndex(vertex)));
}
};
}

@Override
public String toString() {
return name;
}
};
}

/**
* Creates Piece that for each vertex, sends message provided by
* messageSupplier to all targets provided by targetsSupplier.

0 comments on commit e2f82b2

Please sign in to comment.