Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11272. Federation StateStore: Support storage/retrieval of Reservations With Zk. #4781

Merged
merged 5 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource {
@Metric("Duration for a get PolicyConfigurations call")
private MutableRate getPoliciesConfigurations;

@Metric("Duration for a add reservation homeSubCluster call")
private MutableRate addReservationHomeSubCluster;

@Metric("Duration for a get reservation homeSubCluster call")
private MutableRate getReservationHomeSubCluster;

@Metric("Duration for a get reservations homeSubCluster call")
private MutableRate getReservationsHomeSubCluster;

@Metric("Duration for a delete reservation homeSubCluster call")
private MutableRate deleteReservationHomeSubCluster;

@Metric("Duration for a update reservation homeSubCluster call")
private MutableRate updateReservationHomeSubCluster;

protected static final MetricsInfo RECORD_INFO =
info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls");

Expand Down Expand Up @@ -152,4 +167,24 @@ public void addSetPolicyConfigurationDuration(long startTime, long endTime) {
public void addGetPoliciesConfigurationsDuration(long startTime, long endTime) {
getPoliciesConfigurations.add(endTime - startTime);
}

public void addReservationHomeSubClusterDuration(long startTime, long endTime) {
addReservationHomeSubCluster.add(endTime - startTime);
}

public void addGetReservationHomeSubClusterDuration(long startTime, long endTime) {
getReservationHomeSubCluster.add(endTime - startTime);
}

public void addGetReservationsHomeSubClusterDuration(long startTime, long endTime) {
getReservationsHomeSubCluster.add(endTime - startTime);
}

public void addDeleteReservationHomeSubClusterDuration(long startTime, long endTime) {
deleteReservationHomeSubCluster.add(endTime - startTime);
}

public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) {
updateReservationHomeSubCluster.add(endTime - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.TimeZone;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand Down Expand Up @@ -65,6 +64,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
Expand All @@ -84,7 +84,9 @@
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.zookeeper.data.ACL;
Expand All @@ -105,8 +107,11 @@
* | |----- APP1
* | |----- APP2
* |--- POLICY
* |----- QUEUE1
* |----- QUEUE1
* | |----- QUEUE1
* | |----- QUEUE1
* |--- RESERVATION
* | |----- RESERVATION1
* | |----- RESERVATION2
*/
public class ZookeeperFederationStateStore implements FederationStateStore {

Expand All @@ -116,6 +121,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
private final static String ROOT_ZNODE_NAME_POLICY = "policies";
private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";

/** Interface to Zookeeper. */
private ZKCuratorManager zkManager;
Expand All @@ -126,6 +132,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private String appsZNode;
private String membershipZNode;
private String policiesZNode;
private String reservationsZNode;

private volatile Clock clock = SystemClock.getInstance();

Expand All @@ -151,13 +158,15 @@ public void init(Configuration conf) throws YarnException {
membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);

// Create base znode for each entity
try {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
zkManager.createRootDirRecursively(appsZNode, zkAcl);
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
} catch (Exception e) {
String errMsg = "Cannot create base directories: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
Expand Down Expand Up @@ -686,6 +695,30 @@ private static long getCurrentTime() {
return cal.getTimeInMillis();
}

private void putReservation(final ReservationId reservationId,
final SubClusterId subClusterId, boolean update) throws YarnException {
String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
byte[] data = proto.toByteArray();
put(reservationZNode, data, update);
}

private SubClusterId getReservation(final ReservationId reservationId)
throws YarnException {
String reservationIdZNode = getNodePath(reservationsZNode, reservationId.toString());
SubClusterId subClusterId = null;
byte[] data = get(reservationIdZNode);
if (data != null) {
try {
subClusterId = new SubClusterIdPBImpl(SubClusterIdProto.parseFrom(data));
} catch (InvalidProtocolBufferException e) {
String errMsg = "Cannot parse reservation at " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
return subClusterId;
}

@VisibleForTesting
public ZKFederationStateStoreOpDurations getOpDurations() {
return opDurations;
Expand All @@ -694,30 +727,128 @@ public ZKFederationStateStoreOpDurations getOpDurations() {
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");

long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();

// Try to write the subcluster
SubClusterId homeSubCluster = reservationHomeSubCluster.getHomeSubCluster();
try {
putReservation(reservationId, homeSubCluster, false);
} catch (Exception e) {
String errMsg = "Cannot add reservation home subcluster for " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

// Check for the actual subcluster
try {
homeSubCluster = getReservation(reservationId);
} catch (Exception e) {
String errMsg = "Cannot check app home subcluster for " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addReservationHomeSubClusterDuration(start, end);
return AddReservationHomeSubClusterResponse.newInstance(homeSubCluster);
}

@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");

long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
SubClusterId homeSubCluster = getReservation(reservationId);

if (homeSubCluster == null) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
long end = clock.getTime();
opDurations.addGetReservationHomeSubClusterDuration(start, end);
return GetReservationHomeSubClusterResponse.newInstance(reservationHomeSubCluster);
}

@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
List<ReservationHomeSubCluster> result = new ArrayList<>();

try {
for (String child : zkManager.getChildren(reservationsZNode)) {
ReservationId reservationId = ReservationId.parseReservationId(child);
SubClusterId homeSubCluster = getReservation(reservationId);
ReservationHomeSubCluster app =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
result.add(app);
}
} catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetReservationsHomeSubClusterDuration(start, end);
return GetReservationsHomeSubClusterResponse.newInstance(result);
}

@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());

boolean exists = false;
try {
exists = zkManager.exists(reservationZNode);
} catch (Exception e) {
String errMsg = "Cannot check reservation: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

if (!exists) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

try {
zkManager.delete(reservationZNode);
} catch (Exception e) {
String errMsg = "Cannot delete reservation: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addDeleteReservationHomeSubClusterDuration(start, end);
return DeleteReservationHomeSubClusterResponse.newInstance();
}

@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");

long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
SubClusterId homeSubCluster = getReservation(reservationId);

if (homeSubCluster == null) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

SubClusterId newSubClusterId = reservationHomeSubCluster.getHomeSubCluster();
putReservation(reservationId, newSubClusterId, true);
long end = clock.getTime();
opDurations.addUpdateReservationHomeSubClusterDuration(start, end);
return UpdateReservationHomeSubClusterResponse.newInstance();
}
}