Permalink
Browse files

Misc. performance fixes. Avoid encoding and decoding strings in bench…

…mark. Avoid recopying byte arrays in hadoop job. Make jvm use -d64 so that we can mmap large chunks in startup scripts.
  • Loading branch information...
1 parent f77aff6 commit 59685b39b560e23ad2fb49fad357934077895863 @jkreps jkreps committed May 19, 2009
View
@@ -36,7 +36,8 @@
<classpathentry kind="lib" path="lib/libthrift-20080411p1.jar"/>
<classpathentry kind="lib" path="lib/google-collect-snapshot-20090211.jar"/>
<classpathentry kind="lib" path="contrib/mongodb/lib/mongo-xjdm.jar"/>
- <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.18.1-core.jar"/>
<classpathentry kind="lib" path="lib/je-3.3.75.jar"/>
+ <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/commons-cli-2.0-SNAPSHOT.jar"/>
+ <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.18.1-core.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
@@ -16,9 +16,9 @@
# limitations under the License.
#
-if [ $# != 7 ];
+if [ $# != 8 ];
then
- echo 'USAGE: bin/build-readonly-store.sh cluster.xml store_definitions.xml store_name sort_obj_buffer_size input_data output_dir num_threads'
+ echo 'USAGE: bin/build-readonly-store.sh cluster.xml store_definitions.xml store_name sort_obj_buffer_size input_data output_dir num_threads num_chunks'
exit 1
fi
@@ -36,4 +36,4 @@ done
CLASSPATH=$CLASSPATH:$base_dir/dist/resources
-java -Xmx2G -server -cp $CLASSPATH voldemort.store.readonly.JsonStoreBuilder ${1} ${2} ${3} ${4} ${5} ${6} ${7}
+java -Xmx2G -server -d64 -cp $CLASSPATH voldemort.store.readonly.JsonStoreBuilder ${1} ${2} ${3} ${4} ${5} ${6} ${7} ${8}
View
@@ -35,4 +35,4 @@ done
CLASSPATH=$CLASSPATH:$base_dir/dist/resources
export CLASSPATH
-java -Xmx2G -server -Dcom.sun.management.jmxremote -cp $CLASSPATH ${1} ${2} ${3} ${4} ${5} ${6} ${7}
+java -Xmx2G -d64 -server -Dcom.sun.management.jmxremote -cp $CLASSPATH ${1} ${2} ${3} ${4} ${5} ${6} ${7}
View
@@ -35,4 +35,4 @@ do
done
CLASSPATH=$CLASSPATH:$base_dir/dist/resources
-java -agentlib:hprof=cpu=samples,depth=10 -Xmx2G -server -cp $CLASSPATH voldemort.store.readonly.StringSorter ${1} ${2} ${3}
+java -d64 -Xmx2G -server -cp $CLASSPATH voldemort.store.readonly.StringSorter ${1} ${2} ${3}
View
@@ -36,4 +36,4 @@ done
CLASSPATH=$CLASSPATH:$base_dir/dist/resources
-java -Xmx2G -server -cp $CLASSPATH -Dcom.sun.management.jmxremote voldemort.server.VoldemortServer ${1}
+java -d64 -Xmx2G -server -cp $CLASSPATH -Dcom.sun.management.jmxremote voldemort.server.VoldemortServer ${1}
@@ -6,7 +6,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -15,6 +15,7 @@
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.mr.AbstractHadoopStoreBuilderMapper;
import voldemort.store.readonly.mr.HadoopStoreBuilder;
+import voldemort.utils.ByteUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;
@@ -51,30 +52,39 @@ public int run(String[] args) throws Exception {
Configuration config = this.getConf();
config.set("mapred.job.name", "test-store-builder");
+ config.setInt("mapred.min.split.size", 1024 * 1024 * 1024);
HadoopStoreBuilder builder = new HadoopStoreBuilder(config,
BuildTestStoreMapper.class,
SequenceFileInputFormat.class,
cluster,
def,
2,
- 512 * 1024 * 1024,
+ (long) (1.5 * 1024 * 1024 * 1024),
new Path(tempDir),
new Path(outputDir),
new Path(inputDir));
builder.build();
return 0;
}
- public static class BuildTestStoreMapper extends AbstractHadoopStoreBuilderMapper<Text, Text> {
+ public static class BuildTestStoreMapper extends
+ AbstractHadoopStoreBuilderMapper<BytesWritable, BytesWritable> {
@Override
- public Object makeKey(Text key, Text value) {
- return key.toString();
+ public Object makeKey(BytesWritable key, BytesWritable value) {
+ return getValid(key);
}
@Override
- public Object makeValue(Text key, Text value) {
- return value.toString();
+ public Object makeValue(BytesWritable key, BytesWritable value) {
+ return value.get();
+ }
+
+ private byte[] getValid(BytesWritable writable) {
+ if(writable.getSize() == writable.getCapacity())
+ return writable.get();
+ else
+ return ByteUtils.copy(writable.get(), 0, writable.getSize());
}
}
@@ -6,6 +6,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -55,8 +56,8 @@ public int run(String[] args) throws Exception {
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
+ conf.setOutputKeyClass(BytesWritable.class);
+ conf.setOutputValueClass(BytesWritable.class);
Path inputPath = new Path(args[0]);
FileInputFormat.setInputPaths(conf, inputPath);
@@ -73,15 +74,15 @@ public int run(String[] args) throws Exception {
}
public static class GenerateDataMapper extends MapReduceBase implements
- Mapper<LongWritable, Text, Text, Text> {
+ Mapper<LongWritable, Text, BytesWritable, BytesWritable> {
- private String string;
+ private BytesWritable value;
public void map(LongWritable lineNumber,
Text line,
- OutputCollector<Text, Text> collector,
+ OutputCollector<BytesWritable, BytesWritable> collector,
Reporter reporter) throws IOException {
- collector.collect(line, new Text(string));
+ collector.collect(new BytesWritable(line.getBytes()), value);
}
@Override
@@ -90,7 +91,7 @@ public void configure(JobConf job) {
int size = job.getInt("value.size", -1);
for(int i = 0; i < size; i++)
builder.append('a');
- this.string = builder.toString();
+ this.value = new BytesWritable(builder.toString().getBytes());
}
}
@@ -14,6 +14,7 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
@@ -31,14 +32,19 @@
*/
public class HadoopStoreBuilder {
+ public static final long MIN_CHUNK_SIZE = 1L;
+ public static final long MAX_CHUNK_SIZE = (long) (1.9 * 1024 * 1024 * 1024);
+
+ private static final Logger logger = Logger.getLogger(HadoopStoreBuilder.class);
+
private final Configuration config;
private final Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass;
@SuppressWarnings("unchecked")
private final Class<? extends InputFormat> inputFormatClass;
private final Cluster cluster;
private final StoreDefinition storeDef;
private final int replicationFactor;
- private final int chunkSizeBytes;
+ private final long chunkSizeBytes;
private final Path inputPath;
private final Path outputDir;
private final Path tempDir;
@@ -64,7 +70,7 @@ public HadoopStoreBuilder(Configuration conf,
Cluster cluster,
StoreDefinition storeDef,
int replicationFactor,
- int chunkSizeBytes,
+ long chunkSizeBytes,
Path tempDir,
Path outputDir,
Path inputPath) {
@@ -79,6 +85,9 @@ public HadoopStoreBuilder(Configuration conf,
this.chunkSizeBytes = chunkSizeBytes;
this.tempDir = tempDir;
this.outputDir = Utils.notNull(outputDir);
+ if(chunkSizeBytes > MAX_CHUNK_SIZE || chunkSizeBytes < MIN_CHUNK_SIZE)
+ throw new VoldemortException("Invalid chunk size, chunk size must be in the range "
+ + MIN_CHUNK_SIZE + "..." + MAX_CHUNK_SIZE);
}
public void build() {
@@ -108,14 +117,34 @@ public void build() {
FileSystem fs = tempDir.getFileSystem(conf);
fs.delete(tempDir, true);
- FileStatus status = fs.getFileStatus(inputPath);
- int numChunks = Math.max((int) status.getLen() / chunkSizeBytes, 1);
+ long size = sizeOfPath(fs, inputPath);
+ int numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
+ / cluster.getNumberOfNodes() / chunkSizeBytes), 1);
+ logger.info("Data size = " + size + ", replication factor = "
+ + storeDef.getReplicationFactor() + ", numNodes = "
+ + cluster.getNumberOfNodes() + ", chunk size = " + chunkSizeBytes
+ + ", num.chunks = " + numChunks);
conf.setInt("num.chunks", numChunks);
conf.setNumReduceTasks(cluster.getNumberOfNodes() * numChunks);
+ logger.info("Building store...");
JobClient.runJob(conf);
} catch(IOException e) {
throw new VoldemortException(e);
}
}
+
+ private long sizeOfPath(FileSystem fs, Path path) throws IOException {
+ long size = 0;
+ FileStatus[] statuses = fs.listStatus(path);
+ if(statuses != null) {
+ for(FileStatus status: statuses) {
+ if(status.isDir())
+ size += sizeOfPath(fs, status.getPath());
+ else
+ size += status.getLen();
+ }
+ }
+ return size;
+ }
}
@@ -14,6 +14,7 @@
public class HadoopStoreBuilderBase {
+ private int numChunks;
private Cluster cluster;
private StoreDefinition storeDef;
@@ -23,6 +24,9 @@ public void configure(JobConf conf) {
if(storeDefs.size() != 1)
throw new IllegalStateException("Expected to find only a single store, but found multiple!");
this.storeDef = storeDefs.get(0);
+ this.numChunks = conf.getInt("num.chunks", -1);
+ if(this.numChunks < 1)
+ throw new VoldemortException("num.chunks not specified in the job conf.");
}
@SuppressWarnings("unused")
@@ -48,4 +52,8 @@ private final void checkNotNull(Object o) {
throw new VoldemortException("Not configured yet!");
}
+ public int getNumChunks() {
+ return this.numChunks;
+ }
+
}
@@ -1,10 +1,8 @@
package voldemort.store.readonly.mr;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
-import voldemort.VoldemortException;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.ByteUtils;
@@ -15,23 +13,13 @@
* @author bbansal, jay
*
*/
-public class HadoopStoreBuilderPartitioner implements Partitioner<BytesWritable, BytesWritable> {
-
- private int numChunks;
+public class HadoopStoreBuilderPartitioner extends HadoopStoreBuilderBase implements
+ Partitioner<BytesWritable, BytesWritable> {
public int getPartition(BytesWritable key, BytesWritable value, int numReduceTasks) {
int nodeId = ByteUtils.readInt(value.get(), 0);
- int chunkId = ReadOnlyUtils.chunk(key.get(), numChunks);
- System.out.println("nodeId = " + nodeId + ", chunkId = " + chunkId + ", partition = "
- + (chunkId * numChunks + nodeId) % numReduceTasks
- + ", numReduceTasks = " + numReduceTasks);
- return (chunkId * numChunks + nodeId) % numReduceTasks;
- }
-
- public void configure(JobConf job) {
- this.numChunks = job.getInt("num.chunks", -1);
- if(this.numChunks < 1)
- throw new VoldemortException("num.chunks not specified in the job conf.");
+ int chunkId = ReadOnlyUtils.chunk(key.get(), getNumChunks());
+ return (nodeId * getNumChunks() + chunkId % numReduceTasks);
}
}
@@ -62,18 +62,16 @@ public void reduce(BytesWritable key,
if(this.chunkId == -1)
this.chunkId = ReadOnlyUtils.chunk(keyBytes, this.numChunks);
- // read all but the first 4 bytes, which contain the node id
- byte[] value = ByteUtils.copy(valueBytes, 4, writable.getSize());
-
// Write key and position
this.indexFileStream.write(keyBytes);
this.indexFileStream.writeInt(this.position);
// Write length and value
- this.valueFileStream.writeInt(value.length);
- this.valueFileStream.write(value);
+ int valueLength = writable.getSize() - 4;
+ this.valueFileStream.writeInt(valueLength);
+ this.valueFileStream.write(valueBytes, 4, valueLength);
- this.position += 4 + value.length;
+ this.position += 4 + valueLength;
if(this.position < 0)
throw new VoldemortException("Chunk overflow exception: chunk " + chunkId
+ " has exceeded " + Integer.MAX_VALUE + " bytes.");

0 comments on commit 59685b3

Please sign in to comment.