From a63be3c8f40a98d3b6365df65d2fc09ebc482b65 Mon Sep 17 00:00:00 2001 From: Abhinay Nagpal Date: Wed, 12 Sep 2012 13:07:11 -0700 Subject: [PATCH] Rfactoring mapper logic out --- .../voldemort/store/readonly/mr/JobState.java | 73 ++++++++++ .../mr/utils/KeyValuePartitioner.java | 40 +++++ .../mr/utils/MapperKeyValueWriter.java | 137 ++++++++++++++++++ 3 files changed, 250 insertions(+) create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/JobState.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/KeyValuePartitioner.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/MapperKeyValueWriter.java diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/JobState.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/JobState.java new file mode 100644 index 0000000000..cbb5d6af77 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/JobState.java @@ -0,0 +1,73 @@ +package voldemort.store.readonly.mr; + +import java.io.IOException; +import java.io.StringReader; +import java.util.List; + +import org.apache.hadoop.mapred.JobConf; + +import voldemort.VoldemortException; +import voldemort.cluster.Cluster; +import voldemort.store.StoreDefinition; +import voldemort.xml.ClusterMapper; +import voldemort.xml.StoreDefinitionsMapper; + +public class JobState { + + private int numChunks; + private Cluster cluster; + private StoreDefinition storeDef; + private boolean saveKeys; + private boolean reducerPerBucket; + + public void configure(JobConf conf) { + this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml"))); + List storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml"))); + if(storeDefs.size() != 1) + throw new IllegalStateException("Expected to find only a single store, but found multiple!"); + this.storeDef = storeDefs.get(0); + + this.numChunks = conf.getInt("num.chunks", -1); + if(this.numChunks < 1) + throw new VoldemortException("num.chunks not specified in the job conf."); + + this.saveKeys = conf.getBoolean("save.keys", false); + this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false); + } + + @SuppressWarnings("unused") + public void close() throws IOException {} + + public Cluster getCluster() { + checkNotNull(cluster); + return cluster; + } + + public boolean getSaveKeys() { + return this.saveKeys; + } + + public boolean getReducerPerBucket() { + return this.reducerPerBucket; + } + + public StoreDefinition getStoreDef() { + checkNotNull(storeDef); + return storeDef; + } + + public String getStoreName() { + checkNotNull(storeDef); + return storeDef.getName(); + } + + private final void checkNotNull(Object o) { + if(o == null) + throw new VoldemortException("Not configured yet!"); + } + + public int getNumChunks() { + return this.numChunks; + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/KeyValuePartitioner.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/KeyValuePartitioner.java new file mode 100644 index 0000000000..900fe844ed --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/KeyValuePartitioner.java @@ -0,0 +1,40 @@ +package voldemort.store.readonly.mr.utils; + +import voldemort.store.StoreDefinition; +import voldemort.store.readonly.ReadOnlyUtils; +import voldemort.utils.ByteUtils; + +public class KeyValuePartitioner { + + public int getPartition(byte[] keyBytes, + byte[] valueBytes, + boolean saveKeys, + boolean reducerPerBucket, + StoreDefinition storeDef, + int numChunks, + int numReduceTasks) { + int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT); + int chunkId = ReadOnlyUtils.chunk(keyBytes, numChunks); + if(saveKeys) { + int replicaType = (int) ByteUtils.readBytes(valueBytes, + 2 * ByteUtils.SIZE_OF_INT, + ByteUtils.SIZE_OF_BYTE); + if(reducerPerBucket) { + return (partitionId * storeDef.getReplicationFactor() + replicaType) + % numReduceTasks; + } else { + return ((partitionId * storeDef.getReplicationFactor() * numChunks) + + (replicaType * numChunks) + chunkId) + % numReduceTasks; + } + } else { + if(reducerPerBucket) { + return partitionId % numReduceTasks; + } else { + return (partitionId * numChunks + chunkId) % numReduceTasks; + } + + } + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/MapperKeyValueWriter.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/MapperKeyValueWriter.java new file mode 100644 index 0000000000..574eead8aa --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/MapperKeyValueWriter.java @@ -0,0 +1,137 @@ +package voldemort.store.readonly.mr.utils; + +import java.io.IOException; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.BytesWritable; + +import voldemort.cluster.Node; +import voldemort.routing.ConsistentRoutingStrategy; +import voldemort.serialization.Serializer; +import voldemort.serialization.SerializerDefinition; +import voldemort.store.compress.CompressionStrategy; +import voldemort.utils.ByteUtils; +import voldemort.utils.Pair; + +public class MapperKeyValueWriter { + + public BytesWritable getOutputKey() { + return outputKey; + } + + public BytesWritable getOutputValue() { + return outputVal; + } + + BytesWritable outputKey; + BytesWritable outputVal; + + public List> map(ConsistentRoutingStrategy routingStrategy, + Serializer keySerializer, + Serializer valueSerializer, + CompressionStrategy valueCompressor, + CompressionStrategy keyCompressor, + SerializerDefinition keySerializerDefinition, + SerializerDefinition valueSerializerDefinition, + byte[] keyBytes, + byte[] valBytes, + boolean getSaveKeys, + MessageDigest md5er) throws IOException { + + List outputList = new ArrayList(); + // Compress key and values if required + if(keySerializerDefinition.hasCompression()) { + keyBytes = keyCompressor.deflate(keyBytes); + } + + if(valueSerializerDefinition.hasCompression()) { + valBytes = valueCompressor.deflate(valBytes); + } + + // Get the output byte arrays ready to populate + byte[] outputValue; + + // Leave initial offset for (a) node id (b) partition id + // since they are written later + int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT; + + if(getSaveKeys) { + + // In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( + // replica + // type - primary | secondary | tertiary... ] + 4 ( key size ) + // size ) + 4 ( value size ) + key + value + outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4 + * ByteUtils.SIZE_OF_INT]; + + // Write key length - leave byte for replica type + offsetTillNow += ByteUtils.SIZE_OF_BYTE; + ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow); + + // Write value length + offsetTillNow += ByteUtils.SIZE_OF_INT; + ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow); + + // Write key + offsetTillNow += ByteUtils.SIZE_OF_INT; + System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length); + + // Write value + offsetTillNow += keyBytes.length; + System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); + + // Generate MR key - upper 8 bytes of 16 byte md5 + outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes), + 0, + 2 * ByteUtils.SIZE_OF_INT)); + + } else { + + // In order - 4 ( for node id ) + 4 ( partition id ) + value + outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT]; + + // Write value + System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); + + // Generate MR key - 16 byte md5 + outputKey = new BytesWritable(md5er.digest(keyBytes)); + + } + + // Generate partition and node list this key is destined for + List partitionList = routingStrategy.getPartitionList(keyBytes); + Node[] partitionToNode = routingStrategy.getPartitionToNode(); + + for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) { + + // Node id + ByteUtils.writeInt(outputValue, + partitionToNode[partitionList.get(replicaType)].getId(), + 0); + + if(getSaveKeys) { + // Primary partition id + ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT); + + // Replica type + ByteUtils.writeBytes(outputValue, + replicaType, + 2 * ByteUtils.SIZE_OF_INT, + ByteUtils.SIZE_OF_BYTE); + } else { + // Partition id + ByteUtils.writeInt(outputValue, + partitionList.get(replicaType), + ByteUtils.SIZE_OF_INT); + } + outputVal = new BytesWritable(outputValue); + Pair pair = Pair.create(outputKey, outputVal); + outputList.add(pair); + } + + return outputList; + + } +}