Skip to content

Commit

Permalink
Rfactored partitioner and mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Oct 4, 2012
1 parent 29d370b commit d5f9c9c
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 469 deletions.
Expand Up @@ -26,14 +26,14 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import voldemort.cluster.Node;
import voldemort.routing.ConsistentRoutingStrategy;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.store.compress.CompressionStrategy;
import voldemort.store.compress.CompressionStrategyFactory;
import voldemort.store.readonly.mr.utils.MapperKeyValueWriter;
import voldemort.utils.ByteUtils;

/**
Expand Down Expand Up @@ -79,96 +79,28 @@ public void map(K key,
byte[] keyBytes = keySerializer.toBytes(makeKey(key, value));
byte[] valBytes = valueSerializer.toBytes(makeValue(key, value));

// Compress key and values if required
if(keySerializerDefinition.hasCompression()) {
keyBytes = keyCompressor.deflate(keyBytes);
}

if(valueSerializerDefinition.hasCompression()) {
valBytes = valueCompressor.deflate(valBytes);
}

// Get the output byte arrays ready to populate
byte[] outputValue;
BytesWritable outputKey;

// Leave initial offset for (a) node id (b) partition id
// since they are written later
int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT;

if(getSaveKeys()) {

// In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( replica
// type - primary | secondary | tertiary... ] + 4 ( key size )
// size ) + 4 ( value size ) + key + value
outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4
* ByteUtils.SIZE_OF_INT];

// Write key length - leave byte for replica type
offsetTillNow += ByteUtils.SIZE_OF_BYTE;
ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow);

// Write value length
offsetTillNow += ByteUtils.SIZE_OF_INT;
ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow);

// Write key
offsetTillNow += ByteUtils.SIZE_OF_INT;
System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length);

// Write value
offsetTillNow += keyBytes.length;
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - upper 8 bytes of 16 byte md5
outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes),
0,
2 * ByteUtils.SIZE_OF_INT));

} else {

// In order - 4 ( for node id ) + 4 ( partition id ) + value
outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT];

// Write value
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - 16 byte md5
outputKey = new BytesWritable(md5er.digest(keyBytes));

}

// Generate partition and node list this key is destined for
List<Integer> partitionList = routingStrategy.getPartitionList(keyBytes);
Node[] partitionToNode = routingStrategy.getPartitionToNode();

for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) {

// Node id
ByteUtils.writeInt(outputValue,
partitionToNode[partitionList.get(replicaType)].getId(),
0);

if(getSaveKeys()) {
// Primary partition id
ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT);

// Replica type
ByteUtils.writeBytes(outputValue,
replicaType,
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
} else {
// Partition id
ByteUtils.writeInt(outputValue,
partitionList.get(replicaType),
ByteUtils.SIZE_OF_INT);
}
BytesWritable outputVal = new BytesWritable(outputValue);
MapperKeyValueWriter mapWriter = new MapperKeyValueWriter();

List mapperList = mapWriter.map(routingStrategy,
keySerializer,
valueSerializer,
valueCompressor,
keyCompressor,
keySerializerDefinition,
valueSerializerDefinition,
keyBytes,
valBytes,
getSaveKeys(),
md5er);

for(int i = 0; i < mapperList.size(); i++) {
voldemort.utils.Pair<BytesWritable, BytesWritable> pair = (voldemort.utils.Pair<BytesWritable, BytesWritable>) mapperList.get(i);
BytesWritable outputKey = pair.getFirst();
BytesWritable outputVal = pair.getSecond();

output.collect(outputKey, outputVal);

}

md5er.reset();
}

Expand Down
Expand Up @@ -10,15 +10,13 @@
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.Reporter;

import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.routing.ConsistentRoutingStrategy;
import voldemort.serialization.DefaultSerializerFactory;
import voldemort.serialization.SerializerDefinition;
Expand All @@ -27,17 +25,17 @@
import voldemort.store.StoreDefinition;
import voldemort.store.compress.CompressionStrategy;
import voldemort.store.compress.CompressionStrategyFactory;
import voldemort.store.readonly.mr.azkaban.StoreBuilderTransformation;
import voldemort.store.readonly.mr.utils.HadoopUtils;
import voldemort.store.readonly.mr.utils.MapperKeyValueWriter;
import voldemort.utils.ByteUtils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;
import azkaban.common.utils.Props;
import azkaban.common.utils.Utils;

// Avro container files are not sequence input format files
// they contain records instead of k/v pairs
// to consume these files we use the AvroMapper
/*
* Avro container files are not sequence input format files they contain records
* instead of k/v pairs to consume these files we use the AvroMapper
*/
public class AvroStoreBuilderMapper extends
AvroMapper<GenericData.Record, Pair<ByteBuffer, ByteBuffer>> implements JobConfigurable {

Expand All @@ -52,28 +50,19 @@ public class AvroStoreBuilderMapper extends
private String keyField;
private String valField;

private String _keySelection;
private String _valSelection;

private StoreBuilderTransformation _keyTrans;
private StoreBuilderTransformation _valTrans;

private CompressionStrategy valueCompressor;
private CompressionStrategy keyCompressor;
private SerializerDefinition keySerializerDefinition;
private SerializerDefinition valueSerializerDefinition;

// Path path = new Path(fileName);
FSDataOutputStream outputStream;

/**
* Create the voldemort key and value from the input key and value and map
* it out for each of the responsible voldemort nodes
* Create the voldemort key and value from the input Avro record by
* extracting the key and value and map it out for each of the responsible
* voldemort nodes
*
*
* The output key is the md5 of the serialized key returned by makeKey().
* The output value is the node_id & partition_id of the responsible node
* followed by serialized value returned by makeValue() OR if we have
* setKeys flag on the serialized key and serialized value
* followed by serialized value
*/
@Override
public void map(GenericData.Record record,
Expand All @@ -83,103 +72,33 @@ public void map(GenericData.Record record,
byte[] keyBytes = keySerializer.toBytes(record.get(keyField));
byte[] valBytes = valueSerializer.toBytes(record.get(valField));

// Compress key and values if required
if(keySerializerDefinition.hasCompression()) {
keyBytes = keyCompressor.deflate(keyBytes);
}

if(valueSerializerDefinition.hasCompression()) {
valBytes = valueCompressor.deflate(valBytes);
}

// Get the output byte arrays ready to populate
byte[] outputValue;
BytesWritable outputKey;

// Leave initial offset for (a) node id (b) partition id
// since they are written later
int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT;

if(getSaveKeys()) {

// In order - 4 ( for node id ) + 4 ( partition id ) + 1 (
// replica
// type - primary | secondary | tertiary... ] + 4 ( key size )
// size ) + 4 ( value size ) + key + value
outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4
* ByteUtils.SIZE_OF_INT];

// Write key length - leave byte for replica type
offsetTillNow += ByteUtils.SIZE_OF_BYTE;
ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow);

// Write value length
offsetTillNow += ByteUtils.SIZE_OF_INT;
ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow);

// Write key
offsetTillNow += ByteUtils.SIZE_OF_INT;
System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length);
MapperKeyValueWriter mapWriter = new MapperKeyValueWriter();

List mapperList = mapWriter.map(routingStrategy,
keySerializer,
valueSerializer,
valueCompressor,
keyCompressor,
keySerializerDefinition,
valueSerializerDefinition,
keyBytes,
valBytes,
getSaveKeys(),
md5er);

for(int i = 0; i < mapperList.size(); i++) {
voldemort.utils.Pair<BytesWritable, BytesWritable> pair = (voldemort.utils.Pair<BytesWritable, BytesWritable>) mapperList.get(i);
BytesWritable outputKey = pair.getFirst();
BytesWritable outputVal = pair.getSecond();

// Write value
offsetTillNow += keyBytes.length;
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - upper 8 bytes of 16 byte md5
outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes),
0,
2 * ByteUtils.SIZE_OF_INT));

} else {

// In order - 4 ( for node id ) + 4 ( partition id ) + value
outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT];

// Write value
System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length);

// Generate MR key - 16 byte md5
outputKey = new BytesWritable(md5er.digest(keyBytes));

}

// Generate partition and node list this key is destined for
List<Integer> partitionList = routingStrategy.getPartitionList(keyBytes);
Node[] partitionToNode = routingStrategy.getPartitionToNode();

for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) {

// Node id
ByteUtils.writeInt(outputValue,
partitionToNode[partitionList.get(replicaType)].getId(),
0);

if(getSaveKeys()) {
// Primary partition id
ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT);

// Replica type
ByteUtils.writeBytes(outputValue,
replicaType,
2 * ByteUtils.SIZE_OF_INT,
ByteUtils.SIZE_OF_BYTE);
} else {
// Partition id
ByteUtils.writeInt(outputValue,
partitionList.get(replicaType),
ByteUtils.SIZE_OF_INT);
}
BytesWritable outputVal = new BytesWritable(outputValue);

// System.out.println("collect length (K/V): "+
// outputKey.getLength()+ " , " + outputVal.getLength());
ByteBuffer keyBuffer = null, valueBuffer = null;

byte[] md5KeyBytes = outputKey.getBytes();
keyBuffer = ByteBuffer.allocate(md5KeyBytes.length);
keyBuffer.put(md5KeyBytes);
keyBuffer.rewind();

byte[] outputValue = outputVal.getBytes();
valueBuffer = ByteBuffer.allocate(outputValue.length);
valueBuffer.put(outputValue);
valueBuffer.rewind();
Expand All @@ -189,6 +108,7 @@ public void map(GenericData.Record record,

collector.collect(p);
}

md5er.reset();
}

Expand Down Expand Up @@ -246,16 +166,6 @@ public void configure(JobConf conf) {
// /
Props props = HadoopUtils.getPropsFromJob(conf);

_keySelection = props.getString("key.selection", null);
_valSelection = props.getString("value.selection", null);

String _keyTransClass = props.getString("key.transformation.class", null);
String _valueTransClass = props.getString("value.transformation.class", null);

if(_keyTransClass != null)
_keyTrans = (StoreBuilderTransformation) Utils.callConstructor(_keyTransClass);
if(_valueTransClass != null)
_valTrans = (StoreBuilderTransformation) Utils.callConstructor(_valueTransClass);
}

private int numChunks;
Expand Down

0 comments on commit d5f9c9c

Please sign in to comment.