Skip to content
Browse files

initial commit

  • Loading branch information...
0 parents commit 3f0c0278159a60762284d34a1dc4aac9232257f2 @ahadrana committed Nov 11, 2010
Showing with 8,800 additions and 0 deletions.
  1. +27 −0 README
  2. +151 −0 bin/launcher.sh
  3. +2 −0 build.properties
  4. +161 −0 build.xml
  5. +7 −0 conf/commons-logging.properties
  6. +84 −0 conf/log4j.properties
  7. BIN lib/commons-io-1.4.jar
  8. BIN lib/commons-logging-1.0.4.jar
  9. BIN lib/commons-logging-api-1.0.4.jar
  10. BIN lib/junit-4.3.1.jar
  11. BIN lib/log4j-1.2.15.jar
  12. +18 −0 src/org/commoncrawl/hadoop/mergeutils/KeyValuePairComparator.java
  13. +1,240 −0 src/org/commoncrawl/hadoop/mergeutils/MergeSortSpillWriter.java
  14. +396 −0 src/org/commoncrawl/hadoop/mergeutils/MergeSortSpillWriterUnitTest.java
  15. +299 −0 src/org/commoncrawl/hadoop/mergeutils/OptimizedKeyGeneratorAndComparator.java
  16. +31 −0 src/org/commoncrawl/hadoop/mergeutils/RawDataSpillWriter.java
  17. +39 −0 src/org/commoncrawl/hadoop/mergeutils/RawKeyValueComparator.java
  18. +33 −0 src/org/commoncrawl/hadoop/mergeutils/SequenceFileIndexWriter.java
  19. +652 −0 src/org/commoncrawl/hadoop/mergeutils/SequenceFileMerger.java
  20. +119 −0 src/org/commoncrawl/hadoop/mergeutils/SequenceFileReader.java
  21. +429 −0 src/org/commoncrawl/hadoop/mergeutils/SequenceFileSpillWriter.java
  22. +18 −0 src/org/commoncrawl/hadoop/mergeutils/SpillValueCombiner.java
  23. +22 −0 src/org/commoncrawl/hadoop/mergeutils/SpillWriter.java
  24. +46 −0 src/org/commoncrawl/util/shared/CCStringUtils.java
  25. +87 −0 src/org/commoncrawl/util/shared/FileUtils.java
  26. +364 −0 src/org/commoncrawl/util/shared/FlexBuffer.java
  27. +391 −0 src/org/commoncrawl/util/shared/IntrusiveList.java
  28. +18 −0 src/src/org/commoncrawl/hadoop/mergeutils/KeyValuePairComparator.java
  29. +1,240 −0 src/src/org/commoncrawl/hadoop/mergeutils/MergeSortSpillWriter.java
  30. +396 −0 src/src/org/commoncrawl/hadoop/mergeutils/MergeSortSpillWriterUnitTest.java
  31. +299 −0 src/src/org/commoncrawl/hadoop/mergeutils/OptimizedKeyGeneratorAndComparator.java
  32. +31 −0 src/src/org/commoncrawl/hadoop/mergeutils/RawDataSpillWriter.java
  33. +39 −0 src/src/org/commoncrawl/hadoop/mergeutils/RawKeyValueComparator.java
  34. +33 −0 src/src/org/commoncrawl/hadoop/mergeutils/SequenceFileIndexWriter.java
  35. +652 −0 src/src/org/commoncrawl/hadoop/mergeutils/SequenceFileMerger.java
  36. +119 −0 src/src/org/commoncrawl/hadoop/mergeutils/SequenceFileReader.java
  37. +429 −0 src/src/org/commoncrawl/hadoop/mergeutils/SequenceFileSpillWriter.java
  38. +18 −0 src/src/org/commoncrawl/hadoop/mergeutils/SpillValueCombiner.java
  39. +22 −0 src/src/org/commoncrawl/hadoop/mergeutils/SpillWriter.java
  40. +46 −0 src/src/org/commoncrawl/util/shared/CCStringUtils.java
  41. +87 −0 src/src/org/commoncrawl/util/shared/FileUtils.java
  42. +364 −0 src/src/org/commoncrawl/util/shared/FlexBuffer.java
  43. +391 −0 src/src/org/commoncrawl/util/shared/IntrusiveList.java
27 README
@@ -0,0 +1,27 @@
+The beginnings of a library to perform sorting or large datasets outside the scope of a map-reduce jobs.
+
+Please set hadoop.version and hadoop.path in build.properties to point to your version of
+hadoop.
+
+Once commoncrawl-mergeutils.jar has been built, you can execute a org.commoncrawl.hadoop.mergeutils.MergeSortSpillWriterUnitTest via the bin/launcher.sh script.
+
+The luancher runs the command in the background. You can monitor progress via either ./logs/<<ClassName>>.log for LOG output, or ./logs/<<ClassName>>_run.log for stdout output.
+
+The main class of interest are, obviously, MergeSortSpillWriter and also SequenceFileMerger. MergeSortSpillWriter can be fed unsorted records via the spillRecord call. It will
+internally buffer records until a configurable threshold is reached, and then will sort the intermediate records and spill them to a temp sequence file. This will continue
+until the close method is called. Close will trigger the class to spill the final set of records and then feed the part files to SequenceFileMerger, which will perform a merge
+sort of the records and spill them to a configurable output SpillWriter. To optimize the sort, one should specify a RawKeyValueComparator or to squeeze even more performance,
+use the OptimizedKeyGeneratorAndComparator class to generate a long key value from key,value pairs or a long key + buffer secondary key from a key,value pair.
+
+This MergeSortSpillWriter and SequenceFileMerger classes have been used in production to sort very large recordsets (100 of millions of records). But, the library is a work in
+progress. The combiner code in SequenceFileMerge should be avoided for now.
+
+Upcoming features include:
+
+1. The ability to spill Raw Records via MergeSortSpillWriter.
+2. Native (C++) quick-sort support when an optimized comparator/generator is used.
+3. TFileReader and TFileSpillWriter class additions.
+4. Flushing out of the Combiner/Reducer implementation in the SequenceFileMerger class.
+5. Making the quick-sort mutli-threaded, and parallelizing the merge-sort.
+
+
151 bin/launcher.sh
@@ -0,0 +1,151 @@
+#!/usr/bin/env bash
+
+# resolve links - $0 may be a softlink
+
+this="$0"
+while [ -h "$this" ]; do
+ ls=`ls -ld "$this"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '.*/.*' > /dev/null; then
+ this="$link"
+ else
+ this=`dirname "$this"`/"$link"
+ fi
+done
+
+
+# convert relative path to absolute path
+bin=`dirname "$this"`
+script=`basename "$this"`
+bin=`cd "$bin"; pwd`
+this="$bin/$script"
+
+# the root of the app installation
+export CCAPP_HOME=`dirname "$this"`/..
+export CCAPP_CONF_DIR=$CCAPP_HOME/conf
+export CCAPP_LOG_DIR=$CCAPP_HOME/logs
+export CCAPP_LIB_DIR=$CCAPP_HOME/lib
+
+echo "CCAPP_HOME:"$CCAPP_HOME
+echo "CCAPP_CONF_DIR:$CCAPP_CONF_DIR"
+echo "CCAPP_LOG_DIR:$CCAPP_LOG_DIR"
+
+if ! [ -e $CCAPP_HOME/build/commoncrawl-*.jar ]; then
+ echo "Please build commoncrawl jar"
+else
+ CCAPP_JAR=`basename $CCAPP_HOME/build/commoncrawl*.jar`
+ CCAPP_JAR_PATH=$CCAPP_HOME/build
+ echo "CCAPP_JAR:"$CCAPP_JAR
+ echo "CCAPP_JAR_PATH:"$CCAPP_JAR_PATH
+fi
+
+if [ "$JAVA_HOME" = "" ]; then
+ echo "Error: JAVA_HOME is not set."
+ exit 1
+else
+ echo "JAVA_HOME:$JAVA_HOME"
+fi
+
+
+if [ "$HADOOP_HOME" = "" ]; then
+ echo "HADOOP_HOME not defined. Attempting to locate via ~/build.properties"
+ HADOOP_HOME=`cat ~/build.properties | grep "hadoop.path" | sed 's/.*=\(.*\)$/\1/'`
+
+ if ! [ "$HADOOP_HOME" = "" ]; then
+ echo "Derived HADOOP_HOME from build.properties to be:$HADOOP_HOME"
+ else
+ echo "Failed to extract HADOOP_HOME from build.properties. Please set HADOOP_HOME to point to Hadoop Distribution"
+ exit 1
+ fi
+fi
+
+# Try to locate hadoop home if not set ...
+if [ -z $HADOOP_HOME/build/hadoop-*-core.jar ]; then
+ HADOOP_JAR=`ls $HADOOP_HOME/build/hadoop-*-core.jar`
+else
+ HADOOP_JAR=`ls $HADOOP_HOME/hadoop-*-core.jar`
+fi
+
+if [ "$HADOOP_CONF_DIR" = "" ]; then
+ HADOOP_CONF_DIR="$HADOOP_HOME/conf"
+fi
+
+echo "HADOOP_JAR:$HADOOP_JAR"
+echo "HADOOP_CONF_DIR:$HADOOP_CONF_DIR"
+
+# CLASSPATH initially contains CCAPP_CONF:HADOOP_CONF_DIR
+CLASSPATH=${CCAPP_CONF_DIR}
+CLASSPATH=${CLASSPATH}:${HADOOP_CONF_DIR}
+# and add in test path ...
+CLASSPATH=${CLASSPATH}:${CCAPP_HOME}/tests
+# next add tools.jar
+CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
+# next add commoncrawl jar FIRST ...
+CLASSPATH=${CLASSPATH}:${CCAPP_JAR_PATH}/${CCAPP_JAR}
+# then add nested libraries in commoncrawl jar
+for f in ${CCAPP_HOME}/lib//*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+#next add hadoop jar path
+CLASSPATH=${CLASSPATH}:${HADOOP_JAR}
+# add hadoop libs to CLASSPATH
+for f in $HADOOP_HOME/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+# and add jetty libs ...
+for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+
+echo "";
+echo "CLASSPATH:$CLASSPATH"
+echo "";
+
+CCAPP_CLASS_NAME=$1
+
+if [ "$CCAPP_CLASS_NAME" = "" ]; then
+ echo "No Main Class Specified!"
+ exit 1;
+fi
+
+echo "CCAPP_CLASS_NAME:$CCAPP_CLASS_NAME"
+CCAPP_NAME=`echo $CCAPP_CLASS_NAME | sed 's/.*\.\(.*\)$/\1/'`
+echo "CCAPP_NAME:$CCAPP_NAME"
+CCAPP_LOG_FILE=$CCAPP_NAME.log
+
+if [ "$JAVA_HEAP_MAX" = "" ]; then
+ JAVA_HEAP_MAX=-Xmx2000m
+fi
+
+JAVA="$JAVA_HOME/bin/java"
+
+#establish hadoop platform name string
+JAVA_PLATFORM=`CLASSPATH=${CLASSPATH} ${JAVA} org.apache.hadoop.util.PlatformName | sed -e 's/ /_/g' | sed -e "s/ /_/g"`
+echo Platform Name is:${JAVA_PLATFORM}
+#setup commoncrawl library paths
+JAVA_LIBRARY_PATH=${CCAPP_LIB_DIR}:${CCAPP_LIB_DIR}/native/${JAVA_PLATFORM}
+#setup execution path
+export PATH=${CCAPP_LIB_DIR}/native/${JAVA_PLATFORM}:$PATH
+#and ld_library path
+#export LD_LIBRARY_PATH=${CCAPP_LIB_DIR}/native/${JAVA_PLATFORM}:$LD_LIBRARY_PATH
+
+CCAPP_VMARGS="$CCAPP_VMARGS -Dcommoncrawl.log.dir=$CCAPP_LOG_DIR"
+CCAPP_VMARGS="$CCAPP_VMARGS -Dcommoncrawl.log.file=$CCAPP_LOG_FILE"
+CCAPP_VMARGS="$CCAPP_VMARGS -Dhadoop.home.dir=$HADOOP_HOME"
+CCAPP_VMARGS="$CCAPP_VMARGS -Dcommoncrawl.root.logger=${CCAPP_ROOT_LOGGER:-INFO,DRFA}"
+CCAPP_VMARGS="$CCAPP_VMARGS $JAVA_HEAP_MAX"
+CCAPP_VMARGS="$CCAPP_VMARGS -XX:+UseParNewGC -XX:ParallelGCThreads=8 -XX:NewSize=200m -XX:+PrintGCDetails"
+CCAPP_VMARGS="$CCAPP_VMARGS -Djava.library.path=${JAVA_LIBRARY_PATH}"
+CCAPP_VMARGS="$CCAPP_VMARGS -Dcc.native.lib.path=${CCAPP_LIB_DIR}/native/${JAVA_PLATFORM}"
+
+
+CCAPP_CMD_LINE="$JAVA $CCAPP_VMARGS -classpath $CLASSPATH $CCAPP_CLASS $@"
+CCAPP_RUN_LOG=$CCAPP_LOG_DIR/${CCAPP_NAME}_run.log
+echo "CCAPP_CMD_LINE:$CCAPP_CMD_LINE"
+nohup $CCAPP_CMD_LINE "$@" > $CCAPP_RUN_LOG 2>&1 < /dev/null &
+echo $! > "/tmp/${CCAPP_NAME}.pid"
+echo "Process PID Is:"$! " StdOut,StdErr logged to:" $CCAPP_RUN_LOG
+sleep 1; head "$CCAPP_RUN_LOG"
+
+
+
2 build.properties
@@ -0,0 +1,2 @@
+hadoop.version=hadoop-0.20.2+320
+hadoop.path=./lib/hadoop-0.20.2+320
161 build.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0"?>
+
+<project name="commoncrawl-mergeutils" default="compile">
+
+ <!-- Load all the default properties, and any the user wants -->
+ <!-- to contribute (without having to type -D or edit this file -->
+ <property file="${user.home}/build.properties" />
+ <property file="${basedir}/build.properties" />
+ <property name="Name" value="commoncrawl-mergeutils"/>
+ <property name="name" value="commoncrawl-mergeutils"/>
+ <property name="version" value="0.1"/>
+ <property name="final.name" value="${name}-${version}"/>
+
+ <fail message="Please define Hadoop Version via hadoop.version in your build.properties file">
+ <condition>
+ <not>
+ <isset property="hadoop.version"/>
+ </not>
+ </condition>
+ </fail>
+ <fail message="Please define Hadoop Base Path via hadoop.path in your build.properties file">
+ <condition>
+ <not>
+ <isset property="hadoop.path"/>
+ </not>
+ </condition>
+ </fail>
+
+ <property name="src.dir" value="${basedir}/src"/>
+ <property name="amazon.src.dir" value="${basedir}/lib/third_party/amazon"/>
+ <property name="lib.dir" value="${basedir}/lib"/>
+ <property name="conf.dir" value="${basedir}/conf"/>
+
+ <property name="build.dir" value="${basedir}/build"/>
+ <property name="build.classes" value="${build.dir}/classes"/>
+ <property name="build.src" value="${build.dir}/src"/>
+ <property name="build.webapps" value="${build.dir}/webapps"/>
+ <property name="build.anttasks" value="${build.dir}/ant"/>
+
+ <!-- convert spaces to _ so that mac os doesn't break things -->
+ <exec executable="sed" inputstring="${os.name}"
+ outputproperty="nonspace.os">
+ <arg value="s/ /_/g"/>
+ </exec>
+
+ <property name="build.platform"
+ value="${nonspace.os}-${os.arch}-${sun.arch.data.model}"/>
+
+
+ <property name="build.encoding" value="ISO-8859-1"/>
+ <property name="dist.dir" value="${build.dir}/${final.name}"/>
+
+ <property name="javac.debug" value="on"/>
+ <property name="javac.optimize" value="on"/>
+ <property name="javac.deprecation" value="off"/>
+ <property name="javac.version" value="1.5"/>
+ <property name="javac.args" value=""/>
+ <property name="javac.args.warnings" value=""/>
+
+
+ <!-- the normal classpath -->
+ <echo message="Processing Class Path"/>
+ <path id="classpath">
+ <pathelement location="${build.classes}"/>
+ <fileset dir="${lib.dir}">
+ <include name="*.jar" />
+ <include name="**/*.jar" />
+ </fileset>
+ <fileset dir="${hadoop.path}">
+ <include name="lib/**/*.jar"/>
+ <include name="hadoop-*-core.jar"/>
+ </fileset>
+ </path>
+
+ <echo message="classpath:${classpath}" />
+ <!-- ====================================================== -->
+ <!-- Stuff needed by all targets -->
+ <!-- ====================================================== -->
+ <echo message="Processing Init Target"/>
+ <target name="init">
+ <echo message="Executing Init Target"/>
+
+ <mkdir dir="${build.dir}"/>
+ <mkdir dir="${build.classes}"/>
+ <mkdir dir="${build.src}"/>
+ <mkdir dir="${build.anttasks}"/>
+
+ </target>
+
+ <!-- ====================================================== -->
+ <!-- Compile the Java files -->
+ <!-- ====================================================== -->
+ <echo message="Processing Compile Core Classes Target"/>
+ <target name="compile-core-classes" depends="init">
+
+ <!-- Compile Java files (excluding JSPs) checking warnings -->
+ <javac
+ encoding="${build.encoding}"
+ srcdir="${src.dir};${build.src}"
+ includes="org/commoncrawl/**/*.java"
+ excludes="org/commoncrawl/**/OneService.java"
+ destdir="${build.classes}"
+ debug="${javac.debug}"
+ optimize="${javac.optimize}"
+ target="${javac.version}"
+ source="${javac.version}"
+ deprecation="${javac.deprecation}" >
+ <compilerarg line="${javac.args} ${javac.args.warnings}" />
+ <classpath refid="classpath"/>
+ </javac>
+ </target>
+
+ <echo message="Processing Compile Core Target"/>
+ <target name="compile-core" depends="compile-core-classes">
+ </target>
+
+ <echo message="Processing Compile Target"/>
+ <target name="compile" depends="compile-core,jar">
+ </target>
+
+ <!-- ================================================================== -->
+ <!-- Make commoncrawl.jar -->
+ <!-- ================================================================== -->
+ <!-- -->
+ <!-- ================================================================== -->
+ <echo message="Processing Jar Target"/>
+ <target name="jar" depends="compile-core">
+ <jar jarfile="${build.dir}/${final.name}.jar" basedir="${build.classes}" duplicate="preserve" >
+
+ <manifest>
+ <section name="org/commoncrawl">
+ <attribute name="Implementation-Title" value="commoncrawl"/>
+ <attribute name="Implementation-Version" value="${version}"/>
+ <attribute name="Implementation-Vendor" value="CommonCrawl"/>
+ </section>
+ </manifest>
+ <zipfileset dir="${conf.dir}" prefix="conf" >
+ <include name="*" />
+ </zipfileset>
+ <zipfileset dir="${lib.dir}" prefix="lib" >
+ <include name="*.jar" />
+ </zipfileset>
+ <zipfileset dir="${hadoop.path}/lib" prefix="lib" >
+ <include name="**/*.jar" />
+ </zipfileset>
+ </jar>
+ </target>
+
+ <!-- ================================================================== -->
+ <!-- Clean. Delete the build files, and their directories -->
+ <!-- ================================================================== -->
+
+ <echo message="Processing Clean Target"/>
+ <target name="clean">
+ <delete dir="${build.dir}"/>
+ <delete>
+ <fileset dir="${src.dir}" includes="**\/gen.stamp" />
+ </delete>
+ </target>
+ <echo message="Done Processing Targets"/>
+</project>
7 conf/commons-logging.properties
@@ -0,0 +1,7 @@
+#Logging Implementation
+
+#Log4J
+org.apache.commons.logging.Log=org.apache.commons.logging.impl.Log4JLogger
+
+#JDK Logger
+#org.apache.commons.logging.Log=org.apache.commons.logging.impl.Jdk14Logger
84 conf/log4j.properties
@@ -0,0 +1,84 @@
+# Define some default values that can be overridden by system properties
+commoncrawl.root.logger=INFO,console,DRFA
+commoncrawl.log.dir=./logs
+commoncrawl.log.file=commoncrawl.log
+
+# Define the root logger to the system property "commoncrawl.root.logger".
+log4j.rootLogger=${commoncrawl.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${commoncrawl.log.dir}/${commoncrawl.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
BIN lib/commons-io-1.4.jar
Binary file not shown.
BIN lib/commons-logging-1.0.4.jar
Binary file not shown.
BIN lib/commons-logging-api-1.0.4.jar
Binary file not shown.
BIN lib/junit-4.3.1.jar
Binary file not shown.
BIN lib/log4j-1.2.15.jar
Binary file not shown.
18 src/org/commoncrawl/hadoop/mergeutils/KeyValuePairComparator.java
@@ -0,0 +1,18 @@
+package org.commoncrawl.hadoop.mergeutils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * The basic Comparator interface. Note: You can compare on Typed keys and/or values
+ *
+ * @author rana
+ *
+ * @param <KeyType>
+ * @param <ValueType>
+ */
+public interface KeyValuePairComparator<KeyType extends WritableComparable,ValueType extends Writable> {
+
+ int compare(KeyType key1,ValueType value1,KeyType key2,ValueType value2);
+
+}
1,240 src/org/commoncrawl/hadoop/mergeutils/MergeSortSpillWriter.java
@@ -0,0 +1,1240 @@
+package org.commoncrawl.hadoop.mergeutils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.text.NumberFormat;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.Reporter;
+import org.commoncrawl.hadoop.mergeutils.OptimizedKeyGeneratorAndComparator.OptimizedKey;
+import org.commoncrawl.util.shared.CCStringUtils;
+import org.commoncrawl.util.shared.FileUtils;
+/**
+ *
+ * The workhorse of the library. This class (currently) implements SpillWriter and thus can accept
+ * a very large number of <<unsorted>> Key/Value pairs, which it then sorts in batches and stores in a
+ * temp directory, and finally merges everything back into one sorted output via a merge sort operation.
+ *
+ * This class implements two constructors, one that takes a RawComparator and one that takes an
+ * OptimizedKeyGeneratorAndComparator instance. The former is used to compare Raw Key/Value pairs,
+ * while the later is used to create a an optimized key representation which can then be used for
+ * sorting within the segment as well as during the final merge sort.
+ *
+ * NOTE: IGNORE THE COMBINER OPTION FOR NOW. IT NEEDS MORE VETTING.
+ *
+ * @author rana
+ *
+ * @param <KeyType> WritableComparable Type you will be using to represent a key
+ * @param <ValueType> Writable Type you will be using to represent a value
+ */
+
+public class MergeSortSpillWriter<KeyType extends WritableComparable,ValueType extends Writable>
+ implements SpillWriter<KeyType,ValueType> {
+
+ public static final Log LOG = LogFactory.getLog(MergeSortSpillWriter.class);
+
+
+ private static int DEFAULT_SPILL_INDEX_BUFFER_SIZE = 1000000; // 100K * 4 = 40000K .. OR 40MB // memory usage
+ private static int SPILL_DATA_ITEM_AVG_SIZE = 400;
+ private static int DEFAULT_SPILL_DATA_BUFFER_SIZE = DEFAULT_SPILL_INDEX_BUFFER_SIZE * SPILL_DATA_ITEM_AVG_SIZE; // 1 *10(6) * 4 * 10(2) = 10(8) = 400MB // memory usage
+
+ // the number of index items we can store in ram
+ public static final String SPILL_INDEX_BUFFER_SIZE_PARAM = "commoncrawl.spill.indexbuffer.size";
+ // the size of the data buffer used to accumulate key value pairs during the sort / spill process
+ public static final String SPILL_DATA_BUFFER_SIZE_PARAM = "commoncrawl.spill.databuffer.size";
+
+ FileSystem _tempDataFileSystem;
+ Configuration _conf;
+
+ int _spillIndexBuffer[];
+ ByteBuffer _spillDataBuffer;
+ byte[] _spillDataBufferBytes;
+ int _spillItemCount = 0;
+ private NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+ Vector<Path> _mergeSegements = new Vector<Path>();
+ RawKeyValueComparator<KeyType,ValueType> _comparator = null;
+ OptimizedKeyGeneratorAndComparator<KeyType, ValueType> _optimizedKeyGenerator = null;
+ OptimizedKey _optimizedKey = null;
+ SpillValueCombiner<KeyType,ValueType> _optCombiner = null;
+
+
+ RawDataSpillWriter<KeyType,ValueType> _outputSpillWriter = null;
+ Path _temporaryDirectoryPath;
+ Class<KeyType> _keyClass;
+ Class<ValueType> _valueClass;
+ DataOutputStream _outputStream;
+ DataInputStream _inputStream;
+ boolean _compressOutput = false;
+ Reporter _reporter;
+ int _spillIndexBufferSize = -1;
+ int _spillDataBufferSize = -1;
+
+
+ /**
+ * use this constructor for creating a merger that can use an optinal combiner
+ * but one that does not user optimized generated keys
+ *
+ * @param conf
+ * @param outputSpillWriter
+ * @param tempDataFileSystem
+ * @param tempDirectoryPath
+ * @param comparator
+ * @param optCombiner
+ * @param keyClass
+ * @param valueClass
+ * @param compressOutput
+ * @param reporter
+ * @throws IOException
+ */
+ public MergeSortSpillWriter(
+ Configuration conf,
+ RawDataSpillWriter<KeyType,ValueType> outputSpillWriter,
+ FileSystem tempDataFileSystem,
+ Path tempDirectoryPath,
+ SpillValueCombiner<KeyType,ValueType> optionalCombiner,
+ RawKeyValueComparator<KeyType,ValueType> comparator,
+ Class<KeyType> keyClass,
+ Class<ValueType> valueClass,
+ boolean compressOutput,
+ Reporter reporter
+ ) throws IOException {
+
+ init(conf,outputSpillWriter,tempDataFileSystem,tempDirectoryPath,comparator,null,optionalCombiner,keyClass,valueClass,compressOutput,reporter);
+ }
+
+ /**
+ * use this constructor for creating a merger that uses an optimized key generator
+ *
+ *
+ * @param conf
+ * @param outputSpillWriter
+ * @param tempDataFileSystem
+ * @param tempDirectoryPath
+ * @param keyGenerator
+ * @param keyClass
+ * @param valueClass
+ * @param compressOutput
+ * @param reporter
+ * @throws IOException
+ */
+ public MergeSortSpillWriter(
+ Configuration conf,
+ RawDataSpillWriter<KeyType,ValueType> outputSpillWriter,
+ FileSystem tempDataFileSystem,
+ Path tempDirectoryPath,
+ OptimizedKeyGeneratorAndComparator<KeyType, ValueType> keyGenerator,
+ Class<KeyType> keyClass,
+ Class<ValueType> valueClass,
+ boolean compressOutput,
+ Reporter reporter
+ ) throws IOException {
+
+ init(conf,outputSpillWriter,tempDataFileSystem,tempDirectoryPath,null,keyGenerator,null,keyClass,valueClass,compressOutput,reporter);
+ }
+
+
+ private void init(
+ Configuration conf,
+ RawDataSpillWriter<KeyType,ValueType> outputSpillWriter,
+ FileSystem tempDataFileSystem,
+ Path tempDirectoryPath,
+ RawKeyValueComparator<KeyType,ValueType> comparator,
+ OptimizedKeyGeneratorAndComparator<KeyType, ValueType> optionalGenerator,
+ SpillValueCombiner<KeyType,ValueType> optCombiner,
+ Class<KeyType> keyClass,
+ Class<ValueType> valueClass,
+ boolean compressOutput,
+ Reporter reporter
+ ) throws IOException {
+
+ _tempDataFileSystem = tempDataFileSystem;
+ _conf = conf;
+ _comparator = comparator;
+ _optimizedKeyGenerator = optionalGenerator;
+ if (_optimizedKeyGenerator != null) {
+ _optimizedKey = new OptimizedKey(_optimizedKeyGenerator.getGeneratedKeyType());
+ }
+ _optCombiner = optCombiner;
+ _outputSpillWriter = outputSpillWriter;
+ _keyClass = keyClass;
+ _valueClass = valueClass;
+ _temporaryDirectoryPath = new Path(tempDirectoryPath,Long.toString(Thread.currentThread().getId()) + "-" + System.currentTimeMillis());
+ _tempDataFileSystem.delete(_temporaryDirectoryPath,true);
+ _tempDataFileSystem.mkdirs(_temporaryDirectoryPath);
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ _compressOutput = compressOutput;
+ _reporter = reporter;
+ // ok look up buffer sizes
+ _spillIndexBufferSize = _conf.getInt(SPILL_INDEX_BUFFER_SIZE_PARAM, DEFAULT_SPILL_INDEX_BUFFER_SIZE);
+ LOG.info("SpillIndexBufferSize:" + _spillIndexBufferSize);
+ _spillDataBufferSize = _conf.getInt(SPILL_DATA_BUFFER_SIZE_PARAM, DEFAULT_SPILL_DATA_BUFFER_SIZE);
+ LOG.info("SpillDataBufferSize:" + _spillDataBufferSize);
+ // now allocate memory ...
+ _spillIndexBuffer = new int[_spillIndexBufferSize];
+ _spillDataBuffer = ByteBuffer.allocate(_spillDataBufferSize);
+ _spillDataBufferBytes = _spillDataBuffer.array();
+ _spillDataBuffer.clear();
+ // initialize streams ...
+ _outputStream = new DataOutputStream(newOutputStream(_spillDataBuffer));
+ _inputStream = new DataInputStream(newInputStream(_spillDataBuffer));
+ }
+
+ private void freeMemory() {
+ _spillDataBuffer = null;
+ _spillIndexBuffer = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ LOG.info("Entering flushAndClose");
+ if (_spillItemCount == 0 && _mergeSegements.size() == 0) {
+ LOG.info("No Data to Merge. Exiting Prematurely");
+
+ freeMemory();
+ // nothing to do.. return to caller..
+ return;
+ }
+
+ // first check to see if anything left to sort
+ if (_spillItemCount != 0) {
+
+ LOG.info("Trailing Spill Data Items of Count:" + _spillItemCount);
+
+ // if no other merge segments ... spill directly to outer spill writer ...
+ if (_mergeSegements.size() == 0) {
+ LOG.info("Merge Segment Count is zero. Sorting and Spilling to output file directly");
+ // go ahead and sort and then spill the buffered data
+ sortAndSpill(_outputSpillWriter);
+ // free memory ...
+ freeMemory();
+ // and return
+ return;
+ }
+ // otherwise do the normal spill to temporary file ...
+ else {
+ LOG.info("Merge Segment Count non-zero. Sorting and Spilling to temporary file");
+ sortAndSpill(null);
+ }
+
+ _spillItemCount = 0;
+
+ }
+
+ if (_mergeSegements.size() != 0) {
+ // now check to see how many spill files we have ...
+ if (_mergeSegements.size() == 1) {
+ // this is an error, since the check above should have flushed
+ // directly to to output spill writer for the single segment case
+ throw new IOException("Improper merge segment count. Expected >1 got 1");
+ }
+
+ LOG.info("Merging " + _mergeSegements.size() + " Segments");
+
+
+ // do the final merge sort of the resulting paths ...
+ SequenceFileMerger<KeyType,ValueType> merger = null;
+
+ if (_optimizedKeyGenerator != null) {
+ merger = new SequenceFileMerger<KeyType,ValueType>(
+ _tempDataFileSystem,
+ _conf,
+ _mergeSegements,
+ _outputSpillWriter,
+ _keyClass,
+ _valueClass,
+ _optimizedKeyGenerator
+ );
+ }
+ else {
+ merger = new SequenceFileMerger<KeyType,ValueType>(
+ _tempDataFileSystem,
+ _conf,
+ _mergeSegements,
+ _outputSpillWriter,
+ _keyClass,
+ _valueClass,
+ _optCombiner,
+ _comparator
+ );
+ }
+
+ try {
+ long mergeStartTime = System.currentTimeMillis();
+ LOG.info("Starting Final Merge");
+ // do the final merge and spill
+ merger.mergeAndSpill(_reporter);
+ long mergeEndTime = System.currentTimeMillis();
+
+ LOG.info("Final Merge took:" + (mergeEndTime - mergeStartTime));
+ }
+ catch (IOException e) {
+ LOG.error(CCStringUtils.stringifyException(e));
+ throw e;
+ }
+ finally {
+ try {
+ merger.close();
+ }
+ catch (IOException e) {
+ LOG.error(CCStringUtils.stringifyException(e));
+ throw e;
+ }
+ finally {
+ LOG.info("Deleting temporary files");
+ for (Path path : _mergeSegements) {
+ // and delete temp directory ...
+ FileUtils.recursivelyDeleteFile(new File(path.toString()));
+ }
+
+ _tempDataFileSystem.delete(_temporaryDirectoryPath,true);
+
+ _mergeSegements.clear();
+ }
+ }
+
+ }
+ }
+
+ private Path getNextSpillFilePath() {
+ return new Path(_temporaryDirectoryPath,"part-" + NUMBER_FORMAT.format(_mergeSegements.size()));
+ }
+
+
+ /** sort the current set of buffered records and spill **/
+ private void sortAndSpill(RawDataSpillWriter<KeyType, ValueType> optionalWriter) throws IOException {
+
+ // get byte pointer
+ byte[] bufferAsBytes = _spillDataBuffer.array();
+
+ long sortAndSpillTime = System.currentTimeMillis();
+ long sortTimeStart = System.currentTimeMillis();
+ // merge items in buffer
+ if (_optimizedKeyGenerator != null) {
+
+ if (_optimizedKey.getKeyType() == OptimizedKey.KEY_TYPE_LONG) {
+ LOG.info("Sorting:" + _spillItemCount + " Items Using Optimized Long Comparator");
+ sortUsingOptimizedLongComparator(_spillIndexBuffer, 0,_spillItemCount);
+ }
+ else if (_optimizedKey.getKeyType() == OptimizedKey.KEY_TYPE_BUFFER) {
+ LOG.info("Sorting:" + _spillItemCount + " Items Using Optimized Buffer Comparator");
+ sortUsingOptimizedBufferKeys(bufferAsBytes,_spillIndexBuffer,0,_spillItemCount);
+ }
+ else if (_optimizedKey.getKeyType() == OptimizedKey.KEY_TYPE_LONG_AND_BUFFER) {
+ LOG.info("Sorting:" + _spillItemCount + " Items Using Optimized Long And Buffer Only Comparator");
+ sortUsingOptimizedLongAndBufferKeys(bufferAsBytes,_spillIndexBuffer,0,_spillItemCount);
+ }
+ else {
+ throw new IOException("Unknown Optimized Key Type!");
+ }
+ }
+ else {
+ LOG.info("Sorting:" + _spillItemCount + " Items Using Raw Comparator");
+ sortUsingRawComparator(bufferAsBytes,_spillIndexBuffer, 0, _spillItemCount);
+ }
+
+ LOG.info("Sort Took:" + (System.currentTimeMillis() - sortTimeStart));
+
+ Path spillFilePath = getNextSpillFilePath();
+
+ // figure out if we are writing to temporary spill writer or passed in spill writer ...
+ RawDataSpillWriter spillWriter = null;
+
+ boolean directSpillOutput = false;
+
+ if (optionalWriter != null) {
+ LOG.info("Writing spill output directory to final spill writer");
+ spillWriter = optionalWriter;
+ directSpillOutput = true;
+ }
+ else {
+ LOG.info("Writing spill output to temporary spill file:" + spillFilePath);
+ spillWriter = new SequenceFileSpillWriter<KeyType,ValueType>(_tempDataFileSystem,_conf,spillFilePath,_keyClass,_valueClass,null,_compressOutput);
+ }
+
+ long spillTimeStart = System.currentTimeMillis();
+
+ try {
+
+ // iterated sorted items ...
+ int i=0;
+ int dataOffset=0;
+ int keyLen = 0;
+ int keyPos = 0;
+ int valueLen = 0;
+ int valuePosition = 0;
+ int optimizedBufferLen = 0;
+
+
+ for (i=0;i<_spillItemCount;++i) {
+ try {
+ dataOffset = _spillIndexBuffer[i];
+ _spillDataBuffer.position(dataOffset);
+
+
+ // if optimized key ... we need to write optimized key value as well as regular key value ...
+ if (_optimizedKeyGenerator != null) {
+ // init optimized key length
+ _optimizedKey.readHeader(_inputStream);
+ optimizedBufferLen = _optimizedKey.getDataBufferSize();
+ }
+ else {
+ optimizedBufferLen = 0;
+ }
+ // now read in key length
+ keyLen = _spillDataBuffer.getInt();
+ // mark key position
+ keyPos = _spillDataBuffer.position();
+ // now skip past key length
+ _spillDataBuffer.position(keyPos + keyLen);
+ // read value length
+ valueLen = _spillDataBuffer.getInt();
+ // mark value position
+ valuePosition = _spillDataBuffer.position();
+ // now skip past it (and optimized key data)...
+ _spillDataBuffer.position(valuePosition + valueLen + optimizedBufferLen);
+
+ //LOG.info("Spilling Raw Record: startPos:" + dataOffset + " optKeyBufferSize:" + optimizedBufferLen + " keySize:" + keyLen + " keyDataPos:" + keyPos+ " valueSize:" + valueLen + " valuePos:" + valuePosition);
+
+
+ // now write this out to the sequence file ...
+
+ // if direct spill, we don't write lengths and optimized bits to stream ...
+ if (directSpillOutput || _optimizedKeyGenerator == null) {
+ spillWriter.spillRawRecord(
+ bufferAsBytes,
+ keyPos,
+ keyLen,
+ bufferAsBytes,
+ valuePosition,
+ valueLen);
+ }
+ // otherwise ... in the optimized key case ...
+ else {
+ spillWriter.spillRawRecord(
+ bufferAsBytes,
+ dataOffset ,
+ keyLen + _optimizedKey.getHeaderSize() + 4,
+ bufferAsBytes,
+ valuePosition,
+ valueLen + optimizedBufferLen);
+ }
+ // increment progress...
+ if (i % 100000 == 0) {
+ LOG.info("Spilled " + i + " Records");
+ if (_reporter != null) {
+ _reporter.progress();
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Error in Iteration. "+
+ " DataOffset:" + dataOffset +
+ " index:" + i +
+ " keyLen:" + keyLen +
+ " keyPos:" + keyPos+
+ " valueLen:" + valueLen+
+ " valuePos:" + valuePosition+
+ " spillDataBufferSize:" + _spillDataBuffer.capacity()+
+ " spillDataPosition:" + _spillDataBuffer.position()+
+ " spillDataRemaining:" + _spillDataBuffer.remaining());
+
+ CCStringUtils.stringifyException(e);
+ throw new IOException(e);
+ }
+ }
+
+ }
+ finally {
+ if (spillWriter != optionalWriter) {
+ //LOG.info("Closing Spill File");
+ spillWriter.close();
+ }
+ }
+ LOG.info("Spill Took:" + (System.currentTimeMillis() - spillTimeStart));
+
+ if (spillWriter != optionalWriter) {
+ //LOG.info("Adding spill file to merge segment list");
+ // add to spill file vector
+ _mergeSegements.add(spillFilePath);
+ }
+
+ LOG.info("Spill and Sort for:" + _spillItemCount + " Took:" + (System.currentTimeMillis() - sortAndSpillTime));
+ // reset spill data buffer
+ _spillDataBuffer.position(0);
+ // reset spill item count
+ _spillItemCount = 0;
+ }
+
+
+ static final int getIntB(byte[] bb,int offset) {
+ return (int)((((bb[offset + 0] & 0xff) << 24) |
+ ((bb[offset + 1] & 0xff) << 16) |
+ ((bb[offset + 2] & 0xff) << 8) |
+ ((bb[offset + 3] & 0xff) << 0)));
+ }
+
+ private static final int compareUsingRawComparator(RawKeyValueComparator comparator,byte[] keyValueData1,int offset1,byte[] keyValueData2,int offset2) throws IOException {
+
+ ByteBuffer buffer = ByteBuffer.wrap(keyValueData1);
+ int testKey1Length = buffer.getInt();
+ int key1Length = getIntB(keyValueData1,offset1);
+ int key1Offset = offset1 + 4;
+ int value1Offset = key1Offset + key1Length + 4;
+
+ int value1Length = getIntB(keyValueData1,key1Offset + key1Length);
+
+
+ int key2Length = getIntB(keyValueData2,offset2);
+ int key2Offset = offset2 + 4;
+ int value2Offset = key2Offset + key2Length + 4;
+ int value2Length = getIntB(keyValueData2,key2Offset + key2Length);
+
+ return comparator.compareRaw(keyValueData1, key1Offset, key1Length, keyValueData2, key2Offset, key2Length, keyValueData1, value1Offset, value1Length, keyValueData2, value2Offset, value2Length);
+ }
+
+ /**
+ * Returns the index of the median of the three indexed longs.
+ */
+ private final int med3UsingOptimizedComparatorWithLongs(int x[], int a, int b, int c)throws IOException {
+
+ long aValue = _spillDataBuffer.getLong(x[a]);
+ long bValue = _spillDataBuffer.getLong(x[b]);
+ long cValue = _spillDataBuffer.getLong(x[c]);
+
+ return (aValue < bValue ?
+ (bValue < cValue ? b : aValue < cValue ? c : a) :
+ (bValue > cValue ? b : aValue > cValue ? c : a));
+ }
+
+ /**
+ * Returns the index of the median of the three elements using raw key comparator.
+ */
+ private final int med3UsingRawComparator(byte[] spillData,int x[], int a, int b, int c)throws IOException {
+
+ return (
+ compareUsingRawComparator(_comparator,spillData,x[a], spillData, x[b]) < 0 ? // x[a] < x[b] ?
+ ((compareUsingRawComparator(_comparator,spillData,x[b], spillData, x[c]) < 0) ? b :
+ (compareUsingRawComparator(_comparator,spillData,x[a], spillData, x[c]) < 0) ? c: a) : // (x[b] < x[c] ? b : x[a] < x[c] ? c : a) :
+ ((compareUsingRawComparator(_comparator,spillData,x[b], spillData, x[c]) > 0) ? b :
+ (compareUsingRawComparator(_comparator,spillData,x[a], spillData, x[c]) > 0) ? c : a)); // (x[b] > x[c] ? b : x[a] > x[c] ? c : a));
+
+ }
+
+ /**
+ * Returns the index of the median of the three elements using optimized long and buffer key comparator.
+ */
+ private final int med3UsingOptimizedComparatorWithLongsAndBuffer(byte[] spillData,int x[], int a, int b, int c)throws IOException {
+
+ return (
+ compareUsingOptimizedRawLongAndBufferValues(spillData,x[a], x[b]) < 0 ? // x[a] < x[b] ?
+ ((compareUsingOptimizedRawLongAndBufferValues(spillData,x[b], x[c]) < 0) ? b :
+ (compareUsingOptimizedRawLongAndBufferValues(spillData,x[a], x[c]) < 0) ? c: a) : // (x[b] < x[c] ? b : x[a] < x[c] ? c : a) :
+ ((compareUsingOptimizedRawLongAndBufferValues(spillData,x[b], x[c]) > 0) ? b :
+ (compareUsingOptimizedRawLongAndBufferValues(spillData,x[a], x[c]) > 0) ? c : a)); // (x[b] > x[c] ? b : x[a] > x[c] ? c : a));
+
+ }
+
+ /**
+ * Returns the index of the median of the three elements using optimized buffer key comparator.
+ */
+ private final int med3UsingOptimizedComparatorWithBuffer(byte[] spillData,int x[], int a, int b, int c)throws IOException {
+
+ return (
+ compareUsingOptimizedRawBufferValues(spillData,x[a], x[b]) < 0 ? // x[a] < x[b] ?
+ ((compareUsingOptimizedRawBufferValues(spillData,x[b], x[c]) < 0) ? b :
+ (compareUsingOptimizedRawBufferValues(spillData,x[a], x[c]) < 0) ? c: a) : // (x[b] < x[c] ? b : x[a] < x[c] ? c : a) :
+ ((compareUsingOptimizedRawBufferValues(spillData,x[b], x[c]) > 0) ? b :
+ (compareUsingOptimizedRawBufferValues(spillData,x[a], x[c]) > 0) ? c : a)); // (x[b] > x[c] ? b : x[a] > x[c] ? c : a));
+
+ }
+
+
+ /**
+ * Sorts the specified sub-array of longs into ascending order.
+ *
+ * borrowed from the Arrays implementaiton
+ *
+ * The sorting algorithm is a tuned quicksort, adapted from Jon
+ * L. Bentley and M. Douglas McIlroy's "Engineering a Sort Function",
+ * Software-Practice and Experience, Vol. 23(11) P. 1249-1265 (November
+ * 1993). This algorithm offers n*log(n) performance on many data sets
+ * that cause other quicksorts to degrade to quadratic performance.
+ *
+ */
+ private void sortUsingRawComparator(byte[] dataBytes,int x[], int off, int len) throws IOException {
+ byte spillData[] = _spillDataBuffer.array();
+
+// Insertion sort on smallest arrays
+ if (len < 7) {
+ for (int i=off; i<len+off; i++)
+ // for (int j=i; j>off && x[j-1]>x[j]; j--)
+ for (int j=i; j>off && compareUsingRawComparator(_comparator,spillData,x[j-1], spillData, x[j]) > 0; j--)
+ swap(x, j, j-1);
+ return;
+ }
+
+ // figure out what a partition element
+ int m = off + (len >> 1); // for small arrays, take middle element
+ if (len > 7) {
+ int l = off;
+ int n = off + len - 1;
+ if (len > 40) { // for big arrays, take pseudo-median of 9
+ int s = len/8;
+ l = med3UsingRawComparator(dataBytes,x, l, l+s, l+2*s);
+ m = med3UsingRawComparator(dataBytes,x, m-s, m, m+s);
+ n = med3UsingRawComparator(dataBytes,x, n-2*s, n-s, n);
+ }
+ m = med3UsingRawComparator(dataBytes,x, l, m, n); // for mid-sized arrays, take median of 3
+ }
+ int vOffset = x[m];
+
+ // Establish Invariant: v* (<v)* (>v)* v*
+ int a = off, b = a, c = off + len - 1, d = c;
+ while(true) {
+ // while (b <= c && x[b] <= v) {
+ while (b <= c && compareUsingRawComparator(_comparator,spillData,x[b], spillData, vOffset) <=0) {
+ // if (x[b] == v)
+ if (compareUsingRawComparator(_comparator,spillData,x[b], spillData, vOffset) == 0)
+ swap(x, a++, b);
+ b++;
+ }
+ // while (c >= b && x[c] >= v) {
+ while (c >= b && compareUsingRawComparator(_comparator,spillData,x[c], spillData, vOffset) >= 0) {
+ // if (x[c] == v)
+ if (compareUsingRawComparator(_comparator,spillData,x[c], spillData, vOffset) == 0)
+ swap(x, c, d--);
+ c--;
+ }
+ if (b > c)
+ break;
+ swap(x, b++, c--);
+ }
+
+ // Swap partition elements back to middle
+ int s, n = off + len;
+ s = Math.min(a-off, b-a ); vecswap(x, off, b-s, s);
+ s = Math.min(d-c, n-d-1); vecswap(x, b, n-s, s);
+
+ // Recursively sort non-partition-elements
+ if ((s = b-a) > 1)
+ sortUsingRawComparator(dataBytes,x, off, s);
+ if ((s = d-c) > 1)
+ sortUsingRawComparator(dataBytes,x, n-s, s);
+ }
+
+ /**
+ * Sorts the specified sub-array of longs into ascending order.
+ *
+ * borrowed from the Arrays implementaiton
+ *
+ * The sorting algorithm is a tuned quicksort, adapted from Jon
+ * L. Bentley and M. Douglas McIlroy's "Engineering a Sort Function",
+ * Software-Practice and Experience, Vol. 23(11) P. 1249-1265 (November
+ * 1993). This algorithm offers n*log(n) performance on many data sets
+ * that cause other quicksorts to degrade to quadratic performance.
+ *
+ */
+ private void sortUsingOptimizedLongComparator(int x[], int off, int len) throws IOException {
+
+// Insertion sort on smallest arrays
+ if (len < 7) {
+ for (int i=off; i<len+off; i++)
+ // for (int j=i; j>off && x[j-1]>x[j]; j--)
+ for (int j=i; j>off && _spillDataBuffer.getLong(x[j-1]) > _spillDataBuffer.getLong(x[j]); j--)
+ swap(x, j, j-1);
+ return;
+ }
+
+// Choose a partition element, v
+ int m = off + (len >> 1); // Small arrays, middle element
+ if (len > 7) {
+ int l = off;
+ int n = off + len - 1;
+ if (len > 40) { // Big arrays, pseudomedian of 9
+ int s = len/8;
+ l = med3UsingOptimizedComparatorWithLongs(x, l, l+s, l+2*s);
+ m = med3UsingOptimizedComparatorWithLongs(x, m-s, m, m+s);
+ n = med3UsingOptimizedComparatorWithLongs(x, n-2*s, n-s, n);
+ }
+ m = med3UsingOptimizedComparatorWithLongs(x, l, m, n); // Mid-size, med of 3
+ }
+
+ long v = _spillDataBuffer.getLong(x[m]);
+ // LOG.info("Debug:x[" + m + "]=" + v);
+
+// Establish Invariant: v* (<v)* (>v)* v*
+ int a = off, b = a, c = off + len - 1, d = c;
+ while(true) {
+ // while (b <= c && x[b] <= v) {
+
+ while (b <= c && _spillDataBuffer.getLong(x[b]) <= v) {
+ //LOG.info("Debug:x[" + b + "]=" + _spillDataBuffer.getLong(x[b]));
+ // if (x[b] == v)
+ if (_spillDataBuffer.getLong(x[b]) == v)
+ swap(x, a++, b);
+ b++;
+ }
+ // while (c >= b && x[c] >= v) {
+ while (c >= b && _spillDataBuffer.getLong(x[c]) >= v) {
+ //LOG.info("Debug:x[" + c + "]=" + _spillDataBuffer.getLong(x[c]));
+ // if (x[c] == v)
+ if (_spillDataBuffer.getLong(x[c]) == v)
+ swap(x, c, d--);
+ c--;
+ }
+ if (b > c)
+ break;
+ swap(x, b++, c--);
+ }
+
+// Swap partition elements back to middle
+ int s, n = off + len;
+ s = Math.min(a-off, b-a ); vecswap(x, off, b-s, s);
+ s = Math.min(d-c, n-d-1); vecswap(x, b, n-s, s);
+
+// Recursively sort non-partition-elements
+ if ((s = b-a) > 1)
+ sortUsingOptimizedLongComparator(x, off, s);
+ if ((s = d-c) > 1)
+ sortUsingOptimizedLongComparator(x, n-s, s);
+ }
+
+ /**
+ * Sorts the specified sub-array of longs into ascending order.
+ *
+ * borrowed from the Arrays implementaiton
+ *
+ * The sorting algorithm is a tuned quicksort, adapted from Jon
+ * L. Bentley and M. Douglas McIlroy's "Engineering a Sort Function",
+ * Software-Practice and Experience, Vol. 23(11) P. 1249-1265 (November
+ * 1993). This algorithm offers n*log(n) performance on many data sets
+ * that cause other quicksorts to degrade to quadratic performance.
+ *
+ */
+
+ DataInputBuffer _reader1 = new DataInputBuffer();
+ DataInputBuffer _reader2 = new DataInputBuffer();
+
+
+ private final int compareUsingOptimizedRawLongAndBufferValues(byte[] dataAsBytes,int lValueDataOffset,int rValueDataOffset)throws IOException {
+ final long lValue = _spillDataBuffer.getLong(lValueDataOffset);
+ final long rValue = _spillDataBuffer.getLong(rValueDataOffset);
+ int buffer1Len = _spillDataBuffer.getInt(lValueDataOffset + 8);
+ int buffer2Len = _spillDataBuffer.getInt(rValueDataOffset + 8);
+ int buffer1Offset = _spillDataBuffer.getInt(lValueDataOffset + 12);
+ int buffer2Offset= _spillDataBuffer.getInt(rValueDataOffset + 12);
+
+ int result = (lValue > rValue) ? 1 : (lValue < rValue)
+ ? -1 : 0;
+ if (result == 0) {
+ return _optimizedKeyGenerator.compareOptimizedBufferKeys(dataAsBytes, lValueDataOffset + buffer1Offset, buffer1Len, dataAsBytes, rValueDataOffset + buffer2Offset, buffer2Len);
+ }
+ return result;
+ }
+
+ private final int compareUsingOptimizedRawBufferValues(byte[] dataAsBytes,int lValueDataOffset,int rValueDataOffset)throws IOException {
+ int buffer1Len = _spillDataBuffer.getInt(lValueDataOffset);
+ int buffer2Len = _spillDataBuffer.getInt(rValueDataOffset);
+ int buffer1Offset = _spillDataBuffer.getInt(lValueDataOffset + 4);
+ int buffer2Offset= _spillDataBuffer.getInt(rValueDataOffset + 4);
+
+ return _optimizedKeyGenerator.compareOptimizedBufferKeys(dataAsBytes, lValueDataOffset + buffer1Offset, buffer1Len, dataAsBytes, rValueDataOffset + buffer2Offset, buffer2Len);
+ }
+
+ /**
+ * sort optimized long and buffer keys
+ * @param dataBytes
+ * @param x
+ * @param off
+ * @param len
+ * @throws IOException
+ */
+ private void sortUsingOptimizedLongAndBufferKeys(byte[] dataBytes,int x[], int off, int len) throws IOException {
+
+ byte spillData[] = _spillDataBuffer.array();
+
+// Insertion sort on smallest arrays
+ if (len < 7) {
+ for (int i=off; i<len+off; i++)
+ // for (int j=i; j>off && x[j-1]>x[j]; j--)
+ for (int j=i; j>off && compareUsingOptimizedRawLongAndBufferValues(spillData,x[j-1], x[j]) > 0; j--)
+ swap(x, j, j-1);
+ return;
+ }
+
+ // figure out what a partition element
+ int m = off + (len >> 1); // for small arrays, take middle element
+ if (len > 7) {
+ int l = off;
+ int n = off + len - 1;
+ if (len > 40) { // for big arrays, take pseudo-median of 9
+ int s = len/8;
+ l = med3UsingOptimizedComparatorWithLongsAndBuffer(dataBytes,x, l, l+s, l+2*s);
+ m = med3UsingOptimizedComparatorWithLongsAndBuffer(dataBytes,x, m-s, m, m+s);
+ n = med3UsingOptimizedComparatorWithLongsAndBuffer(dataBytes,x, n-2*s, n-s, n);
+ }
+ m = med3UsingOptimizedComparatorWithLongsAndBuffer(dataBytes,x, l, m, n); // for mid-sized arrays, take median of 3
+ }
+ int vOffset = x[m];
+
+ // Establish Invariant: v* (<v)* (>v)* v*
+ int a = off, b = a, c = off + len - 1, d = c;
+ while(true) {
+ // while (b <= c && x[b] <= v) {
+ while (b <= c && compareUsingOptimizedRawLongAndBufferValues(spillData,x[b], vOffset) <=0) {
+ // if (x[b] == v)
+ if (compareUsingOptimizedRawLongAndBufferValues(spillData,x[b],vOffset) == 0)
+ swap(x, a++, b);
+ b++;
+ }
+ // while (c >= b && x[c] >= v) {
+ while (c >= b && compareUsingOptimizedRawLongAndBufferValues(spillData,x[c],vOffset) >= 0) {
+ // if (x[c] == v)
+ if (compareUsingOptimizedRawLongAndBufferValues(spillData,x[c], vOffset) == 0)
+ swap(x, c, d--);
+ c--;
+ }
+ if (b > c)
+ break;
+ swap(x, b++, c--);
+ }
+
+ // Swap partition elements back to middle
+ int s, n = off + len;
+ s = Math.min(a-off, b-a ); vecswap(x, off, b-s, s);
+ s = Math.min(d-c, n-d-1); vecswap(x, b, n-s, s);
+
+ // Recursively sort non-partition-elements
+ if ((s = b-a) > 1)
+ sortUsingOptimizedLongAndBufferKeys(dataBytes,x, off, s);
+ if ((s = d-c) > 1)
+ sortUsingOptimizedLongAndBufferKeys(dataBytes,x, n-s, s);
+ }
+
+
+ /**
+ * sort optimized long and buffer keys
+ * @param dataBytes
+ * @param x
+ * @param off
+ * @param len
+ * @throws IOException
+ */
+ private void sortUsingOptimizedBufferKeys(byte[] dataBytes,int x[], int off, int len) throws IOException {
+
+ byte spillData[] = _spillDataBuffer.array();
+
+// Insertion sort on smallest arrays
+ if (len < 7) {
+ for (int i=off; i<len+off; i++)
+ // for (int j=i; j>off && x[j-1]>x[j]; j--)
+ for (int j=i; j>off && compareUsingOptimizedRawBufferValues(spillData,x[j-1], x[j]) > 0; j--)
+ swap(x, j, j-1);
+ return;
+ }
+
+ // figure out what a partition element
+ int m = off + (len >> 1); // for small arrays, take middle element
+ if (len > 7) {
+ int l = off;
+ int n = off + len - 1;
+ if (len > 40) { // for big arrays, take pseudo-median of 9
+ int s = len/8;
+ l = med3UsingOptimizedComparatorWithBuffer(dataBytes,x, l, l+s, l+2*s);
+ m = med3UsingOptimizedComparatorWithBuffer(dataBytes,x, m-s, m, m+s);
+ n = med3UsingOptimizedComparatorWithBuffer(dataBytes,x, n-2*s, n-s, n);
+ }
+ m = med3UsingOptimizedComparatorWithBuffer(dataBytes,x, l, m, n); // for mid-sized arrays, take median of 3
+ }
+ int vOffset = x[m];
+
+ // Establish Invariant: v* (<v)* (>v)* v*
+ int a = off, b = a, c = off + len - 1, d = c;
+ while(true) {
+ // while (b <= c && x[b] <= v) {
+ while (b <= c && compareUsingOptimizedRawBufferValues(spillData,x[b], vOffset) <=0) {
+ // if (x[b] == v)
+ if (compareUsingOptimizedRawBufferValues(spillData,x[b],vOffset) == 0)
+ swap(x, a++, b);
+ b++;
+ }
+ // while (c >= b && x[c] >= v) {
+ while (c >= b && compareUsingOptimizedRawBufferValues(spillData,x[c],vOffset) >= 0) {
+ // if (x[c] == v)
+ if (compareUsingOptimizedRawBufferValues(spillData,x[c], vOffset) == 0)
+ swap(x, c, d--);
+ c--;
+ }
+ if (b > c)
+ break;
+ swap(x, b++, c--);
+ }
+
+ // Swap partition elements back to middle
+ int s, n = off + len;
+ s = Math.min(a-off, b-a ); vecswap(x, off, b-s, s);
+ s = Math.min(d-c, n-d-1); vecswap(x, b, n-s, s);
+
+ // Recursively sort non-partition-elements
+ if ((s = b-a) > 1)
+ sortUsingOptimizedBufferKeys(dataBytes,x, off, s);
+ if ((s = d-c) > 1)
+ sortUsingOptimizedBufferKeys(dataBytes,x, n-s, s);
+ }
+
+
+ /**
+ * Swaps x[a] with x[b].
+ */
+ private static void swap(int x[], int a, int b) {
+ //LOG.info("Swapping [" + a + "][" + _spillDataBuffer.getLong(x[a]) +"] and [" + b + "][" + _spillDataBuffer.getLong(x[b]) + "]");
+ int t = x[a];
+ x[a] = x[b];
+ x[b] = t;
+ }
+
+ /**
+ * Swaps x[a .. (a+n-1)] with x[b .. (b+n-1)].
+ */
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i=0; i<n; i++, a++, b++)
+ swap(x, a, b);
+ }
+
+ /**
+ * Spill a Raw Record (support for TFile)
+ */
+ /*
+ //DISABLE THIS CODE UNTIL WE CAN DEVELOP A UNIT TEST TO VALIDATE IT
+ public void spillRawRecord(DataInputStream keyStream,DataInputStream valueStream) throws IOException {
+ // if index is full , trigger merge as well
+ if (_spillItemCount == _spillIndexBufferSize) {
+ //LOG.info("Spill Item Count == " + SPILL_INDEX_BUFFER_SIZE + ". Flushing");
+ sortAndSpill(null);
+ }
+
+
+ boolean done = false;
+
+ // ok, writing from a stream is a little trickier, since we could overflow at any moment
+ // and (in the case of TFile) in some cases the input stream is not rewindable
+ boolean wroteKey = false;
+ int totalKeyBytesWritten = 0;
+ int keySizePos = 0;
+ boolean wroteValue = false;
+ int totalValueBytesWritten = 0;
+ int valueSizePos = 0;
+
+
+ while (!done) {
+
+ // mark buffer position ...
+ int startPositon = _spillDataBuffer.position();
+ //LOG.info("Buffer start position:" + startPositon);
+
+ boolean overflow = false;
+ try {
+ int optimizedKeyBufferSize = 0;
+ // if optimized comparator is available ...
+ if (_optimizedKeyGenerator != null) {
+ // we don't have access to key, value bits yet, so leave space for header bytes
+ _spillDataBuffer.position(startPositon + OptimizedKey.FIXED_HEADER_SIZE);
+ }
+ // save key size position
+ keySizePos = _spillDataBuffer.position();
+ // LOG.info("keySizePos:" + keySizePos);
+
+ // skip past key length
+ _spillDataBuffer.position(keySizePos + 4);
+ // and skip past key bytes already written (via overflow condition)..
+ _spillDataBuffer.position(_spillDataBuffer.position() + totalKeyBytesWritten);
+
+ // now if we did not finish writing key ...
+ if (!wroteKey) {
+ // write key into remaining bytes ...
+ int bufferBytesAvailable = _spillDataBuffer.remaining();
+ int keyBytesWritten = keyStream.read(_spillDataBufferBytes, _spillDataBuffer.position(), bufferBytesAvailable);
+ totalKeyBytesWritten += keyBytesWritten;
+ if (bufferBytesAvailable == keyBytesWritten) {
+ // ok we reached an overflow condition !!!
+ throw new BufferOverflowException();
+ }
+ else {
+ // advance cursor
+ _spillDataBuffer.position(_spillDataBuffer.position() + keyBytesWritten);
+ // ok we are done writing key ...
+ wroteKey = true;
+ }
+ }
+ // now save value size position
+ valueSizePos = _spillDataBuffer.position();
+ // and calculate key size
+ int keySize = valueSizePos - keySizePos - 4;
+ // reseek back
+ _spillDataBuffer.position(keySizePos);
+ // write out real key size
+ _spillDataBuffer.putInt(keySize);
+ // skip to value size position + 4
+ _spillDataBuffer.position(valueSizePos + 4);
+ // and skip past already written data (via overflow condition)
+ _spillDataBuffer.position(_spillDataBuffer.position() + totalValueBytesWritten);
+ // now if we did not finish writing value ...
+ if (!wroteValue) {
+ // write key into remaining bytes ...
+ int bufferBytesAvailable = _spillDataBuffer.remaining();
+ int valueBytesWritten = valueStream.read(_spillDataBufferBytes, _spillDataBuffer.position(), bufferBytesAvailable);
+ totalValueBytesWritten += valueBytesWritten;
+ if (bufferBytesAvailable == valueBytesWritten) {
+ // ok we reached an overflow condition !!!
+ throw new BufferOverflowException();
+ }
+ else {
+ // advance cursor
+ _spillDataBuffer.position(_spillDataBuffer.position() + valueBytesWritten);
+ // ok we are done writing value bytes ...
+ wroteValue = true;
+ }
+ }
+ // save end position
+ int endPosition = _spillDataBuffer.position();
+ // calculate value size
+ int valueSize = endPosition - valueSizePos - 4;
+ // reseek back to value size pos
+ _spillDataBuffer.position(valueSizePos);
+ // write value size
+ _spillDataBuffer.putInt(valueSize);
+ // ok now ... if optimized key generation is required ...
+ if (_optimizedKeyGenerator != null) {
+ // seek back to start position
+ _spillDataBuffer.position(startPositon);
+ // gen optimized key
+ _optimizedKeyGenerator.generateOptimizedKeyForRawPair(
+ _spillDataBufferBytes,
+ keySizePos+4,
+ totalKeyBytesWritten,
+ _spillDataBufferBytes,
+ valueSizePos+4,
+ totalValueBytesWritten,
+ _optimizedKey);
+ // write out optimized key header
+ // and calc trailing key buffer size
+ optimizedKeyBufferSize = _optimizedKey.writeHeaderToStream(_outputStream);
+ }
+ // seek forward to end position
+ _spillDataBuffer.position(endPosition);
+ // and now if there is optional optimized key buffer data ...
+ // append it at end
+ if (optimizedKeyBufferSize != 0) {
+ _optimizedKey.writeBufferToStream(_outputStream);
+ }
+ // store start position in index buffer
+ _spillIndexBuffer[_spillItemCount] = startPositon;
+ // increment ...
+ _spillItemCount++;
+
+ //LOG.info("startPos:" + startPositon + " optKeySize:" + optimizedKeySize + " keySizePos:" + keySizePos + " keySize:" + keySize + " valueSizePos:" + valueSizePos + " valueSize:" + valueSize);
+
+ done = true;
+ }
+ // trap for buffer overflow specifically
+ catch (IllegalArgumentException e) {
+ //LOG.info("IllegalArgumentException - Buffer Overflow detected while writing to Spill Data Buffer. Flushing");
+ overflow = true;
+ }
+ catch (BufferOverflowException e) {
+ //LOG.info("BufferOverflowException - Buffer Overflow detected while writing to Spill Data Buffer. Flushing");
+ overflow = true;
+ }
+ if (overflow) {
+ // if overflow happened with start position at zero... this is bad news.
+ // it means that key + value + optimized bits > spill data buffer size!
+ if (startPositon == 0) {
+ throw new IOException("FATAL: KeySize:" + totalKeyBytesWritten + " + ValueSize:" + totalValueBytesWritten + " GT SpillBufferSize:" + _spillDataBuffer.capacity());
+ }
+ // otherwise ..
+ // overflow in the stream case is a little tricky ...
+ // first rewind buffer position ...
+ _spillDataBuffer.position(startPositon);
+ // sort and spill whatever we did fit into the buffer ...
+ sortAndSpill(null);
+ // now, we need to reconstitute the buffer back to previous state
+ if (totalKeyBytesWritten != 0) {
+ // calculate offset ...
+ int newKeyBytesOffset = 4;
+ // if optimized ..
+ if (_optimizedKeyGenerator != null) {
+ newKeyBytesOffset += OptimizedKey.FIXED_HEADER_SIZE;
+ }
+ // copy bits
+ System.arraycopy(_spillDataBufferBytes, keySizePos + 4,_spillDataBufferBytes, newKeyBytesOffset, totalKeyBytesWritten);
+
+ // and copy value bits potentially
+ if (totalValueBytesWritten != 0) {
+ int newValueBytesOffset = newKeyBytesOffset + totalKeyBytesWritten + 4;
+ // copy bits
+ System.arraycopy(_spillDataBufferBytes, valueSizePos + 4,_spillDataBufferBytes, newValueBytesOffset, totalValueBytesWritten);
+ }
+ }
+
+ }
+ }
+ }
+ */
+
+ /**
+ * Spill a Typed Record
+ */
+ @Override
+ public void spillRecord(KeyType key, ValueType value) throws IOException {
+
+ // if index is full , trigger merge as well
+ if (_spillItemCount == _spillIndexBufferSize) {
+ //LOG.info("Spill Item Count == " + SPILL_INDEX_BUFFER_SIZE + ". Flushing");
+ sortAndSpill(null);
+ }
+
+
+ boolean done = false;
+
+ while (!done) {
+
+ // mark buffer position ...
+ int startPositon = _spillDataBuffer.position();
+ //LOG.info("Buffer start position:" + startPositon);
+
+ boolean overflow = false;
+ try {
+ // if optimized comparator is available ...
+ if (_optimizedKeyGenerator != null) {
+ // gen optimized key
+ _optimizedKeyGenerator.generateOptimizedKeyForPair(key, value,_optimizedKey);
+ // ok skip header size for now ...
+ _spillDataBuffer.position(_spillDataBuffer.position() + _optimizedKey.getHeaderSize());
+ }
+ // save key size position
+ int keySizePos = _spillDataBuffer.position();
+ // LOG.info("keySizePos:" + keySizePos);
+ // skip past key length
+ _spillDataBuffer.position(keySizePos + 4);
+ // next write key and value
+ key.write(_outputStream);
+ // now save value size position
+ int valueSizePos = _spillDataBuffer.position();
+ // and calculate key size
+ int keySize = valueSizePos - keySizePos - 4;
+ // reseek back
+ _spillDataBuffer.position(keySizePos);
+ // write out real key size
+ _spillDataBuffer.putInt(keySize);
+ // skip to value size position + 4
+ _spillDataBuffer.position(valueSizePos + 4);
+ // write out actual value
+ value.write(_outputStream);
+ // save end position
+ int endPosition = _spillDataBuffer.position();
+ // calculate value size
+ int valueSize = endPosition - valueSizePos - 4;
+ // reseek back to value size pos
+ _spillDataBuffer.position(valueSizePos);
+ // write value size
+ _spillDataBuffer.putInt(valueSize);
+ // seek forward to end position
+ _spillDataBuffer.position(endPosition);
+ // and now if there is optional optimized key buffer data ...
+ // append optimized key data buffer at end
+
+ if (_optimizedKeyGenerator != null) {
+ if (_optimizedKey.getDataBufferSize() != 0) {
+ // update relative positon of data buffer in key
+ _optimizedKey.setDataBufferOffset(_spillDataBuffer.position() - startPositon);
+ // write buffer to spill buffer
+ _optimizedKey.writeBufferToStream(_outputStream);
+ }
+ // ok now record final data position
+ int nextItemPosition = _spillDataBuffer.position();
+ // seek back to begining ...
+ _spillDataBuffer.position(startPositon);
+ // rewrite header
+ _optimizedKey.writeHeaderToStream(_outputStream);
+ // now back to next item position
+ _spillDataBuffer.position(nextItemPosition);
+ }
+ // store start position in index buffer
+ _spillIndexBuffer[_spillItemCount] = startPositon;
+ // increment ...
+ _spillItemCount++;
+
+ //LOG.info("startPos:" + startPositon + " optKeyBufferSize:" + optimizedKeyBufferSize + " keySizePos:" + keySizePos + " keySize:" + keySize + " valueSizePos:" + valueSizePos + " valueSize:" + valueSize);
+
+ done = true;
+ }
+ // trap for buffer overflow specifically
+ catch (IllegalArgumentException e) {
+ //LOG.info("IllegalArgumentException - Buffer Overflow detected while writing to Spill Data Buffer. Flushing");
+ overflow = true;
+ }
+ catch (BufferOverflowException e) {
+ //LOG.info("BufferOverflowException - Buffer Overflow detected while writing to Spill Data Buffer. Flushing");
+ overflow = true;
+ }
+ if (overflow) {
+ // reset to start position
+ _spillDataBuffer.position(startPositon);
+ // sort and spill
+ sortAndSpill(null);
+ }
+ }
+ }
+
+ private static OutputStream newOutputStream(final ByteBuffer buf) {
+ return new OutputStream() {
+
+ @Override
+ public void write(int b) throws IOException {
+ buf.put((byte) (b & 0xff));
+ }
+
+ public void write(byte src[], int off, int len) throws IOException {
+ buf.put(src, off, len);
+ }
+ };
+ }
+
+ public static InputStream newInputStream(final ByteBuffer buf) {
+ return new InputStream() {
+ public synchronized int read() throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get() & 0xff;
+ }
+
+ public synchronized int read(byte[] bytes, int off, int len) throws IOException {
+ // Read only what's left
+ len = Math.min(len, buf.remaining());
+ buf.get(bytes, off, len);
+ return len;
+ }
+ };
+ }
+
+}
396 src/org/commoncrawl/hadoop/mergeutils/MergeSortSpillWriterUnitTest.java
@@ -0,0 +1,396 @@
+package org.commoncrawl.hadoop.mergeutils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.commoncrawl.util.shared.CCStringUtils;
+
+/**
+ * A bunch of unit tests covering possible combinations of comparators.
+ *
+ * @author rana
+ *
+ */
+public class MergeSortSpillWriterUnitTest {
+
+ public static final Log LOG = LogFactory.getLog(MergeSortSpillWriterUnitTest.class);
+
+
+ static abstract class BaseTest {
+
+ TreeMap<Integer,Text> originalKeyValueMap = new TreeMap<Integer,Text>();
+ int index[];
+ String _testName = null;
+ int _keySetSize = -1;
+ int _indexBufferSize = -1;
+ int _dataBufferSize = -1;
+ int _spillBufferSize = -1;
+
+ public BaseTest(String testName,int keySetSize,int indexBufferSize,int dataBufferSize,int spillBufferSize) {
+ _testName = testName;
+ _keySetSize = keySetSize;
+ _indexBufferSize = indexBufferSize;
+ _dataBufferSize = dataBufferSize;
+ _spillBufferSize = spillBufferSize;
+ }
+
+ public void runTest() throws IOException {
+ LOG.info("*************** STARTING TEST:" + _testName);
+ LOG.info("Set Size:" + _keySetSize);
+ LOG.info("Index Buffer Size:" + _indexBufferSize);
+ LOG.info("Data Buffer Size:" + _dataBufferSize);
+ LOG.info("Spill Buffer Size:" + _spillBufferSize);
+ LOG.info("");
+
+ long testStartTime = System.currentTimeMillis();
+
+ // initialization here
+ // create an array of keys and an index into them ...
+ index= new int[_keySetSize];
+ for (int i=0;i<_keySetSize;++i) {
+ index[i] =i;
+ originalKeyValueMap.put(i, new Text(keyForNumber(i)));
+ }
+ // randomly shuffle the index
+ Random r = new Random();
+ // Shuffle array
+ for (int i=index.length; i>1; i--)
+ swap(index, i-1, r.nextInt(i));
+
+ // ok create a spill writer that validates position and value
+ RawDataSpillWriter<IntWritable,Text> validatingSpillWriter = new RawDataSpillWriter<IntWritable,Text> () {
+
+ int closeCount = 0;
+
+ // initial spill count to zero
+ int spilledKeyCount = 0;
+
+ @Override
+ public void close() throws IOException {
+ if (++closeCount > 1) {
+ throw new IOException("Close Called One Too Many Times!");
+ }
+ if (spilledKeyCount != index.length) {
+ throw new IOException("Spilled Key Count:" + spilledKeyCount + " Excpected Key Count:" + index.length);
+ }
+ }
+
+ @Override
+ public void spillRecord(IntWritable key, Text value) throws IOException {
+ //LOG.info("Got Key:" + key.get() + " Value:"+ value);
+ // if keys don't match ...
+ if (key.get() != spilledKeyCount) {
+ throw new IOException("Got Key:" + key.get() + " Expected Key:" + spilledKeyCount);
+ }
+ // ok keys match... check that values match ...
+ Text expectedValue = originalKeyValueMap.get(spilledKeyCount);
+ // ok validate expected value ..
+ if (expectedValue == null || value == null) {
+ throw new IOException("Null Expected or Incoming Value");
+ }
+ else {
+ if (expectedValue.compareTo(value) != 0) {
+ throw new IOException("Expected Value:" + expectedValue + " @index:" + spilledKeyCount + " differs from resulting value:" + value);
+ }
+ }
+ spilledKeyCount++;
+ }
+
+ DataInputBuffer keyReader = new DataInputBuffer();
+ DataInputBuffer valueReader = new DataInputBuffer();
+ IntWritable keyObject = new IntWritable();
+ Text valueObject = new Text();
+
+ @Override
+ public void spillRawRecord(byte[] keyData, int keyOffset,int keyLength, byte[] valueData, int valueOffset, int valueLength) throws IOException {
+ //LOG.info("Got Raw Record");
+ // initialize key / value readers .
+ keyReader.reset(keyData, keyOffset,keyLength);
+ valueReader.reset(valueData,valueOffset,valueLength);
+ keyObject.readFields(keyReader);
+ valueObject.readFields(valueReader);
+ this.spillRecord(keyObject, valueObject);
+ }
+ };
+
+ // create a local file system
+ Configuration conf = new Configuration();
+
+ // create a raw comparator
+ RawKeyValueComparator<IntWritable,Text> comparator = new RawKeyValueComparator<IntWritable,Text>() {
+
+ DataInputBuffer keyReader1 = new DataInputBuffer();
+ DataInputBuffer keyReader2 = new DataInputBuffer();
+
+ @Override
+ public int compareRaw(byte[] key1Data, int key1Offset, int key1Length,
+ byte[] key2Data, int key2Offset, int key2Length, byte[] value1Data,
+ int value1Offset, int value1Length, byte[] value2Data,
+ int value2Offset, int value2Length) throws IOException {
+
+ keyReader1.reset(key1Data,key1Offset,key1Length);
+ keyReader2.reset(key2Data,key2Offset,key2Length);
+
+ return ((Integer)keyReader1.readInt()).compareTo(keyReader2.readInt());
+ }
+
+ @Override
+ public int compare(IntWritable key1, Text value1, IntWritable key2,Text value2) {
+ return key1.compareTo(key2);
+ }
+ };
+
+ // setup conf
+
+ // number of records to store in RAM before doing an intermediate sort
+ conf.setInt(MergeSortSpillWriter.SPILL_INDEX_BUFFER_SIZE_PARAM,_indexBufferSize);
+ // size of intermediate buffer key value buffer ...
+ conf.setInt(MergeSortSpillWriter.SPILL_DATA_BUFFER_SIZE_PARAM,_dataBufferSize);
+ // set spill write buffer size ...
+ conf.setInt(SequenceFileSpillWriter.SPILL_WRITER_BUFFER_SIZE_PARAM,_spillBufferSize);
+
+ // ok create the spill writer
+ MergeSortSpillWriter<IntWritable, Text> merger =
+ constructMerger(conf,validatingSpillWriter,FileSystem.getLocal(conf),new Path("/tmp"),comparator,IntWritable.class,Text.class);
+
+ // and finally ... spill the records in random order
+ for (int i=0;i<index.length;++i) {
+ merger.spillRecord(new IntWritable(index[i]), originalKeyValueMap.get(index[i]));
+ }
+ // ok close merger ...
+ merger.close();
+ // now close the external spill writer ...
+ validatingSpillWriter.close();
+
+ LOG.info("*************** ENDING TEST:" + _testName + " TOOK:" + (System.currentTimeMillis() - testStartTime));
+ }
+
+ protected abstract MergeSortSpillWriter<IntWritable, Text> constructMerger(
+ Configuration conf,
+ RawDataSpillWriter<IntWritable, Text> writer,
+ FileSystem tempFileSystem,
+ Path tempFilePath,
+ RawKeyValueComparator<IntWritable, Text> comparator,Class keyClass,Class valueClass) throws IOException;
+
+ private static final void swap(int[] arr, int i, int j) {
+ int tmp = arr[i];
+ arr[i] = arr[j];
+ arr[j] = tmp;
+ }
+
+ private static final String keyForNumber(int number) {
+ // establish pattern start location
+ int patternStartIdx = number % 26;
+ // establish pattern size ...
+ int patternSize = (number % 100) + 1;
+ // preallocate buffer
+ StringBuffer buffer = new StringBuffer(patternSize);
+ // build pattern
+ int currPatternIdx = patternStartIdx;
+ for (int i=0;i<patternSize;++i) {
+ buffer.append((char)('A' + currPatternIdx));
+ currPatternIdx = (currPatternIdx+1) % 26;
+ }
+ return buffer.toString();
+ }
+ }
+
+
+ public static class BasicTest extends BaseTest {
+
+ public BasicTest() {
+ super("Basic RawKeyValueComparator Test",1000000,10000,10000 * 200,1000000);
+ }
+
+ @Override
+ protected MergeSortSpillWriter<IntWritable, Text> constructMerger(
+ Configuration conf,
+ RawDataSpillWriter<IntWritable, Text> writer,
+ FileSystem tempFileSystem,
+ Path tempFilePath,
+ RawKeyValueComparator<IntWritable, Text> comparator,Class keyClass,Class valueClass) throws IOException {
+
+ return new MergeSortSpillWriter<IntWritable, Text>(
+ conf,
+ writer,
+ tempFileSystem,tempFilePath, null,comparator, keyClass, valueClass,false,null);
+ }
+ }
+
+ public static class BasicOptimizedTest extends BaseTest {
+
+ public BasicOptimizedTest() {
+ super("OptimizedKeyGenerator - using Long ONLY Keys Test",1000000,10000,10000 * 200,1000000);
+ }
+
+
+ @Override
+ protected MergeSortSpillWriter<IntWritable, Text> constructMerger(
+ Configuration conf,
+ RawDataSpillWriter<IntWritable, Text> writer,
+ FileSystem tempFileSystem,
+ Path tempFilePath,
+ RawKeyValueComparator<IntWritable, Text> comparator,Class keyClass,Class valueClass) throws IOException {
+
+ return new MergeSortSpillWriter<IntWritable, Text>(
+ conf,
+ writer,
+ tempFileSystem,tempFilePath,
+ new OptimizedKeyGeneratorAndComparator<IntWritable, Text>() {
+
+ @Override
+ public void generateOptimizedKeyForPair(IntWritable key, Text value,org.commoncrawl.hadoop.mergeutils.OptimizedKeyGeneratorAndComparator.OptimizedKey optimizedKeyOut)throws IOException {
+ optimizedKeyOut.setLongKeyValue(key.get());
+ }
+
+ @Override
+ public int getGeneratedKeyType() {
+ return OptimizedKey.KEY_TYPE_LONG;
+ }
+
+ }
+ , keyClass, valueClass,false,null);
+ }
+ }
+
+ public static class BasicOptimizedWithLongAndBufferTest extends BaseTest {
+
+ public BasicOptimizedWithLongAndBufferTest() {
+ super("OptimizedKeyGenerator - using Long AND Buffer Keys Test",1000000,10000,10000 * 200,1000000);
+ }
+
+
+ @Override
+ protected MergeSortSpillWriter<IntWritable, Text> constructMerger(
+ Configuration conf,
+ RawDataSpillWriter<IntWritable, Text> writer,
+ FileSystem tempFileSystem,
+ Path tempFilePath,
+ RawKeyValueComparator<IntWritable, Text> comparator,Class keyClass,Class valueClass) throws IOException {
+
+ return new MergeSortSpillWriter<IntWritable, Text>(
+ conf,
+ writer,
+ tempFileSystem,tempFilePath,
+ new OptimizedKeyGeneratorAndComparator<IntWritable, Text>() {
+
+ @Override
+ public void generateOptimizedKeyForPair(IntWritable key, Text value,org.commoncrawl.hadoop.mergeutils.OptimizedKeyGeneratorAndComparator.OptimizedKey optimizedKeyOut)throws IOException {
+ // set the long to dummy value to force secondary comparator to trigger
+ optimizedKeyOut.setLongKeyValue(0);
+ // and set the buffer value by first obtaining an output stream from key object
+ DataOutputStream bufferOutput = optimizedKeyOut.getBufferKeyValueStream();
+ // and then writing into it
+ bufferOutput.writeLong(key.get());
+ // and finally committing it by calling close
+ bufferOutput.close();
+ }
+
+ @Override
+ public int getGeneratedKeyType() {
+ return OptimizedKey.KEY_TYPE_LONG_AND_BUFFER;
+ }
+
+ DataInputBuffer key1ReaderStream = new DataInputBuffer();
+ DataInputBuffer key2ReaderStream = new DataInputBuffer();
+
+ @Override
+ public int compareOptimizedBufferKeys(byte[] key1Data,
+ int key1Offset, int key1Length, byte[] key2Data,
+ int key2Offset, int key2Length) throws IOException {
+
+ key1ReaderStream.reset(key1Data, key1Offset,key1Length);
+ key2ReaderStream.reset(key2Data, key2Offset,key2Length);
+ return (int) (key1ReaderStream.readLong() - key2ReaderStream.readLong());
+
+ }
+
+ }
+ , keyClass, valueClass,false,null);
+ }
+ }
+
+ public static class BasicOptimizedWithBufferOnlyTest extends BaseTest {
+
+
+ public BasicOptimizedWithBufferOnlyTest(int keySetSize,int indexBufferSize,int dataBufferSize,int spillBufferSize) {
+ super("OptimizedKeyGenerator - using Buffer ONLY Keys Test",keySetSize,indexBufferSize,dataBufferSize,spillBufferSize);
+ }
+
+
+ @Override
+ protected MergeSortSpillWriter<IntWritable, Text> constructMerger(
+ Configuration conf,
+ RawDataSpillWriter<IntWritable, Text> writer,
+ FileSystem tempFileSystem,
+ Path tempFilePath,
+ RawKeyValueComparator<IntWritable, Text> comparator,Class keyClass,Class valueClass) throws IOException {
+
+ return new MergeSortSpillWriter<IntWritable, Text>(
+ conf,
+ writer,
+ tempFileSystem,tempFilePath,
+ new OptimizedKeyGeneratorAndComparator<IntWritable, Text>() {
+
+ @Override
+ public void generateOptimizedKeyForPair(IntWritable key, Text value,org.commoncrawl.hadoop.mergeutils.OptimizedKeyGeneratorAndComparator.OptimizedKey optimizedKeyOut)throws IOException {
+ // and set the buffer value by first obtaining an output stream from key object
+ DataOutputStream bufferOutput = optimizedKeyOut.getBufferKeyValueStream();
+ // and then writing into it
+ bufferOutput.writeLong(key.get());
+ // and finally committing it by calling close
+ bufferOutput.close();
+ }
+
+ @Override
+ public int getGeneratedKeyType() {
+ return OptimizedKey.KEY_TYPE_BUFFER;
+ }
+
+ DataInputBuffer key1ReaderStream = new DataInputBuffer();
+ DataInputBuffer key2ReaderStream = new DataInputBuffer();
+
+ @Override
+ public int compareOptimizedBufferKeys(byte[] key1Data,
+ int key1Offset, int key1Length, byte[] key2Data,
+ int key2Offset, int key2Length) throws IOException {
+
+ key1ReaderStream.reset(key1Data, key1Offset,key1Length);
+ key2ReaderStream.reset(key2Data, key2Offset,key2Length);
+ return (int) (key1ReaderStream.readLong() - key2ReaderStream.readLong());
+
+ }
+
+ }
+ , keyClass, valueClass,false,null);
+ }
+ }
+
+
+ public static void main(String[] args) {
+ try {
+ new BasicTest().runTest();
+ new BasicOptimizedTest().runTest();
+ new BasicOptimizedWithLongAndBufferTest().runTest();
+ new BasicOptimizedWithBufferOnlyTest(1000000,10000,10000*200,1000000).runTest();
+ new BasicOptimizedWithBufferOnlyTest(1000000,1000000,1000000*200,1000000).runTest();
+ //new BasicOptimizedWithBufferOnlyTest(10000000,1000000,1000000*200,1000000).runTest();
+ } catch (IOException e) {
+ LOG.error(CCStringUtils.stringifyException(e));
+ }
+ }
+}
299 src/org/commoncrawl/hadoop/mergeutils/OptimizedKeyGeneratorAndComparator.java
@@ -0,0 +1,299 @@
+package org.commoncrawl.hadoop.mergeutils;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.commoncrawl.util.shared.FlexBuffer;
+
+/**
+ * Used to Generate optimized representations from complex key/values pairs. Basically a tradeoff between better
+ * sort performance at the expense of a little bit of increased (per record) memory footprint.
+ *
+ * @author rana
+ *
+ * @param <KeyType>
+ * @param <ValueType>
+ */
+public abstract class OptimizedKeyGeneratorAndComparator<KeyType extends Writable,ValueType extends Writable> {
+
+
+ /**
+ * OptimizedKey - used to encapsulate optimized key data generated via the generator
+ *
+ * @author rana
+ *
+ */
+ public static final class OptimizedKey {
+
+ // key types ...
+
+ // a key that has a long component
+ public static final int KEY_TYPE_LONG = 1 << 0;
+ // a key that has a buffer component
+ public static final int KEY_TYPE_BUFFER = 1 << 1;
+ // a key that has a long and buffer component
+ public static final int KEY_TYPE_LONG_AND_BUFFER = KEY_TYPE_LONG | KEY_TYPE_BUFFER;
+
+
+ public OptimizedKey(int keyType) {
+ _keyType = keyType;
+ _headerSize = 0;
+ if ((_keyType & KEY_TYPE_LONG) != 0)
+ _headerSize += 8;
+ if ((_keyType & KEY_TYPE_BUFFER) != 0)
+ _headerSize += 8;
+ }
+
+ // key type
+ private int _keyType;
+ // header size (based on key type)
+ private int _headerSize = 0;
+
+ // long value if optimized key is a long
+ private long _longKeyValue = 0;
+ // serialized buffer size
+ private int _dataBufferSize = 0;
+ // serialized buffer offset
+ private int _dataBufferOffset = 0;
+ // data buffer
+ private FlexBuffer _dataBuffer = new FlexBuffer();
+
+
+ /**
+ *
+ * @return key type
+ */
+ public int getKeyType() {
+ return _keyType;
+ }
+
+ /** get header bytes size
+ *
+ */
+ public int getHeaderSize() {
+ return _headerSize;
+ }
+
+ /** get optimized key size in bytes
+ *
+ */
+ public int getDataBufferSize() {
+ return _dataBufferSize;
+ }
+
+ public int getDataBufferOffset() {
+ return _dataBufferOffset;
+ }
+
+ public void setDataBufferOffset(int dataBufferOffset) {
+ _dataBufferOffset= dataBufferOffset;
+ }
+
+ public int writeHeaderToStream(DataOutputStream outputStream) throws IOException {
+ if ((_keyType & KEY_TYPE_LONG) != 0) {
+ outputStream.writeLong(_longKeyValue);
+ }
+ if ((_keyType & KEY_TYPE_BUFFER) != 0) {
+ outputStream.writeInt(_dataBuffer.getCount());
+ outputStream.writeInt(_dataBufferOffset);
+ }
+ return _headerSize;
+ }
+
+ /**
+ *
+ *
+ * @param outputStream
+ * @return
+ * @throws IOException
+ */
+ public int writeBufferToStream(DataOutputStream outputStream)throws IOException {
+ _dataBufferSize = 0;
+ if ((_keyType & KEY_TYPE_BUFFER) != 0) {
+ outputStream.write(_dataBuffer.get(),_dataBuffer.getOffset(),_dataBuffer.getCount());
+ _dataBufferSize += _dataBuffer.getCount();
+ }
+ return _dataBufferSize;
+ }
+
+ /**
+ *
+ * @return a DataOutputStream - write into this stream and then close it to commit data
+ * @throws IOException
+ */
+ public DataOutputStream getBufferKeyValueStream()throws IOException {
+ _outputStream.reset();
+ return _outputStream;
+ }
+
+ /**
+ *
+ * @return the long key value
+ */
+ public long getLongKeyValue() {
+ return _longKeyValue;
+ }
+
+ /**
+ *
+ */
+ public void setLongKeyValue(long value) {
+ _longKeyValue = value;
+ }
+
+ /**
+ *
+ * @return the buffer key value
+ */
+ public FlexBuffer getBufferKeyValue() {
+ return _dataBuffer;
+ }
+
+ /**
+ *
+ */
+ public int readHeader(DataInputStream stream) throws IOException {
+ // read header ...
+ if ((_keyType & KEY_TYPE_LONG) != 0) {
+ _longKeyValue = stream.readLong();
+ }
+ if ((_keyType & KEY_TYPE_BUFFER) != 0) {
+ _dataBufferSize = stream.readInt();
+ _dataBufferOffset = stream.readInt();
+ }
+ return _headerSize;
+ }
+
+ /**
+ * initialize the optimized key object from the passed in key/value data
+ *
+ * @param data
+ * @param offset
+ * @param length