Permalink
Browse files

MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugi…

…ns. Contributed by Anver BenHanoch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1418173 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 2d1fbda commit ab9bf12f9b5a892c52a8b79497dc8b50135aab14 @acmurthy acmurthy committed Dec 7, 2012
@@ -11,6 +11,9 @@ Trunk (Unreleased)
MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation.
(Plamen Jeliazkov via shv)
+ MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
+ (Avner BenHanoch via acmurthy)
+
IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
@@ -340,6 +340,7 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
boolean isLocal = false;
// local if
@@ -358,8 +359,14 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
- Shuffle shuffle =
- new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical,
+ Class<? extends ShuffleConsumerPlugin> clazz =
+ job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
+
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+ LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
+
+ ShuffleConsumerPlugin.Context shuffleContext =
+ new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
@@ -368,7 +375,8 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile);
- rIter = shuffle.run();
+ shuffleConsumerPlugin.init(shuffleContext);
+ rIter = shuffleConsumerPlugin.run();
} else {
// local job runner doesn't have a copy phase
copyPhase.complete();
@@ -399,6 +407,10 @@ public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
+
+ if (shuffleConsumerPlugin != null) {
+ shuffleConsumerPlugin.close();
+ }
done(umbilical, reporter);
}
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * ShuffleConsumerPlugin for serving Reducers. It may shuffle MOF files from
+ * either the built-in ShuffleHandler or from a 3rd party AuxiliaryService.
+ *
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public interface ShuffleConsumerPlugin<K, V> {
+
+ public void init(Context<K, V> context);
+
+ public RawKeyValueIterator run() throws IOException, InterruptedException;
+
+ public void close();
+
+ @InterfaceAudience.LimitedPrivate("mapreduce")
+ @InterfaceStability.Unstable
+ public static class Context<K,V> {
+ private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
+ private final JobConf jobConf;
+ private final FileSystem localFS;
+ private final TaskUmbilicalProtocol umbilical;
+ private final LocalDirAllocator localDirAllocator;
+ private final Reporter reporter;
+ private final CompressionCodec codec;
+ private final Class<? extends Reducer> combinerClass;
+ private final CombineOutputCollector<K, V> combineCollector;
+ private final Counters.Counter spilledRecordsCounter;
+ private final Counters.Counter reduceCombineInputCounter;
+ private final Counters.Counter shuffledMapsCounter;
+ private final Counters.Counter reduceShuffleBytes;
+ private final Counters.Counter failedShuffleCounter;
+ private final Counters.Counter mergedMapOutputsCounter;
+ private final TaskStatus status;
+ private final Progress copyPhase;
+ private final Progress mergePhase;
+ private final Task reduceTask;
+ private final MapOutputFile mapOutputFile;
+
+ public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
+ JobConf jobConf, FileSystem localFS,
+ TaskUmbilicalProtocol umbilical,
+ LocalDirAllocator localDirAllocator,
+ Reporter reporter, CompressionCodec codec,
+ Class<? extends Reducer> combinerClass,
+ CombineOutputCollector<K,V> combineCollector,
+ Counters.Counter spilledRecordsCounter,
+ Counters.Counter reduceCombineInputCounter,
+ Counters.Counter shuffledMapsCounter,
+ Counters.Counter reduceShuffleBytes,
+ Counters.Counter failedShuffleCounter,
+ Counters.Counter mergedMapOutputsCounter,
+ TaskStatus status, Progress copyPhase, Progress mergePhase,
+ Task reduceTask, MapOutputFile mapOutputFile) {
+ this.reduceId = reduceId;
+ this.jobConf = jobConf;
+ this.localFS = localFS;
+ this. umbilical = umbilical;
+ this.localDirAllocator = localDirAllocator;
+ this.reporter = reporter;
+ this.codec = codec;
+ this.combinerClass = combinerClass;
+ this.combineCollector = combineCollector;
+ this.spilledRecordsCounter = spilledRecordsCounter;
+ this.reduceCombineInputCounter = reduceCombineInputCounter;
+ this.shuffledMapsCounter = shuffledMapsCounter;
+ this.reduceShuffleBytes = reduceShuffleBytes;
+ this.failedShuffleCounter = failedShuffleCounter;
+ this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+ this.status = status;
+ this.copyPhase = copyPhase;
+ this.mergePhase = mergePhase;
+ this.reduceTask = reduceTask;
+ this.mapOutputFile = mapOutputFile;
+ }
+
+ public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
+ return reduceId;
+ }
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+ public FileSystem getLocalFS() {
+ return localFS;
+ }
+ public TaskUmbilicalProtocol getUmbilical() {
+ return umbilical;
+ }
+ public LocalDirAllocator getLocalDirAllocator() {
+ return localDirAllocator;
+ }
+ public Reporter getReporter() {
+ return reporter;
+ }
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+ public Class<? extends Reducer> getCombinerClass() {
+ return combinerClass;
+ }
+ public CombineOutputCollector<K, V> getCombineCollector() {
+ return combineCollector;
+ }
+ public Counters.Counter getSpilledRecordsCounter() {
+ return spilledRecordsCounter;
+ }
+ public Counters.Counter getReduceCombineInputCounter() {
+ return reduceCombineInputCounter;
+ }
+ public Counters.Counter getShuffledMapsCounter() {
+ return shuffledMapsCounter;
+ }
+ public Counters.Counter getReduceShuffleBytes() {
+ return reduceShuffleBytes;
+ }
+ public Counters.Counter getFailedShuffleCounter() {
+ return failedShuffleCounter;
+ }
+ public Counters.Counter getMergedMapOutputsCounter() {
+ return mergedMapOutputsCounter;
+ }
+ public TaskStatus getStatus() {
+ return status;
+ }
+ public Progress getCopyPhase() {
+ return copyPhase;
+ }
+ public Progress getMergePhase() {
+ return mergePhase;
+ }
+ public Task getReduceTask() {
+ return reduceTask;
+ }
+ public MapOutputFile getMapOutputFile() {
+ return mapOutputFile;
+ }
+ } // end of public static class Context<K,V>
+
+}
@@ -85,6 +85,9 @@
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+ public static final String SHUFFLE_CONSUMER_PLUGIN =
+ "mapreduce.job.reduce.shuffle.consumer.plugin.class";
+
/**
* Configuration key to enable/disable IFile readahead.
*/
@@ -34,73 +34,63 @@
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
-public class Shuffle<K, V> implements ExceptionReporter {
+public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
private static final int PROGRESS_FREQUENCY = 2000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MIN_EVENTS_TO_FETCH = 100;
private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
- private final TaskAttemptID reduceId;
- private final JobConf jobConf;
- private final Reporter reporter;
- private final ShuffleClientMetrics metrics;
- private final TaskUmbilicalProtocol umbilical;
+ private ShuffleConsumerPlugin.Context context;
+
+ private TaskAttemptID reduceId;
+ private JobConf jobConf;
+ private Reporter reporter;
+ private ShuffleClientMetrics metrics;
+ private TaskUmbilicalProtocol umbilical;
- private final ShuffleScheduler<K,V> scheduler;
- private final MergeManager<K, V> merger;
+ private ShuffleScheduler<K,V> scheduler;
+ private MergeManager<K, V> merger;
private Throwable throwable = null;
private String throwingThreadName = null;
- private final Progress copyPhase;
- private final TaskStatus taskStatus;
- private final Task reduceTask; //Used for status updates
-
- public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS,
- TaskUmbilicalProtocol umbilical,
- LocalDirAllocator localDirAllocator,
- Reporter reporter,
- CompressionCodec codec,
- Class<? extends Reducer> combinerClass,
- CombineOutputCollector<K,V> combineCollector,
- Counters.Counter spilledRecordsCounter,
- Counters.Counter reduceCombineInputCounter,
- Counters.Counter shuffledMapsCounter,
- Counters.Counter reduceShuffleBytes,
- Counters.Counter failedShuffleCounter,
- Counters.Counter mergedMapOutputsCounter,
- TaskStatus status,
- Progress copyPhase,
- Progress mergePhase,
- Task reduceTask,
- MapOutputFile mapOutputFile) {
- this.reduceId = reduceId;
- this.jobConf = jobConf;
- this.umbilical = umbilical;
- this.reporter = reporter;
+ private Progress copyPhase;
+ private TaskStatus taskStatus;
+ private Task reduceTask; //Used for status updates
+
+ @Override
+ public void init(ShuffleConsumerPlugin.Context context) {
+ this.context = context;
+
+ this.reduceId = context.getReduceId();
+ this.jobConf = context.getJobConf();
+ this.umbilical = context.getUmbilical();
+ this.reporter = context.getReporter();
this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
- this.copyPhase = copyPhase;
- this.taskStatus = status;
- this.reduceTask = reduceTask;
+ this.copyPhase = context.getCopyPhase();
+ this.taskStatus = context.getStatus();
+ this.reduceTask = context.getReduceTask();
scheduler =
- new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase,
- shuffledMapsCounter,
- reduceShuffleBytes, failedShuffleCounter);
- merger = new MergeManager<K, V>(reduceId, jobConf, localFS,
- localDirAllocator, reporter, codec,
- combinerClass, combineCollector,
- spilledRecordsCounter,
- reduceCombineInputCounter,
- mergedMapOutputsCounter,
- this, mergePhase, mapOutputFile);
+ new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase,
+ context.getShuffledMapsCounter(),
+ context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+ merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
+ context.getLocalDirAllocator(), reporter, context.getCodec(),
+ context.getCombinerClass(), context.getCombineCollector(),
+ context.getSpilledRecordsCounter(),
+ context.getReduceCombineInputCounter(),
+ context.getMergedMapOutputsCounter(),
+ this, context.getMergePhase(), context.getMapOutputFile());
}
+ @Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
@@ -171,6 +161,10 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
return kvIter;
}
+ @Override
+ public void close(){
+ }
+
public synchronized void reportException(Throwable t) {
if (throwable == null) {
throwable = t;
@@ -748,6 +748,16 @@
</description>
</property>
+<property>
+ <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
+ <value>org.apache.hadoop.mapreduce.task.reduce.Shuffle</value>
+ <description>
+ Name of the class whose instance will be used
+ to send shuffle requests by reducetasks of this job.
+ The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin.
+ </description>
+</property>
+
<!-- MR YARN Application properties -->
<property>
Oops, something went wrong.

0 comments on commit ab9bf12

Please sign in to comment.