Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAPREDUCE-7407. Avoid stopContainer() on dead node #4779

Merged
merged 5 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -379,27 +379,38 @@ class EventProcessor implements Runnable {

@Override
public void run() {
LOG.info("Processing the event " + event.toString());
LOG.info("Processing the event {}", event);

// Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager.
ContainerId containerID = event.getContainerID();

Container c = getContainer(event);
switch(event.getType()) {

case CONTAINER_REMOTE_LAUNCH:
ContainerRemoteLaunchEvent launchEvent
= (ContainerRemoteLaunchEvent) event;
c.launch(launchEvent);
getContainer(event).launch(launchEvent);
break;

case CONTAINER_REMOTE_CLEANUP:
c.kill(event.getDumpContainerThreads());
// If the container failed to launch earlier (due to dead node for example),
// it has been marked as FAILED and removed from containers during
// CONTAINER_REMOTE_LAUNCH event handling.
// Skip kill() such container during CONTAINER_REMOTE_CLEANUP as
// it is not necessary and could cost 15 minutes delay if the node is dead.
if (!containers.containsKey(containerID)) {
LOG.info("Skip cleanup of already-removed container {}", containerID);
// send killed event to task attempt regardless like in kill().
context.getEventHandler().handle(new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return;
}
getContainer(event).kill(event.getDumpContainerThreads());
break;

case CONTAINER_COMPLETED:
c.done();
getContainer(event).done();
break;

}
Expand Down
Expand Up @@ -209,14 +209,11 @@ public void testHandle() throws Exception {
ut.waitForPoolToIdle();

verify(mockCM).startContainers(any(StartContainersRequest.class));

LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
ContainerLauncherEvent mockCleanupEvent = mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID()).thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
Expand Down Expand Up @@ -283,8 +280,21 @@ public void testOutOfOrder() throws Exception {
ut.handle(mockLaunchEvent);

ut.waitForPoolToIdle();

verify(mockCM, never()).startContainers(any(StartContainersRequest.class));

verify(mockCM).startContainers(any(StartContainersRequest.class));

LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent2 = mock(ContainerLauncherEvent.class);
when(mockCleanupEvent2.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent2.getContainerID()).thenReturn(contId);
when(mockCleanupEvent2.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent2.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent2);

ut.waitForPoolToIdle();

// Verifies stopContainers is called on existing container
verify(mockCM).stopContainers(any(StopContainersRequest.class));
} finally {
ut.stop();
}
Expand Down