Skip to content

Commit

Permalink
YARN-3985. Make ReservationSystem persist state using RMStateStore re…
Browse files Browse the repository at this point in the history
…servation APIs. (adhoot via asuresh)
  • Loading branch information
xslogic committed Oct 20, 2015
1 parent 6c8b6f3 commit 506d1b1
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 79 deletions.
2 changes: 2 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -522,6 +522,8 @@ Release 2.8.0 - UNRELEASED
YARN-4267. Add additional logging to container launch implementations in
container-executor. (Sidharta Seethana via vvasudev)

YARN-3985. Make ReservationSystem persist state using RMStateStore
reservation APIs. (adhoot via asuresh)

OPTIMIZATIONS

Expand Down
Expand Up @@ -349,7 +349,7 @@ protected Plan initializePlan(String planQueueName) throws YarnException {
getAgent(planQueuePath), totCap, planStepSize, rescCalc,
minAllocation, maxAllocation, planQueueName,
getReplanner(planQueuePath), getReservationSchedulerConfiguration()
.getMoveOnExpiry(planQueuePath));
.getMoveOnExpiry(planQueuePath), rmContext);
LOG.info("Intialized plan {0} based on reservable queue {1}",
plan.toString(), planQueueName);
return plan;
Expand Down
Expand Up @@ -32,6 +32,7 @@

import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
Expand All @@ -53,6 +54,7 @@ public class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);

private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private final RMContext rmContext;

private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
Expand Down Expand Up @@ -85,15 +87,18 @@ public class InMemoryPlan implements Plan {
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry) {
String queueName, Planner replanner, boolean getMoveOnExpiry,
RMContext rmContext) {
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext,
new UTCClock());
}

public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
String queueName, Planner replanner, boolean getMoveOnExpiry,
RMContext rmContext, Clock clock) {
this.queueMetrics = queueMetrics;
this.policy = policy;
this.agent = agent;
Expand All @@ -107,6 +112,7 @@ public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry;
this.clock = clock;
this.rmContext = rmContext;
}

@Override
Expand Down Expand Up @@ -211,6 +217,9 @@ public boolean addReservation(ReservationAllocation reservation)
currentReservations.put(searchInterval, reservations);
reservationTable.put(inMemReservation.getReservationId(),
inMemReservation);
rmContext.getStateStore().storeNewReservation(
ReservationSystemUtil.buildStateProto(inMemReservation),
getQueueName(), inMemReservation.getReservationId().toString());
incrementAllocation(inMemReservation);
LOG.info("Sucessfully added reservation: {} to plan.",
inMemReservation.getReservationId());
Expand Down Expand Up @@ -289,6 +298,8 @@ private boolean removeReservation(ReservationAllocation reservation) {
throw new IllegalArgumentException(errMsg);
}
reservationTable.remove(reservation.getReservationId());
rmContext.getStateStore().removeReservation(
getQueueName(), reservation.getReservationId().toString());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
reservation.getReservationId());
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
Expand Down Expand Up @@ -815,6 +816,12 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
return response.getApplicationReport();
}

public void updateReservationState(ReservationUpdateRequest request)
throws IOException, YarnException {
ApplicationClientProtocol client = getClientRMService();
client.updateReservation(request);
}

// Explicitly reset queue metrics for testing.
@SuppressWarnings("static-access")
public void clearQueueMetrics(RMApp app) {
Expand Down
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
Expand Down Expand Up @@ -106,8 +108,18 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
}

protected void startRMs() throws IOException {
rm1 = new MockRM(confForRM1, null, false);
rm2 = new MockRM(confForRM2, null, false);
rm1 = new MockRM(confForRM1, null, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
};
rm2 = new MockRM(confForRM2, null, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
};
startRMs(rm1, confForRM1, rm2, confForRM2);

}
Expand Down
Expand Up @@ -32,7 +32,6 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -103,8 +102,6 @@
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
Expand Down Expand Up @@ -1115,7 +1112,8 @@ public void testReservationAPIs() {
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
createSimpleReservationRequest(4, arrival, deadline, duration);
ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
deadline, duration);
ReservationSubmissionResponse sResponse = null;
try {
sResponse = clientService.submitReservation(sRequest);
Expand Down Expand Up @@ -1167,24 +1165,6 @@ public void testReservationAPIs() {
rm = null;
}

private ReservationSubmissionRequest createSimpleReservationRequest(
int numContainers, long arrival, long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
numContainers, 1, duration);
ReservationRequests reqs =
ReservationRequests.newInstance(Collections.singletonList(r),
ReservationRequestInterpreter.R_ALL);
ReservationDefinition rDef =
ReservationDefinition.newInstance(arrival, deadline, reqs,
"testClientRMService#reservation");
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef,
ReservationSystemTestUtil.reservationQ);
return request;
}

@Test
public void testGetNodeLabels() throws Exception {
MockRM rm = new MockRM() {
Expand Down
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;

import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.junit.Assert;
import org.junit.Test;

import java.util.Map;

public class TestReservationSystemWithRMHA extends RMHATestBase{

@Override
public void setup() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
configuration = conf;

super.setup();
}

@Test
public void testSubmitReservationAndCheckAfterFailover() throws Exception {
startRMs();

addNodeCapacityToPlan();

ClientRMService clientService = rm1.getClientRMService();

// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition = request
.getReservationDefinition();

// Do the failover
explicitFailover();

rm2.registerNode("127.0.0.1:1", 102400, 100);

RMState state = rm2.getRMContext().getStateStore().loadState();
Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
Assert.assertNotNull(reservationStateMap);
Assert.assertNotNull(reservationStateMap.get(reservationID));
}


@Test
public void testUpdateReservationAndCheckAfterFailover() throws Exception {
startRMs();

addNodeCapacityToPlan();

ClientRMService clientService = rm1.getClientRMService();

// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition = request
.getReservationDefinition();


// Change any field

long newDeadline = reservationDefinition.getDeadline() + 100;
reservationDefinition.setDeadline(newDeadline);
ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
reservationDefinition, reservationID);
rm1.updateReservationState(updateRequest);

// Do the failover
explicitFailover();

rm2.registerNode("127.0.0.1:1", 102400, 100);

RMState state = rm2.getRMContext().getStateStore().loadState();
Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
Assert.assertNotNull(reservationStateMap);
ReservationAllocationStateProto reservationState =
reservationStateMap.get(reservationID);
Assert.assertEquals(newDeadline,
reservationState.getReservationDefinition().getDeadline());
}

@Test
public void testDeleteReservationAndCheckAfterFailover() throws Exception {
startRMs();

addNodeCapacityToPlan();

ClientRMService clientService = rm1.getClientRMService();

// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);


// Delete the reservation
ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationID);
clientService.deleteReservation(deleteRequest);

// Do the failover
explicitFailover();

rm2.registerNode("127.0.0.1:1", 102400, 100);

RMState state = rm2.getRMContext().getStateStore().loadState();
Assert.assertNull(state.getReservationState().get(
ReservationSystemTestUtil.reservationQ));
}

private void addNodeCapacityToPlan() {
try {
rm1.registerNode("127.0.0.1:1", 102400, 100);
int attempts = 10;
do {
DrainDispatcher dispatcher =
(DrainDispatcher) rm1.getRMContext().getDispatcher();
dispatcher.await();
rm1.getRMContext().getReservationSystem().synchronizePlan(
ReservationSystemTestUtil.reservationQ);
if (rm1.getRMContext().getReservationSystem().getPlan
(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 0) {
break;
}
LOG.info("Waiting for node capacity to be added to plan");
Thread.sleep(100);
}
while (attempts-- > 0);
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was " +
"added to the plan");
}

} catch (Exception e) {
Assert.fail(e.getMessage());
}
}

private ReservationSubmissionRequest createReservationSubmissionRequest() {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
deadline, duration);
}
}

0 comments on commit 506d1b1

Please sign in to comment.