Skip to content

Commit

Permalink
YARN-6744. Recover component information on YARN native services AM r…
Browse files Browse the repository at this point in the history
…estart. Contributed by Billie Rinaldi
  • Loading branch information
jian-he committed Nov 6, 2017
1 parent c723021 commit b8a7ef1
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 59 deletions.
Expand Up @@ -157,6 +157,11 @@
<artifactId>hadoop-yarn-common</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
Expand Down Expand Up @@ -237,11 +237,6 @@ public void serviceStop() throws Exception {
serviceTimelinePublisher
.serviceAttemptUnregistered(context, diagnostics.toString());
}
// Cleanup each component instance. no need to release containers as
// they will be automatically released by RM
for (ComponentInstance instance : liveInstances.values()) {
instance.cleanupRegistryAndCompHdfsDir();
}
String msg = diagnostics.toString()
+ "Navigate to the failed component for more details.";
amRMClient
Expand All @@ -266,11 +261,67 @@ public void serviceStart() throws Exception {
}
registerServiceInstance(context.attemptId, app);

//TODO handle containers recover
}
// recover components based on containers sent from RM
recoverComponents(response);

private void recover() {
for (Component component : componentsById.values()) {
// Trigger initial evaluation of components
if (component.areDependenciesReady()) {
LOG.info("Triggering initial evaluation of component {}",
component.getName());
ComponentEvent event = new ComponentEvent(component.getName(), FLEX)
.setDesired(component.getComponentSpec().getNumberOfContainers());
component.handle(event);
}
}
}

private void recoverComponents(RegisterApplicationMasterResponse response) {
List<Container> recoveredContainers = response
.getContainersFromPreviousAttempts();
LOG.info("Received {} containers from previous attempt.",
recoveredContainers.size());
Map<String, ServiceRecord> existingRecords = new HashMap<>();
List<String> existingComps = null;
try {
existingComps = yarnRegistryOperations.listComponents();
LOG.info("Found {} containers from ZK registry: {}", existingComps.size(),
existingComps);
} catch (Exception e) {
LOG.info("Could not read component paths: {}", e.getMessage());
}
if (existingComps != null) {
for (String existingComp : existingComps) {
try {
ServiceRecord record =
yarnRegistryOperations.getComponent(existingComp);
existingRecords.put(existingComp, record);
} catch (Exception e) {
LOG.warn("Could not resolve record for component {}: {}",
existingComp, e);
}
}
}
for (Container container : recoveredContainers) {
LOG.info("Handling container {} from previous attempt",
container.getId());
ServiceRecord record = existingRecords.get(RegistryPathUtils
.encodeYarnID(container.getId().toString()));
if (record != null) {
Component comp = componentsById.get(container.getAllocationRequestId());
ComponentEvent event =
new ComponentEvent(comp.getName(), CONTAINER_RECOVERED)
.setContainer(container)
.setInstance(comp.getComponentInstance(record.description));
comp.handle(event);
// do not remove requests in this case because we do not know if they
// have already been removed
} else {
LOG.info("Record not found in registry for container {} from previous" +
" attempt, releasing", container.getId());
amRMClient.releaseAssignedContainer(container.getId());
}
}
}

private void initGlobalTokensForSubstitute(ServiceContext context) {
Expand Down Expand Up @@ -353,7 +404,7 @@ private void registerServiceInstance(ApplicationAttemptId attemptId,
executorService.submit(new Runnable() {
@Override public void run() {
try {
yarnRegistryOperations.registerSelf(serviceRecord, true);
yarnRegistryOperations.registerSelf(serviceRecord, false);
LOG.info("Registered service under {}; absolute path {}",
yarnRegistryOperations.getSelfRegistrationPath(),
yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
Expand Down Expand Up @@ -398,13 +449,6 @@ private void createAllComponents() {
componentsById.put(allocateId, component);
componentsByName.put(component.getName(), component);
allocateId++;

// Trigger the component without dependencies
if (component.areDependenciesReady()) {
ComponentEvent event = new ComponentEvent(compSpec.getName(), FLEX)
.setDesired(compSpec.getNumberOfContainers());
component.handle(event);
}
}
}

Expand Down Expand Up @@ -458,17 +502,17 @@ public void onContainersAllocated(List<Container> containers) {
new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED)
.setContainer(container);
dispatcher.getEventHandler().handle(event);
Collection<AMRMClient.ContainerRequest> requests = amRMClient
.getMatchingRequests(container.getAllocationRequestId());
LOG.info("[COMPONENT {}]: {} outstanding container requests.",
comp.getName(),
amRMClient.getMatchingRequests(container.getAllocationRequestId()).size());
comp.getName(), requests.size());
// remove the corresponding request
Collection<AMRMClient.ContainerRequest> collection = amRMClient
.getMatchingRequests(container.getAllocationRequestId());
if (collection.iterator().hasNext()) {
AMRMClient.ContainerRequest request = collection.iterator().next();
if (requests.iterator().hasNext()) {
LOG.info("[COMPONENT {}]: removing one container request.", comp
.getName());
AMRMClient.ContainerRequest request = requests.iterator().next();
amRMClient.removeContainerRequest(request);
}

}
}

Expand All @@ -478,7 +522,7 @@ public void onContainersCompleted(List<ContainerStatus> statuses) {
ContainerId containerId = status.getContainerId();
ComponentInstance instance = liveInstances.get(status.getContainerId());
if (instance == null) {
LOG.error(
LOG.warn(
"Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
containerId, status.getExitStatus(), status.getDiagnostics());
return;
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.ServiceMetrics;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class Component implements EventHandler<ComponentEvent> {
private ServiceContext context;
private AMRMClientAsync<ContainerRequest> amrmClient;
private AtomicLong instanceIdCounter = new AtomicLong();
private Map<ComponentInstanceId, ComponentInstance> compInstances =
private Map<String, ComponentInstance> compInstances =
new ConcurrentHashMap<>();
// component instances to be assigned with a container
private List<ComponentInstance> pendingInstances = new LinkedList<>();
Expand All @@ -101,6 +102,9 @@ public class Component implements EventHandler<ComponentEvent> {
// INIT will only got to FLEXING
.addTransition(INIT, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition())
// container recovered on AM restart
.addTransition(INIT, INIT, CONTAINER_RECOVERED,
new ContainerRecoveredTransition())

// container allocated by RM
.addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
Expand Down Expand Up @@ -165,7 +169,7 @@ private void createOneCompInstance() {
new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
componentSpec.getName());
ComponentInstance instance = new ComponentInstance(this, id);
compInstances.put(id, instance);
compInstances.put(instance.getCompInstanceName(), instance);
pendingInstances.add(instance);
}

Expand All @@ -186,8 +190,8 @@ public ComponentState transition(Component component,
// This happens on init
LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event
.getDesired() + " instances.");
component.requestContainers(event.getDesired());
return FLEXING;
component.requestContainers(component.pendingInstances.size());
return checkIfStable(component);
}
long before = component.getComponentSpec().getNumberOfContainers();
long delta = event.getDesired() - before;
Expand All @@ -205,14 +209,14 @@ public ComponentState transition(Component component,
LOG.info("[FLEX DOWN COMPONENT " + component.getName()
+ "]: scaling down from " + before + " to " + event.getDesired());
List<ComponentInstance> list =
new ArrayList<>(component.compInstances.values());
new ArrayList<>(component.getAllComponentInstances());

// sort in Most recent -> oldest order, destroy most recent ones.
Collections.sort(list, Collections.reverseOrder());
for (int i = 0; i < delta; i++) {
ComponentInstance instance = list.get(i);
// remove the instance
component.compInstances.remove(instance.getCompInstanceId());
component.compInstances.remove(instance.getCompInstanceName());
component.pendingInstances.remove(instance);
component.componentMetrics.containersFailed.incr();
component.componentMetrics.containersRunning.decr();
Expand All @@ -236,6 +240,46 @@ public void transition(Component component, ComponentEvent event) {
}
}

private static class ContainerRecoveredTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
ComponentInstance instance = event.getInstance();
Container container = event.getContainer();
if (instance == null) {
LOG.info("[COMPONENT {}]: Trying to recover {} but event did not " +
"specify component instance",
component.getName(), container.getId());
component.releaseContainer(container);
return;
}
if (instance.hasContainer()) {
LOG.info(
"[COMPONENT {}]: Instance {} already has container, release " +
"surplus container {}",
instance.getCompName(), instance.getCompInstanceId(), container
.getId());
component.releaseContainer(container);
return;
}
component.pendingInstances.remove(instance);
LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
"host {}, num pending component instances reduced to {} ",
component.getName(), container.getId(), instance
.getCompInstanceName(), container.getNodeId(), component
.pendingInstances.size());
instance.setContainer(container);
ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
component.getScheduler().addLiveCompInstance(container.getId(), instance);
LOG.info("[COMPONENT {}]: Marking {} as started for component " +
"instance {}", component.getName(), event.getContainer().getId(),
instance.getCompInstanceId());
component.compInstanceDispatcher.getEventHandler().handle(
new ComponentInstanceEvent(instance.getContainerId(),
START));
component.incRunningContainers();
}
}

private static class ContainerStartedTransition implements
MultipleArcTransition<Component,ComponentEvent,ComponentState> {

Expand Down Expand Up @@ -280,14 +324,18 @@ public ServiceMetrics getCompMetrics () {
return componentMetrics;
}

private void releaseContainer(Container container) {
scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
componentMetrics.surplusContainers.incr();
scheduler.getServiceMetrics().surplusContainers.incr();
}

private void assignContainerToCompInstance(Container container) {
if (pendingInstances.size() == 0) {
LOG.info(
"[COMPONENT {}]: No pending component instance left, release surplus container {}",
getName(), container.getId());
scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
componentMetrics.surplusContainers.incr();
scheduler.getServiceMetrics().surplusContainers.incr();
releaseContainer(container);
return;
}
ComponentInstance instance = pendingInstances.remove(0);
Expand Down Expand Up @@ -397,7 +445,7 @@ public Map<String, String> getDependencyHostIpTokens() {
}
for (String dependency : dependencies) {
Collection<ComponentInstance> instances = scheduler.getAllComponents()
.get(dependency).getAllComponentInstances().values();
.get(dependency).getAllComponentInstances();
for (ComponentInstance instance : instances) {
if (instance.getContainerStatus() == null) {
continue;
Expand Down Expand Up @@ -447,8 +495,12 @@ public int getNumDesiredInstances() {
return componentMetrics.containersDesired.value();
}

public Map<ComponentInstanceId, ComponentInstance> getAllComponentInstances() {
return compInstances;
public ComponentInstance getComponentInstance(String componentInstanceName) {
return compInstances.get(componentInstanceName);
}

public Collection<ComponentInstance> getAllComponentInstances() {
return compInstances.values();
}

public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() {
Expand Down
Expand Up @@ -21,6 +21,7 @@
public enum ComponentEventType {
FLEX,
CONTAINER_ALLOCATED,
CONTAINER_RECOVERED,
CONTAINER_STARTED,
CONTAINER_COMPLETED
}
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Container;
Expand All @@ -35,6 +34,8 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.component.Component;
Expand Down Expand Up @@ -143,10 +144,19 @@ private static class ContainerStartedTransition extends BaseTransition {
compInstance.getContainerId(), compInstance), 0, 1,
TimeUnit.SECONDS);

long containerStartTime = System.currentTimeMillis();
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(compInstance.getContainer()
.getContainerToken());
containerStartTime = containerTokenIdentifier.getCreationTime();
} catch (Exception e) {
LOG.info("Could not get container creation time, using current time");
}
org.apache.hadoop.yarn.service.api.records.Container container =
new org.apache.hadoop.yarn.service.api.records.Container();
container.setId(compInstance.getContainerId().toString());
container.setLaunchTime(new Date());
container.setLaunchTime(new Date(containerStartTime));
container.setState(ContainerState.RUNNING_BUT_UNREADY);
container.setBareHost(compInstance.container.getNodeId().getHost());
container.setComponentName(compInstance.getCompInstanceName());
Expand All @@ -156,7 +166,7 @@ private static class ContainerStartedTransition extends BaseTransition {
}
compInstance.containerSpec = container;
compInstance.getCompSpec().addContainer(container);
compInstance.containerStartedTime = System.currentTimeMillis();
compInstance.containerStartedTime = containerStartTime;

if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher
Expand Down Expand Up @@ -243,6 +253,8 @@ public void transition(ComponentInstance compInstance,
}
ExitUtil.terminate(-1);
}

compInstance.removeContainer();
}
}

Expand Down Expand Up @@ -276,6 +288,15 @@ public void handle(ComponentInstanceEvent event) {
}
}

public boolean hasContainer() {
return this.container != null;
}

public void removeContainer() {
this.container = null;
this.compInstanceId.setContainerId(null);
}

public void setContainer(Container container) {
this.container = container;
this.compInstanceId.setContainerId(container.getId());
Expand Down

0 comments on commit b8a7ef1

Please sign in to comment.