Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

The code fails to compile. Revert "rebase mapred".

This reverts commit c47715a.
  • Loading branch information...
commit 100771f65a6c040f44cfcf6d12a0f15cf312058d 1 parent c47715a
@weiyanwang weiyanwang authored
Showing with 1,643 additions and 7,254 deletions.
  1. +0 −157 src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java
  2. +0 −520 src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java
  3. +0 −28 src/mapred/org/apache/hadoop/mapred/BlockMapOutputCollector.java
  4. +19 −4 src/mapred/org/apache/hadoop/mapred/Child.java
  5. +0 −33 src/mapred/org/apache/hadoop/mapred/ChildMemoryBlock.java
  6. +1 −8 src/mapred/org/apache/hadoop/mapred/CompositeTaskTrackerInstrumentation.java
  7. +0 −19 src/mapred/org/apache/hadoop/mapred/Counters.java
  8. +0 −37 src/mapred/org/apache/hadoop/mapred/DirectTaskUmbilical.java
  9. +0 −116 src/mapred/org/apache/hadoop/mapred/ExpireUnusedFilesInCache.java
  10. +0 −13 src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
  11. +0 −9 src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
  12. +2 −27 src/mapred/org/apache/hadoop/mapred/IFile.java
  13. +100 −138 src/mapred/org/apache/hadoop/mapred/JSPUtil.java
  14. +212 −239 src/mapred/org/apache/hadoop/mapred/JobClient.java
  15. +0 −5 src/mapred/org/apache/hadoop/mapred/JobConf.java
  16. +37 −120 src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  17. +0 −3  src/mapred/org/apache/hadoop/mapred/JobInProgressTraits.java
  18. +0 −438 src/mapred/org/apache/hadoop/mapred/JobStats.java
  19. +108 −65 src/mapred/org/apache/hadoop/mapred/JobTracker.java
  20. +12 −71 src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
  21. +124 −142 src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
  22. +8 −38 src/mapred/org/apache/hadoop/mapred/JobTrackerReconfigurable.java
  23. +0 −23 src/mapred/org/apache/hadoop/mapred/KeyValueSpillIterator.java
  24. +0 −239 src/mapred/org/apache/hadoop/mapred/LexicographicalComparerHolder.java
  25. +0 −110 src/mapred/org/apache/hadoop/mapred/MapSpillSortCounters.java
  26. +3 −12 src/mapred/org/apache/hadoop/mapred/MapTask.java
  27. +0 −44 src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
  28. +0 −135 src/mapred/org/apache/hadoop/mapred/MapperWaitThread.java
  29. +0 −195 src/mapred/org/apache/hadoop/mapred/MemoryBlock.java
  30. +0 −383 src/mapred/org/apache/hadoop/mapred/MemoryBlockAllocator.java
  31. +0 −25 src/mapred/org/apache/hadoop/mapred/MemoryBlockHolder.java
  32. +0 −46 src/mapred/org/apache/hadoop/mapred/MemoryBlockIndex.java
  33. +0 −33 src/mapred/org/apache/hadoop/mapred/MemoryBlockTooSmallException.java
  34. +2 −2 src/mapred/org/apache/hadoop/mapred/Merger.java
  35. +0 −53 src/mapred/org/apache/hadoop/mapred/NettyMapOutputAttributes.java
  36. +0 −329 src/mapred/org/apache/hadoop/mapred/PoolFairnessCalculator.java
  37. +0 −89 src/mapred/org/apache/hadoop/mapred/PoolMetadata.java
  38. +0 −280 src/mapred/org/apache/hadoop/mapred/ReducePartition.java
  39. +479 −564 src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  40. +1 −105 src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
  41. +0 −123 src/mapred/org/apache/hadoop/mapred/ResourceMetadata.java
  42. +4 −43 src/mapred/org/apache/hadoop/mapred/SequenceFileRecordReader.java
  43. +0 −283 src/mapred/org/apache/hadoop/mapred/ShuffleHandler.java
  44. +0 −301 src/mapred/org/apache/hadoop/mapred/SimulatedTaskRunner.java
  45. +2 −52 src/mapred/org/apache/hadoop/mapred/Task.java
  46. +2 −4 src/mapred/org/apache/hadoop/mapred/TaskCompletionEvent.java
  47. +11 −217 src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  48. +2 −11 src/mapred/org/apache/hadoop/mapred/TaskLog.java
  49. +0 −1  src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  50. +1 −3 src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
  51. +14 −39 src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
  52. +65 −101 src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  53. +4 −13 src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
  54. +4 −24 src/mapred/org/apache/hadoop/mapred/TaskStatus.java
  55. +415 −1,053 src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  56. +0 −27 src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
  57. +11 −62 src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
View
157 src/mapred/org/apache/hadoop/mapred/BasicReducePartition.java
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Task.TaskReporter;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.QuickSort;
-
-public abstract class BasicReducePartition<K extends BytesWritable, V extends BytesWritable>
- implements MemoryBlockHolder {
-
- class OffsetSortable implements IndexedSortable {
-
- private int[] offsets;
- private int[] keyLenArray;
- private int[] valLenArray;
- private byte[] kvbuffer;
-
- public OffsetSortable(int[] offsets, int[] keyLenArray, int[] valLenArray,
- byte[] kvbuffer) {
- this.offsets = offsets;
- this.keyLenArray = keyLenArray;
- this.valLenArray = valLenArray;
- this.kvbuffer = kvbuffer;
- }
-
- public OffsetSortable(MemoryBlock memBlock, byte[] kvbuffer) {
- this.offsets = memBlock.getOffsets();
- this.keyLenArray = memBlock.getKeyLenArray();
- this.valLenArray = memBlock.getValueLenArray();
- this.kvbuffer = kvbuffer;
- }
-
- @Override
- public int compare(int i, int j) {
- return LexicographicalComparerHolder.compareBytes(kvbuffer, offsets[i],
- keyLenArray[i], offsets[j], keyLenArray[j]);
- }
-
- @Override
- public void swap(int i, int j) {
- swapElement(offsets, i, j);
- swapElement(keyLenArray, i, j);
- swapElement(valLenArray, i, j);
- }
-
- private void swapElement(int[] array, int i, int j) {
- int tmp = array[i];
- array[i] = array[j];
- array[j] = tmp;
- }
- }
-
- protected MemoryBlockAllocator memoryBlockAllocator;
- protected byte[] kvbuffer;
- protected final TaskReporter reporter;
- protected BlockMapOutputCollector<K, V> collector;
-
- protected int partition;
-
- protected int collectedBytesSize;
- protected int collectedRecordsNum;
-
- protected MemoryBlock currentBlock;
- protected List<MemoryBlock> memoryBlocks;
-
-
- public BasicReducePartition(int reduceNum,
- MemoryBlockAllocator memoryBlockAllocator, byte[] kvBuffer,
- BlockMapOutputCollector<K, V> collector, TaskReporter reporter)
- throws IOException {
- this.partition = reduceNum;
- this.collectedBytesSize = 0;
- this.collectedRecordsNum = 0;
- this.memoryBlockAllocator = memoryBlockAllocator;
- this.kvbuffer = kvBuffer;
- this.collector = collector;
- this.reporter = reporter;
- this.memoryBlockAllocator.registerMemoryBlockHolder(this);
- initMemoryBlocks();
- }
-
- protected void initMemoryBlocks() {
- memoryBlocks = new ArrayList<MemoryBlock>();
- }
-
- protected void sortMemBlock(MemoryBlock memBlock) {
- if (memBlock.currentPtr <= 0) {
- return;
- }
- // quick sort the offsets
- OffsetSortable sortableObj = new OffsetSortable(memBlock, kvbuffer);
- QuickSort quickSort = new QuickSort();
- quickSort.sort(sortableObj, 0, memBlock.currentPtr);
- }
-
- protected void sortIndividualMemoryBlock(List<MemoryBlock> memBlks) {
- if (memBlks == null) {
- return;
- }
- for (MemoryBlock memBlk : memBlks) {
- if (memBlk != null) {
- sortMemBlock(memBlk);
- }
- }
- }
-
- public int getCollectedRecordsNum() {
- return collectedRecordsNum;
- }
-
- public int getCollectedBytesSize() {
- return collectedBytesSize;
- }
-
- abstract void groupOrSort();
-
- public abstract KeyValueSpillIterator getKeyValueSpillIterator();
-
- public abstract IndexRecord spill(JobConf job, FSDataOutputStream out,
- Class<K> keyClass, Class<V> valClass, CompressionCodec codec,
- Counter spillCounter) throws IOException;
-
- public abstract int collect(K key, V value) throws IOException;
-
- @Override
- public MemoryBlock getCurrentOpenMemoryBlock() {
- return currentBlock;
- }
-
- @Override
- public List<MemoryBlock> getClosedMemoryBlocks() {
- return memoryBlocks;
- }
-}
View
520 src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java
@@ -1,520 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
-import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapred.Merger.Segment;
-import org.apache.hadoop.mapred.Task.TaskReporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues;
-
-public class BlockMapOutputBuffer<K extends BytesWritable, V extends BytesWritable>
- implements BlockMapOutputCollector<K, V> {
-
- private static final Log LOG = LogFactory.getLog(BlockMapOutputBuffer.class.getName());
-
- private final Partitioner<K, V> partitioner;
- private final int partitions;
- private final JobConf job;
- private final TaskReporter reporter;
- private final Class<K> keyClass;
- private final Class<V> valClass;
- private final int softBufferLimit;
- // Compression for map-outputs
- private CompressionCodec codec = null;
- // main output buffer
- private byte[] kvbuffer;
- private int kvBufferSize;
- // spill accounting
- private volatile int numSpills = 0;
- // number of spills for big records
- private volatile int numBigRecordsSpills = 0;
- private volatile int numBigRecordsWarnThreshold = 500;
-
- private final FileSystem localFs;
- private final FileSystem rfs;
- private final Counters.Counter mapOutputByteCounter;
- private final Counters.Counter mapOutputRecordCounter;
- private MapSpillSortCounters mapSpillSortCounter;
-
- private MapTask task;
- private ReducePartition<K, V>[] reducePartitions;
- private ArrayList<SpillRecord> indexCacheList;
- // an array of memory segments, one for each reduce partition.
- private Segment<K,V>[] inMemorySegments;
- private boolean hasInMemorySpill;
- private boolean lastSpillInMem;
-
- private int totalIndexCacheMemory;
- private static final int INDEX_CACHE_MEMORY_LIMIT = 2 * 1024 * 1024;
- private final MemoryBlockAllocator memoryBlockAllocator;
-
- @SuppressWarnings( { "unchecked", "deprecation" })
- public BlockMapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
- TaskReporter reporter, MapTask task) throws IOException,
- ClassNotFoundException {
- this.task = task;
- this.job = job;
- this.reporter = reporter;
- localFs = FileSystem.getLocal(job);
- partitions = job.getNumReduceTasks();
- indexCacheList = new ArrayList<SpillRecord>();
- if (partitions > 0) {
- partitioner = (Partitioner<K, V>) ReflectionUtils.newInstance(job
- .getPartitionerClass(), job);
- } else {
- partitioner = new Partitioner() {
- @Override
- public int getPartition(Object key, Object value, int numPartitions) {
- return -1;
- }
-
- @Override
- public void configure(JobConf job) {
- }
- };
- }
- rfs = ((LocalFileSystem) localFs).getRaw();
-
- float spillper = job.getFloat("io.sort.spill.percent", (float) 0.9);
- if (spillper > (float) 1.0 || spillper < (float) 0.0) {
- LOG.error("Invalid \"io.sort.spill.percent\": " + spillper);
- spillper = 0.8f;
- }
-
- lastSpillInMem = job.getBoolean("mapred.map.lastspill.memory", true);
- numBigRecordsWarnThreshold =
- job.getInt("mapred.map.bigrecord.spill.warn.threshold", 500);
-
- int sortmb = job.getInt("io.sort.mb", 100);
- boolean localMode = job.get("mapred.job.tracker", "local").equals("local");
- if (localMode) {
- sortmb = job.getInt("io.sort.mb.localmode", 100);
- }
- if ((sortmb & 0x7FF) != sortmb) {
- throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
- }
- LOG.info("io.sort.mb = " + sortmb);
- // buffers and accounting
- kvBufferSize = sortmb << 20;
- kvbuffer = new byte[kvBufferSize];
- softBufferLimit = (int) (kvbuffer.length * spillper);
- // k/v serialization
- keyClass = (Class<K>) job.getMapOutputKeyClass();
- valClass = (Class<V>) job.getMapOutputValueClass();
- if (!BytesWritable.class.isAssignableFrom(keyClass)
- || !BytesWritable.class.isAssignableFrom(valClass)) {
- throw new IOException(this.getClass().getName()
- + " only support " + BytesWritable.class.getName()
- + " as key and value classes, MapOutputKeyClass is "
- + keyClass.getName() + ", MapOutputValueClass is "
- + valClass.getName());
- }
-
- int numMappers = job.getNumMapTasks();
- memoryBlockAllocator =
- new MemoryBlockAllocator(kvBufferSize, softBufferLimit, numMappers,
- partitions, this);
-
- // counters
- mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
- mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
- mapSpillSortCounter = new MapSpillSortCounters(reporter);
-
- reducePartitions = new ReducePartition[partitions];
- inMemorySegments = new Segment[partitions];
- for (int i = 0; i < partitions; i++) {
- reducePartitions[i] = new ReducePartition(i, this.memoryBlockAllocator,
- this.kvbuffer, this, this.reporter);
- }
- // compression
- if (job.getCompressMapOutput()) {
- Class<? extends CompressionCodec> codecClass = job
- .getMapOutputCompressorClass(DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, job);
- }
- }
-
- private TaskAttemptID getTaskID() {
- return task.getTaskID();
- }
-
- public void collect(K key, V value, int partition) throws IOException {
- reporter.progress();
- if (key.getClass() != keyClass) {
- throw new IOException("Type mismatch in key from map: expected "
- + keyClass.getName() + ", recieved " + key.getClass().getName());
- }
- if (value.getClass() != valClass) {
- throw new IOException("Type mismatch in value from map: expected "
- + valClass.getName() + ", recieved " + value.getClass().getName());
- }
- int collected = reducePartitions[partition].collect(key, value);
- mapOutputRecordCounter.increment(1);
- mapOutputByteCounter.increment(collected);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void collect(K key, V value) throws IOException {
- collect(key, value, partitioner.getPartition(key, value,
- partitions));
- }
-
- /*
- * return the value of ProcResourceValues for later use
- */
- protected ProcResourceValues sortReduceParts() {
- long sortStartMilli = System.currentTimeMillis();
- ProcResourceValues sortStartProcVals =
- task.getCurrentProcResourceValues();
- // sort
- for (int i = 0; i < reducePartitions.length; i++) {
- reducePartitions[i].groupOrSort();
- }
- long sortEndMilli = System.currentTimeMillis();
- ProcResourceValues sortEndProcVals =
- task.getCurrentProcResourceValues();
- mapSpillSortCounter.incCountersPerSort(sortStartProcVals,
- sortEndProcVals, sortEndMilli - sortStartMilli);
- return sortEndProcVals;
- }
-
- @Override
- public void sortAndSpill() throws IOException {
- ProcResourceValues sortEndProcVals = sortReduceParts();
- long sortEndMilli = System.currentTimeMillis();
- // spill
- FSDataOutputStream out = null;
- long spillBytes = 0;
- try {
- // create spill file
- final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename =
- task.mapOutputFile
- .getSpillFileForWrite(getTaskID(), numSpills,
- this.memoryBlockAllocator.getEstimatedSize());
- out = rfs.create(filename);
- for (int i = 0; i < partitions; ++i) {
- IndexRecord rec =
- reducePartitions[i].spill(job, out, keyClass, valClass,
- codec, task.spilledRecordsCounter);
- // record offsets
- spillBytes += rec.partLength;
- spillRec.putIndex(rec, i);
- }
-
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // create spill index file
- Path indexFilename =
- task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
- numSpills, partitions
- * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- LOG.info("Finished spill " + numSpills);
- ++numSpills;
- } finally {
- if (out != null)
- out.close();
- }
-
- long spillEndMilli = System.currentTimeMillis();
- ProcResourceValues spillEndProcVals =
- task.getCurrentProcResourceValues();
- mapSpillSortCounter.incCountersPerSpill(sortEndProcVals,
- spillEndProcVals, spillEndMilli - sortEndMilli, spillBytes);
- }
-
- public void spillSingleRecord(K key, V value, int part)
- throws IOException {
-
- ProcResourceValues spillStartProcVals =
- task.getCurrentProcResourceValues();
- long spillStartMilli = System.currentTimeMillis();
- // spill
- FSDataOutputStream out = null;
- long spillBytes = 0;
- try {
- // create spill file
- final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename =
- task.mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, key.getLength() + value.getLength());
- out = rfs.create(filename);
- IndexRecord rec = new IndexRecord();
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer<K, V> writer = null;
- try {
- long segmentStart = out.getPos();
- // Create a new codec, don't care!
- writer =
- new IFile.Writer<K, V>(job, out, keyClass, valClass,
- codec, task.spilledRecordsCounter);
- if (i == part) {
- final long recordStart = out.getPos();
- writer.append(key, value);
- // Note that our map byte count will not be accurate with
- // compression
- mapOutputByteCounter
- .increment(out.getPos() - recordStart);
- }
- writer.close();
-
- // record offsets
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillBytes += writer.getCompressedLength();
- spillRec.putIndex(rec, i);
- writer = null;
- } catch (IOException e) {
- if (null != writer)
- writer.close();
- throw e;
- }
- }
-
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // create spill index file
- Path indexFilename =
- task.mapOutputFile.getSpillIndexFileForWrite(getTaskID(),
- numSpills, partitions
- * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
-
- LOG.info("Finished spill big record " + numBigRecordsSpills);
- ++numBigRecordsSpills;
- ++numSpills;
- } finally {
- if (out != null)
- out.close();
- }
-
- long spillEndMilli = System.currentTimeMillis();
- ProcResourceValues spillEndProcVals =
- task.getCurrentProcResourceValues();
- mapSpillSortCounter.incCountersPerSpill(spillStartProcVals,
- spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
- mapSpillSortCounter.incSpillSingleRecord();
- }
-
- public synchronized void flush() throws IOException, ClassNotFoundException,
- InterruptedException {
- if (numSpills > 0 && lastSpillInMem) {
- // if there is already one spills, we can try to hold this last spill in
- // memory.
- sortReduceParts();
- for (int i = 0; i < partitions; i++) {
- this.inMemorySegments[i] =
- new Segment<K, V>(this.reducePartitions[i].getIReader(),
- true);
- }
- hasInMemorySpill=true;
- } else {
- sortAndSpill();
- }
- long mergeStartMilli = System.currentTimeMillis();
- ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
- mergeParts();
- long mergeEndMilli = System.currentTimeMillis();
- ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
- mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
- mergeEndMilli - mergeStartMilli);
- }
-
- private void mergeParts() throws IOException, InterruptedException,
- ClassNotFoundException {
- // get the approximate size of the final output/index files
- long finalOutFileSize = 0;
- long finalIndexFileSize = 0;
- final Path[] filename = new Path[numSpills];
- final TaskAttemptID mapId = getTaskID();
-
- for (int i = 0; i < numSpills; i++) {
- filename[i] = task.mapOutputFile.getSpillFile(mapId, i);
- finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
- }
-
- for (Segment<K, V> segement : this.inMemorySegments) {
- if(segement != null) {
- finalOutFileSize += segement.getLength();
- }
- }
-
- // the spill is the final output
- if (numSpills == 1 && !hasInMemorySpill) {
- Path outFile = new Path(filename[0].getParent(), "file.out");
- rfs.rename(filename[0], outFile);
- if (indexCacheList.size() == 0) {
- rfs.rename(task.mapOutputFile.getSpillIndexFile(mapId, 0), new Path(
- filename[0].getParent(), "file.out.index"));
- } else {
- indexCacheList.get(0).writeToFile(
- new Path(filename[0].getParent(), "file.out.index"), job);
- }
- return;
- }
-
- // read in paged indices
- for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = task.mapOutputFile.getSpillIndexFile(mapId, i);
- indexCacheList.add(new SpillRecord(indexFileName, job));
- }
-
- // make correction in the length to include the file header
- // lengths for each partition
- finalOutFileSize += partitions * MapTask.APPROX_HEADER_LENGTH;
- finalIndexFileSize = partitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = task.mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = task.mapOutputFile.getOutputIndexFileForWrite(mapId,
- finalIndexFileSize);
-
- // The output stream for the final single output file
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-
- if (numSpills == 0) {
- // create dummy files
- IndexRecord rec = new IndexRecord();
- SpillRecord sr = new SpillRecord(partitions);
- try {
- for (int i = 0; i < partitions; i++) {
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
- valClass, codec, null);
- writer.close();
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- sr.putIndex(rec, i);
- }
- sr.writeToFile(finalIndexFile, job);
- } finally {
- finalOut.close();
- }
- return;
- }
- {
- IndexRecord rec = new IndexRecord();
- final SpillRecord spillRec = new SpillRecord(partitions);
- for (int parts = 0; parts < partitions; parts++) {
- // create the segments to be merged
- List<Segment<K, V>> segmentList = new ArrayList<Segment<K, V>>(
- numSpills + this.inMemorySegments.length);
- for (int i = 0; i < numSpills; i++) {
- IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
- Segment<K, V> s = new Segment<K, V>(job, rfs, filename[i],
- indexRecord.startOffset, indexRecord.partLength, codec, true);
- segmentList.add(i, s);
- if (LOG.isDebugEnabled()) {
- LOG.debug("MapId=" + mapId + " Reducer=" + parts + "Spill =" + i
- + "(" + indexRecord.startOffset + "," + indexRecord.rawLength
- + ", " + indexRecord.partLength + ")");
- }
- }
-
- if(this.inMemorySegments[parts] != null) {
- // add the in memory spill to the end of segmentList
- segmentList.add(numSpills, this.inMemorySegments[parts]);
- }
-
- // merge
- RawKeyValueIterator kvIter =
- Merger.merge(job, rfs, keyClass, valClass, codec,
- segmentList, job.getInt("io.sort.factor", 100),
- new Path(mapId.toString()), new RawComparator<K>() {
- @Override
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- return LexicographicalComparerHolder.BEST_COMPARER
- .compareTo(
- b1,
- s1 + WritableUtils.INT_LENGTH_BYTES,
- l1 - WritableUtils.INT_LENGTH_BYTES,
- b2,
- s2 + WritableUtils.INT_LENGTH_BYTES,
- l2 - WritableUtils.INT_LENGTH_BYTES
- );
- }
-
- @Override
- public int compare(K o1, K o2) {
- return LexicographicalComparerHolder.BEST_COMPARER
- .compareTo( o1.getBytes(), 0, o1.getLength(),
- o2.getBytes(), 0, o2.getLength());
- }
- }, reporter, null,
- task.spilledRecordsCounter);
-
- // write merged output to disk
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass,
- valClass, codec, task.spilledRecordsCounter);
- Merger.writeFile(kvIter, writer, reporter, job);
- // close
- writer.close();
- // record offsets
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, parts);
- }
- spillRec.writeToFile(finalIndexFile, job);
- finalOut.close();
- for (int i = 0; i < numSpills; i++) {
- rfs.delete(filename[i], true);
- }
- }
- }
-
- public void close() {
- this.mapSpillSortCounter.finalCounterUpdate();
- if(numBigRecordsSpills > numBigRecordsWarnThreshold) {
- LOG.warn("Spilled a large number of big records: "
- + numBigRecordsSpills);
- }
- }
-}
View
28 src/mapred/org/apache/hadoop/mapred/BlockMapOutputCollector.java
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapred.MapTask.MapOutputCollector;
-
-public interface BlockMapOutputCollector<K, V> extends
- MapOutputCollector<K, V>, OutputCollector<K, V> {
- public void spillSingleRecord(K key, V value, int part) throws IOException;
-
- public void sortAndSpill() throws IOException;
-}
View
23 src/mapred/org/apache/hadoop/mapred/Child.java
@@ -218,11 +218,26 @@ private static TaskUmbilicalProtocol convertToDirectUmbilicalIfNecessary(
String jtHost = hostPortPair[0];
int jtPort = Integer.parseInt(hostPortPair[1]);
InetSocketAddress addr = new InetSocketAddress(jtHost, jtPort);
- DirectTaskUmbilical d = DirectTaskUmbilical.createDirectUmbilical(
- umbilical, addr, job);
- proxiesCreated.addAll(d.getCreatedProxies());
- return d;
+ umbilical = createDirectUmbilical(umbilical, addr, job);
}
return umbilical;
}
+
+ private static TaskUmbilicalProtocol createDirectUmbilical(
+ TaskUmbilicalProtocol taskTracker,
+ InetSocketAddress jobTrackerAddress, JobConf conf) throws IOException {
+
+ LOG.info("Creating direct umbilical to " + jobTrackerAddress.toString());
+ long jtConnectTimeoutMsec = conf.getLong(
+ "corona.jobtracker.connect.timeout.msec", 60000L);
+
+ InterTrackerProtocol jobClient =
+ (InterTrackerProtocol) RPC.waitForProtocolProxy(
+ InterTrackerProtocol.class,
+ InterTrackerProtocol.versionID,
+ jobTrackerAddress, conf, jtConnectTimeoutMsec).getProxy();
+
+ proxiesCreated.add(jobClient);
+ return new DirectTaskUmbilical(taskTracker, jobClient);
+ }
}
View
33 src/mapred/org/apache/hadoop/mapred/ChildMemoryBlock.java
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-class ChildMemoryBlock extends MemoryBlock {
-
- private MemoryBlock parentMemoryBlock;
-
- public ChildMemoryBlock(int startOffset, int allocateSize,
- MemoryBlockAllocator memoryBlockAllocator, int elemNum,
- MemoryBlock blk) {
- super(startOffset, allocateSize, memoryBlockAllocator, elemNum);
- this.parentMemoryBlock = blk;
- }
-
- public MemoryBlock getParentMemoryBlock() {
- return parentMemoryBlock;
- }
-}
View
9 src/mapred/org/apache/hadoop/mapred/CompositeTaskTrackerInstrumentation.java
@@ -28,7 +28,7 @@
*/
class CompositeTaskTrackerInstrumentation extends TaskTrackerInstrumentation {
- private final List<TaskTrackerInstrumentation> instrumentations;
+ private List<TaskTrackerInstrumentation> instrumentations;
public CompositeTaskTrackerInstrumentation(TaskTracker tt,
List<TaskTrackerInstrumentation> instrumentations) {
@@ -82,11 +82,4 @@ public void statusUpdate(Task task, TaskStatus taskStatus) {
tti.statusUpdate(task, taskStatus);
}
}
-
- @Override
- public void unaccountedMemory(long memory) {
- for (TaskTrackerInstrumentation tti: instrumentations) {
- tti.unaccountedMemory(memory);
- }
- }
}
View
19 src/mapred/org/apache/hadoop/mapred/Counters.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.CounterNames;
import org.apache.hadoop.util.StringUtils;
-import org.mortbay.util.ajax.JSON;
/**
* A set of named counters.
@@ -561,24 +560,6 @@ public synchronized String makeCompactString() {
}
return buffer.toString();
}
-
- /**
- * Convert a counters object into a JSON string
- */
- public synchronized String makeJsonString() {
-
- Map<String, Map<String, Long>> data =
- new HashMap<String, Map<String, Long>>();
-
- for (Group group : this) {
- Map<String, Long> groupData = new HashMap<String, Long>();
- data.put(group.getDisplayName(), groupData);
- for (Counter counter : group) {
- groupData.put(counter.getDisplayName(), counter.getCounter());
- }
- }
- return JSON.toString(data);
- }
/**
* Represent the counter in a textual format that can be converted back to
View
37 src/mapred/org/apache/hadoop/mapred/DirectTaskUmbilical.java
@@ -3,16 +3,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Collections;
-import java.net.InetSocketAddress;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.VersionIncompatible;
-import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.SortedRanges.Range;
/**
@@ -21,42 +14,12 @@
*/
class DirectTaskUmbilical implements TaskUmbilicalProtocol {
- public static final Log LOG = LogFactory.getLog(DirectTaskUmbilical.class);
-
final private TaskUmbilicalProtocol taskTrackerUmbilical;
final private InterTrackerProtocol jobTracker;
final private List<TaskCompletionEvent> mapEventFetched;
private int totalEventsFetched = 0;
static final String MAPRED_DIRECT_TASK_UMBILICAL_ADDRESS = "mapred.direct.task.umbilical.address";
- public static DirectTaskUmbilical createDirectUmbilical(
- TaskUmbilicalProtocol taskTracker,
- InetSocketAddress jobTrackerAddress, JobConf conf) throws IOException {
-
- LOG.info("Creating direct umbilical to " + jobTrackerAddress.toString());
- long jtConnectTimeoutMsec = conf.getLong(
- "corona.jobtracker.connect.timeout.msec", 60000L);
- int rpcTimeout = (int) jtConnectTimeoutMsec;
-
- InterTrackerProtocol jobClient = RPC.waitForProxy(
- InterTrackerProtocol.class,
- InterTrackerProtocol.versionID,
- jobTrackerAddress,
- conf,
- jtConnectTimeoutMsec,
- rpcTimeout);
-
- return new DirectTaskUmbilical(taskTracker, jobClient);
- }
-
- public List<VersionedProtocol> getCreatedProxies() {
- return Collections.singletonList((VersionedProtocol)jobTracker);
- }
-
- public void close() {
- RPC.stopProxy(jobTracker);
- }
-
DirectTaskUmbilical(TaskUmbilicalProtocol taskTrackerUmbilical,
InterTrackerProtocol jobTracker) {
this.taskTrackerUmbilical = taskTrackerUmbilical;
View
116 src/mapred/org/apache/hadoop/mapred/ExpireUnusedFilesInCache.java
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Used to expire files in cache that hasn't been accessed for a while
- */
-public class ExpireUnusedFilesInCache implements Runnable {
- /** Logger. */
- private static final Log LOG =
- LogFactory.getLog(ExpireUnusedFilesInCache.class);
-
- /** Configuration. */
- private final Configuration conf;
- /** Clock. */
- private final Clock clock;
- /** The directories to purge. */
- private final Path[] cachePath;
- /** The filesystem to use. */
- private final FileSystem fs;
- /** Expire threshold in milliseconds. */
- private final long expireCacheThreshold;
-
- /**
- * Constructor.
- * @param conf The configuration.
- * @param clock The clock.
- * @param systemDir The system directory.
- * @param fs The filesystem.
- */
- public ExpireUnusedFilesInCache(
- Configuration conf, Clock clock, Path systemDir, FileSystem fs) {
- this.conf = conf;
- this.clock = clock;
- this.fs = fs;
-
- Path sharedPath = new Path(systemDir, JobSubmissionProtocol.CAR);
- sharedPath = sharedPath.makeQualified(fs);
- this.cachePath = new Path[3];
- this.cachePath[0] = new Path(sharedPath, "files");
- this.cachePath[1] = new Path(sharedPath, "archives");
- this.cachePath[2] = new Path(sharedPath, "libjars");
-
-
- long clearCacheInterval = conf.getLong(
- "mapred.cache.shared.check_interval",
- 24 * 60 * 60 * 1000);
-
- expireCacheThreshold =
- conf.getLong("mapred.cache.shared.expire_threshold",
- 24 * 60 * 60 * 1000);
- Executors.newScheduledThreadPool(1).scheduleAtFixedRate(
- this,
- clearCacheInterval,
- clearCacheInterval,
- TimeUnit.MILLISECONDS);
-
- LOG.info("ExpireUnusedFilesInCache created with " +
- " sharedPath = " + sharedPath +
- " clearCacheInterval = " + clearCacheInterval +
- " expireCacheThreshold = " + expireCacheThreshold);
- }
-
- @Override
- public void run() {
- long currentTime = clock.getTime();
-
- for (int i = 0; i < cachePath.length; i++) {
- try {
- if (!fs.exists(cachePath[i])) continue;
-
- FileStatus[] fStatus = fs.listStatus(cachePath[i]);
-
- for (int j = 0; j < fStatus.length; j++) {
- if (!fStatus[j].isDir()) {
- long atime = fStatus[j].getAccessTime();
-
- if (currentTime - atime > expireCacheThreshold) {
- fs.delete(fStatus[j].getPath(), false);
- }
- }
- }
- } catch (IOException ioe) {
- LOG.error("IOException when clearing cache");
- }
- }
- }
-}
-
-
View
13 src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
@@ -513,19 +513,10 @@ public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
* for the map-reduce job.
*/
public static void setInputPaths(JobConf conf, Path... inputPaths) {
- if (!inputPaths[0].isAbsolute()) {
- FileSystem.LogForCollect.info("set relative path to non absolute path: "
- + inputPaths[0]+ " working directory: " + conf.getWorkingDirectory());
- }
Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
for(int i = 1; i < inputPaths.length;i++) {
str.append(StringUtils.COMMA_STR);
- if (!inputPaths[i].isAbsolute()) {
- FileSystem.LogForCollect.info("set input path to non absolute path: "
- + inputPaths[i] + " working directory: "
- + conf.getWorkingDirectory());
- }
path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
str.append(StringUtils.escapeString(path.toString()));
}
@@ -540,10 +531,6 @@ public static void setInputPaths(JobConf conf, Path... inputPaths) {
* the map-reduce job.
*/
public static void addInputPath(JobConf conf, Path path ) {
- if (!path.isAbsolute()) {
- FileSystem.LogForCollect.info("set input path to relative path: " + path
- + " working directory: " + conf.getWorkingDirectory());
- }
path = new Path(conf.getWorkingDirectory(), path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get("mapred.input.dir");
View
9 src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
@@ -123,10 +123,6 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job)
* the map-reduce job.
*/
public static void setOutputPath(JobConf conf, Path outputDir) {
- if (!outputDir.isAbsolute()) {
- FileSystem.LogForCollect.info("set output path to relative path: "
- + outputDir + " working directory: " + conf.getWorkingDirectory());
- }
outputDir = new Path(conf.getWorkingDirectory(), outputDir);
conf.set("mapred.output.dir", outputDir.toString());
}
@@ -143,11 +139,6 @@ public static void setOutputPath(JobConf conf, Path outputDir) {
*/
static void setWorkOutputPath(JobConf conf, Path outputDir) {
- if (!outputDir.isAbsolute()) {
- FileSystem.LogForCollect.info("set work output path to relative path: "
- + outputDir + " working directory: " + conf.getWorkingDirectory());
- }
-
outputDir = new Path(conf.getWorkingDirectory(), outputDir);
conf.set("mapred.work.output.dir", outputDir.toString());
}
View
29 src/mapred/org/apache/hadoop/mapred/IFile.java
@@ -46,7 +46,7 @@
* There is a <code>Writer</code> to write out map-outputs in this format and
* a <code>Reader</code> to read files of this format.
*/
-public class IFile {
+class IFile {
private static final int EOF_MARKER = -1;
@@ -117,7 +117,7 @@ public Writer(Configuration conf, FSDataOutputStream out,
}
public void close() throws IOException {
-
+
// Close the serializers
keySerializer.close();
valueSerializer.close();
@@ -224,31 +224,6 @@ public void append(DataInputBuffer key, DataInputBuffer value)
++numRecordsWritten;
}
- public void append(byte[] kvBuffer, int offset, int keyLength,
- int valueLength)
- throws IOException {
- int realKeyLen = keyLength + WritableUtils.INT_LENGTH_BYTES;
- int realValLen = valueLength + WritableUtils.INT_LENGTH_BYTES;
-
- WritableUtils.writeVInt(buffer, realKeyLen);
- WritableUtils.writeVInt(buffer, realValLen);
- //this is real key: keyLength + key
- buffer.writeInt(keyLength);
- buffer.write(kvBuffer, offset, keyLength);
- //this is real value:
- buffer.writeInt(valueLength);
- buffer.write(kvBuffer, offset + keyLength, valueLength);
-
- out.write(buffer.getData(), 0, buffer.getLength());
- buffer.reset();
-
- // Update bytes written
- decompressedBytesWritten += realKeyLen + realValLen
- + WritableUtils.getVIntSize(realKeyLen)
- + WritableUtils.getVIntSize(realValLen);
- ++numRecordsWritten;
- }
-
public long getRawLength() {
return decompressedBytesWritten;
}
View
238 src/mapred/org/apache/hadoop/mapred/JSPUtil.java
@@ -42,21 +42,21 @@
public class JSPUtil {
private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";
-
+
public static final Configuration conf = new Configuration();
//LRU based cache
- private static final Map<String, JobInfo> jobHistoryCache =
- new LinkedHashMap<String, JobInfo>();
+ private static final Map<String, JobInfo> jobHistoryCache =
+ new LinkedHashMap<String, JobInfo>();
- private static final int CACHE_SIZE =
+ private static final int CACHE_SIZE =
conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5);
private static final Log LOG = LogFactory.getLog(JSPUtil.class);
/**
- * Method used to process the request from the job page based on the
+ * Method used to process the request from the job page based on the
* request which it has received. For example like changing priority.
- *
+ *
* @param request HTTP request Object.
* @param response HTTP response object.
* @param tracker {@link JobTracker} instance
@@ -75,7 +75,7 @@ public static void processButtons(HttpServletRequest request,
}
}
- if (conf.getBoolean(PRIVATE_ACTIONS_KEY, false) &&
+ if (conf.getBoolean(PRIVATE_ACTIONS_KEY, false) &&
request.getParameter("changeJobPriority") != null) {
String[] jobs = request.getParameterValues("jobCheckBox");
@@ -92,7 +92,7 @@ public static void processButtons(HttpServletRequest request,
/**
* Method used to generate the Job table for Job pages.
- *
+ *
* @param label display heading to be used in the job table.
* @param jobs vector of jobs to be displayed in table.
* @param refresh refresh interval to be used in jobdetails page.
@@ -102,22 +102,23 @@ public static void processButtons(HttpServletRequest request,
*/
public static String generateJobTable(String label, Collection<JobInProgress> jobs
, int refresh, int rowId) throws IOException {
- boolean isRunning = label.equals("Running");
- boolean isModifiable =
- isRunning && conf.getBoolean(PRIVATE_ACTIONS_KEY, false);
- StringBuffer sb = new StringBuffer();
- sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\" class=\"tablesorter\">\n");
+ boolean isModifiable = label.equals("Running")
+ && conf.getBoolean(
+ PRIVATE_ACTIONS_KEY, false);
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
if (jobs.size() > 0) {
if (isModifiable) {
- sb.append("<thead><form action=\"/jobtracker.jsp\" onsubmit=\"return confirmAction();\" method=\"POST\">");
+ sb.append("<form action=\"/jobtracker.jsp\" onsubmit=\"return confirmAction();\" method=\"POST\">");
sb.append("<tr>");
sb.append("<td><input type=\"Button\" onclick=\"selectAll()\" " +
"value=\"Select All\" id=\"checkEm\"></td>");
sb.append("<td>");
sb.append("<input type=\"submit\" name=\"killJobs\" value=\"Kill Selected Jobs\">");
- sb.append("</td>");
+ sb.append("</td");
sb.append("<td><nobr>");
sb.append("<select name=\"setJobPriority\">");
@@ -133,46 +134,34 @@ public static String generateJobTable(String label, Collection<JobInProgress> jo
sb.append("</nobr></td>");
sb.append("<td colspan=\"10\">&nbsp;</td>");
sb.append("</tr>");
- sb.append("<th>&nbsp;</th>");
+ sb.append("<td>&nbsp;</td>");
} else {
- sb.append("<thead><tr>");
+ sb.append("<tr>");
}
int totalMaps = 0;
int comMaps = 0;
- int totalRunningMaps = 0;
int totalReduces = 0;
int comReduces = 0;
- int totalRunningReduces = 0;
for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext(); ) {
JobInProgress job = it.next();
totalMaps += job.desiredMaps();
totalReduces += job.desiredReduces();
comMaps += job.finishedMaps();
comReduces += job.finishedReduces();
- if (isRunning) {
- totalRunningMaps += job.runningMaps();
- totalRunningReduces += job.runningReduces();
- }
- }
-
- sb.append("<th><b>Jobid</b></th><th><b>Priority" +
- "</b></th><th><b>User</b></th>");
- sb.append("<th><b>Name</b></th>");
- sb.append("<th><b>Map % Complete</b></th>");
- sb.append("<th><b>Map Total " + totalMaps + "</b></th>");
- sb.append("<th><b>Maps Completed " + comMaps + "</b></th>");
- if (isRunning) {
- sb.append("<th><b>Maps Running " + totalRunningMaps + "</b></th>");
}
- sb.append("<th><b>Reduce % Complete</b></th>");
- sb.append("<th><b>Reduce Total " + totalReduces + "</b></th>");
- sb.append("<th><b>Reduces Completed " + comReduces + "</b></th>");
- if (isRunning) {
- sb.append("<th><b>Reduces Running " + totalRunningReduces + "</b></th>");
- }
- sb.append("<th><b>Job Scheduling Information</b></th>");
- sb.append("</tr></thead><tbody>\n");
+
+ sb.append("<td><b>Jobid</b></td><td><b>Priority" +
+ "</b></td><td><b>User</b></td>");
+ sb.append("<td><b>Name</b></td>");
+ sb.append("<td><b>Map % Complete</b></td>");
+ sb.append("<td><b>Map Total " + totalMaps + "</b></td>");
+ sb.append("<td><b>Maps Completed " + comMaps + "</b></td>");
+ sb.append("<td><b>Reduce % Complete</b></td>");
+ sb.append("<td><b>Reduce Total " + totalReduces + "</b></td>");
+ sb.append("<td><b>Reduces Completed " + comReduces + "</b></td>");
+ sb.append("<td><b>Job Scheduling Information</b></td>");
+ sb.append("</tr>\n");
for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext(); ++rowId) {
JobInProgress job = it.next();
JobProfile profile = job.getProfile();
@@ -183,11 +172,6 @@ public static String generateJobTable(String label, Collection<JobInProgress> jo
int desiredReduces = job.desiredReduces();
int completedMaps = job.finishedMaps();
int completedReduces = job.finishedReduces();
- String runningMapTableData =
- (isRunning) ? job.runningMaps() + "</td><td>" : "";
- String runningReduceTableData =
- (isRunning) ? job.runningReduces() + "</td><td>" : "";
-
String name = profile.getJobName();
String abbreviatedName
= (name.length() > 76 ? name.substring(0,76) + "..." : name);
@@ -214,28 +198,27 @@ public static String generateJobTable(String label, Collection<JobInProgress> jo
+ StringUtils.formatPercent(status.mapProgress(), 2)
+ ServletUtil.percentageGraph(status.mapProgress() * 100, 80)
+ "</td><td>" + desiredMaps + "</td><td>" + completedMaps
- + "</td><td>" + runningMapTableData
+ + "</td><td>"
+ StringUtils.formatPercent(status.reduceProgress(), 2)
+ ServletUtil.percentageGraph(status.reduceProgress() * 100, 80)
- + "</td><td>" + desiredReduces + "</td><td> " + completedReduces
- + "</td><td>" + runningReduceTableData + schedulingInfo
+ + "</td><td>" + desiredReduces + "</td><td> " + completedReduces
+ + "</td><td>" + schedulingInfo
+ "</td></tr>\n");
}
if (isModifiable) {
sb.append("</form>\n");
}
- sb.append("</tbody>");
} else {
sb.append("<tr><td align=\"center\" colspan=\"8\"><i>none</i>" +
"</td></tr>\n");
}
sb.append("</table>\n");
-
+
return sb.toString();
}
/**
- * Given jobId, resolve the link to jobdetailshistory.jsp
+ * Given jobId, resolve the link to jobdetailshistory.jsp
* @param tracker JobTracker
* @param jobId JobID
* @return the link to the page jobdetailshistory.jsp for the job
@@ -287,31 +270,31 @@ private static String getHistoryFileUrl(RetireJobInfo info) {
}
@SuppressWarnings("unchecked")
- public static String generateRetiredJobTable(JobTracker tracker, int rowId)
+ public static String generateRetiredJobTable(JobTracker tracker, int rowId)
throws IOException {
StringBuffer sb = new StringBuffer();
- sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\" class=\"tablesorter\">\n");
+ sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
- Iterator<RetireJobInfo> iterator =
+ Iterator<RetireJobInfo> iterator =
tracker.retireJobs.getAll().descendingIterator();
if (!iterator.hasNext()) {
- sb.append("<tr><th align=\"center\" colspan=\"8\"><i>none</i>" +
- "</th></tr>\n");
+ sb.append("<tr><td align=\"center\" colspan=\"8\"><i>none</i>" +
+ "</td></tr>\n");
} else {
- sb.append("<thead><tr>");
-
- sb.append("<th><b>Jobid</b></th>");
- sb.append("<th><b>Priority</b></th>");
- sb.append("<th><b>User</b></th>");
- sb.append("<th><b>Name</b></th>");
- sb.append("<th><b>State</b></th>");
- sb.append("<th><b>Start Time</b></th>");
- sb.append("<th><b>Finish Time</b></th>");
- sb.append("<th><b>Map % Complete</b></th>");
- sb.append("<th><b>Reduce % Complete</b></th>");
- sb.append("<th><b>Job Scheduling Information</b></th>");
- sb.append("</tr></thead><tbody>\n");
+ sb.append("<tr>");
+
+ sb.append("<td><b>Jobid</b></td>");
+ sb.append("<td><b>Priority</b></td>");
+ sb.append("<td><b>User</b></td>");
+ sb.append("<td><b>Name</b></td>");
+ sb.append("<td><b>State</b></td>");
+ sb.append("<td><b>Start Time</b></td>");
+ sb.append("<td><b>Finish Time</b></td>");
+ sb.append("<td><b>Map % Complete</b></td>");
+ sb.append("<td><b>Reduce % Complete</b></td>");
+ sb.append("<td><b>Job Scheduling Information</b></td>");
+ sb.append("</tr>\n");
for (int i = 0; i < 100 && iterator.hasNext(); i++) {
RetireJobInfo info = iterator.next();
String historyFileUrl = getHistoryFileUrl(info);
@@ -322,46 +305,45 @@ public static String generateRetiredJobTable(JobTracker tracker, int rowId)
= (name.length() > 76 ? name.substring(0,76) + "..." : name);
sb.append(
- "<td id=\"job_" + rowId + "\">" +
-
+ "<td id=\"job_" + rowId + "\">" +
+
(historyFileUrl == null ? "" :
- "<a href=\"jobdetailshistory.jsp?jobid=" +
- info.status.getJobId() + "&logFile=" + historyFileUrl + "\">") +
-
+ "<a href=\"jobdetailshistory.jsp?jobid=" +
+ info.status.getJobId() + "&logFile=" + historyFileUrl + "\">") +
+
info.status.getJobId() + "</a></td>" +
-
- "<td id=\"priority_" + rowId + "\">" +
+
+ "<td id=\"priority_" + rowId + "\">" +
info.status.getJobPriority().toString() + "</td>" +
- "<td id=\"user_" + rowId + "\">" + info.profile.getUser()
+ "<td id=\"user_" + rowId + "\">" + info.profile.getUser()
+ "</td>" +
"<td id=\"name_" + rowId + "\">" + abbreviatedName
+ "</td>" +
- "<td>" + JobStatus.getJobRunState(info.status.getRunState())
+ "<td>" + JobStatus.getJobRunState(info.status.getRunState())
+ "</td>" +
"<td>" + new Date(info.status.getStartTime()) + "</td>" +
"<td>" + new Date(info.finishTime) + "</td>" +
-
+
"<td>" + StringUtils.formatPercent(info.status.mapProgress(), 2)
- + ServletUtil.percentageGraph(info.status.mapProgress() * 100, 80) +
+ + ServletUtil.percentageGraph(info.status.mapProgress() * 100, 80) +
"</td>" +
-
+
"<td>" + StringUtils.formatPercent(info.status.reduceProgress(), 2)
+ ServletUtil.percentageGraph(
- info.status.reduceProgress() * 100, 80) +
+ info.status.reduceProgress() * 100, 80) +
"</td>" +
-
+
"<td>" + info.status.getSchedulingInfo() + "</td>" +
-
+
"</tr>\n");
rowId++;
}
- sb.append("</tbody>");
}
sb.append("</table>\n");
return sb.toString();
}
- public static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs)
+ public static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs)
throws IOException {
String jobid = request.getParameter("jobid");
String logFile = request.getParameter("logFile");
@@ -371,11 +353,11 @@ public static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs)
jobInfo = new JobHistory.JobInfo(jobid);
LOG.info("Loading Job History file "+jobid + ". Cache size is " +
jobHistoryCache.size());
- DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ;
+ DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ;
}
jobHistoryCache.put(jobid, jobInfo);
if (jobHistoryCache.size() > CACHE_SIZE) {
- Iterator<Map.Entry<String, JobInfo>> it =
+ Iterator<Map.Entry<String, JobInfo>> it =
jobHistoryCache.entrySet().iterator();
String removeJobId = it.next().getKey();
it.remove();
@@ -422,7 +404,7 @@ public static void generateRetiredJobXml(JspWriter out, JobTracker tracker, int
rowId++;
}
}
-
+
/**
* Method used to generate the cluster resource utilization table
*/
@@ -465,7 +447,7 @@ public static String generateClusterResTable(JobTracker tracker)
/**
* Method used to generate the Job table for Job pages with resource
* utilization information obtain from {@link ResourceReporter}.
- *
+ *
* @param label display heading to be used in the job table.
* @param jobs vector of jobs to be displayed in table.
* @param refresh refresh interval to be used in jobdetails page.
@@ -481,22 +463,21 @@ public static String generateJobTableWithResourceInfo(String label,
if (reporter == null) {
return generateJobTable(label, jobs, refresh, rowId);
}
- boolean isRunning = label.equals("Running");
- boolean isModifiable =
- isRunning && conf.getBoolean(PRIVATE_ACTIONS_KEY, false);
+ boolean isModifiable = label.equals("Running")
+ && conf.getBoolean(PRIVATE_ACTIONS_KEY, false);
StringBuffer sb = new StringBuffer();
- sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\" class=\"tablesorter\">\n");
+ sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
if (jobs.size() > 0) {
if (isModifiable) {
sb.append("<form action=\"/jobtracker_hmon.jsp\" onsubmit=\"return confirmAction();\" method=\"POST\">");
- sb.append("<thead><tr>");
+ sb.append("<tr>");
sb.append("<td><input type=\"Button\" onclick=\"selectAll()\" " +
"value=\"Select All\" id=\"checkEm\"></td>");
sb.append("<td>");
sb.append("<input type=\"submit\" name=\"killJobs\" value=\"Kill Selected Jobs\">");
- sb.append("</td>");
+ sb.append("</td");
sb.append("<td><nobr>");
sb.append("<select name=\"setJobPriority\">");
@@ -512,49 +493,37 @@ public static String generateJobTableWithResourceInfo(String label,
sb.append("</nobr></td>");
sb.append("<td colspan=\"15\">&nbsp;</td>");
sb.append("</tr>");
- sb.append("<th>&nbsp;</th>");
+ sb.append("<td>&nbsp;</td>");
} else {
- sb.append("<thead><tr>");
+ sb.append("<tr>");
}
int totalMaps = 0;
int comMaps = 0;
- int totalRunningMaps = 0;
int totalReduces = 0;
int comReduces = 0;
- int totalRunningReduces = 0;
for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext(); ) {
JobInProgress job = it.next();
totalMaps += job.desiredMaps();
totalReduces += job.desiredReduces();
comMaps += job.finishedMaps();
comReduces += job.finishedReduces();
- if (isRunning) {
- totalRunningMaps += job.runningMaps();
- totalRunningReduces += job.runningReduces();
- }
- }
- sb.append("<th><b>Jobid</b></th><th><b>Priority" +
- "</b></th><th><b>User</b></th>");
- sb.append("<th><b>Name</b></th>");
- sb.append("<th><b>Map % Complete</b></th>");
- sb.append("<th><b>Map Total " + totalMaps + "</b></th>");
- sb.append("<th><b>Maps Completed " + comMaps + "</b></th>");
- if (isRunning) {
- sb.append("<th><b>Maps Running " + totalRunningMaps + "</b></th>");
}
- sb.append("<th><b>Reduce % Complete</b></th>");
- sb.append("<th><b>Reduce Total " + totalReduces + "</b></th>");
- sb.append("<th><b>Reduces Completed " + comReduces + "</b></th>");
- if (isRunning) {
- sb.append("<th><b>Reduces Running " + totalRunningReduces + "</b></th>");
- }
- sb.append("<th><b>CPU Now</b></th>");
- sb.append("<th><b>CPU Cumulated Cluster-sec</b></th>");
- sb.append("<th><b>MEM Now</b></a></th>");
- sb.append("<th><b>MEM Cumulated Cluster-sec</b></th>");
- sb.append("<th><b>MEM Max/Node</b></th>");
- sb.append("</tr></thead><tbody>\n");
+ sb.append("<td><b>Jobid</b></td><td><b>Priority" +
+ "</b></td><td><b>User</b></td>");
+ sb.append("<td><b>Name</b></td>");
+ sb.append("<td><b>Map % Complete</b></td>");
+ sb.append("<td><b>Map Total " + totalMaps + "</b></td>");
+ sb.append("<td><b>Maps Completed " + comMaps + "</b></td>");
+ sb.append("<td><b>Reduce % Complete</b></td>");
+ sb.append("<td><b>Reduce Total " + totalReduces + "</b></td>");
+ sb.append("<td><b>Reduces Completed " + comReduces + "</b></td>");
+ sb.append("<td><b>CPU Now</b></td>");
+ sb.append("<td><b>CPU Cumulated Cluster-sec</b></td>");
+ sb.append("<td><b>MEM Now</b></a></td>");
+ sb.append("<td><b>MEM Cumulated Cluster-sec</b></td>");
+ sb.append("<td><b>MEM Max/Node</b></td>");
+ sb.append("</tr>\n");
for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext(); ++rowId) {
JobInProgress job = it.next();
JobProfile profile = job.getProfile();
@@ -565,11 +534,6 @@ public static String generateJobTableWithResourceInfo(String label,
int desiredReduces = job.desiredReduces();
int completedMaps = job.finishedMaps();
int completedReduces = job.finishedReduces();
- String runningMapTableData =
- (isRunning) ? job.runningMaps() + "</td><td>" : "";
- String runningReduceTableData =
- (isRunning) ? "</td><td>" + job.runningReduces() : "";
-
String name = profile.getJobName();
String jobpri = job.getPriority().toString();
@@ -618,11 +582,10 @@ public static String generateJobTableWithResourceInfo(String label,
+ StringUtils.formatPercent(status.mapProgress(), 2)
+ ServletUtil.percentageGraph(status.mapProgress() * 100, 80)
+ "</td><td>" + desiredMaps + "</td><td>" + completedMaps
- + "</td><td>" + runningMapTableData
+ + "</td><td>"
+ StringUtils.formatPercent(status.reduceProgress(), 2)
+ ServletUtil.percentageGraph(status.reduceProgress() * 100, 80)
+ "</td><td>" + desiredReduces + "</td><td> " + completedReduces
- + runningReduceTableData
+ "</td><td id=\"cpu_" + rowId + "\">" + cpu + "</td>"
+ "<td id=\"cpuCost_" + rowId + "\">" + cpuCost + "</td>"
+ "<td id=\"mem_" + rowId + "\">" + mem + "</td>"
@@ -632,7 +595,6 @@ public static String generateJobTableWithResourceInfo(String label,
if (isModifiable) {
sb.append("</form>\n");
}
- sb.append("</tbody>");
} else {
sb.append("<tr><td align=\"center\" colspan=\"8\"><i>none</i>" +
"</td></tr>\n");
@@ -644,7 +606,7 @@ public static String generateJobTableWithResourceInfo(String label,
/**
* Method used to generate the txt based Job table for Job pages.
- *
+ *
* @param jobs vector of jobs to be displayed in table.
* @param colSeparator the char used to separate columns
* @param rowSeparator the char used to separate records
@@ -683,8 +645,8 @@ public static String generateTxtJobTable(Collection<JobInProgress> jobs,
"24.MEM_MS" + colSeparator +
"25.%CPU" + colSeparator +
"26.%CPU_MAX" + colSeparator +
- "27.CPU_MS" + rowSeparator);
-
+ "27.CPU_MS" + rowSeparator);
+
if (jobs.size() > 0) {
for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
JobInProgress job = it.next();
@@ -760,7 +722,7 @@ public static String generateTxtJobTable(Collection<JobInProgress> jobs,
memCost + colSeparator +
cpu + colSeparator +
cpuMax + colSeparator +
- cpuCost + rowSeparator);
+ cpuCost + rowSeparator);
}
}
return sb.toString();
View
451 src/mapred/org/apache/hadoop/mapred/JobClient.java
@@ -91,11 +91,11 @@
/**
* <code>JobClient</code> is the primary interface for the user-job to interact
* with the {@link JobTracker}.
- *
- * <code>JobClient</code> provides facilities to submit jobs, track their
+ *
+ * <code>JobClient</code> provides facilities to submit jobs, track their
* progress, access component-tasks' reports/logs, get the Map-Reduce cluster
* status information etc.
- *
+ *
* <p>The job submission process involves:
* <ol>
* <li>
@@ -105,59 +105,59 @@
* Computing the {@link InputSplit}s for the job.
* </li>
* <li>
- * Setup the requisite accounting information for the {@link DistributedCache}
+ * Setup the requisite accounting information for the {@link DistributedCache}
* of the job, if necessary.
* </li>
* <li>
- * Copying the job's jar and configuration to the map-reduce system directory
- * on the distributed file-system.
+ * Copying the job's jar and configuration to the map-reduce system directory
+ * on the distributed file-system.
* </li>
* <li>
* Submitting the job to the <code>JobTracker</code> and optionally monitoring
* it's status.
* </li>
* </ol></p>
- *
+ *
* Normally the user creates the application, describes various facets of the
- * job via {@link JobConf} and then uses the <code>JobClient</code> to submit
+ * job via {@link JobConf} and then uses the <code>JobClient</code> to submit
* the job and monitor its progress.
- *
+ *
* <p>Here is an example on how to use <code>JobClient</code>:</p>
* <p><blockquote><pre>
* // Create a new JobConf
* JobConf job = new JobConf(new Configuration(), MyJob.class);
- *
- * // Specify various job-specific parameters
+ *
+ * // Specify various job-specific parameters
* job.setJobName("myjob");
- *
+ *
* job.setInputPath(new Path("in"));
* job.setOutputPath(new Path("out"));
- *
+ *
* job.setMapperClass(MyJob.MyMapper.class);
* job.setReducerClass(MyJob.MyReducer.class);
*
* // Submit the job, then poll for progress until the job is complete
* JobClient.runJob(job);
* </pre></blockquote></p>
- *
+ *
* <h4 id="JobControl">Job Control</h4>
- *
- * <p>At times clients would chain map-reduce jobs to accomplish complex tasks
- * which cannot be done via a single map-reduce job. This is fairly easy since
- * the output of the job, typically, goes to distributed file-system and that
+ *
+ * <p>At times clients would chain map-reduce jobs to accomplish complex tasks
+ * which cannot be done via a single map-reduce job. This is fairly easy since
+ * the output of the job, typically, goes to distributed file-system and that
* can be used as the input for the next job.</p>
- *
- * <p>However, this also means that the onus on ensuring jobs are complete
- * (success/failure) lies squarely on the clients. In such situations the
+ *
+ * <p>However, this also means that the onus on ensuring jobs are complete
+ * (success/failure) lies squarely on the clients. In such situations the
* various job-control options are:
* <ol>
* <li>
- * {@link #runJob(JobConf)} : submits the job and returns only after
+ * {@link #runJob(JobConf)} : submits the job and returns only after
* the job has completed.
* </li>
* <li>
- * {@link #submitJob(JobConf)} : only submits the job, then poll the
- * returned handle to the {@link RunningJob} to query status and make
+ * {@link #submitJob(JobConf)} : only submits the job, then poll the
+ * returned handle to the {@link RunningJob} to query status and make
* scheduling decisions.
* </li>
* <li>
@@ -165,7 +165,7 @@
* on job-completion, thus avoiding polling.
* </li>
* </ol></p>
- *
+ *
* @see JobConf
* @see ClusterStatus
* @see Tool
@@ -174,7 +174,7 @@
public class JobClient extends Configured implements MRConstants, Tool {
private static final Log LOG = LogFactory.getLog(JobClient.class);
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
- private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
+ private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
private static Random r = new Random();
@@ -199,9 +199,6 @@
JobStatus status;
long statustime;
- // See comment for getJobStatus()
- private long MAX_STALE_STATUS_INTERVAL = 10 * 1000;
-
/**
* We store a JobProfile and a timestamp for when we last
* acquired the job profile. If the job is null, then we cannot
@@ -224,38 +221,14 @@ synchronized void ensureFreshStatus() throws IOException {
updateStatus();
}
}
-
+
/** Some methods need to update status immediately. So, refresh
* immediately
* @throws IOException
*/
synchronized void updateStatus() throws IOException {
- JobStatus status = jobSubmitClient.getJobStatus(profile.getJobID());
- long now = System.currentTimeMillis();
- // There is a bug where the JT occasionally returns a null status. The
- // null status will cause clients (inc. Hive) to fail. The null status
- // may be a transient error, so we are logging when this occurs and
- // instead keeping the last status value. This change should be reverted
- // following Task #917234
-
- if (status == null) {
- LOG.error("JT returned null status for job " + profile.getJobID());
- }
-
- // In case the JT continues to return the null status, that's probably the
- // right value
- if (status != null ||
- (now > this.statustime + MAX_STALE_STATUS_INTERVAL)) {
- if (status == null) {
- // If we could not get the status, assume failed.
- LOG.error("Creating failed status because getJobStatus() " +
- "returned null");
- status = new JobStatus(profile.getJobID(), 0.0f, 0.0f,
- 0.0f, JobStatus.FAILED);
- }
- this.status = status;
- this.statustime = now;
- }
+ this.status = jobSubmitClient.getJobStatus(profile.getJobID());
+ this.statustime = System.currentTimeMillis();
}
/**
@@ -264,14 +237,14 @@ synchronized void updateStatus() throws IOException {
public JobID getID() {
return profile.getJobID();
}
-
- /** @deprecated This method is deprecated and will be removed. Applications should
+
+ /** @deprecated This method is deprecated and will be removed. Applications should
* rather use {@link #getID()}.*/
@Deprecated
public String getJobID() {
return profile.getJobID().toString();
}
-
+
/**
* The user-specified job name
*/
@@ -366,23 +339,23 @@ public synchronized int getJobState() throws IOException {
updateStatus();
return status.getRunState();
}
-
+
/**
* Tells the service to terminate the current job.
*/
public synchronized void killJob() throws IOException {
jobSubmitClient.killJob(getID());
}
-
-
+
+
/** Set the priority of the job.
- * @param priority new priority of the job.
+ * @param priority new priority of the job.
*/
- public synchronized void setJobPriority(String priority)
+ public synchronized void setJobPriority(String priority)
throws IOException {
jobSubmitClient.setJobPriority(getID(), priority);
}
-
+
/**
* Kill indicated task attempt.
* @param taskId the id of the task to kill.
@@ -398,14 +371,14 @@ public synchronized void killTask(TaskAttemptID taskId, boolean shouldFail) thro
public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
killTask(TaskAttemptID.forName(taskId), shouldFail);
}
-
+
/**
- * Fetch task completion events from jobtracker for this job.
+ * Fetch task completion events from jobtracker for this job.
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
int startFrom) throws IOException{
return jobSubmitClient.getTaskCompletionEvents(
- getID(), startFrom, 10);
+ getID(), startFrom, 10);
}
/**
@@ -417,20 +390,20 @@ public String toString() {
updateStatus();
} catch (IOException e) {
}
- return "Job: " + profile.getJobID() + "\n" +
- "file: " + profile.getJobFile() + "\n" +
- "tracking URL: " + profile.getURL() + "\n" +
- "map() completion: " + status.mapProgress() + "\n" +
+ return "Job: " + profile.getJobID() + "\n" +
+ "file: " + profile.getJobFile() + "\n" +
+ "tracking URL: " + profile.getURL() + "\n" +
+ "map() completion: " + status.mapProgress() + "\n" +
"reduce() completion: " + status.reduceProgress();
}
-
+
/**
* Returns the counters for this job
*/
public Counters getCounters() throws IOException {
return jobSubmitClient.getJobCounters(getID());
}
-
+
@Override
public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
return jobSubmitClient.getTaskDiagnostics(id);
@@ -441,7 +414,7 @@ public Counters getCounters() throws IOException {
private boolean jobSubmitClientIsProxy = false;
private boolean isJobTrackerInProc = false;
private Path sysDir = null;
-
+
private FileSystem fs = null;
/**
@@ -449,11 +422,11 @@ public Counters getCounters() throws IOException {
*/
public JobClient() {
}
-
+
/**
- * Build a job client with the given {@link JobConf}, and connect to the
+ * Build a job client with the given {@link JobConf}, and connect to the
* default {@link JobTracker}.
- *
+ *
* @param conf the job configuration.
* @throws IOException
*/
@@ -512,18 +485,18 @@ public void init(JobConf conf) throws IOException {
private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
- return RPC.getProxy(JobSubmissionProtocol.class,
+ return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
}
/**
* Build a job client, connect to the indicated job tracker.
- *
+ *
* @param jobTrackAddr the job tracker to connect to.
* @param conf configuration.
*/
- public JobClient(InetSocketAddress jobTrackAddr,
+ public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
}
@@ -540,7 +513,7 @@ public synchronized void close() throws IOException {
/**
* Get a filesystem handle. We need this to prepare jobs
* for submission to the MapReduce system.
- *
+ *
* @return the filesystem handle.
*/
public synchronized FileSystem getFs() throws IOException {
@@ -550,7 +523,7 @@ public synchronized FileSystem getFs() throws IOException {
}
return fs;
}
-
+
/* see if two file systems are the same or not
*
*/
@@ -563,7 +536,7 @@ private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
if (!srcUri.getScheme().equals(dstUri.getScheme())) {
return false;
}
- String srcHost = srcUri.getHost();
+ String srcHost = srcUri.getHost();
String dstHost = dstUri.getHost();
if ((srcHost != null) && (dstHost != null)) {
try {
@@ -605,10 +578,10 @@ private Path copyRemoteFiles(FileSystem jtFs, Path parentDir,
throws IOException {
//check if we do not need to copy the files
// is jt using the same file system.
- // just checking for uri strings... doing no dns lookups
+ // just checking for uri strings... doing no dns lookups
// to see if the filesystems are the same. This is not optimal.
// but avoids name resolution.
-
+
FileSystem remoteFs = null;
remoteFs = originalPath.getFileSystem(job);
if (compareFs(remoteFs, jtFs)) {
@@ -672,7 +645,7 @@ private Path copyRemoteFiles(FileSystem jtFs, Path parentDir,
private final static long FCACHE_REFRESH_INTERVAL = 1000L * 60 * 60;
private void populateFileListings(FileSystem fs, Path[] f) {
-
+
long now = System.currentTimeMillis();
if (filesInCache != null &&
now - filesInCacheTs < FCACHE_REFRESH_INTERVAL) {
@@ -717,11 +690,11 @@ public FileInfo(String md5, long fileLength, long timeStamp) {
}
Map<URI, FileInfo> fileInfo;
-
+
/**
* Create symlinks for the files needed for the jobs in current directory
* @param job
- * @throws IOException
+ * @throws IOException
*/
private void symLinkAndConfigureFiles(JobConf job) throws IOException {
if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
@@ -729,7 +702,7 @@ private void symLinkAndConfigureFiles(JobConf job) throws IOException {
"Applications should implement Tool for the same.");
}
- // get all the command line arguments into the
+ // get all the command line arguments into the
// jobconf passed in by the user conf
String files = job.get("tmpfiles");
String archives = job.get("tmparchives");
@@ -754,7 +727,7 @@ private void symLinkAndConfigureFiles(JobConf job) throws IOException {
}
// Configure job name
String originalJar = job.getJar();
- if (originalJar != null) {
+ if (originalJar != null) {
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
job.setJobName(new Path(originalJar).getName());
@@ -776,30 +749,30 @@ private void splitAndAdd(String files, List<String> filesToSymLink) {
}
/**
- * configure the jobconf of the user with the command line options of
+ * configure the jobconf of the user with the command line options of
* -libjars, -files, -archives
* @param conf
* @throws IOException
*/
private void copyAndConfigureFiles(JobConf job, Path uploadFileDir,
- boolean shared)
+ boolean shared)
throws IOException {
-
+
if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
"Applications should implement Tool for the same.");
}
- // get all the command line arguments into the
+ // get all the command line arguments into the
// jobconf passed in by the user conf
String files = job.get("tmpfiles");
String libjars = job.get("tmpjars");
String archives = job.get("tmparchives");
-
+
//
// Figure out what fs the JobTracker is using. Copy the
// job to it, under a temporary name. This allows DFS to work,
- // and under the local fs also provides UNIX-like object loading
+ // and under the local fs also provides UNIX-like object loading
// semantics. (that is, if the job file is deleted right after
// submission, we can still run the submission to completion)
//
@@ -821,7 +794,7 @@ private void copyAndConfigureFiles(JobConf job, Path uploadFileDir,
short replication = (short)job.getInt("mapred.submit.replication", 10);
if (shared) {
- populateFileListings(fs,
+ populateFileListings(fs,
new Path[] { filesDir, archivesDir, libjarsDir});
}
@@ -829,7 +802,7 @@ private void copyAndConfigureFiles(JobConf job, Path uploadFileDir,
String originalJar = job.getJar();
- if (originalJar != null) {
+ if (originalJar != null) {
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
job.setJobName(new Path(originalJar).getName());
@@ -883,7 +856,7 @@ private void copyAndConfigureFiles(JobConf job, Path uploadFileDir,
}
// add all the command line files/ jars and archive
- // first copy them to jobtrackers filesystem
+ // first copy them to jobtrackers filesystem
if (files != null) {
if (!fs.exists(filesDir)) {
@@ -926,7 +899,7 @@ private void copyAndConfigureFiles(JobConf job, Path uploadFileDir,
}
} catch(URISyntaxException ue) {
- //should not throw a uri exception
+ //should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile);
}
DistributedCache.createSymlink(job);
@@ -967,10 +940,10 @@ private void copyAndConfigureFiles(JobConf job, Path uploadFileDir,
}
}
}
-
+
if (archives != null) {
if (!fs.exists(archivesDir)) {
- FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);
+ FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);
}
String[] archivesArr = archives.split(",");
@@ -1013,7 +986,7 @@ private void copyAndConfigureFiles(JobConf job, Path uploadFileDir,
DistributedCache.createSymlink(job);
}
}
-
+
// set the timestamps and md5 of the archives and files
URI[] tarchives = DistributedCache.getSharedCacheArchives(job);
if (tarchives != null) {
@@ -1123,7 +1096,7 @@ private void configureUserName(JobConf job) throws IOException {
job.set("group.name", ugi.getGroupNames()[0]);
}
if (job.getWorkingDirectory() == null) {
- job.setWorkingDirectory(fs.getWorkingDirectory());
+ job.setWorkingDirectory(fs.getWorkingDirectory());
}
}
@@ -1137,13 +1110,13 @@ private static UnixUserGroupInformation getUGI(Configuration job) throws IOExcep
}
return ugi;
}
-
+
/**
* Submit a job to the MR system.
- *
+ *
* This returns a handle to the {@link RunningJob} which can be used to track
* the running-job.
- *
+ *
* @param jobFile the job configuration.
* @return a handle to the {@link RunningJob} which can be used to track the
* running-job.
@@ -1151,27 +1124,27 @@ private static UnixUserGroupInformation getUGI(Configuration job) throws IOExcep
* @throws InvalidJobConfException
* @throws IOException
*/
- public RunningJob submitJob(String jobFile) throws FileNotFoundException,
- InvalidJobConfException,
+ public RunningJob submitJob(String jobFile) throws FileNotFoundException,
+ InvalidJobConfException,
IOException {
// Load in the submitted job details
JobConf job = new JobConf(jobFile);
return submitJob(job);
}
-
+
// job files are world-wide readable and owner writable