Skip to content

Commit

Permalink
YARN-11272. Federation StateStore: Support storage/retrieval of Reser…
Browse files Browse the repository at this point in the history
…vations With Zk. (#4781)
  • Loading branch information
slfan1989 committed Aug 31, 2022
1 parent 19830c9 commit 33edbed
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource {
@Metric("Duration for a get PolicyConfigurations call")
private MutableRate getPoliciesConfigurations;

@Metric("Duration for a add reservation homeSubCluster call")
private MutableRate addReservationHomeSubCluster;

@Metric("Duration for a get reservation homeSubCluster call")
private MutableRate getReservationHomeSubCluster;

@Metric("Duration for a get reservations homeSubCluster call")
private MutableRate getReservationsHomeSubCluster;

@Metric("Duration for a delete reservation homeSubCluster call")
private MutableRate deleteReservationHomeSubCluster;

@Metric("Duration for a update reservation homeSubCluster call")
private MutableRate updateReservationHomeSubCluster;

protected static final MetricsInfo RECORD_INFO =
info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls");

Expand Down Expand Up @@ -152,4 +167,24 @@ public void addSetPolicyConfigurationDuration(long startTime, long endTime) {
public void addGetPoliciesConfigurationsDuration(long startTime, long endTime) {
getPoliciesConfigurations.add(endTime - startTime);
}

public void addReservationHomeSubClusterDuration(long startTime, long endTime) {
addReservationHomeSubCluster.add(endTime - startTime);
}

public void addGetReservationHomeSubClusterDuration(long startTime, long endTime) {
getReservationHomeSubCluster.add(endTime - startTime);
}

public void addGetReservationsHomeSubClusterDuration(long startTime, long endTime) {
getReservationsHomeSubCluster.add(endTime - startTime);
}

public void addDeleteReservationHomeSubClusterDuration(long startTime, long endTime) {
deleteReservationHomeSubCluster.add(endTime - startTime);
}

public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) {
updateReservationHomeSubCluster.add(endTime - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.TimeZone;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand Down Expand Up @@ -65,6 +64,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
Expand All @@ -84,7 +84,9 @@
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.zookeeper.data.ACL;
Expand All @@ -105,8 +107,11 @@
* | |----- APP1
* | |----- APP2
* |--- POLICY
* |----- QUEUE1
* |----- QUEUE1
* | |----- QUEUE1
* | |----- QUEUE1
* |--- RESERVATION
* | |----- RESERVATION1
* | |----- RESERVATION2
*/
public class ZookeeperFederationStateStore implements FederationStateStore {

Expand All @@ -116,6 +121,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
private final static String ROOT_ZNODE_NAME_POLICY = "policies";
private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";

/** Interface to Zookeeper. */
private ZKCuratorManager zkManager;
Expand All @@ -126,6 +132,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private String appsZNode;
private String membershipZNode;
private String policiesZNode;
private String reservationsZNode;

private volatile Clock clock = SystemClock.getInstance();

Expand All @@ -151,13 +158,15 @@ public void init(Configuration conf) throws YarnException {
membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);

// Create base znode for each entity
try {
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
zkManager.createRootDirRecursively(appsZNode, zkAcl);
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
} catch (Exception e) {
String errMsg = "Cannot create base directories: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
Expand Down Expand Up @@ -686,6 +695,30 @@ private static long getCurrentTime() {
return cal.getTimeInMillis();
}

private void putReservation(final ReservationId reservationId,
final SubClusterId subClusterId, boolean update) throws YarnException {
String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
byte[] data = proto.toByteArray();
put(reservationZNode, data, update);
}

private SubClusterId getReservation(final ReservationId reservationId)
throws YarnException {
String reservationIdZNode = getNodePath(reservationsZNode, reservationId.toString());
SubClusterId subClusterId = null;
byte[] data = get(reservationIdZNode);
if (data != null) {
try {
subClusterId = new SubClusterIdPBImpl(SubClusterIdProto.parseFrom(data));
} catch (InvalidProtocolBufferException e) {
String errMsg = "Cannot parse reservation at " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
}
return subClusterId;
}

@VisibleForTesting
public ZKFederationStateStoreOpDurations getOpDurations() {
return opDurations;
Expand All @@ -694,30 +727,128 @@ public ZKFederationStateStoreOpDurations getOpDurations() {
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");

long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();

// Try to write the subcluster
SubClusterId homeSubCluster = reservationHomeSubCluster.getHomeSubCluster();
try {
putReservation(reservationId, homeSubCluster, false);
} catch (Exception e) {
String errMsg = "Cannot add reservation home subcluster for " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

// Check for the actual subcluster
try {
homeSubCluster = getReservation(reservationId);
} catch (Exception e) {
String errMsg = "Cannot check app home subcluster for " + reservationId;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addReservationHomeSubClusterDuration(start, end);
return AddReservationHomeSubClusterResponse.newInstance(homeSubCluster);
}

@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");

long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
SubClusterId homeSubCluster = getReservation(reservationId);

if (homeSubCluster == null) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
long end = clock.getTime();
opDurations.addGetReservationHomeSubClusterDuration(start, end);
return GetReservationHomeSubClusterResponse.newInstance(reservationHomeSubCluster);
}

@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
List<ReservationHomeSubCluster> result = new ArrayList<>();

try {
for (String child : zkManager.getChildren(reservationsZNode)) {
ReservationId reservationId = ReservationId.parseReservationId(child);
SubClusterId homeSubCluster = getReservation(reservationId);
ReservationHomeSubCluster app =
ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
result.add(app);
}
} catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetReservationsHomeSubClusterDuration(start, end);
return GetReservationsHomeSubClusterResponse.newInstance(result);
}

@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());

boolean exists = false;
try {
exists = zkManager.exists(reservationZNode);
} catch (Exception e) {
String errMsg = "Cannot check reservation: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

if (!exists) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

try {
zkManager.delete(reservationZNode);
} catch (Exception e) {
String errMsg = "Cannot delete reservation: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addDeleteReservationHomeSubClusterDuration(start, end);
return DeleteReservationHomeSubClusterResponse.newInstance();
}

@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");

long start = clock.getTime();
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
ReservationId reservationId = reservationHomeSubCluster.getReservationId();
SubClusterId homeSubCluster = getReservation(reservationId);

if (homeSubCluster == null) {
String errMsg = "Reservation " + reservationId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}

SubClusterId newSubClusterId = reservationHomeSubCluster.getHomeSubCluster();
putReservation(reservationId, newSubClusterId, true);
long end = clock.getTime();
opDurations.addUpdateReservationHomeSubClusterDuration(start, end);
return UpdateReservationHomeSubClusterResponse.newInstance();
}
}

0 comments on commit 33edbed

Please sign in to comment.