forked from voldemort/voldemort
-
Notifications
You must be signed in to change notification settings - Fork 0
/
KeyValuePartitioner.java
40 lines (35 loc) · 1.52 KB
/
KeyValuePartitioner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package voldemort.store.readonly.mr.utils;
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.ByteUtils;
public class KeyValuePartitioner {
public int getPartition(byte[] keyBytes,
byte[] valueBytes,
boolean saveKeys,
boolean reducerPerBucket,
StoreDefinition storeDef,
int numChunks,
int numReduceTasks) {
int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT);
int chunkId = ReadOnlyUtils.chunk(keyBytes, numChunks);
if(saveKeys) {
int replicaType = (int) ByteUtils.readBytes(valueBytes,
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
if(reducerPerBucket) {
return (partitionId * storeDef.getReplicationFactor() + replicaType)
% numReduceTasks;
} else {
return ((partitionId * storeDef.getReplicationFactor() * numChunks)
+ (replicaType * numChunks) + chunkId)
% numReduceTasks;
}
} else {
if(reducerPerBucket) {
return partitionId % numReduceTasks;
} else {
return (partitionId * numChunks + chunkId) % numReduceTasks;
}
}
}
}