Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
[GIRAPH-1041] Generate primitive type specific code
Browse files Browse the repository at this point in the history
Summary:
- Use FreeMarker library to generate primitive type specific code.
Initially generating three sets of files:
{TYPE}Consumer, {TYPE}TypeOps and W{TYPE}ArrayList

Right now generation happens manually, and generated files are being committed.
In the future we can move those to a separate project, and have them generated
when maven is compiling and deploying.

Additionally to generation change, BasicArrayList is renamed to WArrayList and
directly extends fastutil implementation, to now serves two purposes:
- generic handling of efficient arrays through TypeOps
- extended fastutil class - to make it writtable, to add useful Java8 methods,
  or anything else we can think of. Since we are just extending it, and there is
  no efficiency penalty, we can always use WLongArrayList instead of LongArrayList.

There is additional WReusableLongArrayList, which when readFields is called,
doesn't size it to exact size, but reuses the old length.

Test Plan:
mvn clean install

There are no changes in logic in this diff. Will send a small separate diff
with some examples of what is now simpler.

Reviewers: sergey.edunov, dionysis.logothetis, spupyrev, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D52515
  • Loading branch information
Igor Kabiljo committed Apr 12, 2016
1 parent bac93fa commit 77ae12e
Show file tree
Hide file tree
Showing 51 changed files with 3,681 additions and 1,052 deletions.
Expand Up @@ -35,7 +35,7 @@
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.types.ops.PrimitiveTypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.types.ops.collections.BasicArrayList;
import org.apache.giraph.types.ops.collections.array.WArrayList;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.worker.WorkerBroadcastUsage;
import org.apache.hadoop.io.Writable;
Expand All @@ -50,7 +50,7 @@
* @param <R> Reduced value type
*/
public class BasicArrayReduce<S, R extends Writable>
implements ReduceOperation<Pair<IntRef, S>, BasicArrayList<R>> {
implements ReduceOperation<Pair<IntRef, S>, WArrayList<R>> {
private int fixedSize;
private PrimitiveTypeOps<R> typeOps;
private ReduceOperation<S, R> elementReduceOp;
Expand Down Expand Up @@ -182,7 +182,7 @@ ReducerArrayHandle<S, R> createArrayHandles(
final int fixedSize, final PrimitiveTypeOps<R> typeOps,
ReduceOperation<S, R> elementReduceOp,
CreateReducerFunctionApi createFunction) {
final ReducerHandle<Pair<IntRef, S>, BasicArrayList<R>> reduceHandle =
final ReducerHandle<Pair<IntRef, S>, WArrayList<R>> reduceHandle =
createFunction.createReducer(
new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp));
final IntRef curIndex = new IntRef(0);
Expand All @@ -193,11 +193,11 @@ ReducerArrayHandle<S, R> createArrayHandles(
final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
@Override
public R getReducedValue(MasterGlobalCommUsage master) {
BasicArrayList<R> result = reduceHandle.getReducedValue(master);
WArrayList<R> result = reduceHandle.getReducedValue(master);
if (fixedSize == -1 && curIndex.value >= result.size()) {
typeOps.set(reusableValue, initialValue);
} else {
result.getInto(curIndex.value, reusableValue);
result.getIntoW(curIndex.value, reusableValue);
}
return reusableValue;
}
Expand Down Expand Up @@ -238,19 +238,19 @@ public int getReducedSize(BlockMasterApi master) {

@Override
public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
final BroadcastHandle<BasicArrayList<R>> broadcastHandle =
final BroadcastHandle<WArrayList<R>> broadcastHandle =
reduceHandle.broadcastValue(master);
final IntRef curIndex = new IntRef(0);
final R reusableValue = typeOps.create();
final BroadcastHandle<R>
elementBroadcastHandle = new BroadcastHandle<R>() {
@Override
public R getBroadcast(WorkerBroadcastUsage worker) {
BasicArrayList<R> result = broadcastHandle.getBroadcast(worker);
WArrayList<R> result = broadcastHandle.getBroadcast(worker);
if (fixedSize == -1 && curIndex.value >= result.size()) {
typeOps.set(reusableValue, initialValue);
} else {
result.getInto(curIndex.value, reusableValue);
result.getIntoW(curIndex.value, reusableValue);
}
return reusableValue;
}
Expand Down Expand Up @@ -288,17 +288,17 @@ private void init() {
}

@Override
public BasicArrayList<R> createInitialValue() {
public WArrayList<R> createInitialValue() {
if (fixedSize != -1) {
BasicArrayList<R> list = typeOps.createArrayList(fixedSize);
WArrayList<R> list = typeOps.createArrayList(fixedSize);
fill(list, fixedSize);
return list;
} else {
return typeOps.createArrayList(1);
}
}

private void fill(BasicArrayList<R> list, int newSize) {
private void fill(WArrayList<R> list, int newSize) {
if (fixedSize != -1 && newSize > fixedSize) {
throw new IllegalArgumentException(newSize + " larger then " + fixedSize);
}
Expand All @@ -307,30 +307,30 @@ private void fill(BasicArrayList<R> list, int newSize) {
list.setCapacity(newSize);
}
while (list.size() < newSize) {
list.add(initialElement);
list.addW(initialElement);
}
}

@Override
public BasicArrayList<R> reduce(
BasicArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
public WArrayList<R> reduce(
WArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
int index = valueToReduce.getLeft().value;
fill(curValue, index + 1);
curValue.getInto(index, reusable);
curValue.getIntoW(index, reusable);
R result = elementReduceOp.reduce(reusable, valueToReduce.getRight());
curValue.set(index, result);
curValue.setW(index, result);
return curValue;
}

@Override
public BasicArrayList<R> reduceMerge(
BasicArrayList<R> curValue, BasicArrayList<R> valueToReduce) {
public WArrayList<R> reduceMerge(
WArrayList<R> curValue, WArrayList<R> valueToReduce) {
fill(curValue, valueToReduce.size());
for (int i = 0; i < valueToReduce.size(); i++) {
valueToReduce.getInto(i, reusable2);
curValue.getInto(i, reusable);
valueToReduce.getIntoW(i, reusable2);
curValue.getIntoW(i, reusable);
R result = elementReduceOp.reduceMerge(reusable, reusable2);
curValue.set(i, result);
curValue.setW(i, result);
}

return curValue;
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.types.ops.PrimitiveTypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.types.ops.collections.BasicArrayList;
import org.apache.giraph.types.ops.collections.array.WArrayList;
import org.apache.giraph.utils.ArrayWritable;
import org.apache.giraph.worker.WorkerBroadcastUsage;
import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -300,30 +300,30 @@ public V getBroadcast(WorkerBroadcastUsage worker) {
Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier(
final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps,
final BlockMasterApi master, final ObjectStriping striping) {
final ArrayOfHandles<BroadcastHandle<BasicArrayList<V>>> arrayOfBroadcasts =
final ArrayOfHandles<BroadcastHandle<WArrayList<V>>> arrayOfBroadcasts =
new ArrayOfHandles<>(
striping.getSplits(),
new Int2ObjFunction<BroadcastHandle<BasicArrayList<V>>>() {
new Int2ObjFunction<BroadcastHandle<WArrayList<V>>>() {
@Override
public BroadcastHandle<BasicArrayList<V>> apply(int value) {
public BroadcastHandle<WArrayList<V>> apply(int value) {
int size = striping.getSplitSize(value);
int start = striping.getSplitStart(value);
BasicArrayList<V> array = typeOps.createArrayList(size);
WArrayList<V> array = typeOps.createArrayList(size);
for (int i = 0; i < size; i++) {
array.add(valueSupplier.apply(start + i));
array.addW(valueSupplier.apply(start + i));
}
return master.broadcast(array);
}
});

final IntRef insideIndex = new IntRef(-1);
final ObjectHolder<BroadcastHandle<BasicArrayList<V>>> handleHolder =
final ObjectHolder<BroadcastHandle<WArrayList<V>>> handleHolder =
new ObjectHolder<>();
final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
private final V reusable = typeOps.create();
@Override
public V getBroadcast(WorkerBroadcastUsage worker) {
handleHolder.get().getBroadcast(worker).getInto(
handleHolder.get().getBroadcast(worker).getIntoW(
insideIndex.value, reusable);
return reusable;
}
Expand Down
Expand Up @@ -24,8 +24,8 @@
import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
import org.apache.giraph.types.ops.PrimitiveTypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.types.ops.collections.BasicArrayList;
import org.apache.giraph.types.ops.collections.ResettableIterator;
import org.apache.giraph.types.ops.collections.array.WArrayList;
import org.apache.giraph.utils.WritableUtils;

/**
Expand All @@ -34,7 +34,7 @@
* @param <S> Primitive Writable type, which has its type ops
*/
public class CollectPrimitiveReduceOperation<S>
extends KryoWrappedReduceOperation<S, BasicArrayList<S>> {
extends KryoWrappedReduceOperation<S, WArrayList<S>> {
/**
* Type ops if available, or null
*/
Expand All @@ -49,25 +49,25 @@ public CollectPrimitiveReduceOperation(PrimitiveTypeOps<S> typeOps) {
}

@Override
public BasicArrayList<S> createValue() {
public WArrayList<S> createValue() {
return createList();
}

@Override
public void reduce(BasicArrayList<S> reduceInto, S value) {
reduceInto.add(value);
public void reduce(WArrayList<S> reduceInto, S value) {
reduceInto.addW(value);
}

@Override
public void reduceMerge(BasicArrayList<S> reduceInto,
BasicArrayList<S> toReduce) {
ResettableIterator<S> iterator = toReduce.fastIterator();
public void reduceMerge(final WArrayList<S> reduceInto,
WArrayList<S> toReduce) {
ResettableIterator<S> iterator = toReduce.fastIteratorW();
while (iterator.hasNext()) {
reduceInto.add(iterator.next());
reduceInto.addW(iterator.next());
}
}

public BasicArrayList<S> createList() {
public WArrayList<S> createList() {
return typeOps.createArrayList();
}

Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.types.ops.PrimitiveTypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.types.ops.collections.BasicArrayList;
import org.apache.giraph.types.ops.collections.array.WArrayList;
import org.apache.giraph.worker.WorkerBroadcastUsage;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;

Expand All @@ -35,7 +35,7 @@
* @param <S> Single value type
*/
public class CollectShardedPrimitiveReducerHandle<S>
extends ShardedReducerHandle<S, BasicArrayList<S>> {
extends ShardedReducerHandle<S, WArrayList<S>> {
/**
* Type ops if available, or null
*/
Expand All @@ -48,27 +48,27 @@ public CollectShardedPrimitiveReducerHandle(final CreateReducersApi reduceApi,
}

@Override
public ReduceOperation<S, KryoWritableWrapper<BasicArrayList<S>>>
public ReduceOperation<S, KryoWritableWrapper<WArrayList<S>>>
createReduceOperation() {
return new CollectPrimitiveReduceOperation<>(typeOps);
}

@Override
public BasicArrayList<S> createReduceResult(MasterGlobalCommUsage master) {
public WArrayList<S> createReduceResult(MasterGlobalCommUsage master) {
int size = 0;
for (int i = 0; i < REDUCER_COUNT; i++) {
size += reducers.get(i).getReducedValue(master).get().size();
}
return createList(size);
}

public BasicArrayList<S> createList(int size) {
public WArrayList<S> createList(int size) {
return typeOps.createArrayList(size);
}

@Override
public BroadcastHandle<BasicArrayList<S>> createBroadcastHandle(
BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>> broadcasts) {
public BroadcastHandle<WArrayList<S>> createBroadcastHandle(
BroadcastArrayHandle<KryoWritableWrapper<WArrayList<S>>> broadcasts) {
return new CollectShardedPrimitiveBroadcastHandle(broadcasts);
}

Expand All @@ -78,13 +78,13 @@ public BroadcastHandle<BasicArrayList<S>> createBroadcastHandle(
public class CollectShardedPrimitiveBroadcastHandle
extends ShardedBroadcastHandle {
public CollectShardedPrimitiveBroadcastHandle(
BroadcastArrayHandle<KryoWritableWrapper<BasicArrayList<S>>>
BroadcastArrayHandle<KryoWritableWrapper<WArrayList<S>>>
broadcasts) {
super(broadcasts);
}

@Override
public BasicArrayList<S> createBroadcastResult(
public WArrayList<S> createBroadcastResult(
WorkerBroadcastUsage worker) {
int size = 0;
for (int i = 0; i < REDUCER_COUNT; i++) {
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.types.ops.PrimitiveTypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.giraph.types.ops.collections.BasicArrayList;
import org.apache.giraph.types.ops.collections.array.WArrayList;
import org.apache.giraph.worker.WorkerBroadcastUsage;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;

Expand All @@ -39,7 +39,7 @@
*/
@SuppressWarnings("unchecked")
public class CollectShardedTuplesOfPrimitivesReducerHandle
extends ShardedReducerHandle<List<Object>, List<BasicArrayList>> {
extends ShardedReducerHandle<List<Object>, List<WArrayList>> {
/**
* Type ops if available, or null
*/
Expand All @@ -64,12 +64,12 @@ public List<Object> createSingleValue() {

@Override
public ReduceOperation<List<Object>,
KryoWritableWrapper<List<BasicArrayList>>> createReduceOperation() {
KryoWritableWrapper<List<WArrayList>>> createReduceOperation() {
return new CollectTuplesOfPrimitivesReduceOperation(typeOpsList);
}

@Override
public List<BasicArrayList> createReduceResult(
public List<WArrayList> createReduceResult(
MasterGlobalCommUsage master) {
int size = 0;
for (int i = 0; i < REDUCER_COUNT; i++) {
Expand All @@ -78,17 +78,17 @@ public List<BasicArrayList> createReduceResult(
return createLists(size);
}

public List<BasicArrayList> createLists(int size) {
List<BasicArrayList> ret = new ArrayList<>();
public List<WArrayList> createLists(int size) {
List<WArrayList> ret = new ArrayList<>();
for (PrimitiveTypeOps typeOps : typeOpsList) {
ret.add(typeOps.createArrayList(size));
}
return ret;
}

@Override
public BroadcastHandle<List<BasicArrayList>> createBroadcastHandle(
BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>>
public BroadcastHandle<List<WArrayList>> createBroadcastHandle(
BroadcastArrayHandle<KryoWritableWrapper<List<WArrayList>>>
broadcasts) {
return new CollectShardedTuplesOfPrimitivesBroadcastHandle(broadcasts);
}
Expand All @@ -99,13 +99,13 @@ public BroadcastHandle<List<BasicArrayList>> createBroadcastHandle(
public class CollectShardedTuplesOfPrimitivesBroadcastHandle
extends ShardedBroadcastHandle {
public CollectShardedTuplesOfPrimitivesBroadcastHandle(
BroadcastArrayHandle<KryoWritableWrapper<List<BasicArrayList>>>
BroadcastArrayHandle<KryoWritableWrapper<List<WArrayList>>>
broadcasts) {
super(broadcasts);
}

@Override
public List<BasicArrayList> createBroadcastResult(
public List<WArrayList> createBroadcastResult(
WorkerBroadcastUsage worker) {
int size = 0;
for (int i = 0; i < REDUCER_COUNT; i++) {
Expand All @@ -120,7 +120,7 @@ public List<BasicArrayList> createBroadcastResult(
*/
public static class CollectShardedTuplesOfPrimitivesReduceBroadcast {
private CollectShardedTuplesOfPrimitivesReducerHandle reducerHandle;
private BroadcastHandle<List<BasicArrayList>> broadcastHandle;
private BroadcastHandle<List<WArrayList>> broadcastHandle;

/** Set reducer handle to just registered handle */
public void registeredReducer(CreateReducersApi reduceApi,
Expand All @@ -139,7 +139,7 @@ public void reduce(List<Object> valueToReduce) {
}

/** Get reduced value */
public List<BasicArrayList> getReducedValue(MasterGlobalCommUsage master) {
public List<WArrayList> getReducedValue(MasterGlobalCommUsage master) {
return reducerHandle.getReducedValue(master);
}

Expand All @@ -151,7 +151,7 @@ public void broadcastValue(BlockMasterApi master) {
}

/** Get broadcasted value */
public List<BasicArrayList> getBroadcast(WorkerBroadcastUsage worker) {
public List<WArrayList> getBroadcast(WorkerBroadcastUsage worker) {
return broadcastHandle.getBroadcast(worker);
}
}
Expand Down

0 comments on commit 77ae12e

Please sign in to comment.