Skip to content

Commit

Permalink
Fixed bug in partitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 4, 2012
1 parent a63be3c commit d74d185
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 23 deletions.
Expand Up @@ -29,7 +29,8 @@
import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.mr.utils.KeyValuePartitioner;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.ByteUtils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;

Expand All @@ -46,9 +47,17 @@ public int getPartition(AvroKey<ByteBuffer> key, AvroValue<ByteBuffer> value, in

byte[] keyBytes = null, valueBytes;

if(!key.datum().hasRemaining()) {
// System.out.println("empty key");
return 0;
}
keyBytes = new byte[key.datum().remaining()];
key.datum().get(keyBytes);

if(!value.datum().hasRemaining()) {
// System.out.println("empty value");
return 0;
}
valueBytes = new byte[value.datum().remaining()];
value.datum().get(valueBytes);

Expand All @@ -65,16 +74,28 @@ public int getPartition(AvroKey<ByteBuffer> key, AvroValue<ByteBuffer> value, in
key.datum(keyBuffer);
value.datum(valueBuffer);

KeyValuePartitioner partitioner = new KeyValuePartitioner();

return partitioner.getPartition(keyBytes,
valueBytes,
saveKeys,
reducerPerBucket,
storeDef,
numReduceTasks,
numReduceTasks);

int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT);
int chunkId = ReadOnlyUtils.chunk(keyBytes, getNumChunks());
if(getSaveKeys()) {
int replicaType = (int) ByteUtils.readBytes(valueBytes,
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
if(getReducerPerBucket()) {
return (partitionId * getStoreDef().getReplicationFactor() + replicaType)
% numReduceTasks;
} else {
return ((partitionId * getStoreDef().getReplicationFactor() * getNumChunks())
+ (replicaType * getNumChunks()) + chunkId)
% numReduceTasks;
}
} else {
if(getReducerPerBucket()) {
return partitionId % numReduceTasks;
} else {
return (partitionId * getNumChunks() + chunkId) % numReduceTasks;
}

}
}

private int numChunks;
Expand Down
Expand Up @@ -19,7 +19,8 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Partitioner;

import voldemort.store.readonly.mr.utils.KeyValuePartitioner;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.ByteUtils;

/**
* A Partitioner that splits data so that all data for the same nodeId, chunkId
Expand All @@ -30,17 +31,27 @@ public class HadoopStoreBuilderPartitioner extends AbstractStoreBuilderConfigura
Partitioner<BytesWritable, BytesWritable> {

public int getPartition(BytesWritable key, BytesWritable value, int numReduceTasks) {
int partitionId = ByteUtils.readInt(value.get(), ByteUtils.SIZE_OF_INT);
int chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks());
if(getSaveKeys()) {
int replicaType = (int) ByteUtils.readBytes(value.get(),
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
if(getReducerPerBucket()) {
return (partitionId * getStoreDef().getReplicationFactor() + replicaType)
% numReduceTasks;
} else {
return ((partitionId * getStoreDef().getReplicationFactor() * getNumChunks())
+ (replicaType * getNumChunks()) + chunkId)
% numReduceTasks;
}
} else {
if(getReducerPerBucket()) {
return partitionId % numReduceTasks;
} else {
return (partitionId * getNumChunks() + chunkId) % numReduceTasks;
}

byte[] keyBytes = key.get();
byte[] valueBytes = value.get();
KeyValuePartitioner partitioner = new KeyValuePartitioner();
return partitioner.getPartition(keyBytes,
valueBytes,
getSaveKeys(),
getReducerPerBucket(),
getStoreDef(),
numReduceTasks,
numReduceTasks);

}
}
}

0 comments on commit d74d185

Please sign in to comment.