Skip to content

Commit

Permalink
YARN-3508. Prevent processing preemption events on the main RM dispat…
Browse files Browse the repository at this point in the history
…cher. (Varun Saxena via wangda)
  • Loading branch information
wangdatan committed Jul 2, 2015
1 parent 152e5df commit 0e4b066
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 102 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -601,6 +601,9 @@ Release 2.7.2 - UNRELEASED
YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
Saxena via jlowe)

YARN-3508. Prevent processing preemption events on the main RM dispatcher.
(Varun Saxena via wangda)

Release 2.7.1 - UNRELEASED

INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -76,7 +76,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
Expand Down Expand Up @@ -614,9 +613,6 @@ protected void createPolicyMonitors() {
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
SchedulingEditPolicy.class);
if (policies.size() > 0) {
rmDispatcher.register(ContainerPreemptEventType.class,
new RMContainerPreemptEventDispatcher(
(PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
// periodically check whether we need to take action to guarantee
Expand Down Expand Up @@ -786,36 +782,6 @@ public void handle(RMAppEvent event) {
}
}

@Private
public static final class
RMContainerPreemptEventDispatcher
implements EventHandler<ContainerPreemptEvent> {

private final PreemptableResourceScheduler scheduler;

public RMContainerPreemptEventDispatcher(
PreemptableResourceScheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public void handle(ContainerPreemptEvent event) {
ApplicationAttemptId aid = event.getAppId();
RMContainer container = event.getContainer();
switch (event.getType()) {
case DROP_RESERVATION:
scheduler.dropContainerReservation(container);
break;
case PREEMPT_CONTAINER:
scheduler.preemptContainer(aid, container);
break;
case KILL_CONTAINER:
scheduler.killContainer(container);
break;
}
}
}

@Private
public static final class ApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
Expand Down
Expand Up @@ -18,14 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;

public interface SchedulingEditPolicy {

public void init(Configuration config,
EventHandler<ContainerPreemptEvent> dispatcher,
public void init(Configuration config, RMContext context,
PreemptableResourceScheduler scheduler);

/**
Expand Down
Expand Up @@ -54,9 +54,8 @@ public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
return scheduleEditPolicy;
}

@SuppressWarnings("unchecked")
public void serviceInit(Configuration conf) throws Exception {
scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
scheduleEditPolicy.init(conf, rmContext,
(PreemptableResourceScheduler) rmContext.getScheduler());
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf);
Expand Down
Expand Up @@ -38,20 +38,20 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
Expand Down Expand Up @@ -118,8 +118,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";

// the dispatcher to send preempt and kill events
public EventHandler<ContainerPreemptEvent> dispatcher;
private RMContext rmContext;

private final Clock clock;
private double maxIgnoredOverCapacity;
Expand All @@ -141,20 +140,17 @@ public ProportionalCapacityPreemptionPolicy() {
}

public ProportionalCapacityPreemptionPolicy(Configuration config,
EventHandler<ContainerPreemptEvent> dispatcher,
CapacityScheduler scheduler) {
this(config, dispatcher, scheduler, new SystemClock());
RMContext context, CapacityScheduler scheduler) {
this(config, context, scheduler, new SystemClock());
}

public ProportionalCapacityPreemptionPolicy(Configuration config,
EventHandler<ContainerPreemptEvent> dispatcher,
CapacityScheduler scheduler, Clock clock) {
init(config, dispatcher, scheduler);
RMContext context, CapacityScheduler scheduler, Clock clock) {
init(config, context, scheduler);
this.clock = clock;
}

public void init(Configuration config,
EventHandler<ContainerPreemptEvent> disp,
public void init(Configuration config, RMContext context,
PreemptableResourceScheduler sched) {
LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
assert null == scheduler : "Unexpected duplicate call to init";
Expand All @@ -163,7 +159,7 @@ public void init(Configuration config,
sched.getClass().getCanonicalName() + " not instance of " +
CapacityScheduler.class.getCanonicalName());
}
dispatcher = disp;
rmContext = context;
scheduler = (CapacityScheduler) sched;
maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
naturalTerminationFactor =
Expand Down Expand Up @@ -196,6 +192,7 @@ public void editSchedule() {
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
*/
@SuppressWarnings("unchecked")
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// All partitions to look at
Expand Down Expand Up @@ -248,22 +245,25 @@ private void containerBasedPreemptOrKill(CSQueue root,
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
ApplicationAttemptId appAttemptId = e.getKey();
if (LOG.isDebugEnabled()) {
LOG.debug("Send to scheduler: in app=" + e.getKey()
LOG.debug("Send to scheduler: in app=" + appAttemptId
+ " #containers-to-be-preempted=" + e.getValue().size());
}
for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null &&
preempted.get(container) + maxWaitTime < clock.getTime()) {
// kill it
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.KILL_CONTAINER));
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.KILL_CONTAINER));
preempted.remove(container);
} else {
//otherwise just send preemption events
dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
ContainerPreemptEventType.PREEMPT_CONTAINER));
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.PREEMPT_CONTAINER));
if (preempted.get(container) == null) {
preempted.put(container, clock.getTime());
}
Expand Down Expand Up @@ -735,6 +735,7 @@ private void preemptAMContainers(Resource clusterResource,
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
*/
@SuppressWarnings("unchecked")
private void preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
Expand All @@ -758,8 +759,9 @@ private void preemptFrom(FiCaSchedulerApp app,
clusterResource, preemptMap);

if (!observeOnly) {
dispatcher.handle(new ContainerPreemptEvent(appId, c,
ContainerPreemptEventType.DROP_RESERVATION));
rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(
appId, c, SchedulerEventType.DROP_RESERVATION));
}
}

Expand Down
Expand Up @@ -19,20 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;

/**
* Simple event class used to communicate containers unreservations, preemption, killing
*/
public class ContainerPreemptEvent
extends AbstractEvent<ContainerPreemptEventType> {
public class ContainerPreemptEvent extends SchedulerEvent {

private final ApplicationAttemptId aid;
private final RMContainer container;

public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
ContainerPreemptEventType type) {
SchedulerEventType type) {
super(type);
this.aid = aid;
this.container = container;
Expand Down

This file was deleted.

Expand Up @@ -86,6 +86,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
Expand Down Expand Up @@ -1346,6 +1347,29 @@ public void handle(SchedulerEvent event) {
RMContainerEventType.EXPIRE);
}
break;
case DROP_RESERVATION:
{
ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
RMContainer container = dropReservationEvent.getContainer();
dropContainerReservation(container);
}
break;
case PREEMPT_CONTAINER:
{
ContainerPreemptEvent preemptContainerEvent =
(ContainerPreemptEvent)event;
ApplicationAttemptId aid = preemptContainerEvent.getAppId();
RMContainer containerToBePreempted = preemptContainerEvent.getContainer();
preemptContainer(aid, containerToBePreempted);
}
break;
case KILL_CONTAINER:
{
ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
RMContainer containerToBeKilled = killContainerEvent.getContainer();
killContainer(containerToBeKilled);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
Expand Down
Expand Up @@ -36,5 +36,10 @@ public enum SchedulerEventType {
APP_ATTEMPT_REMOVED,

// Source: ContainerAllocationExpirer
CONTAINER_EXPIRED
CONTAINER_EXPIRED,

// Source: SchedulingEditPolicy
DROP_RESERVATION,
PREEMPT_CONTAINER,
KILL_CONTAINER
}

0 comments on commit 0e4b066

Please sign in to comment.