Skip to content

Commit

Permalink
YARN-3868. Recovery support for container resizing. Contributed by Me…
Browse files Browse the repository at this point in the history
…ng Ding
  • Loading branch information
jian-he authored and wangdatan committed Sep 23, 2015
1 parent c3dc1af commit c57eac5
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 12 deletions.
2 changes: 2 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -218,6 +218,8 @@ Release 2.8.0 - UNRELEASED
YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
support container resizing. (Meng Ding via jianhe) support container resizing. (Meng Ding via jianhe)


YARN-3868. Recovery support for container resizing. (Meng Ding via jianhe)

IMPROVEMENTS IMPROVEMENTS


YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before
Expand Down
Expand Up @@ -346,7 +346,7 @@ private void recoverContainer(RecoveredContainerState rcs)
Container container = new ContainerImpl(getConfig(), dispatcher, Container container = new ContainerImpl(getConfig(), dispatcher,
context.getNMStateStore(), req.getContainerLaunchContext(), context.getNMStateStore(), req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
rcs.getDiagnostics(), rcs.getKilled()); rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
context.getContainers().put(containerId, container); context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container)); new ApplicationContainerInitEvent(container));
Expand Down Expand Up @@ -1101,6 +1101,9 @@ private void changeContainerResourceInternal(
this.readLock.lock(); this.readLock.lock();
try { try {
if (!serviceStopped) { if (!serviceStopped) {
// Persist container resource change for recovery
this.context.getNMStateStore().storeContainerResourceChanged(
containerId, targetResource);
getContainersMonitor().handle( getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent( new ChangeMonitoringContainerResourceEvent(
containerId, targetResource)); containerId, targetResource));
Expand Down
Expand Up @@ -154,13 +154,19 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
Credentials creds, NodeManagerMetrics metrics, Credentials creds, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier,
RecoveredContainerStatus recoveredStatus, int exitCode, RecoveredContainerStatus recoveredStatus, int exitCode,
String diagnostics, boolean wasKilled) { String diagnostics, boolean wasKilled, Resource recoveredCapability) {
this(conf, dispatcher, stateStore, launchContext, creds, metrics, this(conf, dispatcher, stateStore, launchContext, creds, metrics,
containerTokenIdentifier); containerTokenIdentifier);
this.recoveredStatus = recoveredStatus; this.recoveredStatus = recoveredStatus;
this.exitCode = exitCode; this.exitCode = exitCode;
this.recoveredAsKilled = wasKilled; this.recoveredAsKilled = wasKilled;
this.diagnostics.append(diagnostics); this.diagnostics.append(diagnostics);
if (recoveredCapability != null
&& !this.resource.equals(recoveredCapability)) {
// resource capability had been updated before NM was down
this.resource = Resource.newInstance(recoveredCapability.getMemory(),
recoveredCapability.getVirtualCores());
}
} }


private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
Expand Down
Expand Up @@ -40,7 +40,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
Expand Down Expand Up @@ -99,6 +102,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
"/resourceChanged";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";


Expand Down Expand Up @@ -230,6 +235,9 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
} else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
rcs.status = RecoveredContainerStatus.COMPLETED; rcs.status = RecoveredContainerStatus.COMPLETED;
rcs.exitCode = Integer.parseInt(asString(entry.getValue())); rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
rcs.capability = new ResourcePBImpl(
ResourceProto.parseFrom(entry.getValue()));
} else { } else {
throw new IOException("Unexpected container state key: " + key); throw new IOException("Unexpected container state key: " + key);
} }
Expand Down Expand Up @@ -274,6 +282,20 @@ public void storeContainerLaunched(ContainerId containerId)
} }
} }


@Override
public void storeContainerResourceChanged(ContainerId containerId,
Resource capability) throws IOException {
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX;
try {
// New value will overwrite old values for the same key
db.put(bytes(key),
((ResourcePBImpl) capability).getProto().toByteArray());
} catch (DBException e) {
throw new IOException(e);
}
}

@Override @Override
public void storeContainerKilled(ContainerId containerId) public void storeContainerKilled(ContainerId containerId)
throws IOException { throws IOException {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
Expand Down Expand Up @@ -87,6 +88,11 @@ public void storeContainerLaunched(ContainerId containerId)
throws IOException { throws IOException {
} }


@Override
public void storeContainerResourceChanged(ContainerId containerId,
Resource capability) throws IOException {
}

@Override @Override
public void storeContainerKilled(ContainerId containerId) public void storeContainerKilled(ContainerId containerId)
throws IOException { throws IOException {
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
Expand Down Expand Up @@ -74,6 +75,7 @@ public static class RecoveredContainerState {
boolean killed = false; boolean killed = false;
String diagnostics = ""; String diagnostics = "";
StartContainerRequest startRequest; StartContainerRequest startRequest;
Resource capability;


public RecoveredContainerStatus getStatus() { public RecoveredContainerStatus getStatus() {
return status; return status;
Expand All @@ -94,6 +96,10 @@ public String getDiagnostics() {
public StartContainerRequest getStartRequest() { public StartContainerRequest getStartRequest() {
return startRequest; return startRequest;
} }

public Resource getCapability() {
return capability;
}
} }


public static class LocalResourceTrackerState { public static class LocalResourceTrackerState {
Expand Down Expand Up @@ -283,6 +289,15 @@ public abstract void storeContainer(ContainerId containerId,
public abstract void storeContainerLaunched(ContainerId containerId) public abstract void storeContainerLaunched(ContainerId containerId)
throws IOException; throws IOException;


/**
* Record that a container resource has been changed
* @param containerId the container ID
* @param capability the container resource capability
* @throws IOException
*/
public abstract void storeContainerResourceChanged(ContainerId containerId,
Resource capability) throws IOException;

/** /**
* Record that a container has completed * Record that a container has completed
* @param containerId the container ID * @param containerId the container ID
Expand Down

0 comments on commit c57eac5

Please sign in to comment.