From 77f8a075ccc029cb608a382f5deb7cc0b27b02e5 Mon Sep 17 00:00:00 2001 From: Igor Kabiljo Date: Wed, 17 Jun 2015 12:47:52 -0700 Subject: [PATCH] [GIRAPH-1013] Adding reducer handle utilities Summary: And more functional interfaces, and PairWritable Test Plan: mvn clean install Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D40269 --- giraph-block-app/pom.xml | 4 + .../reducers/array/ArrayOfHandles.java | 127 ++++++ .../block_app/reducers/array/ArrayReduce.java | 211 +++++++++ .../reducers/array/BasicArrayReduce.java | 353 +++++++++++++++ .../reducers/array/HugeArrayUtils.java | 404 ++++++++++++++++++ .../reducers/array/package-info.java | 21 + .../CollectPrimitiveReduceOperation.java | 84 ++++ .../collect/CollectReduceOperation.java | 50 +++ .../CollectShardedPrimitiveReducerHandle.java | 96 +++++ .../collect/CollectShardedReducerHandle.java | 85 ++++ ...hardedTuplesOfPrimitivesReducerHandle.java | 158 +++++++ ...lectTuplesOfPrimitivesReduceOperation.java | 96 +++++ .../collect/ShardedReducerHandle.java | 123 ++++++ .../reducers/collect/package-info.java | 21 + .../reducers/map/BasicMapReduce.java | 276 ++++++++++++ .../block_app/reducers/map/package-info.java | 21 + .../block_app/reducers/package-info.java | 21 + .../giraph/function/TripleFunction.java | 41 ++ .../primitive/Obj2DoubleFunction.java | 33 ++ .../function/primitive/Obj2FloatFunction.java | 33 ++ .../function/primitive/Obj2LongFunction.java | 33 ++ .../reducers/array/ObjectStripingTest.java | 58 +++ giraph-core/pom.xml | 4 + .../impl/KryoWrappedReduceOperation.java | 86 ++++ .../writable/tuple/DoubleDoubleWritable.java | 39 ++ .../writable/tuple/IntDoubleWritable.java | 40 ++ .../giraph/writable/tuple/IntIntWritable.java | 39 ++ .../writable/tuple/IntLongWritable.java | 40 ++ .../writable/tuple/LongDoubleWritable.java | 40 ++ .../writable/tuple/LongIntWritable.java | 40 ++ .../writable/tuple/LongLongWritable.java | 39 ++ .../giraph/writable/tuple/PairWritable.java | 113 +++++ .../giraph/writable/tuple/package-info.java | 21 + pom.xml | 6 + 34 files changed, 2856 insertions(+) create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/package-info.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/package-info.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/function/TripleFunction.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2DoubleFunction.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2FloatFunction.java create mode 100644 giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2LongFunction.java create mode 100644 giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/reducers/impl/KryoWrappedReduceOperation.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/DoubleDoubleWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntDoubleWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntIntWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntLongWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongDoubleWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongIntWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongLongWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/PairWritable.java create mode 100644 giraph-core/src/main/java/org/apache/giraph/writable/tuple/package-info.java diff --git a/giraph-block-app/pom.xml b/giraph-block-app/pom.xml index 1f653bb0b..a05c1c592 100644 --- a/giraph-block-app/pom.xml +++ b/giraph-block-app/pom.xml @@ -86,6 +86,10 @@ under the License. + + org.apache.commons + commons-lang3 + it.unimi.dsi fastutil diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java new file mode 100644 index 000000000..053fd61e9 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayOfHandles.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.util.ArrayList; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.worker.WorkerBroadcastUsage; + +/** + * ArrayHandle implemented as an array of individual handles. + * + * @param Handle type + */ +public class ArrayOfHandles implements ArrayHandle { + protected final ArrayList handles; + + public ArrayOfHandles(int count, Supplier reduceHandleFactory) { + handles = new ArrayList<>(); + for (int i = 0; i < count; i++) { + handles.add(reduceHandleFactory.get()); + } + } + + public ArrayOfHandles(int count, Int2ObjFunction reduceHandleFactory) { + handles = new ArrayList<>(); + for (int i = 0; i < count; i++) { + handles.add(reduceHandleFactory.apply(i)); + } + } + + @Override + public H get(int index) { + return handles.get(index); + } + + @Override + public int getStaticSize() { + return handles.size(); + } + + /** + * ReducerArrayHandle implemented as an array of separate reducer handles. + * + * @param Handle type + */ + public static class ArrayOfReducers + extends ArrayOfHandles> + implements ReducerArrayHandle { + + public ArrayOfReducers( + int count, Supplier> reduceHandleFactory) { + super(count, reduceHandleFactory); + } + + public ArrayOfReducers( + int count, Int2ObjFunction> reduceHandleFactory) { + super(count, reduceHandleFactory); + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return getStaticSize(); + } + + @Override + public BroadcastArrayHandle broadcastValue(final BlockMasterApi master) { + return new ArrayOfBroadcasts<>( + getStaticSize(), + new Int2ObjFunction>() { + @Override + public BroadcastHandle apply(int index) { + return get(index).broadcastValue(master); + } + }); + } + } + + /** + * BroadcastArrayHandle implemented as an array of separate broadcast handles. + * + * @param Handle type + */ + public static class ArrayOfBroadcasts + extends ArrayOfHandles> + implements BroadcastArrayHandle { + + public ArrayOfBroadcasts( + int count, + Int2ObjFunction> broadcastHandleFactory) { + super(count, broadcastHandleFactory); + } + + public ArrayOfBroadcasts( + int count, + Supplier> broadcastHandleFactory) { + super(count, broadcastHandleFactory); + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return getStaticSize(); + } + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java new file mode 100644 index 000000000..f2cdf8c22 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/ArrayReduce.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Array; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.utils.ArrayWritable; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; + +/** + * One reducer representing reduction of array of individual values. + * Elements are represented as object, and so BasicArrayReduce should be + * used instead when elements are primitive types. + * + * @param Single value type, objects passed on workers + * @param Reduced value type + */ +public class ArrayReduce + implements ReduceOperation, ArrayWritable> { + private int fixedSize; + private ReduceOperation elementReduceOp; + private Class elementClass; + + public ArrayReduce() { + } + + /** + * Create ReduceOperation that reduces arrays by reducing individual + * elements. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public ArrayReduce(int fixedSize, ReduceOperation elementReduceOp) { + this.fixedSize = fixedSize; + this.elementReduceOp = elementReduceOp; + init(); + } + + /** + * Registers one new reducer, that will reduce array of objects, + * by reducing individual elements using {@code elementReduceOp}. + * + * This function will return ReducerArrayHandle to it, by which + * individual elements can be manipulated separately. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerArrayHandle + */ + public static + ReducerArrayHandle createArrayHandles( + final int fixedSize, ReduceOperation elementReduceOp, + CreateReducerFunctionApi createFunction) { + final ReducerHandle, ArrayWritable> reduceHandle = + createFunction.createReducer( + new ArrayReduce<>(fixedSize, elementReduceOp)); + + final IntRef curIndex = new IntRef(0); + final MutablePair reusablePair = + MutablePair.of(new IntRef(0), null); + final ReducerHandle elementReduceHandle = new ReducerHandle() { + @Override + public T getReducedValue(MasterGlobalCommUsage master) { + ArrayWritable result = reduceHandle.getReducedValue(master); + return result.get()[curIndex.value]; + } + + @Override + public void reduce(S valueToReduce) { + reusablePair.getLeft().value = curIndex.value; + reusablePair.setRight(valueToReduce); + reduceHandle.reduce(reusablePair); + } + + @Override + public BroadcastHandle broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException(); + } + }; + + return new ReducerArrayHandle() { + @Override + public ReducerHandle get(int index) { + curIndex.value = index; + return elementReduceHandle; + } + + @Override + public int getStaticSize() { + return fixedSize; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return getStaticSize(); + } + + @Override + public BroadcastArrayHandle broadcastValue(BlockMasterApi master) { + final BroadcastHandle> broadcastHandle = + reduceHandle.broadcastValue(master); + final IntRef curIndex = new IntRef(0); + final BroadcastHandle + elementBroadcastHandle = new BroadcastHandle() { + @Override + public T getBroadcast(WorkerBroadcastUsage worker) { + ArrayWritable result = broadcastHandle.getBroadcast(worker); + return result.get()[curIndex.value]; + } + }; + return new BroadcastArrayHandle() { + @Override + public BroadcastHandle get(int index) { + curIndex.value = index; + return elementBroadcastHandle; + } + + @Override + public int getStaticSize() { + return fixedSize; + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return getStaticSize(); + } + }; + } + }; + } + + private void init() { + elementClass = (Class) elementReduceOp.createInitialValue().getClass(); + } + + @Override + public ArrayWritable createInitialValue() { + R[] values = (R[]) Array.newInstance(elementClass, fixedSize); + for (int i = 0; i < fixedSize; i++) { + values[i] = elementReduceOp.createInitialValue(); + } + return new ArrayWritable<>(elementClass, values); + } + + @Override + public ArrayWritable reduce( + ArrayWritable curValue, Pair valueToReduce) { + int index = valueToReduce.getLeft().value; + curValue.get()[index] = + elementReduceOp.reduce(curValue.get()[index], valueToReduce.getRight()); + return curValue; + } + + @Override + public ArrayWritable reduceMerge( + ArrayWritable curValue, ArrayWritable valueToReduce) { + for (int i = 0; i < fixedSize; i++) { + curValue.get()[i] = + elementReduceOp.reduceMerge( + curValue.get()[i], valueToReduce.get()[i]); + } + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(fixedSize); + WritableUtils.writeWritableObject(elementReduceOp, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + fixedSize = in.readInt(); + elementReduceOp = WritableUtils.readWritableObject(in, null); + init(); + } + +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java new file mode 100644 index 000000000..91ced16e4 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/BasicArrayReduce.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; + +/** + * Efficient generic primitive array reduce operation. + * + * Allows two modes - fixed size, and infinite size + * (with keeping only actually used elements and resizing) + * + * @param Single value type + * @param Reduced value type + */ +public class BasicArrayReduce + implements ReduceOperation, BasicArrayList> { + private int fixedSize; + private PrimitiveTypeOps typeOps; + private ReduceOperation elementReduceOp; + private R initialElement; + private R reusable; + private R reusable2; + + public BasicArrayReduce() { + } + + + /** + * Create ReduceOperation that reduces BasicArrays by reducing individual + * elements, with predefined size. + * + * @param fixedSize Number of elements + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public BasicArrayReduce( + int fixedSize, + PrimitiveTypeOps typeOps, + ReduceOperation elementReduceOp) { + this.fixedSize = fixedSize; + this.typeOps = typeOps; + this.elementReduceOp = elementReduceOp; + init(); + } + + + /** + * Create ReduceOperation that reduces BasicArrays by reducing individual + * elements, with unbounded size. + * + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public BasicArrayReduce( + PrimitiveTypeOps typeOps, ReduceOperation elementReduceOp) { + this(-1, typeOps, elementReduceOp); + } + + + /** + * Registers one new local reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with unbounded size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi API for creating reducers + * @return Created ReducerArrayHandle + */ + public static + ReducerArrayHandle createLocalArrayHandles( + PrimitiveTypeOps typeOps, ReduceOperation elementReduceOp, + CreateReducersApi reduceApi) { + return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi); + } + + /** + * Registers one new local reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with predefined size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param fixedSize Number of elements + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi API for creating reducers + * @return Created ReducerArrayHandle + */ + public static + ReducerArrayHandle createLocalArrayHandles( + int fixedSize, PrimitiveTypeOps typeOps, + ReduceOperation elementReduceOp, + final CreateReducersApi reduceApi) { + return createArrayHandles(fixedSize, typeOps, elementReduceOp, + new CreateReducerFunctionApi() { + @Override + public ReducerHandle createReducer( + ReduceOperation reduceOp) { + return reduceApi.createLocalReducer(reduceOp); + } + }); + } + + /** + * Registers one new reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with unbounded size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerArrayHandle + */ + public static + ReducerArrayHandle createArrayHandles( + PrimitiveTypeOps typeOps, ReduceOperation elementReduceOp, + CreateReducerFunctionApi createFunction) { + return createArrayHandles(-1, typeOps, elementReduceOp, createFunction); + } + + /** + * Registers one new reducer, that will reduce BasicArray, + * by reducing individual elements using {@code elementReduceOp}, + * with predefined size. + * + * This function will return ReducerArrayHandle, by which + * individual elements can be manipulated separately. + * + * @param fixedSize Number of elements + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerArrayHandle + */ + public static + ReducerArrayHandle createArrayHandles( + final int fixedSize, final PrimitiveTypeOps typeOps, + ReduceOperation elementReduceOp, + CreateReducerFunctionApi createFunction) { + final ReducerHandle, BasicArrayList> reduceHandle = + createFunction.createReducer( + new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp)); + final IntRef curIndex = new IntRef(0); + final R reusableValue = typeOps.create(); + final R initialValue = elementReduceOp.createInitialValue(); + final MutablePair reusablePair = + MutablePair.of(new IntRef(0), null); + final ReducerHandle elementReduceHandle = new ReducerHandle() { + @Override + public R getReducedValue(MasterGlobalCommUsage master) { + BasicArrayList result = reduceHandle.getReducedValue(master); + if (fixedSize == -1 && curIndex.value >= result.size()) { + typeOps.set(reusableValue, initialValue); + } else { + result.getInto(curIndex.value, reusableValue); + } + return reusableValue; + } + + @Override + public void reduce(S valueToReduce) { + reusablePair.getLeft().value = curIndex.value; + reusablePair.setRight(valueToReduce); + reduceHandle.reduce(reusablePair); + } + + @Override + public BroadcastHandle broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException(); + } + }; + + return new ReducerArrayHandle() { + @Override + public ReducerHandle get(int index) { + curIndex.value = index; + return elementReduceHandle; + } + + @Override + public int getStaticSize() { + if (fixedSize == -1) { + throw new UnsupportedOperationException( + "Cannot call size, when one is not specified upfront"); + } + return fixedSize; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return reduceHandle.getReducedValue(master).size(); + } + + @Override + public BroadcastArrayHandle broadcastValue(BlockMasterApi master) { + final BroadcastHandle> broadcastHandle = + reduceHandle.broadcastValue(master); + final IntRef curIndex = new IntRef(0); + final R reusableValue = typeOps.create(); + final BroadcastHandle + elementBroadcastHandle = new BroadcastHandle() { + @Override + public R getBroadcast(WorkerBroadcastUsage worker) { + BasicArrayList result = broadcastHandle.getBroadcast(worker); + if (fixedSize == -1 && curIndex.value >= result.size()) { + typeOps.set(reusableValue, initialValue); + } else { + result.getInto(curIndex.value, reusableValue); + } + return reusableValue; + } + }; + return new BroadcastArrayHandle() { + @Override + public BroadcastHandle get(int index) { + curIndex.value = index; + return elementBroadcastHandle; + } + + @Override + public int getStaticSize() { + if (fixedSize == -1) { + throw new UnsupportedOperationException( + "Cannot call size, when one is not specified upfront"); + } + return fixedSize; + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return broadcastHandle.getBroadcast(worker).size(); + } + }; + } + }; + } + + + private void init() { + initialElement = elementReduceOp.createInitialValue(); + reusable = typeOps.create(); + reusable2 = typeOps.create(); + } + + @Override + public BasicArrayList createInitialValue() { + if (fixedSize != -1) { + BasicArrayList list = typeOps.createArrayList(fixedSize); + fill(list, fixedSize); + return list; + } else { + return typeOps.createArrayList(1); + } + } + + private void fill(BasicArrayList list, int newSize) { + if (fixedSize != -1 && newSize > fixedSize) { + throw new IllegalArgumentException(newSize + " larger then " + fixedSize); + } + + if (list.capacity() < newSize) { + list.setCapacity(newSize); + } + while (list.size() < newSize) { + list.add(initialElement); + } + } + + @Override + public BasicArrayList reduce( + BasicArrayList curValue, Pair valueToReduce) { + int index = valueToReduce.getLeft().value; + fill(curValue, index + 1); + curValue.getInto(index, reusable); + R result = elementReduceOp.reduce(reusable, valueToReduce.getRight()); + curValue.set(index, result); + return curValue; + } + + @Override + public BasicArrayList reduceMerge( + BasicArrayList curValue, BasicArrayList valueToReduce) { + fill(curValue, valueToReduce.size()); + for (int i = 0; i < valueToReduce.size(); i++) { + valueToReduce.getInto(i, reusable2); + curValue.getInto(i, reusable); + R result = elementReduceOp.reduceMerge(reusable, reusable2); + curValue.set(i, result); + } + + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(fixedSize); + TypeOpsUtils.writeTypeOps(typeOps, out); + WritableUtils.writeWritableObject(elementReduceOp, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + fixedSize = in.readInt(); + typeOps = TypeOpsUtils.readTypeOps(in); + elementReduceOp = WritableUtils.readWritableObject(in, null); + init(); + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java new file mode 100644 index 000000000..be5d4fe94 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.array; + +import java.util.ArrayList; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle; +import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts; +import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.function.ObjectHolder; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.utils.ArrayWritable; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; + +/** + * Utility class when we are dealing with huge arrays (i.e. large number of + * elements) within reducing/broadcasting. + * + * In Giraph, for each reducer there is a worker machine which is it's owner, + * which does partial aggregation for it. So if we have only single huge + * reducer - other workers will have to wait, while that single worker is doing + * huge reducing operation. + * On the other hand, each reducer has a meaningful overhead, so we should try + * to keep number of reducers as low as possible (in total less then 10k is a + * good number). + * What we want is to split such huge reducers into slightly more then number + * of worker reducers, and NUM_REDUCERS = 50000 is used here as a good middle + * ground. + * + * So when we have huge array, we don't want one reducer/broadcast for each + * element, but we also don't want one reducer/broadcast for the whole array. + * + * This class allows transparent split into reasonable number of reducers + * (~50000), which solves both of the above issues. + */ +public class HugeArrayUtils { + // Striping perfectly reducers of up to 25GB (i.e. 500KB * NUM_STRIPES). + private static final IntConfOption NUM_STRIPES = new IntConfOption( + "giraph.reducers.HugeArrayUtils.num_stripes", 50000, + "Number of distict reducers to create. If array is smaller then this" + + "number, each element will be it's own reducer"); + + private HugeArrayUtils() { } + + /** + * Create global array of reducers, by splitting the huge array + * into NUM_STRIPES number of parts. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi Api for creating reducers + * @return Created ReducerArrayHandle + */ + public static + ReducerArrayHandle createGlobalReducerArrayHandle( + final int fixedSize, final ReduceOperation elementReduceOp, + final CreateReducersApi reduceApi) { + return createGlobalReducerArrayHandle( + fixedSize, elementReduceOp, reduceApi, + NUM_STRIPES.get(reduceApi.getConf())); + } + + /** + * Create global array of reducers, by splitting the huge array + * into {@code maxNumStripes} number of parts. + * + * @param fixedSize Number of elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi Api for creating reducers + * @param maxNumStripes Maximal number of reducers to create. + * @return Created ReducerArrayHandle + */ + public static + ReducerArrayHandle createGlobalReducerArrayHandle( + final int fixedSize, final ReduceOperation elementReduceOp, + final CreateReducersApi reduceApi, int maxNumStripes) { + PrimitiveTypeOps typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull( + (Class) elementReduceOp.createInitialValue().getClass()); + + final CreateReducerFunctionApi + createReducer = new CreateReducerFunctionApi() { + @Override + public ReducerHandle createReducer( + ReduceOperation reduceOp) { + return reduceApi.createGlobalReducer(reduceOp); + } + }; + + if (fixedSize < maxNumStripes) { + return new ArrayOfReducers<>( + fixedSize, + new Supplier>() { + @Override + public ReducerHandle get() { + return createReducer.createReducer(elementReduceOp); + } + }); + } else { + final ObjectStriping striping = + new ObjectStriping(fixedSize, maxNumStripes); + + final ArrayList> handles = + new ArrayList<>(striping.getSplits()); + for (int i = 0; i < striping.getSplits(); i++) { + if (typeOps != null) { + handles.add(BasicArrayReduce.createArrayHandles( + striping.getSplitSize(i), typeOps, + elementReduceOp, createReducer)); + } else { + handles.add(ArrayReduce.createArrayHandles( + striping.getSplitSize(i), elementReduceOp, createReducer)); + } + } + + return new ReducerArrayHandle() { + @Override + public ReducerHandle get(int index) { + if ((index >= fixedSize) || (index < 0)) { + throw new RuntimeException( + "Reducer Access out of bounds: requested : " + + index + " from array of size : " + fixedSize); + } + int reducerIndex = striping.getSplitIndex(index); + int insideIndex = striping.getInsideIndex(index); + return handles.get(reducerIndex).get(insideIndex); + } + + @Override + public int getStaticSize() { + return fixedSize; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return getStaticSize(); + } + + @Override + public BroadcastArrayHandle broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException("for now not supported"); + } + }; + } + } + + /** + * Broadcast a huge array, by splitting into NUM_STRIPES number of parts. + * + * @param count Number of elements + * @param valueSupplier Supplier of value to be broadcasted for a given index + * @param master Master API + * @return Created BroadcastArrayHandle + */ + public static BroadcastArrayHandle broadcast( + final int count, + final Int2ObjFunction valueSupplier, + final BlockMasterApi master) { + return broadcast(count, valueSupplier, null, master); + } + + /** + * Broadcast a huge array, by splitting into NUM_STRIPES number of parts. + * Efficient for primitive types, using BasicArray underneath. + * + * @param count Number of elements + * @param valueSupplier Supplier of value to be broadcasted for a given index + * @param typeOps Element TypeOps + * @param master Master API + * @return Created BroadcastArrayHandle + */ + public static BroadcastArrayHandle broadcast( + final int count, + final Int2ObjFunction valueSupplier, + final PrimitiveTypeOps typeOps, + final BlockMasterApi master) { + int numStripes = NUM_STRIPES.get(master.getConf()); + if (count < numStripes) { + return new ArrayOfBroadcasts<>( + count, + new Int2ObjFunction>() { + @Override + public BroadcastHandle apply(int i) { + // We create a copy because the valueSupplier might return a + // reusable obj. This function is NOT safe if typeOps is null + // & valueSupplier returns reusable + return master.broadcast( + typeOps != null ? + typeOps.createCopy(valueSupplier.apply(i)) : + valueSupplier.apply(i)); + } + }); + } else { + ObjectStriping striping = new ObjectStriping(count, numStripes); + final Int2ObjFunction> handleSupplier; + + if (typeOps != null) { + handleSupplier = getPrimitiveBroadcastHandleSupplier( + valueSupplier, typeOps, master, striping); + } else { + handleSupplier = getObjectBroadcastHandleSupplier( + valueSupplier, master, striping); + } + return new BroadcastArrayHandle() { + @Override + public BroadcastHandle get(int index) { + if (index >= count || index < 0) { + throw new RuntimeException( + "Broadcast Access out of bounds: requested: " + + index + " from array of size : " + count); + } + return handleSupplier.apply(index); + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return count; + } + + @Override + public int getStaticSize() { + return count; + } + }; + } + } + + private static + Int2ObjFunction> getObjectBroadcastHandleSupplier( + final Int2ObjFunction valueSupplier, + final BlockMasterApi master, final ObjectStriping striping) { + final ObjectHolder> elementClass = new ObjectHolder<>(); + final ArrayOfHandles>> arrayOfBroadcasts = + new ArrayOfHandles<>( + striping.getSplits(), + new Int2ObjFunction>>() { + @Override + public BroadcastHandle> apply(int value) { + int size = striping.getSplitSize(value); + int start = striping.getSplitStart(value); + V[] array = (V[]) new Writable[size]; + for (int i = 0; i < size; i++) { + array[i] = valueSupplier.apply(start + i); + if (elementClass.get() == null) { + elementClass.apply((Class) array[i].getClass()); + } + } + return master.broadcast( + new ArrayWritable<>(elementClass.get(), array)); + } + }); + + final IntRef insideIndex = new IntRef(-1); + final ObjectHolder>> handleHolder = + new ObjectHolder<>(); + + final BroadcastHandle reusableHandle = new BroadcastHandle() { + @Override + public V getBroadcast(WorkerBroadcastUsage worker) { + return handleHolder.get().getBroadcast(worker).get()[insideIndex.value]; + } + }; + + return createBroadcastHandleSupplier( + striping, arrayOfBroadcasts, insideIndex, handleHolder, + reusableHandle); + } + + private static + Int2ObjFunction> getPrimitiveBroadcastHandleSupplier( + final Int2ObjFunction valueSupplier, final PrimitiveTypeOps typeOps, + final BlockMasterApi master, final ObjectStriping striping) { + final ArrayOfHandles>> arrayOfBroadcasts = + new ArrayOfHandles<>( + striping.getSplits(), + new Int2ObjFunction>>() { + @Override + public BroadcastHandle> apply(int value) { + int size = striping.getSplitSize(value); + int start = striping.getSplitStart(value); + BasicArrayList array = typeOps.createArrayList(size); + for (int i = 0; i < size; i++) { + array.add(valueSupplier.apply(start + i)); + } + return master.broadcast(array); + } + }); + + final IntRef insideIndex = new IntRef(-1); + final ObjectHolder>> handleHolder = + new ObjectHolder<>(); + final BroadcastHandle reusableHandle = new BroadcastHandle() { + private final V reusable = typeOps.create(); + @Override + public V getBroadcast(WorkerBroadcastUsage worker) { + handleHolder.get().getBroadcast(worker).getInto( + insideIndex.value, reusable); + return reusable; + } + }; + + return createBroadcastHandleSupplier( + striping, arrayOfBroadcasts, insideIndex, handleHolder, + reusableHandle); + } + + private static + Int2ObjFunction> createBroadcastHandleSupplier( + final ObjectStriping striping, + final ArrayOfHandles> arrayOfBroadcasts, + final IntRef insideIndex, + final ObjectHolder> handleHolder, + final BroadcastHandle reusableHandle) { + final Int2ObjFunction> handleProvider = + new Int2ObjFunction>() { + @Override + public BroadcastHandle apply(int index) { + int broadcastIndex = striping.getSplitIndex(index); + insideIndex.value = striping.getInsideIndex(index); + handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex)); + return reusableHandle; + } + }; + return handleProvider; + } + + /** + * Handles indices calculations when spliting one range into smaller number + * of splits, where indices stay consecutive. + */ + static class ObjectStriping { + private final int splits; + private final int indicesPerObject; + private final int overflowNum; + private final int beforeOverflow; + + public ObjectStriping(int size, int splits) { + this.splits = splits; + this.indicesPerObject = size / splits; + this.overflowNum = size % splits; + this.beforeOverflow = overflowNum * (indicesPerObject + 1); + } + + public int getSplits() { + return splits; + } + + public int getSplitSize(int splitIndex) { + return indicesPerObject + (splitIndex < overflowNum ? 1 : 0); + } + + public int getSplitStart(int splitIndex) { + if (splitIndex < overflowNum) { + return splitIndex * (indicesPerObject + 1); + } else { + return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject; + } + } + + public int getSplitIndex(int objectIndex) { + if (objectIndex < beforeOverflow) { + return objectIndex / (indicesPerObject + 1); + } else { + return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum; + } + } + + public int getInsideIndex(int objectIndex) { + if (objectIndex < beforeOverflow) { + return objectIndex % (indicesPerObject + 1); + } else { + return (objectIndex - beforeOverflow) % indicesPerObject; + } + } + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java new file mode 100644 index 000000000..33f8a24ab --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Reducers for collecting arrays of objects. + */ +package org.apache.giraph.block_app.reducers.array; diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java new file mode 100644 index 000000000..13dd15350 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectPrimitiveReduceOperation.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.types.ops.collections.ResettableIterator; +import org.apache.giraph.utils.WritableUtils; + +/** + * Collect primitive values reduce operation + * + * @param Primitive Writable type, which has its type ops + */ +public class CollectPrimitiveReduceOperation + extends KryoWrappedReduceOperation> { + /** + * Type ops if available, or null + */ + private PrimitiveTypeOps typeOps; + + /** For reflection only */ + public CollectPrimitiveReduceOperation() { + } + + public CollectPrimitiveReduceOperation(PrimitiveTypeOps typeOps) { + this.typeOps = typeOps; + } + + @Override + public BasicArrayList createValue() { + return createList(); + } + + @Override + public void reduce(BasicArrayList reduceInto, S value) { + reduceInto.add(value); + } + + @Override + public void reduceMerge(BasicArrayList reduceInto, + BasicArrayList toReduce) { + ResettableIterator iterator = toReduce.fastIterator(); + while (iterator.hasNext()) { + reduceInto.add(iterator.next()); + } + } + + public BasicArrayList createList() { + return typeOps.createArrayList(); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeClass(typeOps.getTypeClass(), out); + } + + @Override + public void readFields(DataInput in) throws IOException { + typeOps = TypeOpsUtils.getPrimitiveTypeOps( + WritableUtils.readClass(in)); + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java new file mode 100644 index 000000000..304ac4724 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectReduceOperation.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation; + +/** + * Collect values reduce operation + * + * @param Type of values to collect + */ +public class CollectReduceOperation + extends KryoWrappedReduceOperation> { + @Override + public List createValue() { + return createList(); + } + + @Override + public void reduce(List reduceInto, S value) { + reduceInto.add(value); + } + + @Override + public void reduceMerge(List reduceInto, List toReduce) { + reduceInto.addAll(toReduce); + } + + public List createList() { + return new ArrayList<>(); + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java new file mode 100644 index 000000000..b29b297d6 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedPrimitiveReducerHandle.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * ShardedReducerHandle where we keep a list of reduced values, + * when primitives are used + * + * @param Single value type + */ +public class CollectShardedPrimitiveReducerHandle + extends ShardedReducerHandle> { + /** + * Type ops if available, or null + */ + private final PrimitiveTypeOps typeOps; + + public CollectShardedPrimitiveReducerHandle(final CreateReducersApi reduceApi, + Class valueClass) { + typeOps = TypeOpsUtils.getPrimitiveTypeOps(valueClass); + register(reduceApi); + } + + @Override + public ReduceOperation>> + createReduceOperation() { + return new CollectPrimitiveReduceOperation<>(typeOps); + } + + @Override + public BasicArrayList createReduceResult(MasterGlobalCommUsage master) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += reducers.get(i).getReducedValue(master).get().size(); + } + return createList(size); + } + + public BasicArrayList createList(int size) { + return typeOps.createArrayList(size); + } + + @Override + public BroadcastHandle> createBroadcastHandle( + BroadcastArrayHandle>> broadcasts) { + return new CollectShardedPrimitiveBroadcastHandle(broadcasts); + } + + /** + * Broadcast handle for CollectShardedPrimitiveReducerHandle + */ + public class CollectShardedPrimitiveBroadcastHandle + extends ShardedBroadcastHandle { + public CollectShardedPrimitiveBroadcastHandle( + BroadcastArrayHandle>> + broadcasts) { + super(broadcasts); + } + + @Override + public BasicArrayList createBroadcastResult( + WorkerBroadcastUsage worker) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += broadcasts.get(i).getBroadcast(worker).get().size(); + } + return createList(size); + } + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java new file mode 100644 index 000000000..5132ecf38 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedReducerHandle.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * ShardedReducerHandle where we keep a list of reduced values + * + * @param Single value type + */ +public class CollectShardedReducerHandle + extends ShardedReducerHandle> { + public CollectShardedReducerHandle(CreateReducersApi reduceApi) { + register(reduceApi); + } + + @Override + public ReduceOperation>> + createReduceOperation() { + return new CollectReduceOperation<>(); + } + + @Override + public List createReduceResult(MasterGlobalCommUsage master) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += reducers.get(i).getReducedValue(master).get().size(); + } + return createList(size); + } + + public List createList(int size) { + return new ArrayList(size); + } + + @Override + public BroadcastHandle> createBroadcastHandle( + BroadcastArrayHandle>> broadcasts) { + return new CollectShardedBroadcastHandle(broadcasts); + } + + /** + * BroadcastHandle for CollectShardedReducerHandle + */ + public class CollectShardedBroadcastHandle extends ShardedBroadcastHandle { + public CollectShardedBroadcastHandle( + BroadcastArrayHandle>> broadcasts) { + super(broadcasts); + } + + @Override + public List createBroadcastResult(WorkerBroadcastUsage worker) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += broadcasts.get(i).getBroadcast(worker).get().size(); + } + return createList(size); + } + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java new file mode 100644 index 000000000..3222c1774 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectShardedTuplesOfPrimitivesReducerHandle.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * ShardedReducerHandle where we keep a list of reduced values, + * and values consist of multiple primitives, so we keep one primitive + * list for each + */ +@SuppressWarnings("unchecked") +public class CollectShardedTuplesOfPrimitivesReducerHandle +extends ShardedReducerHandle, List> { + /** + * Type ops if available, or null + */ + private final List typeOpsList; + + public CollectShardedTuplesOfPrimitivesReducerHandle( + final CreateReducersApi reduceApi, Class... valueClasses) { + typeOpsList = new ArrayList<>(); + for (Class valueClass : valueClasses) { + typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(valueClass)); + } + register(reduceApi); + } + + public List createSingleValue() { + List ret = new ArrayList<>(); + for (PrimitiveTypeOps typeOps : typeOpsList) { + ret.add(typeOps.create()); + } + return ret; + } + + @Override + public ReduceOperation, + KryoWritableWrapper>> createReduceOperation() { + return new CollectTuplesOfPrimitivesReduceOperation(typeOpsList); + } + + @Override + public List createReduceResult( + MasterGlobalCommUsage master) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += reducers.get(i).getReducedValue(master).get().get(0).size(); + } + return createLists(size); + } + + public List createLists(int size) { + List ret = new ArrayList<>(); + for (PrimitiveTypeOps typeOps : typeOpsList) { + ret.add(typeOps.createArrayList(size)); + } + return ret; + } + + @Override + public BroadcastHandle> createBroadcastHandle( + BroadcastArrayHandle>> + broadcasts) { + return new CollectShardedTuplesOfPrimitivesBroadcastHandle(broadcasts); + } + + /** + * BroadcastHandle for CollectShardedTuplesOfPrimitivesReducerHandle + */ + public class CollectShardedTuplesOfPrimitivesBroadcastHandle + extends ShardedBroadcastHandle { + public CollectShardedTuplesOfPrimitivesBroadcastHandle( + BroadcastArrayHandle>> + broadcasts) { + super(broadcasts); + } + + @Override + public List createBroadcastResult( + WorkerBroadcastUsage worker) { + int size = 0; + for (int i = 0; i < REDUCER_COUNT; i++) { + size += broadcasts.get(i).getBroadcast(worker).get().size(); + } + return createLists(size); + } + } + + /** + * Reduce broadcast wrapper + */ + public static class CollectShardedTuplesOfPrimitivesReduceBroadcast { + private CollectShardedTuplesOfPrimitivesReducerHandle reducerHandle; + private BroadcastHandle> broadcastHandle; + + /** Set reducer handle to just registered handle */ + public void registeredReducer(CreateReducersApi reduceApi, + Class... valueClasses) { + this.reducerHandle = new CollectShardedTuplesOfPrimitivesReducerHandle( + reduceApi, valueClasses); + } + + public List createSingleValue() { + return reducerHandle.createSingleValue(); + } + + /** Reduce single value */ + public void reduce(List valueToReduce) { + reducerHandle.reduce(valueToReduce); + } + + /** Get reduced value */ + public List getReducedValue(MasterGlobalCommUsage master) { + return reducerHandle.getReducedValue(master); + } + + /** + * Broadcast reduced value from master + */ + public void broadcastValue(BlockMasterApi master) { + broadcastHandle = reducerHandle.broadcastValue(master); + } + + /** Get broadcasted value */ + public List getBroadcast(WorkerBroadcastUsage worker) { + return broadcastHandle.getBroadcast(worker); + } + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java new file mode 100644 index 000000000..afaba7aec --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/CollectTuplesOfPrimitivesReduceOperation.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.BasicArrayList; +import org.apache.giraph.types.ops.collections.ResettableIterator; +import org.apache.giraph.utils.WritableUtils; + +/** + * Collect tuples of primitive values reduce operation + */ +public class CollectTuplesOfPrimitivesReduceOperation + extends KryoWrappedReduceOperation, List> { + /** + * Type ops if available, or null + */ + private List typeOpsList; + + /** For reflection only */ + public CollectTuplesOfPrimitivesReduceOperation() { + } + + public CollectTuplesOfPrimitivesReduceOperation( + List typeOpsList) { + this.typeOpsList = typeOpsList; + } + + @Override + public List createValue() { + List ret = new ArrayList<>(typeOpsList.size()); + for (PrimitiveTypeOps typeOps : typeOpsList) { + ret.add(typeOps.createArrayList()); + } + return ret; + } + + @Override + public void reduce(List reduceInto, List value) { + for (int i = 0; i < reduceInto.size(); i++) { + reduceInto.get(i).add(value.get(i)); + } + } + + @Override + public void reduceMerge(List reduceInto, + List toReduce) { + for (int i = 0; i < reduceInto.size(); i++) { + ResettableIterator iterator = toReduce.get(i).fastIterator(); + while (iterator.hasNext()) { + reduceInto.get(i).add(iterator.next()); + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(typeOpsList.size()); + for (PrimitiveTypeOps typeOps : typeOpsList) { + WritableUtils.writeClass(typeOps.getTypeClass(), out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + typeOpsList = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps( + WritableUtils.readClass(in))); + } + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java new file mode 100644 index 000000000..0c1721603 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/ShardedReducerHandle.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.collect; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle; +import org.apache.giraph.block_app.reducers.array.ArrayOfHandles; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; +import org.apache.giraph.writable.kryo.TransientRandom; + +/** + * Reducing values into a list of reducers, randomly, + * and getting the results of all reducers together + * + * @param Single value type + * @param Reduced value type + */ +public abstract class ShardedReducerHandle + implements ReducerHandle { + // Use a prime number for number of reducers, large enough to make sure + // request sizes are within expected size (0.5MB) + protected static final int REDUCER_COUNT = 39989; + + protected final TransientRandom random = new TransientRandom(); + + protected ArrayOfHandles.ArrayOfReducers> reducers; + + public final void register(final CreateReducersApi reduceApi) { + reducers = new ArrayOfHandles.ArrayOfReducers<>(REDUCER_COUNT, + new Supplier>>() { + @Override + public ReducerHandle> get() { + return reduceApi.createLocalReducer(createReduceOperation()); + } + }); + } + + @Override + public final void reduce(S value) { + reducers.get(random.nextInt(REDUCER_COUNT)).reduce(value); + } + + @Override + public final R getReducedValue(MasterGlobalCommUsage master) { + KryoWritableWrapper ret = new KryoWritableWrapper<>( + createReduceResult(master)); + ReduceOperation> reduceOperation = + createReduceOperation(); + for (int i = 0; i < REDUCER_COUNT; i++) { + reduceOperation.reduceMerge(ret, + reducers.get(i).getReducedValue(master)); + } + return ret.get(); + } + + public abstract ReduceOperation> + createReduceOperation(); + + public R createReduceResult(MasterGlobalCommUsage master) { + return createReduceOperation().createInitialValue().get(); + } + + public BroadcastHandle createBroadcastHandle( + BroadcastArrayHandle> broadcasts) { + return new ShardedBroadcastHandle(broadcasts); + } + + @Override + public final BroadcastHandle broadcastValue(BlockMasterApi masterApi) { + return createBroadcastHandle(reducers.broadcastValue(masterApi)); + } + + /** + * Broadcast for ShardedReducerHandle + */ + public class ShardedBroadcastHandle implements BroadcastHandle { + protected final BroadcastArrayHandle> broadcasts; + + public ShardedBroadcastHandle( + BroadcastArrayHandle> broadcasts) { + this.broadcasts = broadcasts; + } + + public R createBroadcastResult(WorkerBroadcastUsage worker) { + return createReduceOperation().createInitialValue().get(); + } + + @Override + public final R getBroadcast(WorkerBroadcastUsage worker) { + KryoWritableWrapper ret = new KryoWritableWrapper<>( + createBroadcastResult(worker)); + ReduceOperation> reduceOperation = + createReduceOperation(); + for (int i = 0; i < REDUCER_COUNT; i++) { + reduceOperation.reduceMerge(ret, + broadcasts.get(i).getBroadcast(worker)); + } + return ret.get(); + } + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java new file mode 100644 index 000000000..dc640f73c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/collect/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Reducers for distributed collection of objects. + */ +package org.apache.giraph.block_app.reducers.collect; diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java new file mode 100644 index 000000000..0e1e1131d --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/BasicMapReduce.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.reducers.map; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.map.BroadcastMapHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.PrimitiveTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.WritableWriter; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Efficient generic primitive map of values reduce operation. + * (it is BasicMap Reduce, not to be confused with MapReduce) + * + * @param Key type + * @param Single value type + * @param Reduced value type + */ +public class BasicMapReduce + implements ReduceOperation, Basic2ObjectMap> { + private PrimitiveIdTypeOps keyTypeOps; + private PrimitiveTypeOps typeOps; + private ReduceOperation elementReduceOp; + private WritableWriter writer; + + public BasicMapReduce() { + } + + /** + * Create ReduceOperation that reduces BasicMaps by reducing individual + * elements corresponding to the same key. + * + * @param keyTypeOps TypeOps of keys + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + */ + public BasicMapReduce( + PrimitiveIdTypeOps keyTypeOps, PrimitiveTypeOps typeOps, + ReduceOperation elementReduceOp) { + this.keyTypeOps = keyTypeOps; + this.typeOps = typeOps; + this.elementReduceOp = elementReduceOp; + init(); + } + + /** + * Registers one new local reducer, that will reduce BasicMap, + * by reducing individual elements corresponding to the same key + * using {@code elementReduceOp}. + * + * This function will return ReducerMapHandle, by which + * individual elements can be manipulated separately. + * + * @param keyTypeOps TypeOps of keys + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param reduceApi API for creating reducers + * @return Created ReducerMapHandle + */ + public static + ReducerMapHandle createLocalMapHandles( + PrimitiveIdTypeOps keyTypeOps, PrimitiveTypeOps typeOps, + ReduceOperation elementReduceOp, + final CreateReducersApi reduceApi) { + return createMapHandles( + keyTypeOps, typeOps, elementReduceOp, + new CreateReducerFunctionApi() { + @Override + public ReducerHandle createReducer( + ReduceOperation reduceOp) { + return reduceApi.createLocalReducer(reduceOp); + } + }); + } + + /** + * Registers one new reducer, that will reduce BasicMap, + * by reducing individual elements corresponding to the same key + * using {@code elementReduceOp}. + * + * This function will return ReducerMapHandle, by which + * individual elements can be manipulated separately. + * + * @param keyTypeOps TypeOps of keys + * @param typeOps TypeOps of individual elements + * @param elementReduceOp ReduceOperation for individual elements + * @param createFunction Function for creating a reducer + * @return Created ReducerMapHandle + */ + public static + ReducerMapHandle createMapHandles( + final PrimitiveIdTypeOps keyTypeOps, final PrimitiveTypeOps typeOps, + ReduceOperation elementReduceOp, + CreateReducerFunctionApi createFunction) { + final ReducerHandle, Basic2ObjectMap> reduceHandle = + createFunction.createReducer( + new BasicMapReduce<>(keyTypeOps, typeOps, elementReduceOp)); + final K curIndex = keyTypeOps.create(); + final R reusableValue = typeOps.create(); + final R initialValue = elementReduceOp.createInitialValue(); + final MutablePair reusablePair = MutablePair.of(null, null); + final ReducerHandle elementReduceHandle = new ReducerHandle() { + @Override + public R getReducedValue(MasterGlobalCommUsage master) { + Basic2ObjectMap result = reduceHandle.getReducedValue(master); + R value = result.get(curIndex); + if (value == null) { + typeOps.set(reusableValue, initialValue); + } else { + typeOps.set(reusableValue, value); + } + return reusableValue; + } + + @Override + public void reduce(S valueToReduce) { + reusablePair.setLeft(curIndex); + reusablePair.setRight(valueToReduce); + reduceHandle.reduce(reusablePair); + } + + @Override + public BroadcastHandle broadcastValue(BlockMasterApi master) { + throw new UnsupportedOperationException(); + } + }; + + return new ReducerMapHandle() { + @Override + public ReducerHandle get(K key) { + keyTypeOps.set(curIndex, key); + return elementReduceHandle; + } + + @Override + public int getReducedSize(BlockMasterApi master) { + return reduceHandle.getReducedValue(master).size(); + } + + @Override + public BroadcastMapHandle broadcastValue(BlockMasterApi master) { + final BroadcastHandle> broadcastHandle = + reduceHandle.broadcastValue(master); + final K curIndex = keyTypeOps.create(); + final R reusableValue = typeOps.create(); + final BroadcastHandle + elementBroadcastHandle = new BroadcastHandle() { + @Override + public R getBroadcast(WorkerBroadcastUsage worker) { + Basic2ObjectMap result = broadcastHandle.getBroadcast(worker); + R value = result.get(curIndex); + if (value == null) { + typeOps.set(reusableValue, initialValue); + } else { + typeOps.set(reusableValue, value); + } + return reusableValue; + } + }; + return new BroadcastMapHandle() { + @Override + public BroadcastHandle get(K key) { + keyTypeOps.set(curIndex, key); + return elementBroadcastHandle; + } + + @Override + public int getBroadcastedSize(WorkerBroadcastUsage worker) { + return broadcastHandle.getBroadcast(worker).size(); + } + }; + } + }; + } + + private void init() { + writer = new WritableWriter() { + @Override + public void write(DataOutput out, R value) throws IOException { + value.write(out); + } + + @Override + public R readFields(DataInput in) throws IOException { + R result = typeOps.create(); + result.readFields(in); + return result; + } + }; + } + + @Override + public Basic2ObjectMap createInitialValue() { + return keyTypeOps.create2ObjectOpenHashMap(writer); + } + + @Override + public Basic2ObjectMap reduce( + Basic2ObjectMap curValue, Pair valueToReduce) { + R result = curValue.get(valueToReduce.getLeft()); + if (result == null) { + result = typeOps.create(); + } + result = elementReduceOp.reduce(result, valueToReduce.getRight()); + curValue.put(valueToReduce.getLeft(), result); + return curValue; + } + + @Override + public Basic2ObjectMap reduceMerge( + Basic2ObjectMap curValue, Basic2ObjectMap valueToReduce) { + for (Iterator iter = valueToReduce.fastKeyIterator(); iter.hasNext();) { + K key = iter.next(); + + R result = curValue.get(key); + if (result == null) { + result = typeOps.create(); + } + result = elementReduceOp.reduceMerge(result, valueToReduce.get(key)); + curValue.put(key, result); + } + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + TypeOpsUtils.writeTypeOps(keyTypeOps, out); + TypeOpsUtils.writeTypeOps(typeOps, out); + WritableUtils.writeWritableObject(elementReduceOp, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + keyTypeOps = TypeOpsUtils.readTypeOps(in); + typeOps = TypeOpsUtils.readTypeOps(in); + elementReduceOp = WritableUtils.readWritableObject(in, null); + init(); + } +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/package-info.java new file mode 100644 index 000000000..295ef8c6b --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/map/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Reducers for collecting map of objects. + */ +package org.apache.giraph.block_app.reducers.map; diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/package-info.java new file mode 100644 index 000000000..145f3dc28 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Common reducer utilities for Block Applications + */ +package org.apache.giraph.block_app.reducers; diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/TripleFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/TripleFunction.java new file mode 100644 index 000000000..b7609ca76 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/TripleFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + +/** + * Function: + * (F1, F2, F3) -> T + * + * @param First argument type + * @param Second argument type + * @param Third argument type + * @param Result type + */ +public interface TripleFunction extends Serializable { + + /** + * Returns the result of applying this function to given + * {@code input1}, {@code input2} and {@code input3}. + * + * The returned object may or may not be a new instance, + * depending on the implementation. + */ + T apply(F1 input1, F2 input2, F3 input3); +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2DoubleFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2DoubleFunction.java new file mode 100644 index 000000000..ad1ca8eff --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2DoubleFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +import java.io.Serializable; + +/** + * Primitive specialization of Function: + * (F) -> double + * + * @param Argument type + */ +public interface Obj2DoubleFunction extends Serializable { + /** + * Returns the result of applying this function to given {@code input}. + */ + double apply(T value); +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2FloatFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2FloatFunction.java new file mode 100644 index 000000000..b1a653f3b --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2FloatFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +import java.io.Serializable; + +/** + * Primitive specialization of Function: + * (F) -> float + * + * @param Argument type + */ +public interface Obj2FloatFunction extends Serializable { + /** + * Returns the result of applying this function to given {@code input}. + */ + float apply(T input); +} diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2LongFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2LongFunction.java new file mode 100644 index 000000000..5300250a4 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2LongFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +import java.io.Serializable; + +/** + * Primitive specialization of Function: + * (F) -> long + * + * @param Argument type + */ +public interface Obj2LongFunction extends Serializable { + /** + * Returns the result of applying this function to given {@code input}. + */ + long apply(T input); +} diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java new file mode 100644 index 000000000..5e4eb1198 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java @@ -0,0 +1,58 @@ +package org.apache.giraph.block_app.reducers.array; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.giraph.block_app.reducers.array.HugeArrayUtils.ObjectStriping; +import org.junit.Test; + +public class ObjectStripingTest { + + private void testStriping(int size, int splits) { + ObjectStriping striping = new ObjectStriping(size, splits); + + int numPerSplit = size / splits; + + int prevSplitIndex = 0; + int prevInsideIndex = -1; + + assertEquals(0, striping.getSplitStart(0)); + + for (int i = 0; i < size; i++) { + int splitIndex = striping.getSplitIndex(i); + int insideIndex = striping.getInsideIndex(i); + + + if (prevInsideIndex + 1 == striping.getSplitSize(prevSplitIndex)) { + assertEquals(i, striping.getSplitStart(splitIndex)); + assertEquals(splitIndex, prevSplitIndex + 1); + assertEquals(insideIndex, 0); + } else { + assertEquals(splitIndex, prevSplitIndex); + assertEquals(insideIndex, prevInsideIndex + 1); + } + + int splitSize = striping.getSplitSize(splitIndex); + if (splitSize != numPerSplit && splitSize != numPerSplit + 1) { + fail(splitSize + " " + numPerSplit); + } + prevSplitIndex = splitIndex; + prevInsideIndex = insideIndex; + } + + assertEquals(prevSplitIndex + 1, splits); + assertEquals(prevInsideIndex + 1, striping.getSplitSize(prevSplitIndex)); + } + + @Test + public void test() { + testStriping(5, 5); + testStriping(6, 5); + testStriping(7, 5); + testStriping(9, 5); + testStriping(10, 5); + testStriping(100, 5); + testStriping(101, 5); + testStriping(104, 5); + } +} diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml index 4719a5d4a..e98741e26 100644 --- a/giraph-core/pom.xml +++ b/giraph-core/pom.xml @@ -502,6 +502,10 @@ under the License. commons-io commons-io + + org.apache.commons + commons-lang3 + log4j log4j diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/KryoWrappedReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/KryoWrappedReduceOperation.java new file mode 100644 index 000000000..7dd9bc044 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/KryoWrappedReduceOperation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * Reduce operation which wraps reduced value in KryoWritableWrapper, + * so we don't need to worry about it being writable + * + * @param Single value type + * @param Reduced value type + */ +public abstract class KryoWrappedReduceOperation + implements ReduceOperation> { + /** + * Look at ReduceOperation.reduce. + * + * @param reduceInto Partial value into which to reduce and store the result + * @param valueToReduce Single value to be reduced + */ + public abstract void reduce(R reduceInto, S valueToReduce); + + /** + * Look at ReduceOperation.reduceMerge. + * + * @param reduceInto Partial value into which to reduce and store the result + * @param valueToReduce Partial value to be reduced + */ + public abstract void reduceMerge(R reduceInto, R valueToReduce); + + /** + * Look at ReduceOperation.createValue. + * + * @return Neutral value + */ + public abstract R createValue(); + + @Override + public final KryoWritableWrapper createInitialValue() { + return new KryoWritableWrapper<>(createValue()); + } + + @Override + public final KryoWritableWrapper reduce( + KryoWritableWrapper wrapper, S value) { + reduce(wrapper.get(), value); + return wrapper; + } + + @Override + public final KryoWritableWrapper reduceMerge( + KryoWritableWrapper wrapper, + KryoWritableWrapper wrapperToReduce) { + reduceMerge(wrapper.get(), wrapperToReduce.get()); + return wrapper; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/DoubleDoubleWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/DoubleDoubleWritable.java new file mode 100644 index 000000000..04336429a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/DoubleDoubleWritable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import org.apache.hadoop.io.DoubleWritable; + +/** Double-double Pair Writable */ +public class DoubleDoubleWritable + extends PairWritable { + /** Constructor */ + public DoubleDoubleWritable() { + super(new DoubleWritable(), new DoubleWritable()); + } + + /** + * Constructor + * + * @param left the left value + * @param right the right value + */ + public DoubleDoubleWritable(double left, double right) { + super(new DoubleWritable(left), new DoubleWritable(right)); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntDoubleWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntDoubleWritable.java new file mode 100644 index 000000000..36ae908e7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntDoubleWritable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; + +/** Int-Double Pair Writable */ +public class IntDoubleWritable + extends PairWritable { + /** Constructor */ + public IntDoubleWritable() { + super(new IntWritable(), new DoubleWritable()); + } + + /** + * Constructor + * + * @param left the left value + * @param right the right value + */ + public IntDoubleWritable(int left, double right) { + super(new IntWritable(left), new DoubleWritable(right)); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntIntWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntIntWritable.java new file mode 100644 index 000000000..c41812b7b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntIntWritable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import org.apache.hadoop.io.IntWritable; + +/** Int-Int Pair Writable */ +public class IntIntWritable + extends PairWritable { + /** Constructor */ + public IntIntWritable() { + super(new IntWritable(), new IntWritable()); + } + + /** + * Constructor + * + * @param left the left value + * @param right the right value + */ + public IntIntWritable(int left, int right) { + super(new IntWritable(left), new IntWritable(right)); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntLongWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntLongWritable.java new file mode 100644 index 000000000..2d8d1df39 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/IntLongWritable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +/** Int-Long Pair Writable */ +public class IntLongWritable + extends PairWritable { + /** Constructor */ + public IntLongWritable() { + super(new IntWritable(), new LongWritable()); + } + + /** + * Constructor + * + * @param left the left value + * @param right the right value + */ + public IntLongWritable(int left, long right) { + super(new IntWritable(left), new LongWritable(right)); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongDoubleWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongDoubleWritable.java new file mode 100644 index 000000000..2a86b1c0c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongDoubleWritable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; + +/** Long-Double Pair Writable */ +public class LongDoubleWritable + extends PairWritable { + /** Constructor */ + public LongDoubleWritable() { + super(new LongWritable(), new DoubleWritable()); + } + + /** + * Constructor + * + * @param left the left value + * @param right the right value + */ + public LongDoubleWritable(long left, double right) { + super(new LongWritable(left), new DoubleWritable(right)); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongIntWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongIntWritable.java new file mode 100644 index 000000000..e5b923cce --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongIntWritable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +/** Long-Int Pair Writable */ +public class LongIntWritable + extends PairWritable { + /** Constructor */ + public LongIntWritable() { + super(new LongWritable(), new IntWritable()); + } + + /** + * Constructor + * + * @param left the left value + * @param right the right value + */ + public LongIntWritable(long left, int right) { + super(new LongWritable(left), new IntWritable(right)); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongLongWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongLongWritable.java new file mode 100644 index 000000000..37105d8f0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/LongLongWritable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import org.apache.hadoop.io.LongWritable; + +/** Long-Long Pair Writable */ +public class LongLongWritable + extends PairWritable { + /** Constructor */ + public LongLongWritable() { + super(new LongWritable(), new LongWritable()); + } + + /** + * Constructor + * + * @param left the left value + * @param right the right value + */ + public LongLongWritable(long left, long right) { + super(new LongWritable(left), new LongWritable(right)); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/PairWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/PairWritable.java new file mode 100644 index 000000000..62f294ff6 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/PairWritable.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.tuple; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.io.Writable; + +/** + * Pair Writable class, that knows types upfront and can deserialize itself. + * + * PairWritable knows types, as instances are passed through constructor, and + * are references are immutable (values themselves are mutable). + * + * Child classes specify no-arg constructor that passes concrete types in. + * + * Extends Pair, not ImmutablePair, since later class is final. Code is + * copied from it. + * + * @param Type of the left element + * @param Type of the right element + */ +public class PairWritable + extends Pair implements Writable { + /** Left object */ + private final L left; + /** Right object */ + private final R right; + + /** + * Create a new pair instance. + * + * @param left the left value + * @param right the right value + */ + public PairWritable(L left, R right) { + this.left = left; + this.right = right; + } + + /** + *

+ * Obtains an immutable pair of from two objects inferring + * the generic types.

+ * + *

This factory allows the pair to be created using inference to + * obtain the generic types.

+ * + * @param the left element type + * @param the right element type + * @param left the left element, may be null + * @param right the right element, may be null + * @return a pair formed from the two parameters, not null + */ + public static + PairWritable of(L left, R right) { + return new PairWritable(left, right); + } + + @Override + public final L getLeft() { + return left; + } + + @Override + public final R getRight() { + return right; + } + + /** + *

Throws {@code UnsupportedOperationException}.

+ * + *

This pair is immutable, so this operation is not supported.

+ * + * @param value the value to set + * @return never + * @throws UnsupportedOperationException as this operation is not supported + */ + @Override + public final R setValue(R value) { + throw new UnsupportedOperationException(); + } + + @Override + public final void write(DataOutput out) throws IOException { + left.write(out); + right.write(out); + } + + @Override + public final void readFields(DataInput in) throws IOException { + left.readFields(in); + right.readFields(in); + } +} diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/tuple/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/package-info.java new file mode 100644 index 000000000..e44efd856 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/tuple/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Writable tuple utilities + */ +package org.apache.giraph.writable.tuple; diff --git a/pom.xml b/pom.xml index eb4f2f721..ceca21901 100644 --- a/pom.xml +++ b/pom.xml @@ -293,6 +293,7 @@ under the License. 1.1.1 2.1 3.1 + 3.4 0.14.0 2.1.2 6.5.4 @@ -1617,6 +1618,11 @@ under the License. commons-logging ${dep.commons-logging.version}
+ + org.apache.commons + commons-lang3 + ${dep.commons-lang3.version} + com.facebook.thirdparty.yourkit-api yjp-controller-api-redist