Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11226. [Federation] Add createNewReservation, submitReservation, updateReservation, deleteReservation REST APIs for Router. #4892

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e54c767
YARN-11226. YARN-11226. [Federation] Add createNewReservation, submit…
Sep 15, 2022
4a3dc94
YARN-11226. Fix CheckStyle.
Sep 15, 2022
6e8af50
YARN-11226. Improve Code Style.
Sep 16, 2022
d7209e6
YARN-11226. Fix CheckStyle.
Sep 16, 2022
7a73686
YARN-11226. Fix CheckStyle.
Sep 16, 2022
bb738ae
YARN-11226. Fix CheckStyle.
Sep 16, 2022
12d5811
YARN-11226. Fix CheckStyle.
Sep 16, 2022
001a2fe
YARN-11226. Fix CheckStyle.
Sep 17, 2022
7617b48
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Sep 21, 2022
27d310c
YARN-11226. Fix CheckStyle.
Sep 22, 2022
f569b26
YARN-11226. Fix CheckStyle And Add Junit Test.
Sep 23, 2022
f324c59
YARN-11226. Fix CheckStyle.
Sep 23, 2022
b9d33b4
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Sep 26, 2022
a0fc7bb
YARN-11226. Fix CheckStyle.
Sep 29, 2022
ad614d5
YARN-11226. Fix CheckStyle.
Oct 2, 2022
c1dbec5
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Oct 4, 2022
1e96bfb
YARN-11226. Fix CheckStyle.
Oct 5, 2022
07a3133
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 9, 2022
7f7426d
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 13, 2022
bbaeebc
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Oct 14, 2022
60ee8a3
YARN-11226. Fix CheckStyle.
Oct 14, 2022
2352c50
YARN-11226. Fix CheckStyle.
Oct 14, 2022
21bc868
YARN-11226. Fix CheckStyle.
Oct 14, 2022
6ad7d6a
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 17, 2022
8ed08bb
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 19, 2022
43bb877
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 20, 2022
8d8492e
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Oct 21, 2022
f32ed12
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Oct 27, 2022
b89b320
YARN-11226. Fix CheckStyle.
Oct 27, 2022
e64f06d
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Nov 1, 2022
be63f62
YARN-11226. Fix CheckStyle.
Nov 1, 2022
bb39541
YARN-11226. Fix CheckStyle.
Nov 2, 2022
5abeb24
YARN-11226. Fix CheckStyle.
Nov 2, 2022
0dfeb5c
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Nov 7, 2022
5d535b5
YARN-11226. Fix CheckStyle.
Nov 7, 2022
a73a258
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 10, 2022
b62ec21
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 11, 2022
61f9165
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 14, 2022
a23e080
Merge branch 'apache:trunk' into YARN-11226-V2
slfan1989 Nov 16, 2022
c0f381d
Merge branch 'trunk' into YARN-11226-V2
slfan1989 Nov 29, 2022
5011e12
YARN-11226. Fix CheckStyle.
Nov 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,10 +27,23 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,6 +71,8 @@ public final class RouterServerUtil {

private static final String CONTAINER_PREFIX = "container_";

public static final String RESERVEIDSTR_PREFIX = "reservation_";

private static final String EPOCH_PREFIX = "e";

private static Random rand = new Random(System.currentTimeMillis());
Expand Down Expand Up @@ -263,6 +278,26 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat,
}
}

/**
* Save Reservation And HomeSubCluster Mapping.
*
* @param federationFacade federation facade
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public static void addReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home
federationFacade.addReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
RouterServerUtil.logAndThrowException(e,
"Unable to insert the ReservationId %s into the FederationStateStore.", reservationId);
}
}

/**
* Throws an RunTimeException due to an error.
*
Expand All @@ -286,6 +321,102 @@ public static RuntimeException logAndReturnRunTimeException(
}

/**
* Update Reservation And HomeSubCluster Mapping.
*
* @param federationFacade federation facade
* @param subClusterId subClusterId
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public static void updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
SubClusterId subClusterId, ReservationId reservationId,
ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore =
federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId);
} else {
RouterServerUtil.logAndThrowException(e,
"Unable to update the ReservationId %s into the FederationStateStore.", reservationId);
}
}
}

/**
* Exists ReservationHomeSubCluster Mapping.
*
* @param federationFacade federation facade
* @param reservationId reservationId
* @return true - exist, false - not exist
*/
public static boolean existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
ReservationId reservationId) {
try {
SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e);
}
return false;
}

public static ReservationDefinition convertReservationDefinition(
goiri marked this conversation as resolved.
Show resolved Hide resolved
ReservationDefinitionInfo definitionInfo) {

if (definitionInfo == null || definitionInfo.getReservationRequests() == null
|| definitionInfo.getReservationRequests().getReservationRequest() == null
|| definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) {
throw new RuntimeException("definitionInfo Or ReservationRequests is Null.");
}

// basic variable
long arrival = definitionInfo.getArrival();
long deadline = definitionInfo.getDeadline();

// ReservationRequests reservationRequests
String name = definitionInfo.getReservationName();
String recurrenceExpression = definitionInfo.getRecurrenceExpression();
Priority priority = Priority.newInstance(definitionInfo.getPriority());

// reservation requests info
List<ReservationRequest> reservationRequestList = new ArrayList<>();

ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();

List<ReservationRequestInfo> reservationRequestInfos =
reservationRequestsInfo.getReservationRequest();

for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
ResourceInfo resourceInfo = resRequestInfo.getCapability();
Resource capability =
Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores());
ReservationRequest reservationRequest = ReservationRequest.newInstance(capability,
resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(),
resRequestInfo.getDuration());
reservationRequestList.add(reservationRequest);
}

ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter reservationRequestInterpreter =
values[reservationRequestsInfo.getReservationRequestsInterpreter()];
ReservationRequests reservationRequests = ReservationRequests.newInstance(
reservationRequestList, reservationRequestInterpreter);

ReservationDefinition definition = ReservationDefinition.newInstance(
arrival, deadline, reservationRequests, name, recurrenceExpression, priority);

return definition;
}

/*
* Throws an RunTimeException due to an error.
*
* @param errMsgFormat the error message format string.
Expand Down Expand Up @@ -456,6 +587,31 @@ public static void validateContainerId(String containerId)
}
}

@Public
@Unstable
public static void validateReservationId(String reservationId)
throws IllegalArgumentException {

if (reservationId == null || reservationId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}

if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}

String[] resFields = reservationId.split("_");
if (resFields.length != 3) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}

String clusterTimestamp = resFields[1];
String id = resFields[2];
if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
}

/**
* Randomly pick ActiveSubCluster.
* During the selection process, we will exclude SubClusters from the blacklist.
Expand Down