Skip to content

Commit

Permalink
YARN-11226. Improve Code Style.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Sep 16, 2022
1 parent 4a3dc94 commit 6e8af50
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 140 deletions.
Expand Up @@ -2018,4 +2018,9 @@ private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
return appInfosCaches;
}

@VisibleForTesting
public Map<SubClusterId, DefaultRequestInterceptorREST> getInterceptors() {
return interceptors;
}
}
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
Expand Down Expand Up @@ -74,6 +75,13 @@ public abstract class BaseRouterWebServicesTest {
private Router router;
public final static int TEST_MAX_CACHE_SIZE = 10;

public static final String QUEUE_DEFAULT = "default";
public static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
public static final String QUEUE_DEDICATED = "dedicated";
public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;

private RouterWebServices routerWebService;

@Before
Expand Down
Expand Up @@ -40,10 +40,11 @@

import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.*;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
Expand All @@ -66,19 +67,13 @@
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
Expand All @@ -88,44 +83,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
Expand All @@ -140,6 +103,7 @@
import org.slf4j.LoggerFactory;

import static org.mockito.Mockito.mock;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;

/**
* This class mocks the RESTRequestInterceptor.
Expand All @@ -157,13 +121,6 @@ public class MockDefaultRequestInterceptorREST
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";

private static final String QUEUE_DEFAULT = "default";
private static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
private static final String QUEUE_DEDICATED = "dedicated";
public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;

// duration(milliseconds), 1mins
public static final long DURATION = 60*1000;

Expand All @@ -180,16 +137,6 @@ private void validateRunning() throws ConnectException {
}
}

public MockDefaultRequestInterceptorREST() {
try {
if (mockRM == null) {
mockRM = setupResourceManager();
}
} catch (Exception e) {
LOG.error("setupResourceManager failed.", e);
}
}

@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
Expand Down Expand Up @@ -861,8 +808,11 @@ public Response listReservation(String queue, String reservationId, long startTi
" Please try again with a valid reservable queue.");
}

if (mockRM == null) {
mockRM = setupResourceManager();
ReservationId reservationID =
ReservationId.parseReservationId(reservationId);

if (!reservationMap.containsKey(reservationID)) {
throw new NotFoundException("reservationId with id: " + reservationId + " not found");
}

ClientRMService clientService = mockRM.getClientRMService();
Expand All @@ -877,33 +827,6 @@ public Response listReservation(String queue, String reservationId, long startTi
return Response.status(Status.OK).entity(resResponse).build();
}

private MockRM setupResourceManager() throws IOException {
try {
DefaultMetricsSystem.setMiniClusterMode(true);
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();

// Define default queue
conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
// Define dedicated queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED});
conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
conf.setReservable(QUEUE_DEDICATED_FULL, true);

conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);

MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("127.0.0.1:5678", 100 * 1024, 100);
return rm;
} catch (Exception e) {
LOG.error("setupResourceManager failed.", e);
throw new IOException(e);
}
}

@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
Expand Down Expand Up @@ -945,10 +868,6 @@ public Response submitReservation(ReservationSubmissionRequestInfo resContext,
private void submitReservation(ReservationSubmissionRequest request) {
try {

if (mockRM == null) {
mockRM = setupResourceManager();
}

// synchronize plan
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
Expand Down Expand Up @@ -987,10 +906,6 @@ public Response updateReservation(ReservationUpdateRequestInfo resContext,
private void updateReservation(ReservationUpdateRequestInfo resContext) {
try {

if (mockRM == null) {
mockRM = setupResourceManager();
}

if (resContext == null) {
throw new BadRequestException("Input ReservationSubmissionContext should not be null");
}
Expand All @@ -1002,9 +917,9 @@ private void updateReservation(ReservationUpdateRequestInfo resContext) {

ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
|| resReqsInfo.getReservationRequest().size() == 0) {
|| resReqsInfo.getReservationRequest().size() == 0) {
throw new BadRequestException("The ReservationDefinition should " +
"contain at least one ReservationRequest");
"contain at least one ReservationRequest");
}

if (resContext.getReservationId() == null) {
Expand All @@ -1023,7 +938,7 @@ private void updateReservation(ReservationUpdateRequestInfo resContext) {
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
ReservationRequest rr =
ReservationRequest.newInstance(capability, numContainers, minConcurrency, duration);
ReservationRequest.newInstance(capability, numContainers, minConcurrency, duration);
list.add(rr);
}

Expand All @@ -1033,7 +948,7 @@ private void updateReservation(ReservationUpdateRequestInfo resContext) {
resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
Priority.newInstance(resInfo.getPriority()));
ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
rDef, ReservationId.parseReservationId(resContext.getReservationId()));
rDef, ReservationId.parseReservationId(resContext.getReservationId()));

ClientRMService clientService = mockRM.getClientRMService();
clientService.updateReservation(request);
Expand All @@ -1042,4 +957,43 @@ private void updateReservation(ReservationUpdateRequestInfo resContext) {
throw new RuntimeException(ex);
}
}

@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext, HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

try {
String resId = resContext.getReservationId();
ReservationId reservationId = ReservationId.parseReservationId(resId);

if (!reservationMap.containsKey(reservationId)) {
throw new NotFoundException("reservationId with id: " + reservationId + " not found");
}

ReservationDeleteRequest reservationDeleteRequest =
ReservationDeleteRequest.newInstance(reservationId);
ClientRMService clientService = mockRM.getClientRMService();
clientService.deleteReservation(reservationDeleteRequest);

ReservationDeleteResponseInfo resRespInfo = new ReservationDeleteResponseInfo();
reservationMap.remove(reservationId);

return Response.status(Status.OK).entity(resRespInfo).build();
} catch (YarnException e) {
throw new RuntimeException(e);
}
}

@VisibleForTesting
public MockRM getMockRM() {
return mockRM;
}

@VisibleForTesting
public void setMockRM(MockRM mockRM) {
this.mockRM = mockRM;
}
}

0 comments on commit 6e8af50

Please sign in to comment.