Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11226. [Federation] Add createNewReservation, submitReservation, updateReservation, deleteReservation REST APIs for Router. #4892

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e54c767
YARN-11226. YARN-11226. [Federation] Add createNewReservation, submit…
Sep 15, 2022
4a3dc94
YARN-11226. Fix CheckStyle.
Sep 15, 2022
6e8af50
YARN-11226. Improve Code Style.
Sep 16, 2022
d7209e6
YARN-11226. Fix CheckStyle.
Sep 16, 2022
7a73686
YARN-11226. Fix CheckStyle.
Sep 16, 2022
bb738ae
YARN-11226. Fix CheckStyle.
Sep 16, 2022
12d5811
YARN-11226. Fix CheckStyle.
Sep 16, 2022
001a2fe
YARN-11226. Fix CheckStyle.
Sep 17, 2022
7617b48
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Sep 21, 2022
27d310c
YARN-11226. Fix CheckStyle.
Sep 22, 2022
f569b26
YARN-11226. Fix CheckStyle And Add Junit Test.
Sep 23, 2022
f324c59
YARN-11226. Fix CheckStyle.
Sep 23, 2022
b9d33b4
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Sep 26, 2022
a0fc7bb
YARN-11226. Fix CheckStyle.
Sep 29, 2022
ad614d5
YARN-11226. Fix CheckStyle.
Oct 2, 2022
c1dbec5
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Oct 4, 2022
1e96bfb
YARN-11226. Fix CheckStyle.
Oct 5, 2022
07a3133
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 9, 2022
7f7426d
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 13, 2022
bbaeebc
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Oct 14, 2022
60ee8a3
YARN-11226. Fix CheckStyle.
Oct 14, 2022
2352c50
YARN-11226. Fix CheckStyle.
Oct 14, 2022
21bc868
YARN-11226. Fix CheckStyle.
Oct 14, 2022
6ad7d6a
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 17, 2022
8ed08bb
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 19, 2022
43bb877
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 20, 2022
8d8492e
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Oct 21, 2022
f32ed12
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 27, 2022
b89b320
YARN-11226. Fix CheckStyle.
Oct 27, 2022
e64f06d
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Nov 1, 2022
be63f62
YARN-11226. Fix CheckStyle.
Nov 1, 2022
bb39541
YARN-11226. Fix CheckStyle.
Nov 2, 2022
5abeb24
YARN-11226. Fix CheckStyle.
Nov 2, 2022
0dfeb5c
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Nov 7, 2022
5d535b5
YARN-11226. Fix CheckStyle.
Nov 7, 2022
a73a258
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 10, 2022
b62ec21
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 11, 2022
61f9165
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 14, 2022
a23e080
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 16, 2022
c0f381d
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Nov 29, 2022
5011e12
YARN-11226. Fix CheckStyle.
Nov 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,6 +24,20 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat,
throw new RuntimeException(msg);
}
}

/**
* Save Reservation And HomeSubCluster Mapping.
*
* @param federationFacade federation facade
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public static void addReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home
federationFacade.addReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
RouterServerUtil.logAndThrowException(e,
"Unable to insert the ReservationId %s into the FederationStateStore.", reservationId);
}
}

/**
* Update Reservation And HomeSubCluster Mapping.
*
* @param federationFacade federation facade
* @param subClusterId subClusterId
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public static void updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
SubClusterId subClusterId, ReservationId reservationId,
ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore =
federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId);
} else {
RouterServerUtil.logAndThrowException(e,
"Unable to update the ReservationId %s into the FederationStateStore.", reservationId);
}
}
}

/**
* Exists ReservationHomeSubCluster Mapping.
*
* @param federationFacade federation facade
* @param reservationId reservationId
* @return true - exist, false - not exist
*/
public static Boolean existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be the native boolean type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your help reviewing the code, I will modify the code.

ReservationId reservationId) {
try {
SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e);
}
return false;
}

public static ReservationDefinition convertReservationDefinition(
goiri marked this conversation as resolved.
Show resolved Hide resolved
ReservationDefinitionInfo definitionInfo) {

// basic variable
long arrival = definitionInfo.getArrival();
long deadline = definitionInfo.getDeadline();

// ReservationRequests reservationRequests
String name = definitionInfo.getReservationName();
String recurrenceExpression = definitionInfo.getRecurrenceExpression();
Priority priority = Priority.newInstance(definitionInfo.getPriority());

// reservation requests info
List<ReservationRequest> reservationRequestList = new ArrayList<>();

ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();

List<ReservationRequestInfo> reservationRequestInfos =
reservationRequestsInfo.getReservationRequest();

for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
ResourceInfo resourceInfo = resRequestInfo.getCapability();
Resource capability =
Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores());
ReservationRequest reservationRequest = ReservationRequest.newInstance(capability,
resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(),
resRequestInfo.getDuration());
reservationRequestList.add(reservationRequest);
}

ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter reservationRequestInterpreter =
values[reservationRequestsInfo.getReservationRequestsInterpreter()];
ReservationRequests reservationRequests =
ReservationRequests.newInstance(reservationRequestList, reservationRequestInterpreter);

ReservationDefinition definition =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First two lines into one line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This?

ReservationDefinition.newInstance(
arrival, deadline, reservationRequests, name, recurrenceExpression, priority);

return definition;
}
}
Expand Up @@ -49,11 +49,13 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
Expand All @@ -62,6 +64,7 @@
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
Expand Down Expand Up @@ -98,6 +101,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
Expand Down Expand Up @@ -1456,28 +1460,177 @@ public Response cancelDelegationToken(HttpServletRequest hsr)
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException("Code is not implemented");
long startTime = clock.getTime();

Map<SubClusterId, SubClusterInfo> subClustersActive;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
routerMetrics.incrGetNewReservationFailedRetrieved();
return Response.status(Status.INTERNAL_SERVER_ERROR).
entity(e.getLocalizedMessage()).build();
}

List<SubClusterId> blacklist = new ArrayList<>();

for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = null;
try {
subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterId, subClusterInfo.getRMWebServiceAddress());
Response response = interceptor.createNewReservation(hsr);
LOG.info("createNewReservation try #{} on SubCluster {}.", i, subClusterId);
if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
long stopTime = clock.getTime();
routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime);
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subCluster for this request.
blacklist.add(subClusterId);
}
} catch (YarnException e) {
routerMetrics.incrGetNewReservationFailedRetrieved();
goiri marked this conversation as resolved.
Show resolved Hide resolved
LOG.error("createNewReservation try #{} on SubCluster {} error.", i, subClusterId, e);
}
}

// return Status.SERVICE_UNAVAILABLE
return Response.status(Status.SERVICE_UNAVAILABLE).entity("createNewReservation error").build();
}

@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException("Code is not implemented");

if (resContext == null || resContext.getReservationId() == null
|| resContext.getReservationDefinition() == null || resContext.getQueue() == null) {
routerMetrics.incrSubmitReservationFailedRetrieved();
String errMsg = "Missing submitReservation resContext or reservationId " +
"or reservation definition or queue.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
String resId = resContext.getReservationId();

long startTime = clock.getTime();
for (int i = 0; i < numSubmitRetries; i++) {
try {
ReservationId reservationId = ReservationId.parseReservationId(resId);
ReservationDefinitionInfo definitionInfo = resContext.getReservationDefinition();
ReservationDefinition definition =
RouterServerUtil.convertReservationDefinition(definitionInfo);

// First, Get SubClusterId according to specific strategy.
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(definition,
resContext.getQueue(), reservationId);
SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request);

LOG.info("submitReservation ReservationId {} try #{} on SubCluster {}.",
reservationId, i, subClusterId);

ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);

// Second, determine whether the current ReservationId has a corresponding subCluster.
// If it does not exist, add it. If it exists, update it.
Boolean exists = RouterServerUtil.existsReservationHomeSubCluster(
federationFacade, reservationId);
if (!exists) {
RouterServerUtil.addReservationHomeSubCluster(federationFacade,
reservationId, reservationHomeSubCluster);
} else {
RouterServerUtil.updateReservationHomeSubCluster(federationFacade,
subClusterId, reservationId, reservationHomeSubCluster);
}

// Third, Submit a Reservation request to the subCluster
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.submitReservation(resContext, hsrCopy);
if (response != null) {
LOG.info("Reservation {} submitted on subCluster {}.", reservationId, subClusterId);
long stopTime = clock.getTime();
routerMetrics.succeededSubmitReservationRetrieved(stopTime - startTime);
return response;
}
} catch (Exception e) {
LOG.warn("Unable to submit(try #{}) the Reservation {}.", i, resId, e);
}
}

routerMetrics.incrSubmitReservationFailedRetrieved();
String msg = String.format("Reservation %s failed to be submitted.", resId);
throw new YarnRuntimeException(msg);
}

@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException("Code is not implemented");

// parameter verification
if (resContext == null || resContext.getReservationId() == null
|| resContext.getReservationDefinition() == null) {
routerMetrics.incrUpdateReservationFailedRetrieved();
String errMsg = "Missing updateReservation resContext or reservationId " +
"or reservation definition.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}

// get ReservationId
String reservationId = resContext.getReservationId();
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.updateReservation(resContext, hsrCopy);
if (response != null) {
return response;
}
} catch (Exception e) {
RouterServerUtil.logAndThrowRunTimeException("updateReservation Failed.", e);
}

// throw an exception
throw new YarnRuntimeException("updateReservation Failed, reservationId = " + reservationId);
}

@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException("Code is not implemented");

// parameter verification
if (resContext == null || resContext.getReservationId() == null) {
routerMetrics.incrDeleteReservationFailedRetrieved();
String errMsg = "Missing deleteReservation request or reservationId.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}

// get ReservationId
String reservationId = resContext.getReservationId();
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.deleteReservation(resContext, hsrCopy);
if (response != null) {
return response;
}
} catch (Exception e) {
RouterServerUtil.logAndThrowRunTimeException("deleteReservation Failed.", e);
}

// throw an exception
throw new YarnRuntimeException("deleteReservation Failed, reservationId = " + reservationId);
}

@Override
Expand Down Expand Up @@ -1865,4 +2018,9 @@ private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
return appInfosCaches;
}

@VisibleForTesting
public Map<SubClusterId, DefaultRequestInterceptorREST> getInterceptors() {
return interceptors;
}
}
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
Expand Down Expand Up @@ -74,10 +75,17 @@ public abstract class BaseRouterWebServicesTest {
private Router router;
public final static int TEST_MAX_CACHE_SIZE = 10;

public static final String QUEUE_DEFAULT = "default";
public static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
public static final String QUEUE_DEDICATED = "dedicated";
public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;

private RouterWebServices routerWebService;

@Before
public void setUp() {
public void setUp() throws YarnException, IOException {
this.conf = createConfiguration();

router = spy(new Router());
Expand Down