diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java index 573de222c6..510ca688c9 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java @@ -29,7 +29,8 @@ import voldemort.VoldemortException; import voldemort.cluster.Cluster; import voldemort.store.StoreDefinition; -import voldemort.store.readonly.mr.utils.KeyValuePartitioner; +import voldemort.store.readonly.ReadOnlyUtils; +import voldemort.utils.ByteUtils; import voldemort.xml.ClusterMapper; import voldemort.xml.StoreDefinitionsMapper; @@ -46,9 +47,17 @@ public int getPartition(AvroKey key, AvroValue value, in byte[] keyBytes = null, valueBytes; + if(!key.datum().hasRemaining()) { + // System.out.println("empty key"); + return 0; + } keyBytes = new byte[key.datum().remaining()]; key.datum().get(keyBytes); + if(!value.datum().hasRemaining()) { + // System.out.println("empty value"); + return 0; + } valueBytes = new byte[value.datum().remaining()]; value.datum().get(valueBytes); @@ -65,16 +74,28 @@ public int getPartition(AvroKey key, AvroValue value, in key.datum(keyBuffer); value.datum(valueBuffer); - KeyValuePartitioner partitioner = new KeyValuePartitioner(); - - return partitioner.getPartition(keyBytes, - valueBytes, - saveKeys, - reducerPerBucket, - storeDef, - numReduceTasks, - numReduceTasks); - + int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT); + int chunkId = ReadOnlyUtils.chunk(keyBytes, getNumChunks()); + if(getSaveKeys()) { + int replicaType = (int) ByteUtils.readBytes(valueBytes, + 2 * ByteUtils.SIZE_OF_INT, + ByteUtils.SIZE_OF_BYTE); + if(getReducerPerBucket()) { + return (partitionId * getStoreDef().getReplicationFactor() + replicaType) + % numReduceTasks; + } else { + return ((partitionId * getStoreDef().getReplicationFactor() * getNumChunks()) + + (replicaType * getNumChunks()) + chunkId) + % numReduceTasks; + } + } else { + if(getReducerPerBucket()) { + return partitionId % numReduceTasks; + } else { + return (partitionId * getNumChunks() + chunkId) % numReduceTasks; + } + + } } private int numChunks; diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java index 2e7f31c1dc..6d40611280 100644 --- a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java @@ -19,7 +19,8 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.Partitioner; -import voldemort.store.readonly.mr.utils.KeyValuePartitioner; +import voldemort.store.readonly.ReadOnlyUtils; +import voldemort.utils.ByteUtils; /** * A Partitioner that splits data so that all data for the same nodeId, chunkId @@ -30,17 +31,27 @@ public class HadoopStoreBuilderPartitioner extends AbstractStoreBuilderConfigura Partitioner { public int getPartition(BytesWritable key, BytesWritable value, int numReduceTasks) { + int partitionId = ByteUtils.readInt(value.get(), ByteUtils.SIZE_OF_INT); + int chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks()); + if(getSaveKeys()) { + int replicaType = (int) ByteUtils.readBytes(value.get(), + 2 * ByteUtils.SIZE_OF_INT, + ByteUtils.SIZE_OF_BYTE); + if(getReducerPerBucket()) { + return (partitionId * getStoreDef().getReplicationFactor() + replicaType) + % numReduceTasks; + } else { + return ((partitionId * getStoreDef().getReplicationFactor() * getNumChunks()) + + (replicaType * getNumChunks()) + chunkId) + % numReduceTasks; + } + } else { + if(getReducerPerBucket()) { + return partitionId % numReduceTasks; + } else { + return (partitionId * getNumChunks() + chunkId) % numReduceTasks; + } - byte[] keyBytes = key.get(); - byte[] valueBytes = value.get(); - KeyValuePartitioner partitioner = new KeyValuePartitioner(); - return partitioner.getPartition(keyBytes, - valueBytes, - getSaveKeys(), - getReducerPerBucket(), - getStoreDef(), - numReduceTasks, - numReduceTasks); - + } } }