Permalink
Browse files

rebase mapred

  • Loading branch information...
1 parent 26fc9ba commit c47715a20251380f3c1783dc37efc4894eda37b7 heyongqiang committed Jun 12, 2012
Showing with 7,254 additions and 1,643 deletions.
  1. +157 −0 src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java
  2. +520 −0 src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java
  3. +28 −0 src/mapred/org/apache/hadoop/mapred/BlockMapOutputCollector.java
  4. +4 −19 src/mapred/org/apache/hadoop/mapred/Child.java
  5. +33 −0 src/mapred/org/apache/hadoop/mapred/ChildMemoryBlock.java
  6. +8 −1 src/mapred/org/apache/hadoop/mapred/CompositeTaskTrackerInstrumentation.java
  7. +19 −0 src/mapred/org/apache/hadoop/mapred/Counters.java
  8. +37 −0 src/mapred/org/apache/hadoop/mapred/DirectTaskUmbilical.java
  9. +116 −0 src/mapred/org/apache/hadoop/mapred/ExpireUnusedFilesInCache.java
  10. +13 −0 src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
  11. +9 −0 src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
  12. +27 −2 src/mapred/org/apache/hadoop/mapred/IFile.java
  13. +138 −100 src/mapred/org/apache/hadoop/mapred/JSPUtil.java
  14. +239 −212 src/mapred/org/apache/hadoop/mapred/JobClient.java
  15. +5 −0 src/mapred/org/apache/hadoop/mapred/JobConf.java
  16. +120 −37 src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  17. +3 −0 src/mapred/org/apache/hadoop/mapred/JobInProgressTraits.java
  18. +438 −0 src/mapred/org/apache/hadoop/mapred/JobStats.java
  19. +65 −108 src/mapred/org/apache/hadoop/mapred/JobTracker.java
  20. +71 −12 src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
  21. +142 −124 src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
  22. +38 −8 src/mapred/org/apache/hadoop/mapred/JobTrackerReconfigurable.java
  23. +23 −0 src/mapred/org/apache/hadoop/mapred/KeyValueSpillIterator.java
  24. +239 −0 src/mapred/org/apache/hadoop/mapred/LexicographicalComparerHolder.java
  25. +110 −0 src/mapred/org/apache/hadoop/mapred/MapSpillSortCounters.java
  26. +12 −3 src/mapred/org/apache/hadoop/mapred/MapTask.java
  27. +44 −0 src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
  28. +135 −0 src/mapred/org/apache/hadoop/mapred/MapperWaitThread.java
  29. +195 −0 src/mapred/org/apache/hadoop/mapred/MemoryBlock.java
  30. +383 −0 src/mapred/org/apache/hadoop/mapred/MemoryBlockAllocator.java
  31. +25 −0 src/mapred/org/apache/hadoop/mapred/MemoryBlockHolder.java
  32. +46 −0 src/mapred/org/apache/hadoop/mapred/MemoryBlockIndex.java
  33. +33 −0 src/mapred/org/apache/hadoop/mapred/MemoryBlockTooSmallException.java
  34. +2 −2 src/mapred/org/apache/hadoop/mapred/Merger.java
  35. +53 −0 src/mapred/org/apache/hadoop/mapred/NettyMapOutputAttributes.java
  36. +329 −0 src/mapred/org/apache/hadoop/mapred/PoolFairnessCalculator.java
  37. +89 −0 src/mapred/org/apache/hadoop/mapred/PoolMetadata.java
  38. +280 −0 src/mapred/org/apache/hadoop/mapred/ReducePartition.java
  39. +564 −479 src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  40. +105 −1 src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
  41. +123 −0 src/mapred/org/apache/hadoop/mapred/ResourceMetadata.java
  42. +43 −4 src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java
  43. +283 −0 src/mapred/org/apache/hadoop/mapred/ShuffleHandler.java
  44. +301 −0 src/mapred/org/apache/hadoop/mapred/SimulatedTaskRunner.java
  45. +52 −2 src/mapred/org/apache/hadoop/mapred/Task.java
  46. +4 −2 src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java
  47. +217 −11 src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  48. +11 −2 src/mapred/org/apache/hadoop/mapred/TaskLog.java
  49. +1 −0 src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  50. +3 −1 src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
  51. +39 −14 src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
  52. +101 −65 src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  53. +13 −4 src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
  54. +24 −4 src/mapred/org/apache/hadoop/mapred/TaskStatus.java
  55. +1,053 −415 src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  56. +27 −0 src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
  57. +62 −11 src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public abstract class BasicReducePartition<K extends BytesWritable, V extends BytesWritable>
+ implements MemoryBlockHolder {
+
+ class OffsetSortable implements IndexedSortable {
+
+ private int[] offsets;
+ private int[] keyLenArray;
+ private int[] valLenArray;
+ private byte[] kvbuffer;
+
+ public OffsetSortable(int[] offsets, int[] keyLenArray, int[] valLenArray,
+ byte[] kvbuffer) {
+ this.offsets = offsets;
+ this.keyLenArray = keyLenArray;
+ this.valLenArray = valLenArray;
+ this.kvbuffer = kvbuffer;
+ }
+
+ public OffsetSortable(MemoryBlock memBlock, byte[] kvbuffer) {
+ this.offsets = memBlock.getOffsets();
+ this.keyLenArray = memBlock.getKeyLenArray();
+ this.valLenArray = memBlock.getValueLenArray();
+ this.kvbuffer = kvbuffer;
+ }
+
+ @Override
+ public int compare(int i, int j) {
+ return LexicographicalComparerHolder.compareBytes(kvbuffer, offsets[i],
+ keyLenArray[i], offsets[j], keyLenArray[j]);
+ }
+
+ @Override
+ public void swap(int i, int j) {
+ swapElement(offsets, i, j);
+ swapElement(keyLenArray, i, j);
+ swapElement(valLenArray, i, j);
+ }
+
+ private void swapElement(int[] array, int i, int j) {
+ int tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
+ }
+
+ protected MemoryBlockAllocator memoryBlockAllocator;
+ protected byte[] kvbuffer;
+ protected final TaskReporter reporter;
+ protected BlockMapOutputCollector<K, V> collector;
+
+ protected int partition;
+
+ protected int collectedBytesSize;
+ protected int collectedRecordsNum;
+
+ protected MemoryBlock currentBlock;
+ protected List<MemoryBlock> memoryBlocks;
+
+
+ public BasicReducePartition(int reduceNum,
+ MemoryBlockAllocator memoryBlockAllocator, byte[] kvBuffer,
+ BlockMapOutputCollector<K, V> collector, TaskReporter reporter)
+ throws IOException {
+ this.partition = reduceNum;
+ this.collectedBytesSize = 0;
+ this.collectedRecordsNum = 0;
+ this.memoryBlockAllocator = memoryBlockAllocator;
+ this.kvbuffer = kvBuffer;
+ this.collector = collector;
+ this.reporter = reporter;
+ this.memoryBlockAllocator.registerMemoryBlockHolder(this);
+ initMemoryBlocks();
+ }
+
+ protected void initMemoryBlocks() {
+ memoryBlocks = new ArrayList<MemoryBlock>();
+ }
+
+ protected void sortMemBlock(MemoryBlock memBlock) {
+ if (memBlock.currentPtr <= 0) {
+ return;
+ }
+ // quick sort the offsets
+ OffsetSortable sortableObj = new OffsetSortable(memBlock, kvbuffer);
+ QuickSort quickSort = new QuickSort();
+ quickSort.sort(sortableObj, 0, memBlock.currentPtr);
+ }
+
+ protected void sortIndividualMemoryBlock(List<MemoryBlock> memBlks) {
+ if (memBlks == null) {
+ return;
+ }
+ for (MemoryBlock memBlk : memBlks) {
+ if (memBlk != null) {
+ sortMemBlock(memBlk);
+ }
+ }
+ }
+
+ public int getCollectedRecordsNum() {
+ return collectedRecordsNum;
+ }
+
+ public int getCollectedBytesSize() {
+ return collectedBytesSize;
+ }
+
+ abstract void groupOrSort();
+
+ public abstract KeyValueSpillIterator getKeyValueSpillIterator();
+
+ public abstract IndexRecord spill(JobConf job, FSDataOutputStream out,
+ Class<K> keyClass, Class<V> valClass, CompressionCodec codec,
+ Counter spillCounter) throws IOException;
+
+ public abstract int collect(K key, V value) throws IOException;
+
+ @Override
+ public MemoryBlock getCurrentOpenMemoryBlock() {
+ return currentBlock;
+ }
+
+ @Override
+ public List<MemoryBlock> getClosedMemoryBlocks() {
+ return memoryBlocks;
+ }
+}
Oops, something went wrong.

0 comments on commit c47715a

Please sign in to comment.