From 65e8bee4949583de0cefe4874af0e7cb2edd3e93 Mon Sep 17 00:00:00 2001 From: Abhinay Nagpal Date: Fri, 7 Sep 2012 09:47:02 -0700 Subject: [PATCH] Added support for specifying the key and value field in the avro record --- .../readonly/disk/HadoopStoreWriter.java | 321 ++++++ .../disk/HadoopStoreWriterPerBucket.java | 342 ++++++ .../store/readonly/disk/KeyValueWriter.java | 23 + .../readonly/mr/AvroStoreBuilderMapper.java | 296 ++++++ .../mr/AvroStoreBuilderPartitioner.java | 154 +++ .../readonly/mr/AvroStoreBuilderReducer.java | 92 ++ .../mr/AvroStoreBuilderReducerPerBucket.java | 94 ++ .../store/readonly/mr/IdentityJsonMapper.java | 20 + .../readonly/mr/IdentityJsonReducer.java | 22 + .../mr/VoldemortStoreBuilderMapper.java | 73 ++ .../mr/azkaban/AbstractHadoopJob.java | 268 +++++ .../AbstractVoldemortBatchCopyJob.java | 116 +++ .../store/readonly/mr/azkaban/Job.java | 59 ++ .../azkaban/StoreBuilderTransformation.java | 14 + .../azkaban/UndefinedPropertyException.java | 11 + .../mr/azkaban/VoldemortBatchIndexJob.java | 405 +++++++ .../mr/azkaban/VoldemortBuildAndPushJob.java | 792 ++++++++++++++ .../VoldemortMultiStoreBuildAndPushJob.java | 834 +++++++++++++++ .../mr/azkaban/VoldemortRollbackJob.java | 116 +++ .../mr/azkaban/VoldemortStoreBuilderJob.java | 450 ++++++++ .../readonly/mr/azkaban/VoldemortSwapJob.java | 200 ++++ .../mr/azkaban/VoldemortSwapperUtils.java | 72 ++ .../mr/serialization/JsonConfigurable.java | 76 ++ .../JsonDeserializerComparator.java | 118 +++ .../readonly/mr/serialization/JsonMapper.java | 58 ++ .../mr/serialization/JsonOutputCollector.java | 44 + .../mr/serialization/JsonReducer.java | 89 ++ .../JsonSequenceFileInputFormat.java | 110 ++ .../JsonSequenceFileOutputFormat.java | 101 ++ .../store/readonly/mr/utils/AvroUtils.java | 105 ++ .../store/readonly/mr/utils/EmailMessage.java | 186 ++++ .../store/readonly/mr/utils/HadoopUtils.java | 985 ++++++++++++++++++ .../store/readonly/mr/utils/JsonSchema.java | 49 + .../readonly/mr/utils/VoldemortUtils.java | 131 +++ lib/azkaban-common-0.05.jar | Bin 0 -> 58108 bytes lib/joda-time-1.6.jar | Bin 0 -> 534827 bytes lib/mail-1.4.1.jar | Bin 0 -> 371264 bytes 37 files changed, 6826 insertions(+) create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriter.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriterPerBucket.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/KeyValueWriter.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducer.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducerPerBucket.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonMapper.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonReducer.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/VoldemortStoreBuilderMapper.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractHadoopJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractVoldemortBatchCopyJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/Job.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/StoreBuilderTransformation.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/UndefinedPropertyException.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBatchIndexJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBuildAndPushJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortMultiStoreBuildAndPushJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortRollbackJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortStoreBuilderJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortSwapJob.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortSwapperUtils.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonConfigurable.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonDeserializerComparator.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonMapper.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonOutputCollector.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonReducer.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonSequenceFileInputFormat.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/serialization/JsonSequenceFileOutputFormat.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/AvroUtils.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/EmailMessage.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/HadoopUtils.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/JsonSchema.java create mode 100644 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/utils/VoldemortUtils.java create mode 100644 lib/azkaban-common-0.05.jar create mode 100644 lib/joda-time-1.6.jar create mode 100644 lib/mail-1.4.1.jar diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriter.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriter.java new file mode 100644 index 0000000000..b0cfb63479 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriter.java @@ -0,0 +1,321 @@ +package voldemort.store.readonly.disk; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.StringReader; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.cluster.Cluster; +import voldemort.store.StoreDefinition; +import voldemort.store.readonly.ReadOnlyUtils; +import voldemort.store.readonly.checksum.CheckSum; +import voldemort.store.readonly.checksum.CheckSum.CheckSumType; +import voldemort.utils.ByteUtils; +import voldemort.xml.ClusterMapper; +import voldemort.xml.StoreDefinitionsMapper; + +public class HadoopStoreWriter implements KeyValueWriter { + + private static final Logger logger = Logger.getLogger(HadoopStoreWriter.class); + + private DataOutputStream indexFileStream = null; + private DataOutputStream valueFileStream = null; + private int position; + private String taskId = null; + + private int nodeId = -1; + private int partitionId = -1; + private int chunkId = -1; + private int replicaType = -1; + + private Path taskIndexFileName; + private Path taskValueFileName; + + private JobConf conf; + private CheckSumType checkSumType; + private CheckSum checkSumDigestIndex; + private CheckSum checkSumDigestValue; + + private String outputDir; + + private FileSystem fs; + + private int numChunks; + private Cluster cluster; + private StoreDefinition storeDef; + private boolean saveKeys; + private boolean reducerPerBucket; + + public Cluster getCluster() { + checkNotNull(cluster); + return cluster; + } + + public boolean getSaveKeys() { + return this.saveKeys; + } + + public boolean getReducerPerBucket() { + return this.reducerPerBucket; + } + + public StoreDefinition getStoreDef() { + checkNotNull(storeDef); + return storeDef; + } + + public String getStoreName() { + checkNotNull(storeDef); + return storeDef.getName(); + } + + private final void checkNotNull(Object o) { + if(o == null) + throw new VoldemortException("Not configured yet!"); + } + + public int getNumChunks() { + return this.numChunks; + } + + @Override + public void conf(JobConf job) { + + conf = job; + try { + + this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml"))); + List storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml"))); + if(storeDefs.size() != 1) + throw new IllegalStateException("Expected to find only a single store, but found multiple!"); + this.storeDef = storeDefs.get(0); + + this.numChunks = conf.getInt("num.chunks", -1); + if(this.numChunks < 1) + throw new VoldemortException("num.chunks not specified in the job conf."); + this.saveKeys = conf.getBoolean("save.keys", false); + this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false); + this.conf = job; + this.position = 0; + this.outputDir = job.get("final.output.dir"); + this.taskId = job.get("mapred.task.id"); + this.checkSumType = CheckSum.fromString(job.get("checksum.type")); + this.checkSumDigestIndex = CheckSum.getInstance(checkSumType); + this.checkSumDigestValue = CheckSum.getInstance(checkSumType); + + this.taskIndexFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName() + + "." + + this.taskId + + ".index"); + this.taskValueFileName = new Path(FileOutputFormat.getOutputPath(job), getStoreName() + + "." + + this.taskId + + ".data"); + + if(this.fs == null) + this.fs = this.taskIndexFileName.getFileSystem(job); + + this.indexFileStream = fs.create(this.taskIndexFileName); + this.valueFileStream = fs.create(this.taskValueFileName); + + logger.info("Opening " + this.taskIndexFileName + " and " + this.taskValueFileName + + " for writing."); + + } catch(IOException e) { + throw new RuntimeException("Failed to open Input/OutputStream", e); + } + + } + + @Override + public void write(BytesWritable key, Iterator iterator, Reporter reporter) + throws IOException { + + // Write key and position + this.indexFileStream.write(key.get(), 0, key.getSize()); + this.indexFileStream.writeInt(this.position); + + // Run key through checksum digest + if(this.checkSumDigestIndex != null) { + this.checkSumDigestIndex.update(key.get(), 0, key.getSize()); + this.checkSumDigestIndex.update(this.position); + } + + short numTuples = 0; + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream valueStream = new DataOutputStream(stream); + + while(iterator.hasNext()) { + BytesWritable writable = iterator.next(); + byte[] valueBytes = writable.get(); + int offsetTillNow = 0; + + // Read node Id + if(this.nodeId == -1) + this.nodeId = ByteUtils.readInt(valueBytes, offsetTillNow); + offsetTillNow += ByteUtils.SIZE_OF_INT; + + // Read partition id + if(this.partitionId == -1) + this.partitionId = ByteUtils.readInt(valueBytes, offsetTillNow); + offsetTillNow += ByteUtils.SIZE_OF_INT; + + // Read chunk id + if(this.chunkId == -1) + this.chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks()); + + // Read replica type + if(getSaveKeys()) { + if(this.replicaType == -1) + this.replicaType = (int) ByteUtils.readBytes(valueBytes, + offsetTillNow, + ByteUtils.SIZE_OF_BYTE); + offsetTillNow += ByteUtils.SIZE_OF_BYTE; + } + + int valueLength = writable.getSize() - offsetTillNow; + if(getSaveKeys()) { + // Write ( key_length, value_length, key, + // value ) + valueStream.write(valueBytes, offsetTillNow, valueLength); + } else { + // Write (value_length + value) + valueStream.writeInt(valueLength); + valueStream.write(valueBytes, offsetTillNow, valueLength); + } + + numTuples++; + + // If we have multiple values for this md5 that is a collision, + // throw an exception--either the data itself has duplicates, there + // are trillions of keys, or someone is attempting something + // malicious ( We obviously expect collisions when we save keys ) + if(!getSaveKeys() && numTuples > 1) + throw new VoldemortException("Duplicate keys detected for md5 sum " + + ByteUtils.toHexString(ByteUtils.copy(key.get(), + 0, + key.getSize()))); + + } + + if(numTuples < 0) { + // Overflow + throw new VoldemortException("Found too many collisions: chunk " + chunkId + + " has exceeded " + Short.MAX_VALUE + " collisions."); + } else if(numTuples > 1) { + // Update number of collisions + max keys per collision + reporter.incrCounter(CollisionCounter.NUM_COLLISIONS, 1); + + long numCollisions = reporter.getCounter(CollisionCounter.MAX_COLLISIONS).getCounter(); + if(numTuples > numCollisions) { + reporter.incrCounter(CollisionCounter.MAX_COLLISIONS, numTuples - numCollisions); + } + } + + // Flush the value + valueStream.flush(); + byte[] value = stream.toByteArray(); + + // Start writing to file now + // First, if save keys flag set the number of keys + if(getSaveKeys()) { + + this.valueFileStream.writeShort(numTuples); + this.position += ByteUtils.SIZE_OF_SHORT; + + if(this.checkSumDigestValue != null) { + this.checkSumDigestValue.update(numTuples); + } + } + + this.valueFileStream.write(value); + this.position += value.length; + + if(this.checkSumDigestValue != null) { + this.checkSumDigestValue.update(value); + } + + if(this.position < 0) + throw new VoldemortException("Chunk overflow exception: chunk " + chunkId + + " has exceeded " + Integer.MAX_VALUE + " bytes."); + } + + @Override + public void close() throws IOException { + + this.indexFileStream.close(); + this.valueFileStream.close(); + + if(this.nodeId == -1 || this.chunkId == -1 || this.partitionId == -1) { + // Issue 258 - No data was read in the reduce phase, do not create + // any output + return; + } + + // If the replica type read was not valid, shout out + if(getSaveKeys() && this.replicaType == -1) { + throw new RuntimeException("Could not read the replica type correctly for node " + + nodeId + " ( partition - " + this.partitionId + " )"); + } + + String fileNamePrefix = null; + if(getSaveKeys()) { + fileNamePrefix = new String(Integer.toString(this.partitionId) + "_" + + Integer.toString(this.replicaType) + "_" + + Integer.toString(this.chunkId)); + } else { + fileNamePrefix = new String(Integer.toString(this.partitionId) + "_" + + Integer.toString(this.chunkId)); + } + + // Initialize the node directory + Path nodeDir = new Path(this.outputDir, "node-" + this.nodeId); + + // Create output directory, if it doesn't exist + FileSystem outputFs = nodeDir.getFileSystem(this.conf); + outputFs.mkdirs(nodeDir); + + // Write the checksum and output files + if(this.checkSumType != CheckSumType.NONE) { + + if(this.checkSumDigestIndex != null && this.checkSumDigestValue != null) { + Path checkSumIndexFile = new Path(nodeDir, fileNamePrefix + ".index.checksum"); + Path checkSumValueFile = new Path(nodeDir, fileNamePrefix + ".data.checksum"); + + FSDataOutputStream output = outputFs.create(checkSumIndexFile); + output.write(this.checkSumDigestIndex.getCheckSum()); + output.close(); + + output = outputFs.create(checkSumValueFile); + output.write(this.checkSumDigestValue.getCheckSum()); + output.close(); + } else { + throw new RuntimeException("Failed to open checksum digest for node " + nodeId + + " ( partition - " + this.partitionId + ", chunk - " + + chunkId + " )"); + } + } + + // Generate the final chunk files + Path indexFile = new Path(nodeDir, fileNamePrefix + ".index"); + Path valueFile = new Path(nodeDir, fileNamePrefix + ".data"); + + logger.info("Moving " + this.taskIndexFileName + " to " + indexFile); + outputFs.rename(taskIndexFileName, indexFile); + logger.info("Moving " + this.taskValueFileName + " to " + valueFile); + outputFs.rename(this.taskValueFileName, valueFile); + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriterPerBucket.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriterPerBucket.java new file mode 100644 index 0000000000..3df92b85ba --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/HadoopStoreWriterPerBucket.java @@ -0,0 +1,342 @@ +package voldemort.store.readonly.disk; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.StringReader; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.cluster.Cluster; +import voldemort.store.StoreDefinition; +import voldemort.store.readonly.ReadOnlyUtils; +import voldemort.store.readonly.checksum.CheckSum; +import voldemort.store.readonly.checksum.CheckSum.CheckSumType; +import voldemort.utils.ByteUtils; +import voldemort.xml.ClusterMapper; +import voldemort.xml.StoreDefinitionsMapper; + +public class HadoopStoreWriterPerBucket implements KeyValueWriter { + + private static final Logger logger = Logger.getLogger(HadoopStoreWriterPerBucket.class); + + private DataOutputStream[] indexFileStream = null; + private DataOutputStream[] valueFileStream = null; + private int[] position; + private String taskId = null; + + private int nodeId = -1; + private int partitionId = -1; + private int replicaType = -1; + + private Path[] taskIndexFileName; + private Path[] taskValueFileName; + + private JobConf conf; + private CheckSumType checkSumType; + private CheckSum[] checkSumDigestIndex; + private CheckSum[] checkSumDigestValue; + + private String outputDir; + + private FileSystem fs; + + @Override + public void conf(JobConf job) { + + JobConf conf = job; + try { + + this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml"))); + List storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml"))); + if(storeDefs.size() != 1) + throw new IllegalStateException("Expected to find only a single store, but found multiple!"); + this.storeDef = storeDefs.get(0); + + this.numChunks = conf.getInt("num.chunks", -1); + if(this.numChunks < 1) + throw new VoldemortException("num.chunks not specified in the job conf."); + + this.saveKeys = conf.getBoolean("save.keys", false); + this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false); + this.conf = job; + this.outputDir = job.get("final.output.dir"); + this.taskId = job.get("mapred.task.id"); + this.checkSumType = CheckSum.fromString(job.get("checksum.type")); + + this.checkSumDigestIndex = new CheckSum[getNumChunks()]; + this.checkSumDigestValue = new CheckSum[getNumChunks()]; + this.position = new int[getNumChunks()]; + this.taskIndexFileName = new Path[getNumChunks()]; + this.taskValueFileName = new Path[getNumChunks()]; + this.indexFileStream = new DataOutputStream[getNumChunks()]; + this.valueFileStream = new DataOutputStream[getNumChunks()]; + + for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) { + + this.checkSumDigestIndex[chunkId] = CheckSum.getInstance(checkSumType); + this.checkSumDigestValue[chunkId] = CheckSum.getInstance(checkSumType); + this.position[chunkId] = 0; + + this.taskIndexFileName[chunkId] = new Path(FileOutputFormat.getOutputPath(job), + getStoreName() + "." + + Integer.toString(chunkId) + + "_" + this.taskId + ".index"); + this.taskValueFileName[chunkId] = new Path(FileOutputFormat.getOutputPath(job), + getStoreName() + "." + + Integer.toString(chunkId) + + "_" + this.taskId + ".data"); + + if(this.fs == null) + this.fs = this.taskIndexFileName[chunkId].getFileSystem(job); + + this.indexFileStream[chunkId] = fs.create(this.taskIndexFileName[chunkId]); + this.valueFileStream[chunkId] = fs.create(this.taskValueFileName[chunkId]); + + logger.info("Opening " + this.taskIndexFileName[chunkId] + " and " + + this.taskValueFileName[chunkId] + " for writing."); + } + + } catch(IOException e) { + // throw new RuntimeException("Failed to open Input/OutputStream", + // e); + e.printStackTrace(); + } + + } + + @Override + public void write(BytesWritable key, Iterator iterator, Reporter reporter) + throws IOException { + + // Read chunk id + int chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks()); + + // Write key and position + this.indexFileStream[chunkId].write(key.get(), 0, key.getSize()); + this.indexFileStream[chunkId].writeInt(this.position[chunkId]); + + // Run key through checksum digest + if(this.checkSumDigestIndex[chunkId] != null) { + this.checkSumDigestIndex[chunkId].update(key.get(), 0, key.getSize()); + this.checkSumDigestIndex[chunkId].update(this.position[chunkId]); + } + + short numTuples = 0; + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream valueStream = new DataOutputStream(stream); + + while(iterator.hasNext()) { + BytesWritable writable = iterator.next(); + byte[] valueBytes = writable.get(); + int offsetTillNow = 0; + + // Read node Id + if(this.nodeId == -1) + this.nodeId = ByteUtils.readInt(valueBytes, offsetTillNow); + offsetTillNow += ByteUtils.SIZE_OF_INT; + + // Read partition id + if(this.partitionId == -1) + this.partitionId = ByteUtils.readInt(valueBytes, offsetTillNow); + offsetTillNow += ByteUtils.SIZE_OF_INT; + + // Read replica type + if(getSaveKeys()) { + if(this.replicaType == -1) + this.replicaType = (int) ByteUtils.readBytes(valueBytes, + offsetTillNow, + ByteUtils.SIZE_OF_BYTE); + offsetTillNow += ByteUtils.SIZE_OF_BYTE; + } + + int valueLength = writable.getSize() - offsetTillNow; + if(getSaveKeys()) { + // Write ( key_length, value_length, key, + // value ) + valueStream.write(valueBytes, offsetTillNow, valueLength); + } else { + // Write (value_length + value) + valueStream.writeInt(valueLength); + valueStream.write(valueBytes, offsetTillNow, valueLength); + } + + numTuples++; + + // If we have multiple values for this md5 that is a collision, + // throw an exception--either the data itself has duplicates, there + // are trillions of keys, or someone is attempting something + // malicious ( We obviously expect collisions when we save keys ) + if(!getSaveKeys() && numTuples > 1) + throw new VoldemortException("Duplicate keys detected for md5 sum " + + ByteUtils.toHexString(ByteUtils.copy(key.get(), + 0, + key.getSize()))); + + } + + if(numTuples < 0) { + // Overflow + throw new VoldemortException("Found too many collisions: chunk " + chunkId + + " has exceeded " + Short.MAX_VALUE + " collisions."); + } else if(numTuples > 1) { + // Update number of collisions + max keys per collision + reporter.incrCounter(CollisionCounter.NUM_COLLISIONS, 1); + + long numCollisions = reporter.getCounter(CollisionCounter.MAX_COLLISIONS).getCounter(); + if(numTuples > numCollisions) { + reporter.incrCounter(CollisionCounter.MAX_COLLISIONS, numTuples - numCollisions); + } + } + + // Flush the value + valueStream.flush(); + byte[] value = stream.toByteArray(); + + // Start writing to file now + // First, if save keys flag set the number of keys + if(getSaveKeys()) { + + this.valueFileStream[chunkId].writeShort(numTuples); + this.position[chunkId] += ByteUtils.SIZE_OF_SHORT; + + if(this.checkSumDigestValue[chunkId] != null) { + this.checkSumDigestValue[chunkId].update(numTuples); + } + } + + this.valueFileStream[chunkId].write(value); + this.position[chunkId] += value.length; + + if(this.checkSumDigestValue[chunkId] != null) { + this.checkSumDigestValue[chunkId].update(value); + } + + if(this.position[chunkId] < 0) + throw new VoldemortException("Chunk overflow exception: chunk " + chunkId + + " has exceeded " + Integer.MAX_VALUE + " bytes."); + + } + + @Override + public void close() throws IOException { + + for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) { + this.indexFileStream[chunkId].close(); + this.valueFileStream[chunkId].close(); + } + + if(this.nodeId == -1 || this.partitionId == -1) { + // Issue 258 - No data was read in the reduce phase, do not create + // any output + return; + } + + // If the replica type read was not valid, shout out + if(getSaveKeys() && this.replicaType == -1) { + throw new RuntimeException("Could not read the replica type correctly for node " + + nodeId + " ( partition - " + this.partitionId + " )"); + } + + String fileNamePrefix = null; + if(getSaveKeys()) { + fileNamePrefix = new String(Integer.toString(this.partitionId) + "_" + + Integer.toString(this.replicaType) + "_"); + } else { + fileNamePrefix = new String(Integer.toString(this.partitionId) + "_"); + } + + // Initialize the node directory + Path nodeDir = new Path(this.outputDir, "node-" + this.nodeId); + + // Create output directory, if it doesn't exist + FileSystem outputFs = nodeDir.getFileSystem(this.conf); + outputFs.mkdirs(nodeDir); + + // Write the checksum and output files + for(int chunkId = 0; chunkId < getNumChunks(); chunkId++) { + + String chunkFileName = fileNamePrefix + Integer.toString(chunkId); + if(this.checkSumType != CheckSumType.NONE) { + + if(this.checkSumDigestIndex[chunkId] != null + && this.checkSumDigestValue[chunkId] != null) { + Path checkSumIndexFile = new Path(nodeDir, chunkFileName + ".index.checksum"); + Path checkSumValueFile = new Path(nodeDir, chunkFileName + ".data.checksum"); + + FSDataOutputStream output = outputFs.create(checkSumIndexFile); + output.write(this.checkSumDigestIndex[chunkId].getCheckSum()); + output.close(); + + output = outputFs.create(checkSumValueFile); + output.write(this.checkSumDigestValue[chunkId].getCheckSum()); + output.close(); + } else { + throw new RuntimeException("Failed to open checksum digest for node " + nodeId + + " ( partition - " + this.partitionId + + ", chunk - " + chunkId + " )"); + } + } + + // Generate the final chunk files + Path indexFile = new Path(nodeDir, chunkFileName + ".index"); + Path valueFile = new Path(nodeDir, chunkFileName + ".data"); + + logger.info("Moving " + this.taskIndexFileName[chunkId] + " to " + indexFile); + fs.rename(taskIndexFileName[chunkId], indexFile); + logger.info("Moving " + this.taskValueFileName[chunkId] + " to " + valueFile); + fs.rename(this.taskValueFileName[chunkId], valueFile); + + } + + } + + private int numChunks; + private Cluster cluster; + private StoreDefinition storeDef; + private boolean saveKeys; + private boolean reducerPerBucket; + + public Cluster getCluster() { + checkNotNull(cluster); + return cluster; + } + + public boolean getSaveKeys() { + return this.saveKeys; + } + + public boolean getReducerPerBucket() { + return this.reducerPerBucket; + } + + public StoreDefinition getStoreDef() { + checkNotNull(storeDef); + return storeDef; + } + + public String getStoreName() { + checkNotNull(storeDef); + return storeDef.getName(); + } + + private final void checkNotNull(Object o) { + if(o == null) + throw new VoldemortException("Not configured yet!"); + } + + public int getNumChunks() { + return this.numChunks; + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/KeyValueWriter.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/KeyValueWriter.java new file mode 100644 index 0000000000..ddc11c69e2 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/disk/KeyValueWriter.java @@ -0,0 +1,23 @@ +package voldemort.store.readonly.disk; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; + +public interface KeyValueWriter { + + public static enum CollisionCounter { + + NUM_COLLISIONS, + MAX_COLLISIONS; + } + + public void conf(JobConf job); + + public void write(K key, Iterator iterator, Reporter reporter) throws IOException; + + public void close() throws IOException; + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java new file mode 100644 index 0000000000..05b14179e5 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderMapper.java @@ -0,0 +1,296 @@ +package voldemort.store.readonly.mr; + +import java.io.IOException; +import java.io.StringReader; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.List; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.mapred.AvroCollector; +import org.apache.avro.mapred.AvroMapper; +import org.apache.avro.mapred.Pair; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; + +import voldemort.VoldemortException; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.routing.ConsistentRoutingStrategy; +import voldemort.serialization.DefaultSerializerFactory; +import voldemort.serialization.SerializerDefinition; +import voldemort.serialization.SerializerFactory; +import voldemort.serialization.avro.AvroGenericSerializer; +import voldemort.store.StoreDefinition; +import voldemort.store.compress.CompressionStrategy; +import voldemort.store.compress.CompressionStrategyFactory; +import voldemort.store.readonly.mr.azkaban.StoreBuilderTransformation; +import voldemort.store.readonly.mr.utils.HadoopUtils; +import voldemort.utils.ByteUtils; +import voldemort.xml.ClusterMapper; +import voldemort.xml.StoreDefinitionsMapper; +import azkaban.common.utils.Props; +import azkaban.common.utils.Utils; + +public class AvroStoreBuilderMapper extends + AvroMapper> implements JobConfigurable { + + protected MessageDigest md5er; + protected ConsistentRoutingStrategy routingStrategy; + protected AvroGenericSerializer keySerializer; + protected AvroGenericSerializer valueSerializer; + + private String keySchema; + private String valSchema; + + private String keyField; + private String valField; + + private String _keySelection; + private String _valSelection; + + private StoreBuilderTransformation _keyTrans; + private StoreBuilderTransformation _valTrans; + + private CompressionStrategy valueCompressor; + private CompressionStrategy keyCompressor; + private SerializerDefinition keySerializerDefinition; + private SerializerDefinition valueSerializerDefinition; + + // Path path = new Path(fileName); + FSDataOutputStream outputStream; + + /** + * Create the voldemort key and value from the input key and value and map + * it out for each of the responsible voldemort nodes + * + * The output key is the md5 of the serialized key returned by makeKey(). + * The output value is the node_id & partition_id of the responsible node + * followed by serialized value returned by makeValue() OR if we have + * setKeys flag on the serialized key and serialized value + */ + @Override + public void map(GenericData.Record record, + AvroCollector> collector, + Reporter reporter) throws IOException { + + byte[] keyBytes = keySerializer.toBytes(record.get(keyField)); + byte[] valBytes = valueSerializer.toBytes(record.get(valField)); + + // Compress key and values if required + if(keySerializerDefinition.hasCompression()) { + keyBytes = keyCompressor.deflate(keyBytes); + } + + if(valueSerializerDefinition.hasCompression()) { + valBytes = valueCompressor.deflate(valBytes); + } + + // Get the output byte arrays ready to populate + byte[] outputValue; + BytesWritable outputKey; + + // Leave initial offset for (a) node id (b) partition id + // since they are written later + int offsetTillNow = 2 * ByteUtils.SIZE_OF_INT; + + if(getSaveKeys()) { + + // In order - 4 ( for node id ) + 4 ( partition id ) + 1 ( + // replica + // type - primary | secondary | tertiary... ] + 4 ( key size ) + // size ) + 4 ( value size ) + key + value + outputValue = new byte[valBytes.length + keyBytes.length + ByteUtils.SIZE_OF_BYTE + 4 + * ByteUtils.SIZE_OF_INT]; + + // Write key length - leave byte for replica type + offsetTillNow += ByteUtils.SIZE_OF_BYTE; + ByteUtils.writeInt(outputValue, keyBytes.length, offsetTillNow); + + // Write value length + offsetTillNow += ByteUtils.SIZE_OF_INT; + ByteUtils.writeInt(outputValue, valBytes.length, offsetTillNow); + + // Write key + offsetTillNow += ByteUtils.SIZE_OF_INT; + System.arraycopy(keyBytes, 0, outputValue, offsetTillNow, keyBytes.length); + + // Write value + offsetTillNow += keyBytes.length; + System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); + + // Generate MR key - upper 8 bytes of 16 byte md5 + outputKey = new BytesWritable(ByteUtils.copy(md5er.digest(keyBytes), + 0, + 2 * ByteUtils.SIZE_OF_INT)); + + } else { + + // In order - 4 ( for node id ) + 4 ( partition id ) + value + outputValue = new byte[valBytes.length + 2 * ByteUtils.SIZE_OF_INT]; + + // Write value + System.arraycopy(valBytes, 0, outputValue, offsetTillNow, valBytes.length); + + // Generate MR key - 16 byte md5 + outputKey = new BytesWritable(md5er.digest(keyBytes)); + + } + + // Generate partition and node list this key is destined for + List partitionList = routingStrategy.getPartitionList(keyBytes); + Node[] partitionToNode = routingStrategy.getPartitionToNode(); + + for(int replicaType = 0; replicaType < partitionList.size(); replicaType++) { + + // Node id + ByteUtils.writeInt(outputValue, + partitionToNode[partitionList.get(replicaType)].getId(), + 0); + + if(getSaveKeys()) { + // Primary partition id + ByteUtils.writeInt(outputValue, partitionList.get(0), ByteUtils.SIZE_OF_INT); + + // Replica type + ByteUtils.writeBytes(outputValue, + replicaType, + 2 * ByteUtils.SIZE_OF_INT, + ByteUtils.SIZE_OF_BYTE); + } else { + // Partition id + ByteUtils.writeInt(outputValue, + partitionList.get(replicaType), + ByteUtils.SIZE_OF_INT); + } + BytesWritable outputVal = new BytesWritable(outputValue); + + // System.out.println("collect length (K/V): "+ + // outputKey.getLength()+ " , " + outputVal.getLength()); + ByteBuffer keyBuffer = null, valueBuffer = null; + + byte[] md5KeyBytes = outputKey.getBytes(); + keyBuffer = ByteBuffer.allocate(md5KeyBytes.length); + keyBuffer.put(md5KeyBytes); + keyBuffer.rewind(); + + valueBuffer = ByteBuffer.allocate(outputValue.length); + valueBuffer.put(outputValue); + valueBuffer.rewind(); + + Pair p = new Pair(keyBuffer, + valueBuffer); + + collector.collect(p); + } + md5er.reset(); + } + + @Override + public void configure(JobConf conf) { + + super.setConf(conf); + // from parent code + + md5er = ByteUtils.getDigest("md5"); + + this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml"))); + List storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml"))); + if(storeDefs.size() != 1) + throw new IllegalStateException("Expected to find only a single store, but found multiple!"); + this.storeDef = storeDefs.get(0); + + this.numChunks = conf.getInt("num.chunks", -1); + if(this.numChunks < 1) + throw new VoldemortException("num.chunks not specified in the job conf."); + + this.saveKeys = conf.getBoolean("save.keys", true); + this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false); + + keySerializerDefinition = getStoreDef().getKeySerializer(); + valueSerializerDefinition = getStoreDef().getValueSerializer(); + + try { + SerializerFactory factory = new DefaultSerializerFactory(); + + if(conf.get("serializer.factory") != null) { + factory = (SerializerFactory) Class.forName(conf.get("serializer.factory")) + .newInstance(); + } + + keyField = conf.get("avro.key.field"); + valField = conf.get("avro.value.field"); + + keySchema = conf.get("avro.key.schema"); + valSchema = conf.get("avro.val.schema"); + + // hadoop.job.valueSchema + keySerializer = new AvroGenericSerializer(keySchema); + valueSerializer = new AvroGenericSerializer(valSchema); + } catch(Exception e) { + throw new RuntimeException(e); + } + + keyCompressor = new CompressionStrategyFactory().get(keySerializerDefinition.getCompression()); + valueCompressor = new CompressionStrategyFactory().get(valueSerializerDefinition.getCompression()); + + routingStrategy = new ConsistentRoutingStrategy(getCluster().getNodes(), + getStoreDef().getReplicationFactor()); + + // / + Props props = HadoopUtils.getPropsFromJob(conf); + + _keySelection = props.getString("key.selection", null); + _valSelection = props.getString("value.selection", null); + + String _keyTransClass = props.getString("key.transformation.class", null); + String _valueTransClass = props.getString("value.transformation.class", null); + + if(_keyTransClass != null) + _keyTrans = (StoreBuilderTransformation) Utils.callConstructor(_keyTransClass); + if(_valueTransClass != null) + _valTrans = (StoreBuilderTransformation) Utils.callConstructor(_valueTransClass); + } + + private int numChunks; + private Cluster cluster; + private StoreDefinition storeDef; + private boolean saveKeys; + private boolean reducerPerBucket; + + public Cluster getCluster() { + checkNotNull(cluster); + return cluster; + } + + public boolean getSaveKeys() { + return this.saveKeys; + } + + public boolean getReducerPerBucket() { + return this.reducerPerBucket; + } + + public StoreDefinition getStoreDef() { + checkNotNull(storeDef); + return storeDef; + } + + public String getStoreName() { + checkNotNull(storeDef); + return storeDef.getName(); + } + + private final void checkNotNull(Object o) { + if(o == null) + throw new VoldemortException("Not configured yet!"); + } + + public int getNumChunks() { + return this.numChunks; + } + +} \ No newline at end of file diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java new file mode 100644 index 0000000000..6c1c5a9dc2 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderPartitioner.java @@ -0,0 +1,154 @@ +package voldemort.store.readonly.mr; + +/* + * Copyright 2008-2009 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import java.io.IOException; +import java.io.StringReader; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.AvroValue; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + +import voldemort.VoldemortException; +import voldemort.cluster.Cluster; +import voldemort.store.StoreDefinition; +import voldemort.store.readonly.ReadOnlyUtils; +import voldemort.utils.ByteUtils; +import voldemort.xml.ClusterMapper; +import voldemort.xml.StoreDefinitionsMapper; + +/** + * A Partitioner that splits data so that all data for the same nodeId, chunkId + * combination ends up in the same reduce (and hence in the same store chunk) + */ +@SuppressWarnings("deprecation") +public class AvroStoreBuilderPartitioner implements + Partitioner, AvroValue> { + + @Override + public int getPartition(AvroKey key, AvroValue value, int numReduceTasks) { + + byte[] keyBytes = null, valueBytes; + + keyBytes = new byte[key.datum().remaining()]; + key.datum().get(keyBytes); + + valueBytes = new byte[value.datum().remaining()]; + value.datum().get(valueBytes); + + BytesWritable outputKey = new BytesWritable(keyBytes); + BytesWritable outputVal = new BytesWritable(valueBytes); + + ByteBuffer keyBuffer = null, valueBuffer = null; + + keyBuffer = ByteBuffer.allocate(keyBytes.length); + keyBuffer.put(keyBytes); + keyBuffer.rewind(); + + valueBuffer = ByteBuffer.allocate(valueBytes.length); + valueBuffer.put(valueBytes); + valueBuffer.rewind(); + + key.datum(keyBuffer); + value.datum(valueBuffer); + + int partitionId = ByteUtils.readInt(valueBytes, ByteUtils.SIZE_OF_INT); + int chunkId = ReadOnlyUtils.chunk(keyBytes, getNumChunks()); + if(getSaveKeys()) { + int replicaType = (int) ByteUtils.readBytes(valueBytes, + 2 * ByteUtils.SIZE_OF_INT, + ByteUtils.SIZE_OF_BYTE); + if(getReducerPerBucket()) { + return (partitionId * getStoreDef().getReplicationFactor() + replicaType) + % numReduceTasks; + } else { + return ((partitionId * getStoreDef().getReplicationFactor() * getNumChunks()) + + (replicaType * getNumChunks()) + chunkId) + % numReduceTasks; + } + } else { + if(getReducerPerBucket()) { + return partitionId % numReduceTasks; + } else { + return (partitionId * getNumChunks() + chunkId) % numReduceTasks; + } + + } + } + + private int numChunks; + private Cluster cluster; + private StoreDefinition storeDef; + private boolean saveKeys; + private boolean reducerPerBucket; + + @Override + public void configure(JobConf conf) { + this.cluster = new ClusterMapper().readCluster(new StringReader(conf.get("cluster.xml"))); + List storeDefs = new StoreDefinitionsMapper().readStoreList(new StringReader(conf.get("stores.xml"))); + if(storeDefs.size() != 1) + throw new IllegalStateException("Expected to find only a single store, but found multiple!"); + this.storeDef = storeDefs.get(0); + + this.numChunks = conf.getInt("num.chunks", -1); + if(this.numChunks < 1) + throw new VoldemortException("num.chunks not specified in the job conf."); + + this.saveKeys = conf.getBoolean("save.keys", false); + this.reducerPerBucket = conf.getBoolean("reducer.per.bucket", false); + } + + @SuppressWarnings("unused") + public void close() throws IOException {} + + public Cluster getCluster() { + checkNotNull(cluster); + return cluster; + } + + public boolean getSaveKeys() { + return this.saveKeys; + } + + public boolean getReducerPerBucket() { + return this.reducerPerBucket; + } + + public StoreDefinition getStoreDef() { + checkNotNull(storeDef); + return storeDef; + } + + public String getStoreName() { + checkNotNull(storeDef); + return storeDef.getName(); + } + + private final void checkNotNull(Object o) { + if(o == null) + throw new VoldemortException("Not configured yet!"); + } + + public int getNumChunks() { + return this.numChunks; + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducer.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducer.java new file mode 100644 index 0000000000..6a8be8662b --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducer.java @@ -0,0 +1,92 @@ +package voldemort.store.readonly.mr; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.AvroValue; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +import voldemort.store.readonly.disk.HadoopStoreWriter; +import voldemort.store.readonly.disk.KeyValueWriter; +import azkaban.common.utils.Utils; + +public class AvroStoreBuilderReducer implements + Reducer, AvroValue, Text, Text>, JobConfigurable, Closeable { + + String keyValueWriterClass; + @SuppressWarnings("rawtypes") + KeyValueWriter writer; + + @SuppressWarnings("unchecked") + @Override + public void reduce(AvroKey keyAvro, + Iterator> iterator, + OutputCollector collector, + Reporter reporter) throws IOException { + + ByteBuffer keyBuffer = keyAvro.datum(); + keyBuffer.rewind(); + + byte[] keyBytes = null, valueBytes; + + keyBytes = new byte[keyBuffer.remaining()]; + keyBuffer.get(keyBytes); + + BytesWritable key = new BytesWritable(keyBytes); + + ArrayList valueList = new ArrayList(); + + while(iterator.hasNext()) { + ByteBuffer writable = iterator.next().datum(); + writable.rewind(); + // BytesWritable writable = iterator.next(); + valueBytes = null; + valueBytes = new byte[writable.remaining()]; + writable.get(valueBytes); + + BytesWritable value = new BytesWritable(valueBytes); + valueList.add(value); + + } + + writer.write(key, valueList.iterator(), reporter); + + } + + @Override + public void configure(JobConf job) { + + JobConf conf = job; + try { + + keyValueWriterClass = conf.get("writer.class"); + if(keyValueWriterClass != null) + writer = (KeyValueWriter) Utils.callConstructor(keyValueWriterClass); + else + writer = new HadoopStoreWriter(); + + writer.conf(job); + + } catch(Exception e) { + // throw new RuntimeException("Failed to open Input/OutputStream", + // e); + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + + writer.close(); + } +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducerPerBucket.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducerPerBucket.java new file mode 100644 index 0000000000..d1f53d678b --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AvroStoreBuilderReducerPerBucket.java @@ -0,0 +1,94 @@ +package voldemort.store.readonly.mr; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.AvroValue; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Closeable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; + +import voldemort.store.readonly.disk.HadoopStoreWriterPerBucket; +import voldemort.store.readonly.disk.KeyValueWriter; +import azkaban.common.utils.Utils; + +public class AvroStoreBuilderReducerPerBucket implements + Reducer, AvroValue, Text, Text>, JobConfigurable, Closeable { + + private static final Logger logger = Logger.getLogger(AvroStoreBuilderReducerPerBucket.class); + + String keyValueWriterClass; + @SuppressWarnings("rawtypes") + KeyValueWriter writer; + + @Override + public void reduce(AvroKey keyAvro, + Iterator> iterator, + OutputCollector collector, + Reporter reporter) throws IOException { + + ByteBuffer keyBuffer = keyAvro.datum(); + keyBuffer.rewind(); + + byte[] keyBytes = null, valueBytes; + + keyBytes = new byte[keyBuffer.remaining()]; + keyBuffer.get(keyBytes); + + BytesWritable key = new BytesWritable(keyBytes); + + ArrayList valueList = new ArrayList(); + + while(iterator.hasNext()) { + ByteBuffer writable = iterator.next().datum(); + writable.rewind(); + // BytesWritable writable = iterator.next(); + valueBytes = null; + valueBytes = new byte[writable.remaining()]; + writable.get(valueBytes); + + BytesWritable value = new BytesWritable(valueBytes); + valueList.add(value); + + } + + writer.write(key, valueList.iterator(), reporter); + + } + + @Override + public void configure(JobConf job) { + + JobConf conf = job; + try { + + keyValueWriterClass = conf.get("writer.class"); + if(keyValueWriterClass != null) + writer = (KeyValueWriter) Utils.callConstructor(keyValueWriterClass); + else + writer = new HadoopStoreWriterPerBucket(); + + writer.conf(job); + + } catch(Exception e) { + // throw new RuntimeException("Failed to open Input/OutputStream", + // e); + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + + writer.close(); + } +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonMapper.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonMapper.java new file mode 100644 index 0000000000..3ea7783ca7 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonMapper.java @@ -0,0 +1,20 @@ +package voldemort.store.readonly.mr; + +import java.io.IOException; + +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +import voldemort.store.readonly.mr.serialization.JsonMapper; + +public class IdentityJsonMapper extends JsonMapper { + + @Override + public void mapObjects(Object key, + Object value, + OutputCollector output, + Reporter reporter) throws IOException { + output.collect(key, value); + } + +} \ No newline at end of file diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonReducer.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonReducer.java new file mode 100644 index 0000000000..e4c7f3a21e --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/IdentityJsonReducer.java @@ -0,0 +1,22 @@ +package voldemort.store.readonly.mr; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +import voldemort.store.readonly.mr.serialization.JsonReducer; + +public class IdentityJsonReducer extends JsonReducer { + + @Override + public void reduceObjects(Object key, + Iterator values, + OutputCollector collector, + Reporter reporter) throws IOException { + while(values.hasNext()) { + collector.collect(key, values.next()); + } + } +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/VoldemortStoreBuilderMapper.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/VoldemortStoreBuilderMapper.java new file mode 100644 index 0000000000..d0473d4bfa --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/VoldemortStoreBuilderMapper.java @@ -0,0 +1,73 @@ +package voldemort.store.readonly.mr; + +import java.util.Map; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; + +import voldemort.serialization.json.JsonTypeSerializer; +import voldemort.store.readonly.mr.azkaban.StoreBuilderTransformation; +import voldemort.store.readonly.mr.utils.HadoopUtils; +import azkaban.common.utils.Props; +import azkaban.common.utils.Utils; + +public class VoldemortStoreBuilderMapper extends AbstractHadoopStoreBuilderMapper { + + private String _keySelection; + private String _valSelection; + private JsonTypeSerializer _inputKeySerializer; + private JsonTypeSerializer _inputValueSerializer; + private StoreBuilderTransformation _keyTrans; + private StoreBuilderTransformation _valTrans; + + @Override + public Object makeKey(Object key, Object value) { + return makeResult((BytesWritable) key, _inputKeySerializer, _keySelection, _keyTrans); + } + + @Override + public Object makeValue(Object key, Object value) { + return makeResult((BytesWritable) value, _inputValueSerializer, _valSelection, _valTrans); + } + + private Object makeResult(BytesWritable writable, + JsonTypeSerializer serializer, + String selection, + StoreBuilderTransformation trans) { + Object obj = serializer.toObject(writable.get()); + if(selection != null) { + Map m = (Map) obj; + obj = m.get(selection); + } + + if(trans != null) + obj = trans.transform(obj); + + return obj; + } + + @Override + public void configure(JobConf conf) { + super.configure(conf); + Props props = HadoopUtils.getPropsFromJob(conf); + + _keySelection = props.getString("key.selection", null); + _valSelection = props.getString("value.selection", null); + _inputKeySerializer = getSchemaFromJob(conf, "mapper.input.key.schema"); + _inputValueSerializer = getSchemaFromJob(conf, "mapper.input.value.schema"); + String _keyTransClass = props.getString("key.transformation.class", null); + String _valueTransClass = props.getString("value.transformation.class", null); + + if(_keyTransClass != null) + _keyTrans = (StoreBuilderTransformation) Utils.callConstructor(_keyTransClass); + if(_valueTransClass != null) + _valTrans = (StoreBuilderTransformation) Utils.callConstructor(_valueTransClass); + } + + protected JsonTypeSerializer getSchemaFromJob(JobConf conf, String key) { + if(conf.get(key) == null) + throw new IllegalArgumentException("Missing required parameter '" + key + "' on job."); + return new JsonTypeSerializer(conf.get(key)); + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractHadoopJob.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractHadoopJob.java new file mode 100644 index 0000000000..afd8bc34e3 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractHadoopJob.java @@ -0,0 +1,268 @@ +package voldemort.store.readonly.mr.azkaban; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.RunningJob; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import voldemort.store.readonly.mr.IdentityJsonReducer; +import voldemort.store.readonly.mr.utils.HadoopUtils; +import azkaban.common.jobs.AbstractJob; +import azkaban.common.utils.Props; + +/** + * An abstract Base class for Hadoop Jobs + * + * @author bbansal + * + */ +public abstract class AbstractHadoopJob extends AbstractJob { + + public static String COMMON_FILE_DATE_PATTERN = "yyyy-MM-dd-HH-mm"; + public static final String HADOOP_PREFIX = "hadoop-conf."; + public static final String LATEST_SUFFIX = "#LATEST"; + public static final String CURRENT_SUFFIX = "#CURRENT"; + private final Props _props; + private RunningJob _runningJob; + + public AbstractHadoopJob(String name, Props props) { + super(name); + this._props = props; + } + + public void run(JobConf conf) throws Exception { + _runningJob = new JobClient(conf).submitJob(conf); + info("See " + _runningJob.getTrackingURL() + " for details."); + _runningJob.waitForCompletion(); + + if(!_runningJob.isSuccessful()) { + throw new Exception("Hadoop job:" + getId() + " failed!"); + } + + // dump all counters + Counters counters = _runningJob.getCounters(); + for(String groupName: counters.getGroupNames()) { + Counters.Group group = counters.getGroup(groupName); + info("Group: " + group.getDisplayName()); + for(Counter counter: group) + info(counter.getDisplayName() + ":\t" + counter.getValue()); + } + } + + public JobConf createJobConf(Class mapperClass) throws IOException, + URISyntaxException { + JobConf conf = createJobConf(mapperClass, IdentityJsonReducer.class); + conf.setNumReduceTasks(0); + + return conf; + } + + public JobConf createJobConf(Class mapperClass, + Class reducerClass, + Class combinerClass) throws IOException, + URISyntaxException { + JobConf conf = createJobConf(mapperClass, reducerClass); + conf.setCombinerClass(combinerClass); + + return conf; + } + + public JobConf createJobConf(Class mapperClass, + Class reducerClass) throws IOException, + URISyntaxException { + JobConf conf = new JobConf(); + // set custom class loader with custom find resource strategy. + + conf.setJobName(getId()); + conf.setMapperClass(mapperClass); + conf.setReducerClass(reducerClass); + + String hadoop_ugi = _props.getString("hadoop.job.ugi", null); + if(hadoop_ugi != null) { + conf.set("hadoop.job.ugi", hadoop_ugi); + } + + if(_props.getBoolean("is.local", false)) { + conf.set("mapred.job.tracker", "local"); + conf.set("fs.default.name", "file:///"); + conf.set("mapred.local.dir", "/tmp/map-red"); + + info("Running locally, no hadoop jar set."); + } else { + setClassLoaderAndJar(conf, getClass()); + info("Setting hadoop jar file for class:" + getClass() + " to " + conf.getJar()); + info("*************************************************************************"); + info(" Running on Real Hadoop Cluster(" + conf.get("mapred.job.tracker") + + ") "); + info("*************************************************************************"); + } + + // set JVM options if present + if(_props.containsKey("mapred.child.java.opts")) { + conf.set("mapred.child.java.opts", _props.getString("mapred.child.java.opts")); + info("mapred.child.java.opts set to " + _props.getString("mapred.child.java.opts")); + } + + // set input and output paths if they are present + if(_props.containsKey("input.paths")) { + List inputPaths = _props.getStringList("input.paths"); + if(inputPaths.size() == 0) + throw new IllegalArgumentException("Must specify at least one value for property 'input.paths'"); + for(String path: inputPaths) { + // Implied stuff, but good implied stuff + if(path.endsWith(LATEST_SUFFIX)) { + FileSystem fs = FileSystem.get(conf); + + PathFilter filter = new PathFilter() { + + @Override + public boolean accept(Path arg0) { + return !arg0.getName().startsWith("_") + && !arg0.getName().startsWith("."); + } + }; + + String latestPath = path.substring(0, path.length() - LATEST_SUFFIX.length()); + FileStatus[] statuses = fs.listStatus(new Path(latestPath), filter); + + Arrays.sort(statuses); + + path = statuses[statuses.length - 1].getPath().toString(); + System.out.println("Using latest folder: " + path); + } + HadoopUtils.addAllSubPaths(conf, new Path(path)); + } + } + + if(_props.containsKey("output.path")) { + String location = _props.get("output.path"); + if(location.endsWith("#CURRENT")) { + DateTimeFormatter format = DateTimeFormat.forPattern(COMMON_FILE_DATE_PATTERN); + String destPath = format.print(new DateTime()); + location = location.substring(0, location.length() - "#CURRENT".length()) + + destPath; + System.out.println("Store location set to " + location); + } + + FileOutputFormat.setOutputPath(conf, new Path(location)); + // For testing purpose only remove output file if exists + if(_props.getBoolean("force.output.overwrite", false)) { + FileSystem fs = FileOutputFormat.getOutputPath(conf).getFileSystem(conf); + fs.delete(FileOutputFormat.getOutputPath(conf), true); + } + } + + // Adds External jars to hadoop classpath + String externalJarList = _props.getString("hadoop.external.jarFiles", null); + if(externalJarList != null) { + String[] jarFiles = externalJarList.split(","); + for(String jarFile: jarFiles) { + info("Adding extenral jar File:" + jarFile); + DistributedCache.addFileToClassPath(new Path(jarFile), conf); + } + } + + // Adds distributed cache files + String cacheFileList = _props.getString("hadoop.cache.files", null); + if(cacheFileList != null) { + String[] cacheFiles = cacheFileList.split(","); + for(String cacheFile: cacheFiles) { + info("Adding Distributed Cache File:" + cacheFile); + DistributedCache.addCacheFile(new URI(cacheFile), conf); + } + } + + // Adds distributed cache files + String archiveFileList = _props.getString("hadoop.cache.archives", null); + if(archiveFileList != null) { + String[] archiveFiles = archiveFileList.split(","); + for(String archiveFile: archiveFiles) { + info("Adding Distributed Cache Archive File:" + archiveFile); + DistributedCache.addCacheArchive(new URI(archiveFile), conf); + } + } + + String hadoopCacheJarDir = _props.getString("hdfs.default.classpath.dir", null); + if(hadoopCacheJarDir != null) { + FileSystem fs = FileSystem.get(conf); + if(fs != null) { + FileStatus[] status = fs.listStatus(new Path(hadoopCacheJarDir)); + + if(status != null) { + for(int i = 0; i < status.length; ++i) { + if(!status[i].isDir()) { + Path path = new Path(hadoopCacheJarDir, status[i].getPath().getName()); + info("Adding Jar to Distributed Cache Archive File:" + path); + + DistributedCache.addFileToClassPath(path, conf); + } + } + } else { + info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " is empty."); + } + } else { + info("hdfs.default.classpath.dir " + hadoopCacheJarDir + + " filesystem doesn't exist"); + } + } + + // May want to add this to HadoopUtils, but will await refactoring + for(String key: getProps().keySet()) { + String lowerCase = key.toLowerCase(); + if(lowerCase.startsWith(HADOOP_PREFIX)) { + String newKey = key.substring(HADOOP_PREFIX.length()); + conf.set(newKey, getProps().get(key)); + } + } + + HadoopUtils.setPropsInJob(conf, getProps()); + return conf; + } + + public Props getProps() { + return this._props; + } + + public void cancel() throws Exception { + if(_runningJob != null) + _runningJob.killJob(); + } + + public double getProgress() throws IOException { + if(_runningJob == null) + return 0.0; + else + return (double) (_runningJob.mapProgress() + _runningJob.reduceProgress()) / 2.0d; + } + + public Counters getCounters() throws IOException { + return _runningJob.getCounters(); + } + + public static void setClassLoaderAndJar(JobConf conf, Class jobClass) { + conf.setClassLoader(Thread.currentThread().getContextClassLoader()); + String jar = HadoopUtils.findContainingJar(jobClass, Thread.currentThread() + .getContextClassLoader()); + if(jar != null) { + conf.setJar(jar); + } + } +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractVoldemortBatchCopyJob.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractVoldemortBatchCopyJob.java new file mode 100644 index 0000000000..3e6860ed04 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/AbstractVoldemortBatchCopyJob.java @@ -0,0 +1,116 @@ +package voldemort.store.readonly.mr.azkaban; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; + +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.store.readonly.mr.utils.HadoopUtils; +import azkaban.common.jobs.AbstractJob; +import azkaban.common.utils.Props; + +/** + * A test job that throws an exception + * + * @author bbansal Required Properties + *
    + *
  • voldemort.cluster.file
  • + *
  • voldemort.store.name
  • + *
  • input.path
  • + *
  • dest.path
  • + *
  • source.host
  • + *
  • dest.host
  • + *
+ */ +public abstract class AbstractVoldemortBatchCopyJob extends AbstractJob { + + private final Props _props; + + public AbstractVoldemortBatchCopyJob(String name, Props props) throws IOException { + super(name); + _props = props; + } + + public void run() throws Exception { + JobConf conf = new JobConf(); + HadoopUtils.copyInAllProps(_props, conf); + + Cluster cluster = HadoopUtils.readCluster(_props.get("voldemort.cluster.file"), conf); + final String storeName = _props.get("voldemort.store.name"); + final Path inputDir = new Path(_props.get("input.path")); + + ExecutorService executors = Executors.newFixedThreadPool(cluster.getNumberOfNodes()); + final Semaphore semaphore = new Semaphore(0, false); + final AtomicInteger countSuccess = new AtomicInteger(0); + final boolean[] succeeded = new boolean[cluster.getNumberOfNodes()]; + final String destinationDir = _props.get("dest.path"); + final String sourceHost = _props.getString("src.host", "localhost"); + + for(final Node node: cluster.getNodes()) { + + executors.execute(new Runnable() { + + public void run() { + int id = node.getId(); + String indexFile = inputDir + "/" + storeName + ".index" + "_" + + Integer.toString(id); + String dataFile = inputDir + "/" + storeName + ".data" + "_" + + Integer.toString(id); + + String host = node.getHost(); + try { + // copyFileToLocal(sourceHost, + // indexFile, + // host, + // VoldemortSwapperUtils.getIndexDestinationFile(node.getId(), + // destinationDir)); + // copyFileToLocal(sourceHost, + // dataFile, + // host, + // VoldemortSwapperUtils.getDataDestinationFile(node.getId(), + // destinationDir)); + + succeeded[node.getId()] = true; + countSuccess.incrementAndGet(); + } catch(Exception e) { + error("copy to Remote node failed for node:" + node.getId(), e); + } + + semaphore.release(); + } + }); + } + + // wait for all operations to complete + semaphore.acquire(cluster.getNumberOfNodes()); + + try { + if(countSuccess.get() == cluster.getNumberOfNodes() + || _props.getBoolean("swap.partial.index", false)) { + int counter = 0; + // lets try to swap only the successful nodes + for(Node node: cluster.getNodes()) { + // data refresh succeeded + if(succeeded[node.getId()]) { + VoldemortSwapperUtils.doSwap(storeName, node, destinationDir); + counter++; + } + } + info(counter + " node out of " + cluster.getNumberOfNodes() + + " refreshed with fresh index/data for store '" + storeName + "'"); + } else { + error("Failed to copy Index Files for the entire cluster."); + } + } finally { + // stop all executors Now + executors.shutdown(); + } + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/Job.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/Job.java new file mode 100644 index 0000000000..91ba74370e --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/Job.java @@ -0,0 +1,59 @@ +package voldemort.store.readonly.mr.azkaban; + +import java.util.Properties; + +/** + * This interface defines a Raw Job interface. Each job defines + *
    + *
  • Job Type : {HADOOP, UNIX, JAVA, SUCCESS_TEST, CONTROLLER}
  • + *
  • Job ID/Name : {String}
  • + *
  • Arguments: Key/Value Map for Strings
  • + *
+ * + * A job is required to have a constructor Job(String jobId, Props props) + */ + +public interface Job { + + /** + * Returns a unique(should be checked in xml) string name/id for the Job. + * + * @return + */ + public String getId(); + + /** + * Run the job. In general this method can only be run once. Must either + * succeed or throw an exception. + */ + public void run() throws Exception; + + /** + * Best effort attempt to cancel the job. + * + * @throws Exception If cancel fails + */ + public void cancel() throws Exception; + + /** + * Returns a progress report between [0 - 1.0] to indicate the percentage + * complete + * + * @throws Exception If getting progress fails + */ + public double getProgress() throws Exception; + + /** + * Get the generated properties from this job. + * + * @return + */ + public Properties getJobGeneratedProperties(); + + /** + * Determine if the job was cancelled. + * + * @return + */ + public boolean isCanceled(); +} \ No newline at end of file diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/StoreBuilderTransformation.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/StoreBuilderTransformation.java new file mode 100644 index 0000000000..6703f6879a --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/StoreBuilderTransformation.java @@ -0,0 +1,14 @@ +package voldemort.store.readonly.mr.azkaban; + +/** + * An interface to use for processing rows in the voldemort store builder + * + * @author jkreps + * + */ +public interface StoreBuilderTransformation +{ + + public Object transform(Object obj); + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/UndefinedPropertyException.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/UndefinedPropertyException.java new file mode 100644 index 0000000000..cbfe09504b --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/UndefinedPropertyException.java @@ -0,0 +1,11 @@ +package voldemort.store.readonly.mr.azkaban; + +public class UndefinedPropertyException extends RuntimeException { + + private static final long serialVersionUID = 1; + + public UndefinedPropertyException(String message) { + super(message); + } + +} diff --git a/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBatchIndexJob.java b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBatchIndexJob.java new file mode 100644 index 0000000000..cbe1d28f78 --- /dev/null +++ b/contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/azkaban/VoldemortBatchIndexJob.java @@ -0,0 +1,405 @@ +package voldemort.store.readonly.mr.azkaban; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.lib.HashPartitioner; +import org.apache.log4j.Logger; + +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.routing.ConsistentRoutingStrategy; +import voldemort.serialization.DefaultSerializerFactory; +import voldemort.serialization.Serializer; +import voldemort.store.StoreDefinition; +import voldemort.store.readonly.mr.serialization.JsonConfigurable; +import voldemort.store.readonly.mr.utils.HadoopUtils; +import voldemort.utils.ByteUtils; +import azkaban.common.utils.Props; + +/** + * Creates Index and value files using Voldemort hash keys for easy batch + * update. + *

+ * Creates two files + *