Skip to content
This repository
Browse code

rebase mapred

  • Loading branch information...
commit c47715a20251380f3c1783dc37efc4894eda37b7 1 parent 26fc9ba
authored June 11, 2012

Showing 57 changed files with 7,254 additions and 1,643 deletions. Show diff stats Hide diff stats

  1. 157  src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java
  2. 520  src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java
  3. 28  src/mapred/org/apache/hadoop/mapred/BlockMapOutputCollector.java
  4. 23  src/mapred/org/apache/hadoop/mapred/Child.java
  5. 33  src/mapred/org/apache/hadoop/mapred/ChildMemoryBlock.java
  6. 9  src/mapred/org/apache/hadoop/mapred/CompositeTaskTrackerInstrumentation.java
  7. 19  src/mapred/org/apache/hadoop/mapred/Counters.java
  8. 37  src/mapred/org/apache/hadoop/mapred/DirectTaskUmbilical.java
  9. 116  src/mapred/org/apache/hadoop/mapred/ExpireUnusedFilesInCache.java
  10. 13  src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
  11. 9  src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
  12. 29  src/mapred/org/apache/hadoop/mapred/IFile.java
  13. 238  src/mapred/org/apache/hadoop/mapred/JSPUtil.java
  14. 451  src/mapred/org/apache/hadoop/mapred/JobClient.java
  15. 5  src/mapred/org/apache/hadoop/mapred/JobConf.java
  16. 157  src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  17. 3  src/mapred/org/apache/hadoop/mapred/JobInProgressTraits.java
  18. 438  src/mapred/org/apache/hadoop/mapred/JobStats.java
  19. 173  src/mapred/org/apache/hadoop/mapred/JobTracker.java
  20. 83  src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
  21. 266  src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
  22. 46  src/mapred/org/apache/hadoop/mapred/JobTrackerReconfigurable.java
  23. 23  src/mapred/org/apache/hadoop/mapred/KeyValueSpillIterator.java
  24. 239  src/mapred/org/apache/hadoop/mapred/LexicographicalComparerHolder.java
  25. 110  src/mapred/org/apache/hadoop/mapred/MapSpillSortCounters.java
  26. 15  src/mapred/org/apache/hadoop/mapred/MapTask.java
  27. 44  src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
  28. 135  src/mapred/org/apache/hadoop/mapred/MapperWaitThread.java
  29. 195  src/mapred/org/apache/hadoop/mapred/MemoryBlock.java
  30. 383  src/mapred/org/apache/hadoop/mapred/MemoryBlockAllocator.java
  31. 25  src/mapred/org/apache/hadoop/mapred/MemoryBlockHolder.java
  32. 46  src/mapred/org/apache/hadoop/mapred/MemoryBlockIndex.java
  33. 33  src/mapred/org/apache/hadoop/mapred/MemoryBlockTooSmallException.java
  34. 4  src/mapred/org/apache/hadoop/mapred/Merger.java
  35. 53  src/mapred/org/apache/hadoop/mapred/NettyMapOutputAttributes.java
  36. 329  src/mapred/org/apache/hadoop/mapred/PoolFairnessCalculator.java
  37. 89  src/mapred/org/apache/hadoop/mapred/PoolMetadata.java
  38. 280  src/mapred/org/apache/hadoop/mapred/ReducePartition.java
  39. 1,043  src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  40. 106  src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
  41. 123  src/mapred/org/apache/hadoop/mapred/ResourceMetadata.java
  42. 47  src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java
  43. 283  src/mapred/org/apache/hadoop/mapred/ShuffleHandler.java
  44. 301  src/mapred/org/apache/hadoop/mapred/SimulatedTaskRunner.java
  45. 54  src/mapred/org/apache/hadoop/mapred/Task.java
  46. 6  src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java
  47. 228  src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  48. 13  src/mapred/org/apache/hadoop/mapred/TaskLog.java
  49. 1  src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  50. 4  src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
  51. 53  src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
  52. 166  src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  53. 17  src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
  54. 28  src/mapred/org/apache/hadoop/mapred/TaskStatus.java
  55. 1,468  src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  56. 27  src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
  57. 73  src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
157  src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java
... ...
@@ -0,0 +1,157 @@
  1
+/**
  2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  3
+ * contributor license agreements.  See the NOTICE file distributed with
  4
+ * this work for additional information regarding copyright ownership.
  5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
  6
+ * (the "License"); you may not use this file except in compliance with
  7
+ * the License.  You may obtain a copy of the License at
  8
+ *
  9
+ *     http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS,
  13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14
+ * See the License for the specific language governing permissions and
  15
+ * limitations under the License.
  16
+ */
  17
+package org.apache.hadoop.mapred;
  18
+
  19
+import java.io.IOException;
  20
+import java.util.ArrayList;
  21
+import java.util.List;
  22
+
  23
+import org.apache.hadoop.fs.FSDataOutputStream;
  24
+import org.apache.hadoop.io.BytesWritable;
  25
+import org.apache.hadoop.io.compress.CompressionCodec;
  26
+import org.apache.hadoop.mapred.Counters.Counter;
  27
+import org.apache.hadoop.mapred.Task.TaskReporter;
  28
+import org.apache.hadoop.util.IndexedSortable;
  29
+import org.apache.hadoop.util.QuickSort;
  30
+
  31
+public abstract class BasicReducePartition<K extends BytesWritable, V extends BytesWritable>
  32
+    implements MemoryBlockHolder {
  33
+
  34
+  class OffsetSortable implements IndexedSortable {
  35
+
  36
+    private int[] offsets;
  37
+    private int[] keyLenArray;
  38
+    private int[] valLenArray;
  39
+    private byte[] kvbuffer;
  40
+
  41
+    public OffsetSortable(int[] offsets, int[] keyLenArray, int[] valLenArray,
  42
+        byte[] kvbuffer) {
  43
+      this.offsets = offsets;
  44
+      this.keyLenArray = keyLenArray;
  45
+      this.valLenArray = valLenArray;
  46
+      this.kvbuffer = kvbuffer;
  47
+    }
  48
+
  49
+    public OffsetSortable(MemoryBlock memBlock, byte[] kvbuffer) {
  50
+      this.offsets = memBlock.getOffsets();
  51
+      this.keyLenArray = memBlock.getKeyLenArray();
  52
+      this.valLenArray = memBlock.getValueLenArray();
  53
+      this.kvbuffer = kvbuffer;
  54
+    }
  55
+
  56
+    @Override
  57
+    public int compare(int i, int j) {
  58
+      return LexicographicalComparerHolder.compareBytes(kvbuffer, offsets[i],
  59
+          keyLenArray[i], offsets[j], keyLenArray[j]);
  60
+    }
  61
+
  62
+    @Override
  63
+    public void swap(int i, int j) {
  64
+      swapElement(offsets, i, j);
  65
+      swapElement(keyLenArray, i, j);
  66
+      swapElement(valLenArray, i, j);
  67
+    }
  68
+
  69
+    private void swapElement(int[] array, int i, int j) {
  70
+      int tmp = array[i];
  71
+      array[i] = array[j];
  72
+      array[j] = tmp;
  73
+    }
  74
+  }
  75
+
  76
+  protected MemoryBlockAllocator memoryBlockAllocator;
  77
+  protected byte[] kvbuffer;
  78
+  protected final TaskReporter reporter;
  79
+  protected BlockMapOutputCollector<K, V> collector;
  80
+
  81
+  protected int partition;
  82
+
  83
+  protected int collectedBytesSize;
  84
+  protected int collectedRecordsNum;
  85
+  
  86
+  protected MemoryBlock currentBlock;
  87
+  protected List<MemoryBlock> memoryBlocks;
  88
+
  89
+
  90
+  public BasicReducePartition(int reduceNum,
  91
+      MemoryBlockAllocator memoryBlockAllocator, byte[] kvBuffer,
  92
+      BlockMapOutputCollector<K, V> collector, TaskReporter reporter)
  93
+      throws IOException {
  94
+    this.partition = reduceNum;
  95
+    this.collectedBytesSize = 0;
  96
+    this.collectedRecordsNum = 0;
  97
+    this.memoryBlockAllocator = memoryBlockAllocator;
  98
+    this.kvbuffer = kvBuffer;
  99
+    this.collector = collector;
  100
+    this.reporter = reporter;
  101
+    this.memoryBlockAllocator.registerMemoryBlockHolder(this);
  102
+    initMemoryBlocks();
  103
+  }
  104
+  
  105
+  protected void initMemoryBlocks() {
  106
+    memoryBlocks = new ArrayList<MemoryBlock>();
  107
+  }
  108
+
  109
+  protected void sortMemBlock(MemoryBlock memBlock) {
  110
+    if (memBlock.currentPtr <= 0) {
  111
+      return;
  112
+    }
  113
+    // quick sort the offsets
  114
+    OffsetSortable sortableObj = new OffsetSortable(memBlock, kvbuffer);
  115
+    QuickSort quickSort = new QuickSort();
  116
+    quickSort.sort(sortableObj, 0, memBlock.currentPtr);
  117
+  }
  118
+
  119
+  protected void sortIndividualMemoryBlock(List<MemoryBlock> memBlks) {
  120
+    if (memBlks == null) {
  121
+      return;
  122
+    }
  123
+    for (MemoryBlock memBlk : memBlks) {
  124
+      if (memBlk != null) {
  125
+        sortMemBlock(memBlk);
  126
+      }
  127
+    }
  128
+  }
  129
+
  130
+  public int getCollectedRecordsNum() {
  131
+    return collectedRecordsNum;
  132
+  }
  133
+
  134
+  public int getCollectedBytesSize() {
  135
+    return collectedBytesSize;
  136
+  }
  137
+
  138
+  abstract void groupOrSort();
  139
+
  140
+  public abstract KeyValueSpillIterator getKeyValueSpillIterator();
  141
+
  142
+  public abstract IndexRecord spill(JobConf job, FSDataOutputStream out,
  143
+      Class<K> keyClass, Class<V> valClass, CompressionCodec codec,
  144
+      Counter spillCounter) throws IOException;
  145
+  
  146
+  public abstract int collect(K key, V value) throws IOException;
  147
+
  148
+  @Override
  149
+  public MemoryBlock getCurrentOpenMemoryBlock() {
  150
+    return currentBlock;
  151
+  }
  152
+
  153
+  @Override
  154
+  public List<MemoryBlock> getClosedMemoryBlocks() {
  155
+    return memoryBlocks;
  156
+  }
  157
+}
520  src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java
... ...
@@ -0,0 +1,520 @@
  1
+/**
  2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  3
+ * contributor license agreements.  See the NOTICE file distributed with
  4
+ * this work for additional information regarding copyright ownership.
  5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
  6
+ * (the "License"); you may not use this file except in compliance with
  7
+ * the License.  You may obtain a copy of the License at
  8
+ *
  9
+ *     http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS,
  13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14
+ * See the License for the specific language governing permissions and
  15
+ * limitations under the License.
  16
+ */
  17
+
  18
+package org.apache.hadoop.mapred;
  19
+
  20
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
  21
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
  22
+
  23
+import java.io.IOException;
  24
+import java.util.ArrayList;
  25
+import java.util.List;
  26
+
  27
+import org.apache.commons.logging.Log;
  28
+import org.apache.commons.logging.LogFactory;
  29
+import org.apache.hadoop.fs.FSDataOutputStream;
  30
+import org.apache.hadoop.fs.FileSystem;
  31
+import org.apache.hadoop.fs.LocalFileSystem;
  32
+import org.apache.hadoop.fs.Path;
  33
+import org.apache.hadoop.io.BytesWritable;
  34
+import org.apache.hadoop.io.RawComparator;
  35
+import org.apache.hadoop.io.WritableUtils;
  36
+import org.apache.hadoop.io.compress.CompressionCodec;
  37
+import org.apache.hadoop.io.compress.DefaultCodec;
  38
+import org.apache.hadoop.mapred.IFile.Writer;
  39
+import org.apache.hadoop.mapred.Merger.Segment;
  40
+import org.apache.hadoop.mapred.Task.TaskReporter;
  41
+import org.apache.hadoop.util.ReflectionUtils;
  42
+import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues;
  43
+
  44
+public class BlockMapOutputBuffer<K extends BytesWritable, V extends BytesWritable>
  45
+    implements BlockMapOutputCollector<K, V> {
  46
+
  47
+  private static final Log LOG = LogFactory.getLog(BlockMapOutputBuffer.class.getName());
  48
+
  49
+  private final Partitioner<K, V> partitioner;
  50
+  private final int partitions;
  51
+  private final JobConf job;
  52
+  private final TaskReporter reporter;
  53
+  private final Class<K> keyClass;
  54
+  private final Class<V> valClass;
  55
+  private final int softBufferLimit;
  56
+  // Compression for map-outputs
  57
+  private CompressionCodec codec = null;
  58
+  // main output buffer
  59
+  private byte[] kvbuffer;
  60
+  private int kvBufferSize;
  61
+  // spill accounting
  62
+  private volatile int numSpills = 0;
  63
+  // number of spills for big records
  64
+  private volatile int numBigRecordsSpills = 0;
  65
+  private volatile int numBigRecordsWarnThreshold = 500;
  66
+
  67
+  private final FileSystem localFs;
  68
+  private final FileSystem rfs;
  69
+  private final Counters.Counter mapOutputByteCounter;
  70
+  private final Counters.Counter mapOutputRecordCounter;
  71
+  private MapSpillSortCounters mapSpillSortCounter;
  72
+
  73
+  private MapTask task;
  74
+  private ReducePartition<K, V>[] reducePartitions;
  75
+  private ArrayList<SpillRecord> indexCacheList;
  76
+  // an array of memory segments, one for each reduce partition.
  77
+  private Segment<K,V>[] inMemorySegments;
  78
+  private boolean hasInMemorySpill;
  79
+  private boolean lastSpillInMem;
  80
+
  81
+  private int totalIndexCacheMemory;
  82
+  private static final int INDEX_CACHE_MEMORY_LIMIT = 2 * 1024 * 1024;
  83
+  private final MemoryBlockAllocator memoryBlockAllocator;
  84
+
  85
+  @SuppressWarnings( { "unchecked", "deprecation" })
  86
+  public BlockMapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
  87
+      TaskReporter reporter, MapTask task) throws IOException,
  88
+      ClassNotFoundException {
  89
+    this.task = task;
  90
+    this.job = job;
  91
+    this.reporter = reporter;
  92
+    localFs = FileSystem.getLocal(job);
  93
+    partitions = job.getNumReduceTasks();
  94
+    indexCacheList = new ArrayList<SpillRecord>();
  95
+    if (partitions > 0) {
  96
+      partitioner = (Partitioner<K, V>) ReflectionUtils.newInstance(job
  97
+          .getPartitionerClass(), job);
  98
+    } else {
  99
+      partitioner = new Partitioner() {
  100
+        @Override
  101
+        public int getPartition(Object key, Object value, int numPartitions) {
  102
+          return -1;
  103
+        }
  104
+
  105
+        @Override
  106
+        public void configure(JobConf job) {
  107
+        }
  108
+      };
  109
+    }
  110
+    rfs = ((LocalFileSystem) localFs).getRaw();
  111
+
  112
+    float spillper = job.getFloat("io.sort.spill.percent", (float) 0.9);
  113
+    if (spillper > (float) 1.0 || spillper < (float) 0.0) {
  114
+      LOG.error("Invalid \"io.sort.spill.percent\": " + spillper);
  115
+      spillper = 0.8f;
  116
+    }
  117
+    
  118
+    lastSpillInMem = job.getBoolean("mapred.map.lastspill.memory", true);
  119
+    numBigRecordsWarnThreshold =
  120
+        job.getInt("mapred.map.bigrecord.spill.warn.threshold", 500);
  121
+
  122
+    int sortmb = job.getInt("io.sort.mb", 100);
  123
+    boolean localMode = job.get("mapred.job.tracker", "local").equals("local");
  124
+    if (localMode) {
  125
+      sortmb = job.getInt("io.sort.mb.localmode", 100);
  126
+    }
  127
+    if ((sortmb & 0x7FF) != sortmb) {
  128
+      throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
  129
+    }
  130
+    LOG.info("io.sort.mb = " + sortmb);
  131
+    // buffers and accounting
  132
+    kvBufferSize = sortmb << 20;
  133
+    kvbuffer = new byte[kvBufferSize];
  134
+    softBufferLimit = (int) (kvbuffer.length * spillper);
  135
+    // k/v serialization
  136
+    keyClass = (Class<K>) job.getMapOutputKeyClass();
  137
+    valClass = (Class<V>) job.getMapOutputValueClass();
  138
+    if (!BytesWritable.class.isAssignableFrom(keyClass)
  139
+        || !BytesWritable.class.isAssignableFrom(valClass)) {
  140
+      throw new IOException(this.getClass().getName()
  141
+          + "  only support " + BytesWritable.class.getName()
  142
+          + " as key and value classes, MapOutputKeyClass is "
  143
+          + keyClass.getName() + ", MapOutputValueClass is "
  144
+          + valClass.getName());
  145
+    }
  146
+
  147
+    int numMappers = job.getNumMapTasks();
  148
+    memoryBlockAllocator =
  149
+        new MemoryBlockAllocator(kvBufferSize, softBufferLimit, numMappers,
  150
+            partitions, this);
  151
+
  152
+    // counters
  153
+    mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
  154
+    mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
  155
+    mapSpillSortCounter = new MapSpillSortCounters(reporter);
  156
+
  157
+    reducePartitions = new ReducePartition[partitions];
  158
+    inMemorySegments = new Segment[partitions];
  159
+    for (int i = 0; i < partitions; i++) {
  160
+      reducePartitions[i] = new ReducePartition(i, this.memoryBlockAllocator,
  161
+          this.kvbuffer, this, this.reporter);
  162
+    }     
  163
+    // compression
  164
+    if (job.getCompressMapOutput()) {
  165
+      Class<? extends CompressionCodec> codecClass = job
  166
+          .getMapOutputCompressorClass(DefaultCodec.class);
  167
+      codec = ReflectionUtils.newInstance(codecClass, job);
  168
+    }
  169
+  }
  170
+
  171
+  private TaskAttemptID getTaskID() {
  172
+    return task.getTaskID();
  173
+  }
  174
+
  175
+  public void collect(K key, V value, int partition) throws IOException {
  176
+    reporter.progress();
  177
+    if (key.getClass() != keyClass) {
  178
+      throw new IOException("Type mismatch in key from map: expected "
  179
+          + keyClass.getName() + ", recieved " + key.getClass().getName());
  180
+    }
  181
+    if (value.getClass() != valClass) {
  182
+      throw new IOException("Type mismatch in value from map: expected "
  183
+          + valClass.getName() + ", recieved " + value.getClass().getName());
  184
+    }
  185
+    int collected = reducePartitions[partition].collect(key, value);
  186
+    mapOutputRecordCounter.increment(1);
  187
+    mapOutputByteCounter.increment(collected);
  188
+  }
  189
+
  190
+  @SuppressWarnings("deprecation")
  191
+  @Override
  192
+  public void collect(K key, V value) throws IOException {
  193
+    collect(key, value, partitioner.getPartition(key, value,
  194
+        partitions));
  195
+  }
  196
+
  197
+  /*
  198
+   * return the value of ProcResourceValues for later use
  199
+   */
  200
+  protected ProcResourceValues sortReduceParts() {
  201
+    long sortStartMilli = System.currentTimeMillis();
  202
+    ProcResourceValues sortStartProcVals =
  203
+        task.getCurrentProcResourceValues();
  204
+    // sort
  205
+    for (int i = 0; i < reducePartitions.length; i++) {
  206
+      reducePartitions[i].groupOrSort();
  207
+    }
  208
+    long sortEndMilli = System.currentTimeMillis();
  209
+    ProcResourceValues sortEndProcVals =
  210
+        task.getCurrentProcResourceValues();
  211
+    mapSpillSortCounter.incCountersPerSort(sortStartProcVals,
  212
+        sortEndProcVals, sortEndMilli - sortStartMilli);
  213
+    return sortEndProcVals;
  214
+  }
  215
+
  216
+  @Override
  217
+  public void sortAndSpill() throws IOException {
  218
+    ProcResourceValues sortEndProcVals = sortReduceParts();
  219
+    long sortEndMilli = System.currentTimeMillis();
  220
+    // spill
  221
+    FSDataOutputStream out = null;
  222
+    long spillBytes = 0;
  223
+    try {
  224
+      // create spill file
  225
+      final SpillRecord spillRec = new SpillRecord(partitions);
  226
+      final Path filename =
  227
+          task.mapOutputFile
  228
+              .getSpillFileForWrite(getTaskID(), numSpills,
  229
+                  this.memoryBlockAllocator.getEstimatedSize());
  230
+      out = rfs.create(filename);
  231
+      for (int i = 0; i < partitions; ++i) {
  232
+        IndexRecord rec =
  233
+            reducePartitions[i].spill(job, out, keyClass, valClass,
  234
+                codec, task.spilledRecordsCounter);
  235
+        // record offsets
  236
+        spillBytes += rec.partLength;
  237
+        spillRec.putIndex(rec, i);
  238
+      }
  239
+
  240
+      if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
  241
+        // create spill index file
  242
+        Path indexFilename =
  243
+            task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
  244
+                numSpills, partitions
  245
+                    * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
  246
+        spillRec.writeToFile(indexFilename, job);
  247
+      } else {
  248
+        indexCacheList.add(spillRec);
  249
+        totalIndexCacheMemory +=
  250
+            spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
  251
+      }
  252
+      LOG.info("Finished spill " + numSpills);
  253
+      ++numSpills;
  254
+    } finally {
  255
+      if (out != null)
  256
+        out.close();
  257
+    }
  258
+
  259
+    long spillEndMilli = System.currentTimeMillis();
  260
+    ProcResourceValues spillEndProcVals =
  261
+        task.getCurrentProcResourceValues();
  262
+    mapSpillSortCounter.incCountersPerSpill(sortEndProcVals,
  263
+        spillEndProcVals, spillEndMilli - sortEndMilli, spillBytes);
  264
+  }
  265
+
  266
+  public void spillSingleRecord(K key, V value, int part)
  267
+      throws IOException {
  268
+
  269
+    ProcResourceValues spillStartProcVals =
  270
+        task.getCurrentProcResourceValues();
  271
+    long spillStartMilli = System.currentTimeMillis();
  272
+    // spill
  273
+    FSDataOutputStream out = null;
  274
+    long spillBytes = 0;
  275
+    try {
  276
+      // create spill file
  277
+      final SpillRecord spillRec = new SpillRecord(partitions);
  278
+      final Path filename =
  279
+          task.mapOutputFile.getSpillFileForWrite(getTaskID(),
  280
+              numSpills, key.getLength() + value.getLength());
  281
+      out = rfs.create(filename);
  282
+      IndexRecord rec = new IndexRecord();
  283
+      for (int i = 0; i < partitions; ++i) {
  284
+        IFile.Writer<K, V> writer = null;
  285
+        try {
  286
+          long segmentStart = out.getPos();
  287
+          // Create a new codec, don't care!
  288
+          writer =
  289
+              new IFile.Writer<K, V>(job, out, keyClass, valClass,
  290
+                  codec, task.spilledRecordsCounter);
  291
+          if (i == part) {
  292
+            final long recordStart = out.getPos();
  293
+            writer.append(key, value);
  294
+            // Note that our map byte count will not be accurate with
  295
+            // compression
  296
+            mapOutputByteCounter
  297
+                .increment(out.getPos() - recordStart);
  298
+          }
  299
+          writer.close();
  300
+
  301
+          // record offsets
  302
+          rec.startOffset = segmentStart;
  303
+          rec.rawLength = writer.getRawLength();
  304
+          rec.partLength = writer.getCompressedLength();
  305
+          spillBytes += writer.getCompressedLength();
  306
+          spillRec.putIndex(rec, i);
  307
+          writer = null;
  308
+        } catch (IOException e) {
  309
+          if (null != writer)
  310
+            writer.close();
  311
+          throw e;
  312
+        }
  313
+      }
  314
+
  315
+      if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
  316
+        // create spill index file
  317
+        Path indexFilename =
  318
+            task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
  319
+                numSpills, partitions
  320
+                    * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
  321
+        spillRec.writeToFile(indexFilename, job);
  322
+      } else {
  323
+        indexCacheList.add(spillRec);
  324
+        totalIndexCacheMemory +=
  325
+            spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
  326
+      }
  327
+      
  328
+      LOG.info("Finished spill big record " + numBigRecordsSpills);
  329
+      ++numBigRecordsSpills;
  330
+      ++numSpills;
  331
+    } finally {
  332
+      if (out != null)
  333
+        out.close();
  334
+    }
  335
+
  336
+    long spillEndMilli = System.currentTimeMillis();
  337
+    ProcResourceValues spillEndProcVals =
  338
+        task.getCurrentProcResourceValues();
  339
+    mapSpillSortCounter.incCountersPerSpill(spillStartProcVals,
  340
+        spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
  341
+    mapSpillSortCounter.incSpillSingleRecord();
  342
+  }
  343
+  
  344
+  public synchronized void flush() throws IOException, ClassNotFoundException,
  345
+      InterruptedException {
  346
+    if (numSpills > 0 && lastSpillInMem) {
  347
+      // if there is already one spills, we can try to hold this last spill in
  348
+      // memory.
  349
+      sortReduceParts();
  350
+      for (int i = 0; i < partitions; i++) {
  351
+        this.inMemorySegments[i] =
  352
+            new Segment<K, V>(this.reducePartitions[i].getIReader(),
  353
+                true);
  354
+      }
  355
+      hasInMemorySpill=true;
  356
+    } else {
  357
+      sortAndSpill();      
  358
+    }
  359
+    long mergeStartMilli = System.currentTimeMillis();
  360
+    ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
  361
+    mergeParts();
  362
+    long mergeEndMilli = System.currentTimeMillis();
  363
+    ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
  364
+    mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
  365
+        mergeEndMilli - mergeStartMilli);
  366
+  }
  367
+
  368
+  private void mergeParts() throws IOException, InterruptedException,
  369
+      ClassNotFoundException {
  370
+    // get the approximate size of the final output/index files
  371
+    long finalOutFileSize = 0;
  372
+    long finalIndexFileSize = 0;
  373
+    final Path[] filename = new Path[numSpills];
  374
+    final TaskAttemptID mapId = getTaskID();
  375
+
  376
+    for (int i = 0; i < numSpills; i++) {
  377
+      filename[i] = task.mapOutputFile.getSpillFile(mapId, i);
  378
+      finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
  379
+    }
  380
+
  381
+    for (Segment<K, V> segement : this.inMemorySegments) {
  382
+      if(segement != null) {
  383
+        finalOutFileSize += segement.getLength();        
  384
+      }
  385
+    }
  386
+
  387
+    // the spill is the final output
  388
+    if (numSpills == 1 && !hasInMemorySpill) {
  389
+      Path outFile = new Path(filename[0].getParent(), "file.out");
  390
+      rfs.rename(filename[0], outFile);
  391
+      if (indexCacheList.size() == 0) {
  392
+        rfs.rename(task.mapOutputFile.getSpillIndexFile(mapId, 0), new Path(
  393
+            filename[0].getParent(), "file.out.index"));
  394
+      } else {
  395
+        indexCacheList.get(0).writeToFile(
  396
+            new Path(filename[0].getParent(), "file.out.index"), job);
  397
+      }
  398
+      return;
  399
+    }
  400
+
  401
+    // read in paged indices
  402
+    for (int i = indexCacheList.size(); i < numSpills; ++i) {
  403
+      Path indexFileName = task.mapOutputFile.getSpillIndexFile(mapId, i);
  404
+      indexCacheList.add(new SpillRecord(indexFileName, job));
  405
+    }
  406
+
  407
+    // make correction in the length to include the file header
  408
+    // lengths for each partition
  409
+    finalOutFileSize += partitions * MapTask.APPROX_HEADER_LENGTH;
  410
+    finalIndexFileSize = partitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
  411
+    Path finalOutputFile = task.mapOutputFile.getOutputFileForWrite(mapId,
  412
+        finalOutFileSize);
  413
+    Path finalIndexFile = task.mapOutputFile.getOutputIndexFileForWrite(mapId,
  414
+        finalIndexFileSize);
  415
+
  416
+    // The output stream for the final single output file
  417
+    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
  418
+
  419
+    if (numSpills == 0) {
  420
+      // create dummy files
  421
+      IndexRecord rec = new IndexRecord();
  422
+      SpillRecord sr = new SpillRecord(partitions);
  423
+      try {
  424
+        for (int i = 0; i < partitions; i++) {
  425
+          long segmentStart = finalOut.getPos();
  426
+          Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
  427
+              valClass, codec, null);
  428
+          writer.close();
  429
+          rec.startOffset = segmentStart;
  430
+          rec.rawLength = writer.getRawLength();
  431
+          rec.partLength = writer.getCompressedLength();
  432
+          sr.putIndex(rec, i);
  433
+        }
  434
+        sr.writeToFile(finalIndexFile, job);
  435
+      } finally {
  436
+        finalOut.close();
  437
+      }
  438
+      return;
  439
+    }
  440
+    {
  441
+      IndexRecord rec = new IndexRecord();
  442
+      final SpillRecord spillRec = new SpillRecord(partitions);
  443
+      for (int parts = 0; parts < partitions; parts++) {
  444
+        // create the segments to be merged
  445
+        List<Segment<K, V>> segmentList = new ArrayList<Segment<K, V>>(
  446
+            numSpills + this.inMemorySegments.length);
  447
+        for (int i = 0; i < numSpills; i++) {
  448
+          IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
  449
+          Segment<K, V> s = new Segment<K, V>(job, rfs, filename[i],
  450
+              indexRecord.startOffset, indexRecord.partLength, codec, true);
  451
+          segmentList.add(i, s);
  452
+          if (LOG.isDebugEnabled()) {
  453
+            LOG.debug("MapId=" + mapId + " Reducer=" + parts + "Spill =" + i
  454
+                + "(" + indexRecord.startOffset + "," + indexRecord.rawLength
  455
+                + ", " + indexRecord.partLength + ")");
  456
+          }
  457
+        }
  458
+        
  459
+        if(this.inMemorySegments[parts] != null) {
  460
+          // add the in memory spill to the end of segmentList
  461
+          segmentList.add(numSpills, this.inMemorySegments[parts]);
  462
+        }
  463
+        
  464
+        // merge
  465
+        RawKeyValueIterator kvIter =
  466
+            Merger.merge(job, rfs, keyClass, valClass, codec,
  467
+                segmentList, job.getInt("io.sort.factor", 100),
  468
+                new Path(mapId.toString()), new RawComparator<K>() {
  469
+                  @Override
  470
+                  public int compare(byte[] b1, int s1, int l1,
  471
+                      byte[] b2, int s2, int l2) {
  472
+                    return LexicographicalComparerHolder.BEST_COMPARER
  473
+                        .compareTo(
  474
+                            b1, 
  475
+                            s1 + WritableUtils.INT_LENGTH_BYTES, 
  476
+                            l1 - WritableUtils.INT_LENGTH_BYTES, 
  477
+                            b2, 
  478
+                            s2 + WritableUtils.INT_LENGTH_BYTES, 
  479
+                            l2 - WritableUtils.INT_LENGTH_BYTES
  480
+                            );
  481
+                  }
  482
+
  483
+                  @Override
  484
+                  public int compare(K o1, K o2) {
  485
+                    return LexicographicalComparerHolder.BEST_COMPARER
  486
+                        .compareTo( o1.getBytes(), 0, o1.getLength(), 
  487
+                            o2.getBytes(), 0, o2.getLength());
  488
+                  }
  489
+                },  reporter, null,
  490
+                task.spilledRecordsCounter);
  491
+
  492
+        // write merged output to disk
  493
+        long segmentStart = finalOut.getPos();
  494
+        Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
  495
+            valClass, codec, task.spilledRecordsCounter);
  496
+        Merger.writeFile(kvIter, writer, reporter, job);
  497
+        // close
  498
+        writer.close();
  499
+        // record offsets
  500
+        rec.startOffset = segmentStart;
  501
+        rec.rawLength = writer.getRawLength();
  502
+        rec.partLength = writer.getCompressedLength();
  503
+        spillRec.putIndex(rec, parts);
  504
+      }
  505
+      spillRec.writeToFile(finalIndexFile, job);
  506
+      finalOut.close();
  507
+      for (int i = 0; i < numSpills; i++) {
  508
+        rfs.delete(filename[i], true);
  509
+      }
  510
+    }
  511
+  }
  512
+
  513
+  public void close() {
  514
+    this.mapSpillSortCounter.finalCounterUpdate();
  515
+    if(numBigRecordsSpills > numBigRecordsWarnThreshold) {
  516
+      LOG.warn("Spilled a large number of big records: "
  517
+          + numBigRecordsSpills);
  518
+    }
  519
+  }
  520
+}
28  src/mapred/org/apache/hadoop/mapred/BlockMapOutputCollector.java
... ...
@@ -0,0 +1,28 @@
  1
+/**
  2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  3
+ * contributor license agreements.  See the NOTICE file distributed with
  4
+ * this work for additional information regarding copyright ownership.
  5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
  6
+ * (the "License"); you may not use this file except in compliance with
  7
+ * the License.  You may obtain a copy of the License at
  8
+ *
  9
+ *     http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS,
  13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14
+ * See the License for the specific language governing permissions and
  15
+ * limitations under the License.
  16
+ */
  17
+package org.apache.hadoop.mapred;
  18
+
  19
+import java.io.IOException;
  20
+
  21
+import org.apache.hadoop.mapred.MapTask.MapOutputCollector;
  22
+
  23
+public interface BlockMapOutputCollector<K, V> extends
  24
+    MapOutputCollector<K, V>, OutputCollector<K, V> {
  25
+  public void spillSingleRecord(K key, V value, int part) throws IOException;
  26
+
  27
+  public void sortAndSpill() throws IOException;
  28
+}
23  src/mapred/org/apache/hadoop/mapred/Child.java
@@ -218,26 +218,11 @@ private static TaskUmbilicalProtocol convertToDirectUmbilicalIfNecessary(
218 218
       String jtHost = hostPortPair[0];
219 219
       int jtPort = Integer.parseInt(hostPortPair[1]);
220 220
       InetSocketAddress addr = new InetSocketAddress(jtHost, jtPort);
221  
-      umbilical = createDirectUmbilical(umbilical, addr, job);
  221
+      DirectTaskUmbilical d = DirectTaskUmbilical.createDirectUmbilical(
  222
+        umbilical, addr, job);
  223
+      proxiesCreated.addAll(d.getCreatedProxies());
  224
+      return d;
222 225
     }
223 226
     return umbilical;
224 227
   }
225  
-
226  
-  private static TaskUmbilicalProtocol createDirectUmbilical(
227  
-      TaskUmbilicalProtocol taskTracker,
228  
-      InetSocketAddress jobTrackerAddress, JobConf conf) throws IOException {
229  
-    
230  
-    LOG.info("Creating direct umbilical to " + jobTrackerAddress.toString());
231  
-    long jtConnectTimeoutMsec = conf.getLong(
232  
-        "corona.jobtracker.connect.timeout.msec", 60000L);
233  
-
234  
-    InterTrackerProtocol jobClient =
235  
-        (InterTrackerProtocol) RPC.waitForProtocolProxy(
236  
-        InterTrackerProtocol.class,
237  
-        InterTrackerProtocol.versionID,
238  
-        jobTrackerAddress, conf, jtConnectTimeoutMsec).getProxy();
239  
-
240  
-    proxiesCreated.add(jobClient);
241  
-    return new DirectTaskUmbilical(taskTracker, jobClient);
242  
-  }
243 228
 }
33  src/mapred/org/apache/hadoop/mapred/ChildMemoryBlock.java
... ...
@@ -0,0 +1,33 @@
  1
+/**
  2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  3
+ * contributor license agreements.  See the NOTICE file distributed with
  4
+ * this work for additional information regarding copyright ownership.
  5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
  6
+ * (the "License"); you may not use this file except in compliance with
  7
+ * the License.  You may obtain a copy of the License at
  8
+ *
  9
+ *     http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS,
  13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14
+ * See the License for the specific language governing permissions and
  15
+ * limitations under the License.
  16
+ */
  17
+package org.apache.hadoop.mapred;
  18
+
  19
+class ChildMemoryBlock extends MemoryBlock {
  20
+
  21
+  private MemoryBlock parentMemoryBlock;
  22
+
  23
+  public ChildMemoryBlock(int startOffset, int allocateSize,
  24
+      MemoryBlockAllocator memoryBlockAllocator, int elemNum,
  25
+      MemoryBlock blk) {
  26
+    super(startOffset, allocateSize, memoryBlockAllocator, elemNum);
  27
+    this.parentMemoryBlock = blk;
  28
+  }
  29
+
  30
+  public MemoryBlock getParentMemoryBlock() {
  31
+    return parentMemoryBlock;
  32
+  }
  33
+}
9  src/mapred/org/apache/hadoop/mapred/CompositeTaskTrackerInstrumentation.java
@@ -28,7 +28,7 @@
28 28
  */
29 29
 class CompositeTaskTrackerInstrumentation extends TaskTrackerInstrumentation {
30 30
   
31  
-  private List<TaskTrackerInstrumentation> instrumentations;
  31
+  private final List<TaskTrackerInstrumentation> instrumentations;
32 32
 
33 33
   public CompositeTaskTrackerInstrumentation(TaskTracker tt,
34 34
       List<TaskTrackerInstrumentation> instrumentations) {
@@ -82,4 +82,11 @@ public void statusUpdate(Task task, TaskStatus taskStatus) {
82 82
       tti.statusUpdate(task, taskStatus);
83 83
     }
84 84
   }
  85
+
  86
+  @Override
  87
+  public void unaccountedMemory(long memory) {
  88
+    for (TaskTrackerInstrumentation tti: instrumentations) {
  89
+      tti.unaccountedMemory(memory);
  90
+    }
  91
+  }
85 92
 }
19  src/mapred/org/apache/hadoop/mapred/Counters.java
@@ -38,6 +38,7 @@
38 38
 import org.apache.hadoop.io.WritableUtils;
39 39
 import org.apache.hadoop.mapreduce.CounterNames;
40 40
 import org.apache.hadoop.util.StringUtils;
  41
+import org.mortbay.util.ajax.JSON;
41 42
 
42 43
 /**
43 44
  * A set of named counters.
@@ -560,6 +561,24 @@ public synchronized String makeCompactString() {
560 561
     }
561 562
     return buffer.toString();
562 563
   }
  564
+
  565
+  /**
  566
+   * Convert a counters object into a JSON string
  567
+   */
  568
+  public synchronized String makeJsonString() {
  569
+    
  570
+    Map<String, Map<String, Long>> data =
  571
+        new HashMap<String, Map<String, Long>>();
  572
+    
  573
+    for (Group group : this) {
  574
+      Map<String, Long> groupData = new HashMap<String, Long>();
  575
+      data.put(group.getDisplayName(), groupData);
  576
+      for (Counter counter : group) {
  577
+        groupData.put(counter.getDisplayName(), counter.getCounter());
  578
+      }
  579
+    }
  580
+    return JSON.toString(data);
  581
+  }
563 582
   
564 583
   /**
565 584
    * Represent the counter in a textual format that can be converted back to 
37  src/mapred/org/apache/hadoop/mapred/DirectTaskUmbilical.java
@@ -3,9 +3,16 @@
3 3
 import java.io.IOException;
4 4
 import java.util.ArrayList;
5 5
 import java.util.List;
  6
+import java.util.Collections;
  7
+import java.net.InetSocketAddress;
6 8
 
  9
+import org.apache.commons.logging.Log;
  10
+import org.apache.commons.logging.LogFactory;
  11
+import org.apache.hadoop.conf.Configuration;
7 12
 import org.apache.hadoop.ipc.ProtocolSignature;
  13
+import org.apache.hadoop.ipc.RPC;
8 14
 import org.apache.hadoop.ipc.RPC.VersionIncompatible;
  15
+import org.apache.hadoop.ipc.VersionedProtocol;
9 16
 import org.apache.hadoop.mapred.SortedRanges.Range;
10 17
 
11 18
 /**
@@ -14,12 +21,42 @@
14 21
  */
15 22
 class DirectTaskUmbilical implements TaskUmbilicalProtocol {
16 23
 
  24
+  public static final Log LOG = LogFactory.getLog(DirectTaskUmbilical.class);
  25
+
17 26
   final private TaskUmbilicalProtocol taskTrackerUmbilical;
18 27
   final private InterTrackerProtocol jobTracker;
19 28
   final private List<TaskCompletionEvent> mapEventFetched;
20 29
   private int totalEventsFetched = 0;
21 30
   static final String MAPRED_DIRECT_TASK_UMBILICAL_ADDRESS = "mapred.direct.task.umbilical.address";
22 31
 
  32
+  public static DirectTaskUmbilical createDirectUmbilical(
  33
+    TaskUmbilicalProtocol taskTracker,
  34
+    InetSocketAddress jobTrackerAddress, JobConf conf) throws IOException {
  35
+
  36
+    LOG.info("Creating direct umbilical to " + jobTrackerAddress.toString());
  37
+    long jtConnectTimeoutMsec = conf.getLong(
  38
+      "corona.jobtracker.connect.timeout.msec", 60000L);
  39
+    int rpcTimeout = (int) jtConnectTimeoutMsec;
  40
+
  41
+    InterTrackerProtocol jobClient = RPC.waitForProxy(
  42
+      InterTrackerProtocol.class,
  43
+      InterTrackerProtocol.versionID,
  44
+      jobTrackerAddress,
  45
+      conf,
  46
+      jtConnectTimeoutMsec,
  47
+      rpcTimeout);
  48
+
  49
+    return new DirectTaskUmbilical(taskTracker, jobClient);
  50
+  }
  51
+
  52
+  public List<VersionedProtocol> getCreatedProxies() {
  53
+    return Collections.singletonList((VersionedProtocol)jobTracker);
  54
+  }
  55
+
  56
+  public void close() {
  57
+    RPC.stopProxy(jobTracker);
  58
+  }
  59
+
23 60
   DirectTaskUmbilical(TaskUmbilicalProtocol taskTrackerUmbilical,
24 61
       InterTrackerProtocol jobTracker) {
25 62
     this.taskTrackerUmbilical = taskTrackerUmbilical;
116  src/mapred/org/apache/hadoop/mapred/ExpireUnusedFilesInCache.java
... ...
@@ -0,0 +1,116 @@
  1
+/*
  2
+ * Licensed to the Apache Software Foundation (ASF) under one
  3
+ * or more contributor license agreements.  See the NOTICE file
  4
+ * distributed with this work for additional information
  5
+ * regarding copyright ownership.  The ASF licenses this file
  6
+ * to you under the Apache License, Version 2.0 (the
  7
+ * "License"); you may not use this file except in compliance
  8
+ * with the License.  You may obtain a copy of the License at
  9
+ *
  10
+ *     http://www.apache.org/licenses/LICENSE-2.0
  11
+ *
  12
+ * Unless required by applicable law or agreed to in writing, software
  13
+ * distributed under the License is distributed on an "AS IS" BASIS,
  14
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15
+ * See the License for the specific language governing permissions and
  16
+ * limitations under the License.
  17
+ */
  18
+package org.apache.hadoop.mapred;
  19
+
  20
+import org.apache.commons.logging.Log;
  21
+import org.apache.commons.logging.LogFactory;
  22
+import org.apache.hadoop.conf.Configuration;
  23
+import org.apache.hadoop.fs.FileStatus;
  24
+import org.apache.hadoop.fs.FileSystem;
  25
+import org.apache.hadoop.fs.Path;
  26
+
  27
+import java.io.IOException;
  28
+import java.util.concurrent.Executors;
  29
+import java.util.concurrent.TimeUnit;
  30
+
  31
+/**
  32
+ * Used to expire files in cache that hasn't been accessed for a while
  33
+ */
  34
+public class ExpireUnusedFilesInCache implements Runnable {
  35
+  /** Logger. */
  36
+  private static final Log LOG =
  37
+    LogFactory.getLog(ExpireUnusedFilesInCache.class);
  38
+
  39
+  /** Configuration. */
  40
+  private final Configuration conf;
  41
+  /** Clock. */
  42
+  private final Clock clock;
  43
+  /** The directories to purge. */
  44
+  private final Path[] cachePath;
  45
+  /** The filesystem to use. */
  46
+  private final FileSystem fs;
  47
+  /** Expire threshold in milliseconds. */
  48
+  private final long expireCacheThreshold;
  49
+
  50
+  /**
  51
+   * Constructor.
  52
+   * @param conf The configuration.
  53
+   * @param clock The clock.
  54
+   * @param systemDir The system directory.
  55
+   * @param fs The filesystem.
  56
+   */
  57
+  public ExpireUnusedFilesInCache(
  58
+    Configuration conf, Clock clock, Path systemDir, FileSystem fs) {
  59
+    this.conf = conf;
  60
+    this.clock = clock;
  61
+    this.fs = fs;
  62
+
  63
+    Path sharedPath = new Path(systemDir, JobSubmissionProtocol.CAR);
  64
+    sharedPath = sharedPath.makeQualified(fs);
  65
+    this.cachePath = new Path[3];
  66
+    this.cachePath[0] = new Path(sharedPath, "files");
  67
+    this.cachePath[1] = new Path(sharedPath, "archives");
  68
+    this.cachePath[2] = new Path(sharedPath, "libjars");
  69
+
  70
+
  71
+    long clearCacheInterval = conf.getLong(
  72
+      "mapred.cache.shared.check_interval",
  73
+      24 * 60 * 60 * 1000);
  74
+
  75
+    expireCacheThreshold =
  76
+      conf.getLong("mapred.cache.shared.expire_threshold",
  77
+        24 * 60 * 60 * 1000);
  78
+    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(
  79
+      this,
  80
+      clearCacheInterval,
  81
+      clearCacheInterval,
  82
+      TimeUnit.MILLISECONDS);
  83
+
  84
+    LOG.info("ExpireUnusedFilesInCache created with " +
  85
+      " sharedPath = " + sharedPath +
  86
+      " clearCacheInterval = " + clearCacheInterval +
  87
+      " expireCacheThreshold = " + expireCacheThreshold);
  88
+  }
  89
+
  90
+  @Override
  91
+  public void run() {
  92
+    long currentTime = clock.getTime();
  93
+
  94
+    for (int i = 0; i < cachePath.length; i++) {
  95
+      try {
  96
+        if (!fs.exists(cachePath[i])) continue;
  97
+
  98
+        FileStatus[] fStatus = fs.listStatus(cachePath[i]);
  99
+
  100
+        for (int j = 0; j < fStatus.length; j++) {
  101
+          if (!fStatus[j].isDir()) {
  102
+            long atime = fStatus[j].getAccessTime();
  103
+
  104
+            if (currentTime - atime > expireCacheThreshold) {
  105
+              fs.delete(fStatus[j].getPath(), false);
  106
+            }
  107
+          }
  108
+        }
  109
+      } catch (IOException ioe) {
  110
+        LOG.error("IOException when clearing cache");
  111
+      }
  112
+    }
  113
+  }
  114
+}
  115
+
  116
+
13  src/mapred/org/apache/hadoop/mapred/FileInputFormat.java