Skip to content

Commit

Permalink
draft of lb paused request cleanup queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ssalinas committed Sep 17, 2015
1 parent b0c1fea commit 4cbeb2c
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 7 deletions.
Expand Up @@ -6,7 +6,7 @@

public enum LoadBalancerRequestType {

ADD, REMOVE, DEPLOY;
ADD, REMOVE, DEPLOY, DELETE;

public static class LoadBalancerRequestId {

Expand Down
Expand Up @@ -3,6 +3,8 @@
import java.util.Collection;
import java.util.List;

import com.hubspot.singularity.SingularityLoadBalancerUpdate;
import com.hubspot.singularity.SingularityTaskId;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
Expand Down Expand Up @@ -39,6 +41,7 @@ public class RequestManager extends CuratorAsyncManager {
private final Transcoder<SingularityPendingRequest> pendingRequestTranscoder;
private final Transcoder<SingularityRequestCleanup> requestCleanupTranscoder;
private final Transcoder<SingularityRequestHistory> requestHistoryTranscoder;
private final Transcoder<SingularityLoadBalancerUpdate> loadBalancerUpdateTranscoder;

private final SingularityEventListener singularityEventListener;

Expand All @@ -48,17 +51,19 @@ public class RequestManager extends CuratorAsyncManager {
private static final String PENDING_PATH_ROOT = REQUEST_ROOT + "/pending";
private static final String CLEANUP_PATH_ROOT = REQUEST_ROOT + "/cleanup";
private static final String HISTORY_PATH_ROOT = REQUEST_ROOT + "/history";
private static final String LB_CLEANUP_PATH_ROOT = REQUEST_ROOT + "/lbCleanup";

@Inject
public RequestManager(SingularityConfiguration configuration, CuratorFramework curator, SingularityEventListener singularityEventListener, Transcoder<SingularityRequestCleanup> requestCleanupTranscoder,
Transcoder<SingularityRequestWithState> requestTranscoder, Transcoder<SingularityPendingRequest> pendingRequestTranscoder, Transcoder<SingularityRequestHistory> requestHistoryTranscoder) {
Transcoder<SingularityRequestWithState> requestTranscoder, Transcoder<SingularityPendingRequest> pendingRequestTranscoder, Transcoder<SingularityRequestHistory> requestHistoryTranscoder, Transcoder<SingularityLoadBalancerUpdate> loadBalancerHistoryUpdateTranscoder) {
super(curator, configuration.getZookeeperAsyncTimeout());

this.requestTranscoder = requestTranscoder;
this.requestCleanupTranscoder = requestCleanupTranscoder;
this.pendingRequestTranscoder = pendingRequestTranscoder;
this.requestHistoryTranscoder = requestHistoryTranscoder;
this.singularityEventListener = singularityEventListener;
this.loadBalancerUpdateTranscoder = loadBalancerHistoryUpdateTranscoder;
}

private String getRequestPath(String requestId) {
Expand Down Expand Up @@ -89,6 +94,10 @@ public int getSizeOfCleanupQueue() {
return getNumChildren(CLEANUP_PATH_ROOT);
}

public int getNumLbCleanupRequests() {
return getNumChildren(LB_CLEANUP_PATH_ROOT);
}

public int getNumRequests() {
return getNumChildren(NORMAL_PATH_ROOT);
}
Expand Down Expand Up @@ -269,4 +278,27 @@ public void deleteRequest(SingularityRequest request, Optional<String> user) {
delete(getRequestPath(request.getId()));
}

public List<String> getLBCleanupRequestIds() {
return getChildren(LB_CLEANUP_PATH_ROOT);
}

private String getLBCleanupPath(String requestId) {
return ZKPaths.makePath(LB_CLEANUP_PATH_ROOT, requestId);
}

public SingularityCreateResult createLBCleanupRequest(String requestId) {
return create(getLBCleanupPath(requestId));
}

public SingularityDeleteResult deleteLBCleanupRequest(String requestId) {
return delete(getLBCleanupPath(requestId));
}

public Optional<SingularityLoadBalancerUpdate> getLoadBalancerState(String requestId) {
return getData(getLBCleanupPath(requestId), loadBalancerUpdateTranscoder);
}

public void saveLoadBalancerState(String requestId, SingularityLoadBalancerUpdate lbUpdate) {
save(getLBCleanupPath(requestId), lbUpdate, loadBalancerUpdateTranscoder);
}
}
Expand Up @@ -2,6 +2,7 @@

import java.util.List;

import com.google.common.base.Optional;
import com.hubspot.singularity.LoadBalancerRequestType.LoadBalancerRequestId;
import com.hubspot.singularity.SingularityDeploy;
import com.hubspot.singularity.SingularityLoadBalancerUpdate;
Expand Down
Expand Up @@ -208,9 +208,6 @@ private void drainRequestCleanupQueue() {
LOG.info("Ignoring {}, because {} still existed", requestCleanup, requestCleanup.getRequestId());
} else {
cleanupDeployState(requestCleanup);
if (requestWithState.isPresent() && requestWithState.get().getRequest().isLoadBalanced()) {
deleteFromLoadBalancer = true;
}
}
break;
case BOUNCE:
Expand Down Expand Up @@ -240,6 +237,10 @@ private void drainRequestCleanupQueue() {
}
}

if (deleteFromLoadBalancer) {
requestManager.createLBCleanupRequest(requestId);
}

requestManager.deleteCleanRequest(requestId, requestCleanup.getCleanupType());
}

Expand Down Expand Up @@ -271,7 +272,8 @@ private void cleanupDeployState(SingularityRequestCleanup requestCleanup) {
public void drainCleanupQueue() {
drainRequestCleanupQueue();
drainTaskCleanupQueue();
drainLBCleanupQueue();
drainLBTaskCleanupQueue();
drainLBRequestCleanupQueue();
checkKilledTaskIdRecords();
}

Expand Down Expand Up @@ -490,7 +492,7 @@ private CheckLBState checkLbState(SingularityTaskId taskId) {
return CheckLBState.WAITING;
}

private void drainLBCleanupQueue() {
private void drainLBTaskCleanupQueue() {
final long start = System.currentTimeMillis();

final List<SingularityTaskId> lbCleanupTasks = taskManager.getLBCleanupTasks();
Expand Down Expand Up @@ -531,4 +533,116 @@ private void drainLBCleanupQueue() {
LOG.info("LB cleaned {} tasks ({} left, {} obsolete) in {}", cleanedTasks, lbCleanupTasks.size() - (ignoredTasks + cleanedTasks), ignoredTasks, JavaUtils.duration(start));
}

private void drainLBRequestCleanupQueue() {
final long start = System.currentTimeMillis();

final List<String> lbCleanupRequests = requestManager.getLBCleanupRequestIds();

if (lbCleanupRequests.isEmpty()) {
LOG.trace("LB request cleanup queue is empty");
return;
}

LOG.info("LB request cleanup queue had {} requests", lbCleanupRequests.size());

int cleanedRequests = 0;
int ignoredRequests = 0;

for (String requestId : lbCleanupRequests) {
final long checkStart = System.currentTimeMillis();

final CheckLBState checkLbState = checkRequestLbState(requestId);

LOG.debug("LB cleanup for request {} had state {} after {}", requestId, checkLbState, JavaUtils.duration(checkStart));

switch (checkLbState) {
case WAITING:
case RETRY:
continue;
case DONE:
case MISSING_TASK:
cleanedRequests++;
break;
case NOT_LOAD_BALANCED:
case LOAD_BALANCE_FAILED:
ignoredRequests++;
}

requestManager.deleteLBCleanupRequest(requestId);
}
LOG.info("LB cleaned {} requests ({} left, {} obsolete) in {}", cleanedRequests, lbCleanupRequests.size() - (ignoredRequests + cleanedRequests), ignoredRequests, JavaUtils.duration(start));

}

private CheckLBState checkRequestLbState(String requestId) {
Optional<SingularityLoadBalancerUpdate> maybeDeleteUpdate = requestManager.getLoadBalancerState(requestId);

Optional<SingularityRequestWithState> maybeRequestWithState = requestManager.getRequest(requestId);

if (!maybeRequestWithState.isPresent()) {
exceptionNotifier.notify(String.format("LB delete failed for paused request %s, request data not found", requestId), ImmutableMap.of("requestId", requestId));
return CheckLBState.RETRY;
}

final LoadBalancerRequestId loadBalancerRequestId = getLoadBalancerRequestId(requestId, maybeDeleteUpdate);
SingularityLoadBalancerUpdate lbDeleteUpdate;

if (!maybeDeleteUpdate.isPresent()) {
Optional<String> maybeCurrentDeployId = deployManager.getInUseDeployId(requestId);
Optional<SingularityDeploy> maybeDeploy = Optional.absent();
if (maybeCurrentDeployId.isPresent()) {
maybeDeploy = deployManager.getDeploy(requestId, maybeCurrentDeployId.get());
}
if (maybeDeploy.isPresent()) {
lbDeleteUpdate = lbClient.delete(loadBalancerRequestId, maybeRequestWithState.get().getRequest(), maybeDeploy.get());
requestManager.saveLoadBalancerState(requestId, lbDeleteUpdate);
} else {
exceptionNotifier.notify(String.format("LB delete failed for paused request %s, current deploy not found", requestId),
ImmutableMap.of("requestId", requestId, "currentDeployId", maybeCurrentDeployId.toString(), "currentDeploy", maybeDeploy.toString()));
return CheckLBState.RETRY;
}
} else if (maybeDeleteUpdate.get().getLoadBalancerState() == BaragonRequestState.WAITING || maybeDeleteUpdate.get().getLoadBalancerState() == BaragonRequestState.CANCELING) {
lbDeleteUpdate = lbClient.getState(loadBalancerRequestId);
requestManager.saveLoadBalancerState(requestId, lbDeleteUpdate);
} else {
lbDeleteUpdate = maybeDeleteUpdate.get();
}

switch (lbDeleteUpdate.getLoadBalancerState()) {
case SUCCESS:
return CheckLBState.DONE;
case FAILED:
case CANCELED:
LOG.error("LB delete request {} ({}) got unexpected response {}", lbDeleteUpdate, loadBalancerRequestId, lbDeleteUpdate.getLoadBalancerState());
exceptionNotifier.notify(String.format("LB delete failed for %s", lbDeleteUpdate.getLoadBalancerRequestId().toString()),
ImmutableMap.of("state", lbDeleteUpdate.getLoadBalancerState().name(), "loadBalancerRequestId", loadBalancerRequestId.toString(), "addUpdate", lbDeleteUpdate.toString()));
return CheckLBState.RETRY;
case UNKNOWN:
case CANCELING:
case WAITING:
LOG.trace("Waiting on LB delete request {} in state {}", loadBalancerRequestId, lbDeleteUpdate.getLoadBalancerState());
break;
case INVALID_REQUEST_NOOP:
exceptionNotifier.notify(String.format("LB delete failed for %s", lbDeleteUpdate.getLoadBalancerRequestId().toString()),
ImmutableMap.of("state", lbDeleteUpdate.getLoadBalancerState().name(), "loadBalancerRequestId", loadBalancerRequestId.toString(), "addUpdate", lbDeleteUpdate.toString()));
return CheckLBState.LOAD_BALANCE_FAILED;
}

return CheckLBState.WAITING;
}

private LoadBalancerRequestId getLoadBalancerRequestId(String requestId, Optional<SingularityLoadBalancerUpdate> lbDeleteUpdate) {
if (!lbDeleteUpdate.isPresent()) {
return new LoadBalancerRequestId(requestId, LoadBalancerRequestType.DELETE, Optional.<Integer> absent());
}

switch (lbDeleteUpdate.get().getLoadBalancerState()) {
case FAILED:
case CANCELED:
return new LoadBalancerRequestId(requestId, LoadBalancerRequestType.DELETE, Optional.of(lbDeleteUpdate.get().getLoadBalancerRequestId().getAttemptNumber() + 1));
default:
return lbDeleteUpdate.get().getLoadBalancerRequestId();
}
}

}

0 comments on commit 4cbeb2c

Please sign in to comment.