Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ public static OpportunisticSchedulerMetrics getMetrics() {
return INSTANCE;
}

@VisibleForTesting
public static void resetMetrics() {
synchronized (OpportunisticSchedulerMetrics.class) {
isInitialized.set(false);
INSTANCE = null;
MetricsSystem ms = DefaultMetricsSystem.instance();
if (ms != null) {
ms.unregisterSource("OpportunisticSchedulerMetrics");
}
}
}

private static void registerMetrics() {
registry = new MetricsRegistry(RECORD_INFO);
registry.tag(RECORD_INFO, "ResourceManager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,14 @@ public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
rmContainer);

// recover scheduler attempt
schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
final boolean recovered = schedulerAttempt.recoverContainer(
schedulerNode, rmContainer);

if (recovered && rmContainer.getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
OpportunisticSchedulerMetrics.getMetrics()
.incrAllocatedOppContainers(1);
}
// set master container for the current running AMContainer for this
// attempt.
RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
Expand Down Expand Up @@ -720,7 +726,10 @@ public void completedContainer(RMContainer rmContainer,
SchedulerApplicationAttempt schedulerAttempt =
getCurrentAttemptForContainer(containerId);
if (schedulerAttempt != null) {
schedulerAttempt.removeRMContainer(containerId);
if (schedulerAttempt.removeRMContainer(containerId)) {
OpportunisticSchedulerMetrics.getMetrics()
.incrReleasedOppContainers(1);
}
}
LOG.debug("Completed container: {} in state: {} event:{}",
rmContainer.getContainerId(), rmContainer.getState(), event);
Expand All @@ -729,7 +738,6 @@ public void completedContainer(RMContainer rmContainer,
if (node != null) {
node.releaseContainer(rmContainer.getContainerId(), false);
}
OpportunisticSchedulerMetrics.getMetrics().incrReleasedOppContainers(1);
}

// If the container is getting killed in ACQUIRED state, the requester (AM
Expand Down Expand Up @@ -1411,6 +1419,8 @@ private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt,
RMContainer demotedRMContainer =
createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
if (demotedRMContainer != null) {
OpportunisticSchedulerMetrics.getMetrics()
.incrAllocatedOppContainers(1);
appAttempt.addToNewlyDemotedContainers(
uReq.getContainerId(), demotedRMContainer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,13 @@ public void addRMContainer(
}
}

public void removeRMContainer(ContainerId containerId) {
/**
* Removes an RM container from the map of live containers
* related to this application attempt.
* @param containerId The container ID of the RMContainer to remove
* @return true if the container is in the map
*/
public boolean removeRMContainer(ContainerId containerId) {
writeLock.lock();
try {
RMContainer rmContainer = liveContainers.remove(containerId);
Expand All @@ -415,7 +421,11 @@ public void removeRMContainer(ContainerId containerId) {
this.attemptResourceUsageAllocatedRemotely
.decUsed(rmContainer.getAllocatedResource());
}

return true;
}

return false;
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -1226,15 +1236,15 @@ public void move(Queue newQueue) {
}
}

public void recoverContainer(SchedulerNode node,
public boolean recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
writeLock.lock();
try {
// recover app scheduling info
appSchedulingInfo.recoverContainer(rmContainer, node.getPartition());

if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
return false;
}
LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ " is recovering container " + rmContainer.getContainerId());
Expand All @@ -1244,6 +1254,8 @@ public void recoverContainer(SchedulerNode node,
rmContainer.getContainer().getResource());
}

return true;

// resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
// is called.
// newlyAllocatedContainers.add(rmContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,11 +666,11 @@ private Container createContainer(FSSchedulerNode node, Resource capability,
}

@Override
public synchronized void recoverContainer(SchedulerNode node,
public synchronized boolean recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
writeLock.lock();
try {
super.recoverContainer(node, rmContainer);
final boolean recovered = super.recoverContainer(node, rmContainer);

if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
getQueue().incUsedResource(rmContainer.getContainer().getResource());
Expand All @@ -685,6 +685,8 @@ public synchronized void recoverContainer(SchedulerNode node,
getQueue().addAMResourceUsage(resource);
setAmRunning(true);
}

return recovered;
} finally {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,17 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores,
return nm;
}

public MockNM registerNode(String nodeIdStr, int memory, int vCores,
List<ApplicationId> runningApplications,
List<NMContainerStatus> containerStatuses) throws Exception {
MockNM nm =
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
YarnVersionInfo.getVersion());
nm.registerNode(containerStatuses, runningApplications);
drainEventsImplicitly();
return nm;
}

public MockNM registerNode(String nodeIdStr, Resource nodeCapability)
throws Exception {
MockNM nm = new MockNM(nodeIdStr, nodeCapability,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
Expand All @@ -72,6 +73,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
Expand Down Expand Up @@ -105,6 +107,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

Expand Down Expand Up @@ -132,6 +135,11 @@ public void createAndStartRM() {
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.set(
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
startRM(conf);
}

Expand Down Expand Up @@ -165,6 +173,8 @@ public void stopRM() {
if (rm != null) {
rm.stop();
}

OpportunisticSchedulerMetrics.resetMetrics();
}

@Test(timeout = 600000)
Expand Down Expand Up @@ -817,6 +827,130 @@ public void testOpportunisticSchedulerMetrics() throws Exception {
metrics.getAggregatedReleasedContainers());
}

/**
* Tests that, if a node has running opportunistic containers when the RM
* is down, RM is able to reflect the opportunistic containers
* in its metrics upon RM recovery.
*/
@Test
public void testMetricsRetainsAllocatedOpportunisticAfterRMRestart()
throws Exception {
final MockRMAppSubmissionData appSubmissionData =
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build();

MockNM nm1 = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();

final RMApp app = MockRMAppSubmitter.submit(rm, appSubmissionData);

final ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();

MockRM.launchAndRegisterAM(app, rm, nm1);

final OpportunisticSchedulerMetrics metrics =
OpportunisticSchedulerMetrics.getMetrics();

// We start with ID 2, since AMContainer is ID 1
final ContainerId recoverOContainerId2 = ContainerId.newContainerId(
appAttemptId, 2);

final Resource fakeResource = Resource.newInstance(1024, 1);
final String fakeDiagnostics = "recover container";
final Priority fakePriority = Priority.newInstance(0);

final NMContainerStatus recoverOContainerReport1 =
NMContainerStatus.newInstance(
recoverOContainerId2, 0, ContainerState.RUNNING,
fakeResource, fakeDiagnostics, 0,
fakePriority, 0, null,
ExecutionType.OPPORTUNISTIC, -1);

// Make sure that numbers start with 0
Assert.assertEquals(0, metrics.getAllocatedContainers());

// Recover one OContainer only
rm.registerNode("h2:1234", 4096, 1,
Collections.singletonList(
appAttemptId.getApplicationId()),
Collections.singletonList(recoverOContainerReport1));

Assert.assertEquals(1, metrics.getAllocatedContainers());

// Recover two OContainers at once
final ContainerId recoverOContainerId3 = ContainerId.newContainerId(
appAttemptId, 3);

final ContainerId recoverOContainerId4 = ContainerId.newContainerId(
appAttemptId, 4);

final NMContainerStatus recoverOContainerReport2 =
NMContainerStatus.newInstance(
recoverOContainerId2, 0, ContainerState.RUNNING,
fakeResource, fakeDiagnostics, 0,
fakePriority, 0, null,
ExecutionType.OPPORTUNISTIC, -1);

final NMContainerStatus recoverOContainerReport3 =
NMContainerStatus.newInstance(
recoverOContainerId3, 0, ContainerState.RUNNING,
fakeResource, fakeDiagnostics, 0,
fakePriority, 0, null,
ExecutionType.OPPORTUNISTIC, -1);

rm.registerNode(
"h3:1234", 4096, 10,
Collections.singletonList(
appAttemptId.getApplicationId()),
Arrays.asList(recoverOContainerReport2, recoverOContainerReport3));

Assert.assertEquals(3, metrics.getAllocatedContainers());

// Make sure that the recovered GContainer
// does not increment OContainer count
final ContainerId recoverGContainerId = ContainerId.newContainerId(
appAttemptId, 5);

final NMContainerStatus recoverGContainerReport =
NMContainerStatus.newInstance(
recoverGContainerId, 0, ContainerState.RUNNING,
fakeResource, fakeDiagnostics, 0,
fakePriority, 0, null,
ExecutionType.GUARANTEED, -1);

rm.registerNode(
"h4:1234", 4096, 10,
Collections.singletonList(
appAttemptId.getApplicationId()),
Collections.singletonList(recoverGContainerReport));

Assert.assertEquals(3, metrics.getAllocatedContainers());

final ContainerId completedOContainerId = ContainerId.newContainerId(
appAttemptId, 6);

final NMContainerStatus completedOContainerReport =
NMContainerStatus.newInstance(
completedOContainerId, 0, ContainerState.COMPLETE,
fakeResource, fakeDiagnostics, 0,
fakePriority, 0, null,
ExecutionType.OPPORTUNISTIC, -1);

// Tests that completed containers are not recorded
rm.registerNode(
"h5:1234", 4096, 10,
Collections.singletonList(
appAttemptId.getApplicationId()),
Collections.singletonList(completedOContainerReport));

Assert.assertEquals(3, metrics.getAllocatedContainers());
}

@Test(timeout = 60000)
public void testAMCrashDuringAllocate() throws Exception {
MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
Expand Down