Skip to content

Commit

Permalink
MAPREDUCE-6143. add configuration for mapreduce speculative execution…
Browse files Browse the repository at this point in the history
… in MR2 (zxu via rkanter)
  • Loading branch information
rkanter committed Feb 2, 2015
1 parent d085eb1 commit 8acc5e9
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 25 deletions.
3 changes: 3 additions & 0 deletions hadoop-mapreduce-project/CHANGES.txt
Expand Up @@ -268,6 +268,9 @@ Release 2.7.0 - UNRELEASED

MAPREDUCE-6151. Update document of GridMix (Masatake Iwasaki via aw)

MAPREDUCE-6143. add configuration for mapreduce speculative execution in
MR2 (zxu via rkanter)

OPTIMIZATIONS

MAPREDUCE-6169. MergeQueue should release reference to the current item
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;

import com.google.common.annotations.VisibleForTesting;

public class DefaultSpeculator extends AbstractService implements
Speculator {
Expand All @@ -62,12 +63,11 @@ public class DefaultSpeculator extends AbstractService implements
private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;

private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;

private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
private long soonestRetryAfterNoSpeculate;
private long soonestRetryAfterSpeculate;
private double proportionRunningTasksSpeculatable;
private double proportionTotalTasksSpeculatable;
private int minimumAllowedSpeculativeTasks;

private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);

Expand Down Expand Up @@ -163,6 +163,21 @@ public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
this.estimator = estimator;
this.clock = clock;
this.eventHandler = context.getEventHandler();
this.soonestRetryAfterNoSpeculate =
conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
this.soonestRetryAfterSpeculate =
conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
this.proportionRunningTasksSpeculatable =
conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
this.proportionTotalTasksSpeculatable =
conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
this.minimumAllowedSpeculativeTasks =
conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
}

/* ************************************************************* */
Expand All @@ -182,8 +197,8 @@ public void run() {
try {
int speculations = computeSpeculations();
long mininumRecomp
= speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
: SOONEST_RETRY_AFTER_NO_SPECULATE;
= speculations > 0 ? soonestRetryAfterSpeculate
: soonestRetryAfterNoSpeculate;

long wait = Math.max(mininumRecomp,
clock.getTime() - backgroundRunStartTime);
Expand Down Expand Up @@ -497,8 +512,8 @@ private int maybeScheduleASpeculation(TaskType type) {
Map<TaskId, Task> tasks = job.getTasks(type);

int numberAllowedSpeculativeTasks
= (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
= (int) Math.max(minimumAllowedSpeculativeTasks,
proportionTotalTasksSpeculatable * tasks.size());

TaskId bestTaskID = null;
long bestSpeculationValue = -1L;
Expand All @@ -523,7 +538,7 @@ private int maybeScheduleASpeculation(TaskType type) {
}
numberAllowedSpeculativeTasks
= (int) Math.max(numberAllowedSpeculativeTasks,
PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
proportionRunningTasksSpeculatable * numberRunningTasks);

// If we found a speculation target, fire it off
if (bestTaskID != null
Expand Down Expand Up @@ -583,4 +598,29 @@ public void resetHeartBeatTime(long lastHeartBeatTime) {
this.lastHeartBeatTime = lastHeartBeatTime;
}
}

@VisibleForTesting
public long getSoonestRetryAfterNoSpeculate() {
return soonestRetryAfterNoSpeculate;
}

@VisibleForTesting
public long getSoonestRetryAfterSpeculate() {
return soonestRetryAfterSpeculate;
}

@VisibleForTesting
public double getProportionRunningTasksSpeculatable() {
return proportionRunningTasksSpeculatable;
}

@VisibleForTesting
public double getProportionTotalTasksSpeculatable() {
return proportionTotalTasksSpeculatable;
}

@VisibleForTesting
public int getMinimumAllowedSpeculativeTasks() {
return minimumAllowedSpeculativeTasks;
}
}
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
Expand Down Expand Up @@ -137,7 +138,22 @@ public class TestRuntimeEstimators {

estimator.contextualize(conf, myAppContext);

conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 500L);
conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 5000L);
conf.setDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 0.1);
conf.setDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 0.001);
conf.setInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 5);
speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_NO_SPECULATE value",
500L, speculator.getSoonestRetryAfterNoSpeculate());
Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_SPECULATE value",
5000L, speculator.getSoonestRetryAfterSpeculate());
Assert.assertEquals(speculator.getProportionRunningTasksSpeculatable(),
0.1, 0.00001);
Assert.assertEquals(speculator.getProportionTotalTasksSpeculatable(),
0.001, 0.00001);
Assert.assertEquals("wrong SPECULATIVE_MINIMUM_ALLOWED_TASKS value",
5, speculator.getMinimumAllowedSpeculativeTasks());

dispatcher.register(Speculator.EventType.class, speculator);

Expand Down
Expand Up @@ -86,12 +86,41 @@ public interface MRJobConfig {

public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";

// SPECULATIVE_SLOWNODE_THRESHOLD is obsolete and will be deleted in the future
@Deprecated
public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";

public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";

// SPECULATIVECAP is obsolete and will be deleted in the future
@Deprecated
public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";

public static final String SPECULATIVECAP_RUNNING_TASKS =
"mapreduce.job.speculative.speculative-cap-running-tasks";
public static final double DEFAULT_SPECULATIVECAP_RUNNING_TASKS =
0.1;

public static final String SPECULATIVECAP_TOTAL_TASKS =
"mapreduce.job.speculative.speculative-cap-total-tasks";
public static final double DEFAULT_SPECULATIVECAP_TOTAL_TASKS =
0.01;

public static final String SPECULATIVE_MINIMUM_ALLOWED_TASKS =
"mapreduce.job.speculative.minimum-allowed-tasks";
public static final int DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS =
10;

public static final String SPECULATIVE_RETRY_AFTER_NO_SPECULATE =
"mapreduce.job.speculative.retry-after-no-speculate";
public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE =
1000L;

public static final String SPECULATIVE_RETRY_AFTER_SPECULATE =
"mapreduce.job.speculative.retry-after-speculate";
public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE =
15000L;

public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";

public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
Expand Down
Expand Up @@ -214,12 +214,10 @@ private static void addDeprecatedKeys() {
MRJobConfig.SKIP_RECORDS),
new DeprecationDelta("mapred.skip.out.dir",
MRJobConfig.SKIP_OUTDIR),
new DeprecationDelta("mapred.speculative.execution.slowNodeThreshold",
MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD),
new DeprecationDelta("mapred.speculative.execution.slowTaskThreshold",
MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD),
new DeprecationDelta("mapred.speculative.execution.speculativeCap",
MRJobConfig.SPECULATIVECAP),
MRJobConfig.SPECULATIVECAP_RUNNING_TASKS),
new DeprecationDelta("job.local.dir",
MRJobConfig.JOB_LOCAL_DIR),
new DeprecationDelta("mapreduce.inputformat.class",
Expand Down
Expand Up @@ -490,13 +490,42 @@
<description>If true, then multiple instances of some reduce tasks
may be executed in parallel.</description>
</property>

<property>
<name>mapreduce.job.speculative.speculativecap</name>
<name>mapreduce.job.speculative.speculative-cap-running-tasks</name>
<value>0.1</value>
<description>The max percent (0-1) of running tasks that
can be speculatively re-executed at any time.</description>
</property>

<property>
<name>mapreduce.job.speculative.speculative-cap-total-tasks</name>
<value>0.01</value>
<description>The max percent (0-1) of all tasks that
can be speculatively re-executed at any time.</description>
</property>

<property>
<name>mapreduce.job.speculative.minimum-allowed-tasks</name>
<value>10</value>
<description>The minimum allowed tasks that
can be speculatively re-executed at any time.</description>
</property>

<property>
<name>mapreduce.job.speculative.retry-after-no-speculate</name>
<value>1000</value>
<description>The waiting time(ms) to do next round of speculation
if there is no task speculated in this round.</description>
</property>

<property>
<name>mapreduce.job.speculative.retry-after-speculate</name>
<value>15000</value>
<description>The waiting time(ms) to do next round of speculation
if there are tasks speculated in this round.</description>
</property>

<property>
<name>mapreduce.job.map.output.collector.class</name>
<value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
Expand All @@ -516,16 +545,6 @@
</description>
</property>

<property>
<name>mapreduce.job.speculative.slownodethreshold</name>
<value>1.0</value>
<description>The number of standard deviations by which a Task
Tracker's average map and reduce progress-rates (finishTime-dispatchTime)
must be lower than the average of all successful map/reduce task's for
the NodeManager to be considered too slow to give a speculative task to.
</description>
</property>

<property>
<name>mapreduce.job.ubertask.enable</name>
<value>false</value>
Expand Down

0 comments on commit 8acc5e9

Please sign in to comment.