Permalink
Browse files

HADOOP-5881. Simplify memory monitoring and scheduling related config…

…uration. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@778700 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 4a33ac5 commit fe2cd693c8b9550f748e72c6d5a0db745097b7a4 Hemanth Yamijala committed May 26, 2009
View
@@ -7,6 +7,9 @@ Release 0.20.1 - Unreleased
HADOOP-5726. Remove pre-emption from capacity scheduler code base.
(Rahul Kumar Singh via yhemanth)
+ HADOOP-5881. Simplify memory monitoring and scheduling related
+ configuration. (Vinod Kumar Vavilapalli via yhemanth)
+
NEW FEATURES
IMPROVEMENTS
@@ -56,34 +56,6 @@
account in scheduling decisions by default in a job queue.
</description>
</property>
-
- <property>
- <name>mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem</name>
- <value>-1</value>
- <description>A percentage (float) of the default VM limit for jobs
- (mapred.task.default.maxvm). This is the default RAM task-limit
- associated with a task. Unless overridden by a job's setting, this
- number defines the RAM task-limit.
-
- If this property is missing, or set to an invalid value, scheduling
- based on physical memory, RAM, is disabled.
- </description>
- </property>
-
- <property>
- <name>mapred.capacity-scheduler.task.limit.maxpmem</name>
- <value>-1</value>
- <description>Configuration that provides an upper limit on the maximum
- physical memory that can be specified by a job. The job configuration
- mapred.task.maxpmem should be less than this value. If not, the job will
- be rejected by the scheduler.
-
- If it is set to -1, scheduler will not consider physical memory for
- scheduling even if virtual memory based scheduling is enabled(by setting
- valid values for both mapred.task.default.maxvmem and
- mapred.task.limit.maxvmem).
- </description>
- </property>
<property>
<name>mapred.capacity-scheduler.default-minimum-user-limit-percent</name>
@@ -351,44 +351,4 @@ public void setMaxWorkerThreads(int poolSize) {
rmConf.setInt(
"mapred.capacity-scheduler.init-worker-threads", poolSize);
}
-
- /**
- * Get the upper limit on the maximum physical memory that can be specified by
- * a job.
- *
- * @return upper limit for max pmem for tasks.
- */
- public long getLimitMaxPmemForTasks() {
- return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
- /**
- * Get the upper limit on the maximum physical memory that can be specified by
- * a job.
- *
- * @param value
- */
- public void setLimitMaxPmemForTasks(long value) {
- rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
- }
-
- /**
- * Get cluster-wide default percentage of pmem in vmem.
- *
- * @return cluster-wide default percentage of pmem in vmem.
- */
- public float getDefaultPercentOfPmemInVmem() {
- return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT);
- }
-
- /**
- * Set cluster-wide default percentage of pmem in vmem.
- *
- * @param value
- */
- public void setDefaultPercentOfPmemInVmem(float value) {
- rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
- }
}
@@ -278,11 +278,7 @@ LookUpStatus getLookUpStatus() {
/** our TaskScheduler object */
protected CapacityTaskScheduler scheduler;
- // can be replaced with a global type, if we have one
- protected static enum TYPE {
- MAP, REDUCE
- }
- protected TYPE type = null;
+ protected CapacityTaskScheduler.TYPE type = null;
abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
JobInProgress job) throws IOException;
@@ -413,7 +409,8 @@ private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
//If this job meets memory requirements. Ask the JobInProgress for
//a task to be scheduled on the task tracker.
//if we find a job then we pass it on.
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+ taskTracker)) {
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTracker, j);
//if there is a task return it immediately.
@@ -422,6 +419,8 @@ private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
return TaskLookupResult.getTaskFoundResult(t);
} else {
//skip to the next job in the queue.
+ LOG.debug("Job " + j.getJobID().toString()
+ + " returned no tasks of type " + type);
continue;
}
} else {
@@ -456,7 +455,8 @@ private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+ taskTracker)) {
// We found a suitable job. Get task from it.
Task t = obtainNewTask(taskTracker, j);
//if there is a task return it immediately.
@@ -561,7 +561,7 @@ boolean hasSpeculativeTask(TaskInProgress[] tips, float progress,
private static class MapSchedulingMgr extends TaskSchedulingMgr {
MapSchedulingMgr(CapacityTaskScheduler dad) {
super(dad);
- type = TaskSchedulingMgr.TYPE.MAP;
+ type = CapacityTaskScheduler.TYPE.MAP;
queueComparator = mapComparator;
}
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
@@ -603,7 +603,7 @@ boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
ReduceSchedulingMgr(CapacityTaskScheduler dad) {
super(dad);
- type = TaskSchedulingMgr.TYPE.REDUCE;
+ type = CapacityTaskScheduler.TYPE.REDUCE;
queueComparator = reduceComparator;
}
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
@@ -664,13 +664,18 @@ long getTime() {
return System.currentTimeMillis();
}
}
+ // can be replaced with a global type, if we have one
+ protected static enum TYPE {
+ MAP, REDUCE
+ }
+
private Clock clock;
private JobInitializationPoller initializationPoller;
- long limitMaxVmemForTasks;
- long limitMaxPmemForTasks;
- long defaultMaxVmPerTask;
- float defaultPercentOfPmemInVmem;
+ private long memSizeForMapSlotOnJT;
+ private long memSizeForReduceSlotOnJT;
+ private long limitMaxMemForMapTasks;
+ private long limitMaxMemForReduceTasks;
public CapacityTaskScheduler() {
this(new Clock());
@@ -687,37 +692,45 @@ public void setResourceManagerConf(CapacitySchedulerConf conf) {
this.schedConf = conf;
}
- /**
- * Normalize the negative values in configuration
- *
- * @param val
- * @return normalized value
- */
- private long normalizeMemoryConfigValue(long val) {
- if (val < 0) {
- val = JobConf.DISABLED_MEMORY_LIMIT;
- }
- return val;
- }
-
private void initializeMemoryRelatedConf() {
- limitMaxVmemForTasks =
- normalizeMemoryConfigValue(conf.getLong(
- JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+ memSizeForMapSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
JobConf.DISABLED_MEMORY_LIMIT));
+ memSizeForReduceSlotOnJT =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForMapTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ limitMaxMemForReduceTasks =
+ JobConf.normalizeMemoryConfigValue(conf.getLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ JobConf.DISABLED_MEMORY_LIMIT));
+ LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+ "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+ " limitMaxMemForMapTasks, limitMaxMemForReduceTasks)").append(
+ memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+ .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+ limitMaxMemForReduceTasks).append(")"));
+ }
- limitMaxPmemForTasks =
- normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
+ long getMemSizeForMapSlot() {
+ return memSizeForMapSlotOnJT;
+ }
- defaultMaxVmPerTask =
- normalizeMemoryConfigValue(conf.getLong(
- JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
- JobConf.DISABLED_MEMORY_LIMIT));
+ long getMemSizeForReduceSlot() {
+ return memSizeForReduceSlotOnJT;
+ }
- defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
- if (defaultPercentOfPmemInVmem < 0) {
- defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
- }
+ long getLimitMaxMemForMapSlot() {
+ return limitMaxMemForMapTasks;
+ }
+
+ long getLimitMaxMemForReduceSlot() {
+ return limitMaxMemForReduceTasks;
}
@Override
@@ -955,14 +968,12 @@ private synchronized void updateQSIObjects(int mapClusterCapacity,
// found a task; return
return Collections.singletonList(tlr.getTask());
}
- else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT ==
- tlr.getLookUpStatus()) {
- // return no task
- return null;
- }
// if we didn't get any, look at map tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
- tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+ else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+ == tlr.getLookUpStatus() ||
+ TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+ == tlr.getLookUpStatus())
+ && (maxMapTasks > currentMapTasks)) {
mapScheduler.updateCollectionOfQSIs();
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -980,13 +991,12 @@ else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
// found a task; return
return Collections.singletonList(tlr.getTask());
}
- else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT ==
- tlr.getLookUpStatus()) {
- return null;
- }
// if we didn't get any, look at reduce tasks, if TT has space
- else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
- tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+ else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+ == tlr.getLookUpStatus()
+ || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+ == tlr.getLookUpStatus())
+ && (maxReduceTasks > currentReduceTasks)) {
reduceScheduler.updateCollectionOfQSIs();
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -999,38 +1009,6 @@ else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
return null;
}
- /**
- * Kill the job if it has invalid requirements and return why it is killed
- *
- * @param job
- * @return string mentioning why the job is killed. Null if the job has valid
- * requirements.
- */
- private String killJobIfInvalidRequirements(JobInProgress job) {
- if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
- return null;
- }
- if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
- || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
- .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
- String msg =
- job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
- + job.getMaxPhysicalMemoryForTask()
- + "pmem) exceeds the cluster's max-memory-limits ("
- + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
- + "pmem). Cannot run in this cluster, so killing it.";
- LOG.warn(msg);
- try {
- taskTrackerManager.killJob(job.getJobID());
- return msg;
- } catch (IOException ioe) {
- LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
- + StringUtils.stringifyException(ioe));
- }
- }
- return null;
- }
-
// called when a job is added
synchronized void jobAdded(JobInProgress job) throws IOException {
QueueSchedulingInfo qsi =
@@ -1050,13 +1028,6 @@ synchronized void jobAdded(JobInProgress job) throws IOException {
qsi.numJobsByUser.put(job.getProfile().getUser(), i);
LOG.debug("Job " + job.getJobID().toString() + " is added under user "
+ job.getProfile().getUser() + ", user now has " + i + " jobs");
-
- // Kill the job if it cannot run in the cluster because of invalid
- // resource requirements.
- String statusMsg = killJobIfInvalidRequirements(job);
- if (statusMsg != null) {
- throw new IOException(statusMsg);
- }
}
// called when a job completes
Oops, something went wrong.

0 comments on commit fe2cd69

Please sign in to comment.