Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Support nosort dataflow #5

Open
wants to merge 2 commits into from

1 participant

Binglin Chang
Binglin Chang

Hi, Yongqiang
I have ported the nosort patch for hadoop-0.20 to facebook's Hadoop version.
You can find the original jira issue here:
https://issues.apache.org/jira/browse/MAPREDUCE-3397

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 15, 2011
  1. Binglin Chang

    Support nosort dataflow

    decster authored
Commits on Dec 19, 2011
  1. Binglin Chang
This page is out of date. Refresh to see the latest.
92 src/core/org/apache/hadoop/util/IndexedCountingSortable.java
View
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+/**
+ * Liner time inplace counting sort
+ */
+public abstract class IndexedCountingSortable {
+ abstract public int getKey(int i);
+
+ abstract public int get(int i);
+
+ abstract public void put(int i, int v);
+
+ final int[] counts;
+ final int[] starts;
+ final int total;
+
+ public IndexedCountingSortable(int[] counts, int total) {
+ this.total = total;
+ this.counts = counts;
+ this.starts = new int[counts.length];
+ for (int i = 1; i < counts.length; i++) {
+ starts[i] = starts[i - 1] + counts[i - 1];
+ }
+ assert (starts[counts.length - 1] + counts[counts.length - 1] == total);
+ }
+
+ public void sort() {
+ int[] dest = new int[total];
+ for (int i = 0; i < total; i++) {
+ int p = getKey(i);
+ dest[starts[p]++] = get(i);
+ }
+ for (int i = 0; i < total; i++) {
+ put(i, dest[i]);
+ }
+ }
+
+ private int findSwapPosition(int partition) {
+ while (counts[partition] > 0) {
+ counts[partition]--;
+ int pos = starts[partition] + counts[partition];
+ int part = getKey(pos);
+ if (part != partition) {
+ return part;
+ }
+ }
+ return -1;
+ }
+
+ public void sortInplace() {
+ for (int i = 0; i < counts.length; i++) {
+ while (true) {
+ int part = findSwapPosition(i);
+ if (part < 0) {
+ break;
+ }
+ int hole = starts[i] + counts[i];
+ int tempOffset = get(hole);
+ while (true) {
+ int next = findSwapPosition(part);
+ int pos = starts[part] + counts[part];
+ int temp = get(pos);
+ put(pos, tempOffset);
+ tempOffset = temp;
+ if (i == next) {
+ put(hole, tempOffset);
+ break;
+ }
+ part = next;
+ }
+ }
+ }
+ }
+}
2  src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java
View
@@ -135,7 +135,7 @@ public int getCollectedBytesSize() {
return collectedBytesSize;
}
- abstract void groupOrSort();
+ abstract void groupOrSort(boolean sort);
public abstract KeyValueSpillIterator getKeyValueSpillIterator();
110 src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java
View
@@ -31,12 +31,14 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues;
@@ -57,6 +59,7 @@
// main output buffer
private byte[] kvbuffer;
private int kvBufferSize;
+ private boolean sort = true;
// spill accounting
private volatile int numSpills = 0;
@@ -103,6 +106,7 @@ public void configure(JobConf job) {
}
};
}
+ this.sort = job.getBoolean("mapred.map.output.sort", true);
rfs = ((LocalFileSystem) localFs).getRaw();
float spillper = job.getFloat("io.sort.spill.percent", (float) 0.9);
@@ -187,13 +191,12 @@ public void collect(K key, V value) throws IOException {
/*
* return the value of ProcResourceValues for later use
*/
- protected ProcResourceValues sortReduceParts() {
+ protected ProcResourceValues sortReduceParts(boolean sort) {
long sortStartMilli = System.currentTimeMillis();
ProcResourceValues sortStartProcVals =
task.getCurrentProcResourceValues();
- // sort
for (int i = 0; i < reducePartitions.length; i++) {
- reducePartitions[i].groupOrSort();
+ reducePartitions[i].groupOrSort(sort);
}
long sortEndMilli = System.currentTimeMillis();
ProcResourceValues sortEndProcVals =
@@ -205,7 +208,7 @@ protected ProcResourceValues sortReduceParts() {
@Override
public void sortAndSpill() throws IOException {
- ProcResourceValues sortEndProcVals = sortReduceParts();
+ ProcResourceValues sortEndProcVals = sortReduceParts(sort);
long sortEndMilli = System.currentTimeMillis();
// spill
FSDataOutputStream out = null;
@@ -262,7 +265,7 @@ public synchronized void flush() throws IOException, ClassNotFoundException,
if (numSpills > 0 && lastSpillInMem) {
// if there is already one spills, we can try to hold this last spill in
// memory.
- sortReduceParts();
+ sortReduceParts(sort);
for (int i = 0; i < partitions; i++) {
this.inMemorySegments[i] =
new Segment<K, V>(this.reducePartitions[i].getIReader(),
@@ -281,6 +284,61 @@ public synchronized void flush() throws IOException, ClassNotFoundException,
mergeEndMilli - mergeStartMilli);
}
+ private RawKeyValueIterator getNoSortRawKeyValueIterator(
+ final List<Segment<K, V>> segmentList)
+ throws IOException {
+ return new RawKeyValueIterator() {
+ private Progress progress = new Progress();
+ Segment<K,V> currentSegment = null;
+ int currentIndex = -1;
+ @Override
+ public boolean next() throws IOException {
+ while (true) {
+ if (currentSegment==null) {
+ currentIndex++;
+ if (currentIndex<segmentList.size()) {
+ currentSegment = segmentList.get(currentIndex);
+ currentSegment.init(null);
+ progress.set(currentIndex/segmentList.size());
+ } else {
+ progress.set(1.0f);
+ return false;
+ }
+ } else {
+ if (currentSegment.next()) {
+ return true;
+ }
+ currentSegment = null;
+ }
+ }
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return currentSegment.getKey();
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return currentSegment.getValue();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return progress;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public long getTotalBytesProcessed() {
+ return 0;
+ }
+ };
+ }
+
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
@@ -377,25 +435,29 @@ private void mergeParts() throws IOException, InterruptedException,
segmentList.add(numSpills, this.inMemorySegments[parts]);
}
- // merge
- RawKeyValueIterator kvIter =
- Merger.merge(job, rfs, keyClass, valClass, codec,
- segmentList, job.getInt("io.sort.factor", 100),
- new Path(mapId.toString()), new RawComparator<K>() {
- @Override
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- return LexicographicalComparerHolder.BEST_COMPARER
- .compareTo(b1, s1, l1, b2, s2, l2);
- }
-
- @Override
- public int compare(K o1, K o2) {
- return LexicographicalComparerHolder.BEST_COMPARER
- .compareTo(o1.getBytes(), 0, o1.getLength(),
- o2.getBytes(), 0, o2.getLength());
- }
- }, reporter, null, task.spilledRecordsCounter);
+ RawKeyValueIterator kvIter = null;
+ if (sort) {
+ // merge
+ kvIter = Merger.merge(job, rfs, keyClass, valClass, codec,
+ segmentList, job.getInt("io.sort.factor", 100),
+ new Path(mapId.toString()), new RawComparator<K>() {
+ @Override
+ public int compare(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ return LexicographicalComparerHolder.BEST_COMPARER
+ .compareTo(b1, s1, l1, b2, s2, l2);
+ }
+
+ @Override
+ public int compare(K o1, K o2) {
+ return LexicographicalComparerHolder.BEST_COMPARER
+ .compareTo(o1.getBytes(), 0, o1.getLength(),
+ o2.getBytes(), 0, o2.getLength());
+ }
+ }, reporter, null, task.spilledRecordsCounter);
+ } else {
+ kvIter = getNoSortRawKeyValueIterator(segmentList);
+ }
// write merged output to disk
long segmentStart = finalOut.getPos();
134 src/mapred/org/apache/hadoop/mapred/MapTask.java
View
@@ -59,6 +59,7 @@
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.IndexedCountingSortable;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
@@ -532,6 +533,7 @@ public void close(TaskAttemptContext context)
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
+ boolean sort = job.getBoolean("mapred.map.output.sort", true);
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 0) {
@@ -840,6 +842,9 @@ private long getCPUVal(ProcResourceValues startProcVals,
private ArrayList<SpillRecord> indexCacheList;
private int totalIndexCacheMemory;
private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
+
+ private final boolean sort;
+ private final int [] kvCounts;
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
@@ -856,6 +861,9 @@ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
spillSortCounters = new MapSpillSortCounters(reporter);
+ sort = job.getBoolean("mapred.map.output.sort", true);
+ kvCounts = sort ? null : new int[partitions];
+
//sanity checks
final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
@@ -911,6 +919,9 @@ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
+ if (!sort) {
+ throw new IOException("Combiner not supported in no sort mode");
+ }
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
} else {
combineCollector = null;
@@ -1015,6 +1026,9 @@ public synchronized void collect(K key, V value, int partition
kvindices[ind + KEYSTART] = keystart;
kvindices[ind + VALSTART] = valstart;
kvindex = kvnext;
+ if (!sort) {
+ kvCounts[partition]++;
+ }
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
@@ -1044,6 +1058,12 @@ public int compare(int i, int j) {
kvindices[ij + KEYSTART],
kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
}
+
+ public int comparePartition(int i, int j) {
+ final int ii = kvoffsets[i % kvoffsets.length];
+ final int ij = kvoffsets[j % kvoffsets.length];
+ return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
+ }
/**
* Swap logical indices st i, j MOD offset capacity.
@@ -1337,7 +1357,11 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
long sortStartMilli = System.currentTimeMillis();
ProcResourceValues sortStartProcVals = getCurrentProcResourceValues();
//do the sort
- sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
+ if (sort) {
+ sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
+ } else {
+ sortPartitionsOnly(kvstart, endPosition);
+ }
// get the cumulative resources used after the sort, and use the diff as
// resources/wallclock consumed by the sort.
long sortEndMilli = System.currentTimeMillis();
@@ -1573,6 +1597,95 @@ public long getTotalBytesProcessed() {
}
}
+ private void sortPartitionsOnly(final int l, final int r) {
+ new IndexedCountingSortable(kvCounts, r-l) {
+ @Override
+ public int get(int i) {
+ return kvoffsets[(l+i) % kvoffsets.length];
+ }
+ @Override
+ public void put(int i, int v) {
+ kvoffsets[(l+i) % kvoffsets.length] = v;
+ }
+ @Override
+ public int getKey(int i) {
+ return kvindices[get(i) + PARTITION];
+ }
+ }.sort();
+ // clear kvCounts for next sort&spill
+ for (int i = 0; i < kvCounts.length; i++) {
+ kvCounts[i] = 0;
+ }
+ }
+
+ private RawKeyValueIterator getNoSortRawKeyValueIterator(
+ final List<Segment<K, V>> segmentList)
+ throws IOException {
+ return new RawKeyValueIterator() {
+ private Progress progress = new Progress();
+ Segment<K,V> currentSegment = null;
+ int currentIndex = -1;
+ @Override
+ public boolean next() throws IOException {
+ while (true) {
+ if (currentSegment==null) {
+ currentIndex++;
+ if (currentIndex<segmentList.size()) {
+ currentSegment = segmentList.get(currentIndex);
+ currentSegment.init(null);
+ progress.set(currentIndex/segmentList.size());
+ } else {
+ progress.set(1.0f);
+ return false;
+ }
+ } else {
+ if (currentSegment.next()) {
+ return true;
+ }
+ currentSegment = null;
+ }
+ }
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return currentSegment.getKey();
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return currentSegment.getValue();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return progress;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public long getTotalBytesProcessed() {
+ return 0;
+ }
+ };
+ }
+
+ @SuppressWarnings("unchecked")
+ protected RawKeyValueIterator getRawKeyValueIterator(
+ final List<Segment<K, V>> segmentList, TaskAttemptID mapId) throws IOException {
+ //merge
+ return Merger.merge(job, rfs,
+ keyClass, valClass, codec,
+ segmentList, job.getInt("io.sort.factor", 100),
+ new Path(mapId.toString()),
+ job.getOutputKeyComparator(), reporter,
+ null, spilledRecordsCounter);
+ }
+
+ @SuppressWarnings("unchecked")
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
@@ -1659,14 +1772,17 @@ private void mergeParts() throws IOException, InterruptedException,
}
}
- //merge
- @SuppressWarnings("unchecked")
- RawKeyValueIterator kvIter = Merger.merge(job, rfs,
- keyClass, valClass, codec,
- segmentList, job.getInt("io.sort.factor", 100),
- new Path(mapId.toString()),
- job.getOutputKeyComparator(), reporter,
- null, spilledRecordsCounter);
+ RawKeyValueIterator kvIter = null;
+ if (sort) {
+ kvIter = Merger.merge(job, rfs,
+ keyClass, valClass, codec,
+ segmentList, job.getInt("io.sort.factor", 100),
+ new Path(mapId.toString()),
+ job.getOutputKeyComparator(), reporter,
+ null, spilledRecordsCounter);
+ } else {
+ kvIter = getNoSortRawKeyValueIterator(segmentList);
+ }
//write merged output to disk
long segmentStart = finalOut.getPos();
2  src/mapred/org/apache/hadoop/mapred/Merger.java
View
@@ -200,7 +200,7 @@ public Segment(Reader<K, V> reader, boolean preserve) {
this.segmentLength = reader.getLength();
}
- private void init(Counters.Counter readsCounter) throws IOException {
+ void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
68 src/mapred/org/apache/hadoop/mapred/ReducePartition.java
View
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,6 +35,47 @@
class ReducePartition<K extends BytesWritable, V extends BytesWritable> extends
BasicReducePartition<K, V> {
+ class NoSortKeyValueSpillIterator implements KeyValueSpillIterator {
+ int totalRecordsNum = 0;
+ Iterator<MemoryBlock> blockIterator;
+ MemoryBlock currentBlock;
+ int recordIndex;
+ MemoryBlockIndex memoryBlockIndex;
+
+ public NoSortKeyValueSpillIterator(List<MemoryBlock> memBlks,
+ int collectedRecordsNum) {
+ totalRecordsNum = collectedRecordsNum;
+ blockIterator = memBlks.iterator();
+ memoryBlockIndex = new MemoryBlockIndex();
+ }
+
+ public void reset(List<MemoryBlock> memBlks, int collectedRecordsNum) {
+ totalRecordsNum = collectedRecordsNum;
+ blockIterator = memBlks.iterator();
+ currentBlock = null;
+ recordIndex = 0;
+ }
+
+ @Override
+ public int getRecordNumber() {
+ return totalRecordsNum;
+ }
+
+ @Override
+ public MemoryBlockIndex next() {
+ while (currentBlock == null || recordIndex>=currentBlock.currentPtr) {
+ if (!blockIterator.hasNext()) {
+ return null;
+ } else {
+ currentBlock = blockIterator.next();
+ recordIndex = 0;
+ }
+ }
+ memoryBlockIndex.setMemoryBlockIndex(currentBlock, recordIndex++);
+ return memoryBlockIndex;
+ }
+ }
+
class KeyValueSortedArray extends
PriorityQueue<KeyValuePairIterator> implements
KeyValueSpillIterator {
@@ -109,7 +151,7 @@ public int getRecordNumber() {
}
}
- protected KeyValueSortedArray keyValueSortArray;
+ protected KeyValueSpillIterator keyValueSpillIterator;
public ReducePartition(int reduceNum,
MemoryBlockAllocator memoryBlockAllocator, byte[] kvBuffer,
@@ -145,7 +187,7 @@ public int collect(K key, V value) throws IOException {
}
public KeyValueSpillIterator getKeyValueSpillIterator() {
- return keyValueSortArray;
+ return keyValueSpillIterator;
}
public IndexRecord spill(JobConf job, FSDataOutputStream out,
@@ -179,19 +221,29 @@ public IndexRecord spill(JobConf job, FSDataOutputStream out,
return rec;
}
- public void groupOrSort() {
+ public void groupOrSort(boolean sort) {
reporter.progress();
List<MemoryBlock> memBlks = snapShot();
for (int i = 0; i < memBlks.size(); i++) {
MemoryBlock memBlk = memBlks.get(i);
- sortMemBlock(memBlk);
+ if (sort) {
+ sortMemBlock(memBlk);
+ }
}
// now do a merge sort on the list of memory blocks
- if (keyValueSortArray == null) {
- keyValueSortArray = new KeyValueSortedArray(memBlks,
- getCollectedRecordsNum());
+ if (sort) {
+ if (keyValueSpillIterator == null) {
+ keyValueSpillIterator = new KeyValueSortedArray(memBlks,
+ getCollectedRecordsNum());
+ } else {
+ ((KeyValueSortedArray)keyValueSpillIterator).reset(memBlks, getCollectedRecordsNum());
+ }
} else {
- keyValueSortArray.reset(memBlks, getCollectedRecordsNum());
+ if (keyValueSpillIterator == null) {
+ keyValueSpillIterator = new NoSortKeyValueSpillIterator(memBlks, getCollectedRecordsNum());
+ } else {
+ ((NoSortKeyValueSpillIterator)keyValueSpillIterator).reset(memBlks, getCollectedRecordsNum());
+ }
}
this.collectedRecordsNum = 0;
this.collectedBytesSize = 0;
253 src/mapred/org/apache/hadoop/mapred/ReduceTask.java
View
@@ -102,6 +102,7 @@
private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
private int numMaps;
private ReduceCopier reduceCopier;
+ private Exception reduceCopierException;
private CompressionCodec codec;
@@ -226,6 +227,87 @@ public void readFields(DataInput in) throws IOException {
return fileList.toArray(new Path[0]);
}
+ private class NoSortKVIterator<K,V> implements RawKeyValueIterator {
+ private Progress progress = new Progress();
+ List<Segment<K, V>> segments = new ArrayList<Merger.Segment<K,V>>();
+ int currentSegmentIndex = -1;
+ Segment<K, V> currentSegment = null;
+ int usedSegments=0;
+
+ public NoSortKVIterator() {
+ }
+
+ /**
+ * constructor for local runner
+ */
+ public NoSortKVIterator(FileSystem rfs, Path [] files) throws IOException {
+ for (Path file : files) {
+ segments.add(new Segment<K, V>(conf, rfs, file, codec, false));
+ }
+ }
+
+ protected boolean getNewSegments() throws IOException {
+ return false;
+ }
+
+ private boolean nextSegment() throws IOException {
+ currentSegmentIndex++;
+ if (currentSegmentIndex>=segments.size()) {
+ if (!getNewSegments()) {
+ return false;
+ }
+ currentSegmentIndex = 0;
+ }
+ currentSegment = segments.get(currentSegmentIndex);
+ currentSegment.init(spilledRecordsCounter);
+ usedSegments++;
+ progress.set(usedSegments/(float)numMaps);
+ return true;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ while (true) {
+ if (currentSegment==null) {
+ if (nextSegment()==false) {
+ return false;
+ }
+ } else {
+ if (currentSegment.next()) {
+ return true;
+ } else {
+ currentSegment.close();
+ currentSegment = null;
+ }
+ }
+ }
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return currentSegment.getKey();
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return currentSegment.getValue();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return progress;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public long getTotalBytesProcessed() {
+ return 0;
+ }
+ }
+
private class ReduceValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
public ReduceValuesIterator (RawKeyValueIterator in,
@@ -351,6 +433,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
this.umbilical = umbilical;
job.setBoolean("mapred.skip.on", isSkipping());
+ boolean sort = job.getBoolean("mapred.map.output.sort", true);
taskStartTime = System.currentTimeMillis();
if (isMapOrReduce()) {
@@ -386,12 +469,36 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
ProcResourceValues copyStartProcVals = getCurrentProcResourceValues();
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job, reporter);
- if (!reduceCopier.fetchOutputs()) {
- if(reduceCopier.mergeThrowable instanceof FSError) {
- throw (FSError)reduceCopier.mergeThrowable;
+ if (sort) {
+ if (!reduceCopier.fetchOutputs()) {
+ if(reduceCopier.mergeThrowable instanceof FSError) {
+ throw (FSError)reduceCopier.mergeThrowable;
+ }
+ throw new IOException("Task: " + getTaskID() +
+ " - The reduce copier failed", reduceCopier.mergeThrowable);
}
- throw new IOException("Task: " + getTaskID() +
- " - The reduce copier failed", reduceCopier.mergeThrowable);
+ } else {
+ final Thread mainThread = Thread.currentThread();
+ Thread copierThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!reduceCopier.fetchOutputs()) {
+ if(reduceCopier.mergeThrowable instanceof FSError) {
+ throw (FSError)reduceCopier.mergeThrowable;
+ }
+ throw new IOException("Task: " + getTaskID() +
+ " - The reduce copier failed", reduceCopier.mergeThrowable);
+ }
+ } catch (Exception e) {
+ reduceCopierException = e;
+ LOG.warn(e);
+ mainThread.interrupt();
+ }
+ }
+ });
+ copierThread.start();
+ copierThread.setName("ReduceCopier");
}
}
long reducerCopyEndMilli = System.currentTimeMillis();
@@ -402,16 +509,24 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
statusUpdate(umbilical);
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
- RawKeyValueIterator rIter = isLocal
- ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
- !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
- new Path(getTaskID().toString()), job.getOutputKeyComparator(),
- reporter, spilledRecordsCounter, null)
- : reduceCopier.createKVIterator(job, rfs, reporter);
-
- // free up the data structures
- mapOutputFilesOnDisk.clear();
+ RawKeyValueIterator rIter = null;
+ if (sort) {
+ rIter = isLocal
+ ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
+ !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
+ new Path(getTaskID().toString()), job.getOutputKeyComparator(),
+ reporter, spilledRecordsCounter, null)
+ : reduceCopier.createKVIterator(job, rfs, reporter);
+ // free up the data structures
+ mapOutputFilesOnDisk.clear();
+ } else {
+ rIter = isLocal ? new NoSortKVIterator(rfs, getMapFiles(rfs, true))
+ : reduceCopier.getNoSortRawKVIterator();
+ if (isLocal) {
+ mapOutputFilesOnDisk.clear();
+ }
+ }
long sortEndMilli = System.currentTimeMillis();
ProcResourceValues sortEndProcVals = getCurrentProcResourceValues();
@@ -421,7 +536,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
- RawComparator comparator = job.getOutputValueGroupingComparator();
+ RawComparator comparator = sort ? job.getOutputValueGroupingComparator() : null;
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
@@ -506,7 +621,7 @@ public void collect(OUTKEY key, OUTVALUE value)
comparator, keyClass, valueClass,
job, reporter, umbilical) :
new ReduceValuesIterator<INKEY,INVALUE>(rIter,
- job.getOutputValueGroupingComparator(), keyClass, valueClass,
+ comparator, keyClass, valueClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
@@ -651,6 +766,11 @@ public long getTotalBytesProcessed() {
private ReduceTask reduceTask;
/**
+ * whether the map outputs is sorted & needs merge in reduce side
+ */
+ private boolean sort;
+
+ /**
* the list of map outputs currently being copied
*/
private List<MapOutputLocation> scheduledCopies;
@@ -867,6 +987,11 @@ public long getTotalBytesProcessed() {
Collections.synchronizedList(new LinkedList<MapOutput>());
/**
+ * Condition for new data coming, used by no sort code path
+ */
+ private Object newDataComming = new Object();
+
+ /**
* The map for (Hosts, List of MapIds from this Host) maintaining
* map output locations
*/
@@ -1184,10 +1309,13 @@ public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
}
public void close() {
- synchronized (dataAvailable) {
- closed = true;
- LOG.info("Closed ram manager");
- dataAvailable.notify();
+ synchronized (newDataComming) {
+ synchronized (dataAvailable) {
+ closed = true;
+ LOG.info("Closed ram manager");
+ dataAvailable.notify();
+ }
+ newDataComming.notify();
}
}
@@ -1406,7 +1534,10 @@ private long copyOutput(MapOutputLocation loc
// Process map-output
if (mapOutput.inMemory) {
// Save it in the synchronized list of map-outputs
- mapOutputsFilesInMemory.add(mapOutput);
+ synchronized (newDataComming) {
+ mapOutputsFilesInMemory.add(mapOutput);
+ newDataComming.notify();
+ }
} else {
// Rename the temporary file to the final file;
// ensure it is on the same partition
@@ -1419,8 +1550,11 @@ private long copyOutput(MapOutputLocation loc
tmpMapOutput + " to " + filename);
}
- synchronized (mapOutputFilesOnDisk) {
- addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
+ synchronized (newDataComming) {
+ synchronized (mapOutputFilesOnDisk) {
+ addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
+ newDataComming.notify();
+ }
}
}
@@ -1872,7 +2006,11 @@ private void configureClasspath(JobConf conf)
URLClassLoader loader = new URLClassLoader(urls, parent);
conf.setClassLoader(loader);
}
-
+
+ public RawKeyValueIterator getNoSortRawKVIterator() {
+ return new ShufflingNoSortKVIterator();
+ }
+
public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
TaskReporter reporter
)throws ClassNotFoundException, IOException {
@@ -1882,6 +2020,7 @@ public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
this.umbilical = umbilical;
this.reduceTask = ReduceTask.this;
+ this.sort = conf.getBoolean("mapred.map.output.sort", true);
this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
this.copyResults = new ArrayList<CopyResult>(100);
@@ -1892,7 +2031,7 @@ public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
combineInputCounter,
reporter, null);
- if (combinerRunner != null) {
+ if (combinerRunner != null && sort) {
combineCollector =
new CombineOutputCollector(reduceCombineOutputCounter);
}
@@ -1970,12 +2109,14 @@ public boolean fetchOutputs() throws IOException {
copier.start();
}
- //start the on-disk-merge thread
- localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
- //start the in memory merger thread
- inMemFSMergeThread = new InMemFSMergeThread();
- localFSMergerThread.start();
- inMemFSMergeThread.start();
+ if (sort) {
+ //start the on-disk-merge thread
+ localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
+ //start the in memory merger thread
+ inMemFSMergeThread = new InMemFSMergeThread();
+ localFSMergerThread.start();
+ inMemFSMergeThread.start();
+ }
// start the map events thread
getMapEventsThread = new GetMapEventsThread();
@@ -2301,7 +2442,7 @@ public boolean fetchOutputs() throws IOException {
ramManager.close();
//Do a merge of in-memory files (if there are any)
- if (mergeThrowable == null) {
+ if (mergeThrowable == null && sort) {
try {
// Wait for the on-disk merge to complete
localFSMergerThread.join();
@@ -2547,7 +2688,51 @@ private void addToMapOutputFilesOnDisk(FileStatus status) {
}
}
-
+ private class ShufflingNoSortKVIterator extends NoSortKVIterator<K, V> {
+ public ShufflingNoSortKVIterator() {
+ super();
+ }
+
+ @Override
+ protected boolean getNewSegments() throws IOException {
+ synchronized (newDataComming) {
+ while (mapOutputsFilesInMemory.size() == 0
+ && mapOutputFilesOnDisk.size() == 0) {
+ if (ramManager.closed) {
+ return false;
+ } else {
+ try {
+ newDataComming.wait();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ segments.clear();
+ synchronized (mapOutputsFilesInMemory) {
+ for (MapOutput mo : mapOutputsFilesInMemory) {
+ Reader<K, V> reader =
+ new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
+ mo.data, 0, mo.data.length);
+ Segment<K, V> segment = new Segment<K, V>(reader, true);
+ segments.add(segment);
+ }
+ mapOutputsFilesInMemory.clear();
+ }
+ ArrayList<Path> mapOutputFiles = new ArrayList<Path>();
+ synchronized (mapOutputFilesOnDisk) {
+ for (FileStatus st : mapOutputFilesOnDisk) {
+ mapOutputFiles.add(st.getPath());
+ }
+ mapOutputFilesOnDisk.clear();
+ }
+ for (Path file : mapOutputFiles) {
+ segments.add(new Segment<K, V>(conf, rfs, file, codec, false));
+ }
+ }
+ return segments.size()>0;
+ }
+ }
/** Starts merging the local copy (on disk) of the map's output so that
* most of the reducer's input is sorted i.e overlapping shuffle
2  src/mapred/org/apache/hadoop/mapred/Task.java
View
@@ -1113,7 +1113,7 @@ private void readNextKey() throws IOException {
DataInputBuffer nextKeyBytes = in.getKey();
keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
nextKey = keyDeserializer.deserialize(nextKey);
- hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+ hasNext = key != null && (comparator != null) && (comparator.compare(key, nextKey) == 0);
} else {
hasNext = false;
}
13 src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
View
@@ -117,12 +117,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
hasMore = input.next();
if (hasMore) {
next = input.getKey();
- nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
- currentRawKey.getLength(),
- next.getData(),
- next.getPosition(),
- next.getLength() - next.getPosition()
- ) == 0;
+ nextKeyIsSame = (comparator == null) ? false :
+ (comparator.compare(currentRawKey.getBytes(), 0,
+ currentRawKey.getLength(),
+ next.getData(),
+ next.getPosition(),
+ next.getLength() - next.getPosition()
+ ) == 0);
} else {
nextKeyIsSame = false;
}
77 src/test/org/apache/hadoop/mapred/TestNoSort.java
View
@@ -0,0 +1,77 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestNoSort extends HadoopTestCase {
+ public TestNoSort() throws IOException {
+ super(CLUSTER_MR, LOCAL_FS, 2, 1);
+ }
+
+ private static final Path ROOT_DIR = new Path("testing/nosort");
+ private static final Path IN_DIR = new Path(ROOT_DIR, "input");
+ private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
+
+ private Path getDir(Path dir) {
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data", "/tmp")
+ .replace(' ', '+');
+ dir = new Path(localPathRoot, dir);
+ }
+ return dir;
+ }
+
+ private JobConf conf;
+ private FileSystem fs;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ conf = createJobConf();
+ fs = FileSystem.get(conf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ fs.delete(getDir(ROOT_DIR), true);
+ super.tearDown();
+ }
+
+ void prepareInput(int size, FileSystem fs) throws IOException {
+ FSDataOutputStream out = fs.create(getDir(new Path(IN_DIR, "part-00000")));
+ for (int i=0;i<size;i++) {
+ out.write(String.format("%09d\t%09d\n", i/4, i).getBytes());
+ }
+ out.close();
+ out = fs.create(getDir(new Path(IN_DIR, "part-00000")));
+ for (int i=0;i<size;i++) {
+ out.write(String.format("%09d\t%09d\n", i/4, i).getBytes());
+ }
+ out.close();
+ }
+
+ public void testNoSort() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ prepareInput(10000, fs);
+ conf.setInt("io.sort.mb", 1);
+ conf.setBoolean("mapred.map.output.sort", false);
+ conf.setInputFormat(KeyValueTextInputFormat.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+ KeyValueTextInputFormat.setInputPaths(conf, getDir(IN_DIR));
+ TextOutputFormat.setOutputPath(conf, getDir(OUT_DIR));
+ conf.setNumReduceTasks(1);
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ JobClient.runJob(conf);
+ }
+
+}
69 src/test/org/apache/hadoop/mapred/TestNoSortLocal.java
View
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+
+import junit.framework.TestCase;
+
+public class TestNoSortLocal extends TestCase {
+ private static String TEST_ROOT_DIR = System.getProperty("test.build.data", ".");
+
+ static String INPUTPATH = TEST_ROOT_DIR + "/testing/nosortlocal/input";
+ static String OUTPUTPATH = TEST_ROOT_DIR + "/testing/nosortlocal/output";
+
+ void prepareInput(int size, FileSystem fs) throws IOException {
+ FSDataOutputStream out = fs.create(new Path(new Path(INPUTPATH), "part-00000"));
+ for (int i=0;i<size;i++) {
+ out.write(String.format("%09d\t%09d\n", i/4, i).getBytes());
+ }
+ out.close();
+ out = fs.create(new Path(new Path(INPUTPATH), "part-00001"));
+ for (int i=0;i<size;i++) {
+ out.write(String.format("%09d\t%09d\n", i/4, i).getBytes());
+ }
+ out.close();
+ }
+
+ public void testNoSort() throws IOException {
+ JobConf conf = new JobConf();
+ FileSystem fs = FileSystem.get(conf);
+ prepareInput(10000, fs);
+ conf.setInt("io.sort.mb", 2);
+ conf.setBoolean("mapred.map.output.sort", false);
+ conf.setInputFormat(KeyValueTextInputFormat.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+ KeyValueTextInputFormat.setInputPaths(conf, new Path(INPUTPATH));
+ TextOutputFormat.setOutputPath(conf, new Path(OUTPUTPATH));
+ conf.setNumReduceTasks(1);
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ JobClient.runJob(conf);
+ }
+}
136 src/test/org/apache/hadoop/util/TestIndexedCountingSortable.java
View
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+public class TestIndexedCountingSortable extends TestCase {
+ final static int INPUTSIZE = 2500000;
+ final static int PARTITION = 500;
+ final int[] values = new int[INPUTSIZE];
+ final int[] indexes = new int[INPUTSIZE];
+ final int[] counts = new int[PARTITION];
+
+ void makeInput() {
+ Random r = new Random();
+ r.setSeed(10);
+ for (int i = 0; i < values.length; ++i) {
+ indexes[i] = i;
+ values[i] = r.nextInt(PARTITION);
+ counts[values[i]]++;
+ }
+ }
+
+ void validate() {
+ int [] has = new int[INPUTSIZE];
+ for (int i=0;i<INPUTSIZE;i++) {
+ if (i>0) {
+ int l = values[indexes[i]];
+ int r = values[indexes[i-1]];
+ assertTrue(l>=r);
+ }
+ has[indexes[i]]++;
+ }
+ for (int i=0;i<INPUTSIZE;i++) {
+ assertTrue(has[i]==1);
+ }
+ }
+
+ public void runCountingSort() throws Exception {
+ makeInput();
+ IndexedCountingSortable sorter = new
+ IndexedCountingSortable(counts, INPUTSIZE) {
+ @Override
+ public int get(int i) {
+ return indexes[i];
+ }
+
+ @Override
+ public void put(int i, int v) {
+ indexes[i] = v;
+ }
+
+ @Override
+ public int getKey(int i) {
+ return values[indexes[i]];
+ }
+ };
+ long start = System.nanoTime();
+ sorter.sort();
+ long end = System.nanoTime();
+ System.out.printf("Counting sort: %.3f\n", (end - start) / 1000000000.0);
+ validate();
+ }
+
+ public void runInplaceCountingSort() throws Exception {
+ makeInput();
+ IndexedCountingSortable sorter = new
+ IndexedCountingSortable(counts, INPUTSIZE) {
+ @Override
+ public int get(int i) {
+ return indexes[i];
+ }
+
+ @Override
+ public void put(int i, int v) {
+ indexes[i] = v;
+ }
+
+ @Override
+ public int getKey(int i) {
+ return values[indexes[i]];
+ }
+ };
+ long start = System.nanoTime();
+ sorter.sortInplace();
+ long end = System.nanoTime();
+ System.out.printf("Inplcae counting sort: %.3f\n",
+ (end - start) / 1000000000.0);
+ validate();
+ }
+
+ public void runQuickSort() throws Exception {
+ makeInput();
+ IndexedSortable dest = new IndexedSortable() {
+ @Override
+ public void swap(int i, int j) {
+ int temp = indexes[i];
+ indexes[i] = indexes[j];
+ indexes[j] = temp;
+ }
+
+ @Override
+ public int compare(int i, int j) {
+ return values[indexes[i]] - values[indexes[j]];
+ }
+ };
+ QuickSort sorter = new QuickSort();
+ long start = System.nanoTime();
+ sorter.sort(dest, 0, INPUTSIZE, null);
+ long end = System.nanoTime();
+ System.out.printf("Quick sort: %.3f\n", (end - start) / 1000000000.0);
+ }
+
+ public void testSort() throws Exception {
+ runCountingSort();
+ runInplaceCountingSort();
+ runQuickSort();
+ }
+}
Something went wrong with that request. Please try again.