Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added support for specifying the key and value field in the avro record
  • Loading branch information
abh1nay committed Oct 4, 2012
1 parent 9a78082 commit 65e8bee
Show file tree
Hide file tree
Showing 37 changed files with 6,826 additions and 0 deletions.
@@ -0,0 +1,321 @@
package voldemort.store.readonly.disk;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
import voldemort.utils.ByteUtils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;

public class HadoopStoreWriter implements KeyValueWriter<BytesWritable, BytesWritable> {

private static final Logger logger = Logger.getLogger(HadoopStoreWriter.class);

private DataOutputStream indexFileStream = null;
private DataOutputStream valueFileStream = null;
private int position;
private String taskId = null;

private int nodeId = -1;
private int partitionId = -1;
private int chunkId = -1;
private int replicaType = -1;

private Path taskIndexFileName;
private Path taskValueFileName;

private JobConf conf;
private CheckSumType checkSumType;
private CheckSum checkSumDigestIndex;
private CheckSum checkSumDigestValue;

private String outputDir;

private FileSystem fs;

private int numChunks;
private Cluster cluster;
private StoreDefinition storeDef;
private boolean saveKeys;
private boolean reducerPerBucket;

public Cluster getCluster() {
checkNotNull(cluster);
return cluster;
}

public boolean getSaveKeys() {
return this.saveKeys;
}

public boolean getReducerPerBucket() {
return this.reducerPerBucket;
}

public StoreDefinition getStoreDef() {
checkNotNull(storeDef);
return storeDef;
}

public String getStoreName() {
checkNotNull(storeDef);
return storeDef.getName();
}

private final void checkNotNull(Object o) {
if(o == null)
throw new VoldemortException("Not configured yet!");
}

public int getNumChunks() {
return this.numChunks;
}

@Override
public void conf(JobConf job) {

conf = job;
try {

this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml")));
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml")));
if(storeDefs.size() != 1)
throw new IllegalStateException("Expected to find only a single store, but found multiple!");
this.storeDef = storeDefs.get(0);

this.numChunks = conf.getInt("num.chunks", -1);
if(this.numChunks < 1)
throw new VoldemortException("num.chunks not specified in the job conf.");
this.saveKeys = conf.getBoolean("save.keys", false);
this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false);
this.conf = job;
this.position = 0;
this.outputDir = job.get("final.output.dir");
this.taskId = job.get("mapred.task.id");
this.checkSumType = CheckSum.fromString(job.get("checksum.type"));
this.checkSumDigestIndex = CheckSum.getInstance(checkSumType);
this.checkSumDigestValue = CheckSum.getInstance(checkSumType);

this.taskIndexFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName()
+ "."
+ this.taskId
+ ".index");
this.taskValueFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName()
+ "."
+ this.taskId
+ ".data");

if(this.fs == null)
this.fs = this.taskIndexFileName.getFileSystem(job);

this.indexFileStream = fs.create(this.taskIndexFileName);
this.valueFileStream = fs.create(this.taskValueFileName);

logger.info("Opening " + this.taskIndexFileName + " and " + this.taskValueFileName
+ " for writing.");

} catch(IOException e) {
throw new RuntimeException("Failed to open Input/OutputStream", e);
}

}

@Override
public void write(BytesWritable key, Iterator<BytesWritable> iterator, Reporter reporter)
throws IOException {

// Write key and position
this.indexFileStream.write(key.get(), 0, key.getSize());
this.indexFileStream.writeInt(this.position);

// Run key through checksum digest
if(this.checkSumDigestIndex != null) {
this.checkSumDigestIndex.update(key.get(), 0, key.getSize());
this.checkSumDigestIndex.update(this.position);
}

short numTuples = 0;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DataOutputStream valueStream = new DataOutputStream(stream);

while(iterator.hasNext()) {
BytesWritable writable = iterator.next();
byte[] valueBytes = writable.get();
int offsetTillNow = 0;

// Read node Id
if(this.nodeId == -1)
this.nodeId = ByteUtils.readInt(valueBytes, offsetTillNow);
offsetTillNow += ByteUtils.SIZE_OF_INT;

// Read partition id
if(this.partitionId == -1)
this.partitionId = ByteUtils.readInt(valueBytes, offsetTillNow);
offsetTillNow += ByteUtils.SIZE_OF_INT;

// Read chunk id
if(this.chunkId == -1)
this.chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks());

// Read replica type
if(getSaveKeys()) {
if(this.replicaType == -1)
this.replicaType = (int) ByteUtils.readBytes(valueBytes,
offsetTillNow,
ByteUtils.SIZE_OF_BYTE);
offsetTillNow += ByteUtils.SIZE_OF_BYTE;
}

int valueLength = writable.getSize() - offsetTillNow;
if(getSaveKeys()) {
// Write ( key_length, value_length, key,
// value )
valueStream.write(valueBytes, offsetTillNow, valueLength);
} else {
// Write (value_length + value)
valueStream.writeInt(valueLength);
valueStream.write(valueBytes, offsetTillNow, valueLength);
}

numTuples++;

// If we have multiple values for this md5 that is a collision,
// throw an exception--either the data itself has duplicates, there
// are trillions of keys, or someone is attempting something
// malicious ( We obviously expect collisions when we save keys )
if(!getSaveKeys() && numTuples > 1)
throw new VoldemortException("Duplicate keys detected for md5 sum "
+ ByteUtils.toHexString(ByteUtils.copy(key.get(),
0,
key.getSize())));

}

if(numTuples < 0) {
// Overflow
throw new VoldemortException("Found too many collisions: chunk " + chunkId
+ " has exceeded " + Short.MAX_VALUE + " collisions.");
} else if(numTuples > 1) {
// Update number of collisions + max keys per collision
reporter.incrCounter(CollisionCounter.NUM_COLLISIONS, 1);

long numCollisions = reporter.getCounter(CollisionCounter.MAX_COLLISIONS).getCounter();
if(numTuples > numCollisions) {
reporter.incrCounter(CollisionCounter.MAX_COLLISIONS, numTuples - numCollisions);
}
}

// Flush the value
valueStream.flush();
byte[] value = stream.toByteArray();

// Start writing to file now
// First, if save keys flag set the number of keys
if(getSaveKeys()) {

this.valueFileStream.writeShort(numTuples);
this.position += ByteUtils.SIZE_OF_SHORT;

if(this.checkSumDigestValue != null) {
this.checkSumDigestValue.update(numTuples);
}
}

this.valueFileStream.write(value);
this.position += value.length;

if(this.checkSumDigestValue != null) {
this.checkSumDigestValue.update(value);
}

if(this.position < 0)
throw new VoldemortException("Chunk overflow exception: chunk " + chunkId
+ " has exceeded " + Integer.MAX_VALUE + " bytes.");
}

@Override
public void close() throws IOException {

this.indexFileStream.close();
this.valueFileStream.close();

if(this.nodeId == -1 || this.chunkId == -1 || this.partitionId == -1) {
// Issue 258 - No data was read in the reduce phase, do not create
// any output
return;
}

// If the replica type read was not valid, shout out
if(getSaveKeys() && this.replicaType == -1) {
throw new RuntimeException("Could not read the replica type correctly for node "
+ nodeId + " ( partition - " + this.partitionId + " )");
}

String fileNamePrefix = null;
if(getSaveKeys()) {
fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
+ Integer.toString(this.replicaType) + "_"
+ Integer.toString(this.chunkId));
} else {
fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"
+ Integer.toString(this.chunkId));
}

// Initialize the node directory
Path nodeDir = new Path(this.outputDir, "node-" + this.nodeId);

// Create output directory, if it doesn't exist
FileSystem outputFs = nodeDir.getFileSystem(this.conf);
outputFs.mkdirs(nodeDir);

// Write the checksum and output files
if(this.checkSumType != CheckSumType.NONE) {

if(this.checkSumDigestIndex != null && this.checkSumDigestValue != null) {
Path checkSumIndexFile = new Path(nodeDir, fileNamePrefix + ".index.checksum");
Path checkSumValueFile = new Path(nodeDir, fileNamePrefix + ".data.checksum");

FSDataOutputStream output = outputFs.create(checkSumIndexFile);
output.write(this.checkSumDigestIndex.getCheckSum());
output.close();

output = outputFs.create(checkSumValueFile);
output.write(this.checkSumDigestValue.getCheckSum());
output.close();
} else {
throw new RuntimeException("Failed to open checksum digest for node " + nodeId
+ " ( partition - " + this.partitionId + ", chunk - "
+ chunkId + " )");
}
}

// Generate the final chunk files
Path indexFile = new Path(nodeDir, fileNamePrefix + ".index");
Path valueFile = new Path(nodeDir, fileNamePrefix + ".data");

logger.info("Moving " + this.taskIndexFileName + " to " + indexFile);
outputFs.rename(taskIndexFileName, indexFile);
logger.info("Moving " + this.taskValueFileName + " to " + valueFile);
outputFs.rename(this.taskValueFileName, valueFile);
}

}

0 comments on commit 65e8bee

Please sign in to comment.