Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove paused requests from LB #694

Merged
merged 10 commits into from
Oct 13, 2015
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public enum LoadBalancerRequestType {

ADD, REMOVE, DEPLOY;
ADD, REMOVE, DEPLOY, DELETE;

public static class LoadBalancerRequestId {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class SingularityLoadBalancerUpdate {
private final LoadBalancerRequestId loadBalancerRequestId;

public enum LoadBalancerMethod {
PRE_ENQUEUE, ENQUEUE, CHECK_STATE, CANCEL;
PRE_ENQUEUE, ENQUEUE, CHECK_STATE, CANCEL, DELETE;
}

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class SingularityState {
private final int futureTasks;
private final int cleaningTasks;
private final int lbCleanupTasks;
private final int lbCleanupRequests;

private final long maxTaskLag;

Expand Down Expand Up @@ -52,7 +53,7 @@ public class SingularityState {
@JsonCreator
public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonProperty("activeRequests") int activeRequests, @JsonProperty("cooldownRequests") int cooldownRequests,
@JsonProperty("pausedRequests") int pausedRequests, @JsonProperty("scheduledTasks") int scheduledTasks, @JsonProperty("pendingRequests") int pendingRequests, @JsonProperty("lbCleanupTasks") int lbCleanupTasks,
@JsonProperty("cleaningRequests") int cleaningRequests, @JsonProperty("activeSlaves") int activeSlaves, @JsonProperty("deadSlaves") int deadSlaves,
@JsonProperty("lbCleanupRequests") int lbCleanupRequests, @JsonProperty("cleaningRequests") int cleaningRequests, @JsonProperty("activeSlaves") int activeSlaves, @JsonProperty("deadSlaves") int deadSlaves,
@JsonProperty("decommissioningSlaves") int decommissioningSlaves, @JsonProperty("activeRacks") int activeRacks, @JsonProperty("deadRacks") int deadRacks, @JsonProperty("decommissioningRacks") int decommissioningRacks,
@JsonProperty("cleaningTasks") int cleaningTasks, @JsonProperty("hostStates") List<SingularityHostState> hostStates, @JsonProperty("oldestDeploy") long oldestDeploy, @JsonProperty("numDeploys") int numDeploys,
@JsonProperty("lateTasks") int lateTasks, @JsonProperty("futureTasks") int futureTasks, @JsonProperty("maxTaskLag") long maxTaskLag, @JsonProperty("generatedAt") long generatedAt,
Expand Down Expand Up @@ -84,6 +85,7 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope
this.oldestDeploy = oldestDeploy;
this.numDeploys = numDeploys;
this.lbCleanupTasks = lbCleanupTasks;
this.lbCleanupRequests = lbCleanupRequests;
this.underProvisionedRequests = underProvisionedRequests;
this.overProvisionedRequests = overProvisionedRequests;
this.overProvisionedRequestIds = overProvisionedRequestIds;
Expand Down Expand Up @@ -197,6 +199,10 @@ public int getLbCleanupTasks() {
return lbCleanupTasks;
}

public int getLbCleanupRequests() {
return lbCleanupRequests;
}

public List<String> getOverProvisionedRequestIds() {
return overProvisionedRequestIds;
}
Expand Down Expand Up @@ -227,8 +233,8 @@ public Optional<Boolean> getAuthDatastoreHealthy() {

@Override
public String toString() {
return "SingularityState [activeTasks=" + activeTasks + ", pausedRequests=" + pausedRequests + ", activeRequests=" + activeRequests + ", cooldownRequests=" + cooldownRequests
+ ", scheduledTasks=" + scheduledTasks + ", lateTasks=" + lateTasks + ", futureTasks=" + futureTasks + ", cleaningTasks=" + cleaningTasks + ", lbCleanupTasks=" + lbCleanupTasks
return "SingularityState [activeTasks=" + activeTasks + ", pausedRequests=" + pausedRequests + ", activeRequests=" + activeRequests + ", cooldownRequests=" + cooldownRequests + ", scheduledTasks=" + scheduledTasks
+ ", lateTasks=" + lateTasks + ", futureTasks=" + futureTasks + ", cleaningTasks=" + cleaningTasks + ", lbCleanupTasks=" + lbCleanupTasks + ", lbCleanupRequests=" + lbCleanupRequests
+ ", maxTaskLag=" + maxTaskLag + ", pendingRequests=" + pendingRequests + ", cleaningRequests=" + cleaningRequests + ", finishedRequests=" + finishedRequests + ", activeSlaves="
+ activeSlaves + ", deadSlaves=" + deadSlaves + ", decommissioningSlaves=" + decommissioningSlaves + ", unknownSlaves=" + unknownSlaves + ", activeRacks=" + activeRacks + ", deadRacks="
+ deadRacks + ", decommissioningRacks=" + decommissioningRacks + ", unknownRacks=" + unknownRacks + ", oldestDeploy=" + oldestDeploy + ", numDeploys=" + numDeploys + ", generatedAt="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public class SingularityConfiguration extends Configuration {

private String loadBalancerUri;

private boolean deletePausedRequestsFromLoadBalancer = true;

private int logFetchMaxThreads = 15;

private int maxDeployIdSize = 50;
Expand Down Expand Up @@ -795,4 +797,12 @@ public GraphiteConfiguration getGraphiteConfiguration() {
public void setGraphiteConfiguration(GraphiteConfiguration graphiteConfiguration) {
this.graphiteConfiguration = graphiteConfiguration;
}

public boolean isDeletePausedRequestsFromLoadBalancer() {
return deletePausedRequestsFromLoadBalancer;
}

public void setDeletePausedRequestsFromLoadBalancer(boolean deletePausedRequestsFromLoadBalancer) {
this.deletePausedRequestsFromLoadBalancer = deletePausedRequestsFromLoadBalancer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collection;
import java.util.List;

import com.hubspot.singularity.SingularityLoadBalancerUpdate;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
Expand Down Expand Up @@ -40,6 +41,7 @@ public class RequestManager extends CuratorAsyncManager {
private final Transcoder<SingularityPendingRequest> pendingRequestTranscoder;
private final Transcoder<SingularityRequestCleanup> requestCleanupTranscoder;
private final Transcoder<SingularityRequestHistory> requestHistoryTranscoder;
private final Transcoder<SingularityLoadBalancerUpdate> loadBalancerUpdateTranscoder;

private final SingularityEventListener singularityEventListener;

Expand All @@ -49,18 +51,19 @@ public class RequestManager extends CuratorAsyncManager {
private static final String PENDING_PATH_ROOT = REQUEST_ROOT + "/pending";
private static final String CLEANUP_PATH_ROOT = REQUEST_ROOT + "/cleanup";
private static final String HISTORY_PATH_ROOT = REQUEST_ROOT + "/history";
private static final String LB_CLEANUP_PATH_ROOT = REQUEST_ROOT + "/lbCleanup";

@Inject
public RequestManager(CuratorFramework curator, SingularityConfiguration configuration, MetricRegistry metricRegistry, SingularityEventListener singularityEventListener,
Transcoder<SingularityRequestCleanup> requestCleanupTranscoder, Transcoder<SingularityRequestWithState> requestTranscoder,
Transcoder<SingularityRequestCleanup> requestCleanupTranscoder, Transcoder<SingularityRequestWithState> requestTranscoder, Transcoder<SingularityLoadBalancerUpdate> loadBalancerUpdateTranscoder,
Transcoder<SingularityPendingRequest> pendingRequestTranscoder, Transcoder<SingularityRequestHistory> requestHistoryTranscoder) {
super(curator, configuration, metricRegistry);

this.requestTranscoder = requestTranscoder;
this.requestCleanupTranscoder = requestCleanupTranscoder;
this.pendingRequestTranscoder = pendingRequestTranscoder;
this.requestHistoryTranscoder = requestHistoryTranscoder;
this.singularityEventListener = singularityEventListener;
this.loadBalancerUpdateTranscoder = loadBalancerUpdateTranscoder;
}

private String getRequestPath(String requestId) {
Expand Down Expand Up @@ -91,6 +94,10 @@ public int getSizeOfCleanupQueue() {
return getNumChildren(CLEANUP_PATH_ROOT);
}

public int getNumLbCleanupRequests() {
return getNumChildren(LB_CLEANUP_PATH_ROOT);
}

public int getNumRequests() {
return getNumChildren(NORMAL_PATH_ROOT);
}
Expand Down Expand Up @@ -271,4 +278,27 @@ public void deleteRequest(SingularityRequest request, Optional<String> user) {
delete(getRequestPath(request.getId()));
}

public List<String> getLBCleanupRequestIds() {
return getChildren(LB_CLEANUP_PATH_ROOT);
}

private String getLBCleanupPath(String requestId) {
return ZKPaths.makePath(LB_CLEANUP_PATH_ROOT, requestId);
}

public SingularityCreateResult createLBCleanupRequest(String requestId) {
return create(getLBCleanupPath(requestId));
}

public SingularityDeleteResult deleteLBCleanupRequest(String requestId) {
return delete(getLBCleanupPath(requestId));
}

public Optional<SingularityLoadBalancerUpdate> getLoadBalancerState(String requestId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if getLoadBalancer() state is called after createLBCleanupRequest(), but no saveLoadBalancerState()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I interpreted it would return an Optional.absent, which would tell the cleanup method that no request has been sent yet and the cleanup still needs to be started

return getData(getLBCleanupPath(requestId), loadBalancerUpdateTranscoder);
}

public void saveLoadBalancerState(String requestId, SingularityLoadBalancerUpdate lbUpdate) {
save(getLBCleanupPath(requestId), lbUpdate, loadBalancerUpdateTranscoder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public SingularityState generateState(boolean includeRequestIds) {
final int scheduledTasks = taskManager.getNumScheduledTasks();
final int cleaningTasks = taskManager.getNumCleanupTasks();
final int lbCleanupTasks = taskManager.getNumLbCleanupTasks();
final int lbCleanupRequests = requestManager.getNumLbCleanupRequests();

final SingularityScheduledTasksInfo scheduledTasksInfo = SingularityScheduledTasksInfo.getInfo(taskManager.getPendingTasks(), singularityConfiguration.getDeltaAfterWhichTasksAreLateMillis());

Expand Down Expand Up @@ -290,7 +291,7 @@ public SingularityState generateState(boolean includeRequestIds) {

final Optional<Boolean> authDatastoreHealthy = authDatastore.isHealthy();

return new SingularityState(activeTasks, numActiveRequests, cooldownRequests, numPausedRequests, scheduledTasks, pendingRequests, lbCleanupTasks, cleaningRequests, activeSlaves,
return new SingularityState(activeTasks, numActiveRequests, cooldownRequests, numPausedRequests, scheduledTasks, pendingRequests, lbCleanupTasks, lbCleanupRequests, cleaningRequests, activeSlaves,
deadSlaves, decommissioningSlaves, activeRacks, deadRacks, decommissioningRacks, cleaningTasks, states, oldestDeploy, numDeploys, scheduledTasksInfo.getNumLateTasks(),
scheduledTasksInfo.getNumFutureTasks(), scheduledTasksInfo.getMaxTaskLag(), System.currentTimeMillis(), includeRequestIds ? overProvisionedRequestIds : null,
includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public interface LoadBalancerClient {

SingularityLoadBalancerUpdate cancel(LoadBalancerRequestId loadBalancerRequestId);

SingularityLoadBalancerUpdate delete(LoadBalancerRequestId loadBalancerRequestId, SingularityRequest request, SingularityDeploy deploy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.ws.rs.HEAD;

import com.hubspot.baragon.models.RequestAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -116,6 +119,24 @@ public String toString() {

}

private SingularityLoadBalancerUpdate sendBaragonRequest(LoadBalancerRequestId loadBalancerRequestId, BaragonRequest loadBalancerRequest, LoadBalancerMethod method) {
try {
LOG.trace("Preparing to send request {}", loadBalancerRequest);

final BoundRequestBuilder requestBuilder = httpClient.preparePost(loadBalancerUri)
.addHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
.setBody(objectMapper.writeValueAsBytes(loadBalancerRequest));

if (loadBalancerQueryParams.isPresent()) {
addAllQueryParams(requestBuilder, loadBalancerQueryParams.get());
}

return sendRequestWrapper(loadBalancerRequestId, method, requestBuilder.build(), BaragonRequestState.FAILED);
} catch (IOException e) {
return new SingularityLoadBalancerUpdate(BaragonRequestState.UNKNOWN, loadBalancerRequestId, Optional.of(e.getMessage()), System.currentTimeMillis(), method, Optional.of(loadBalancerUri));
}
}

private LoadBalancerUpdateHolder sendRequest(LoadBalancerRequestId loadBalancerRequestId, Request request, BaragonRequestState onFailure) {
try {
LOG.trace("Sending LB {} request for {} to {}", request.getMethod(), loadBalancerRequestId, request.getUrl());
Expand Down Expand Up @@ -156,21 +177,7 @@ public SingularityLoadBalancerUpdate enqueue(LoadBalancerRequestId loadBalancerR

final BaragonRequest loadBalancerRequest = new BaragonRequest(loadBalancerRequestId.toString(), lbService, addUpstreams, removeUpstreams);

try {
LOG.trace("Deploy {} is preparing to send {}", deploy.getId(), loadBalancerRequest);

final BoundRequestBuilder requestBuilder = httpClient.preparePost(loadBalancerUri)
.addHeader(HEADER_CONTENT_TYPE, CONTENT_TYPE_JSON)
.setBody(objectMapper.writeValueAsBytes(loadBalancerRequest));

if (loadBalancerQueryParams.isPresent()) {
addAllQueryParams(requestBuilder, loadBalancerQueryParams.get());
}

return sendRequestWrapper(loadBalancerRequestId, LoadBalancerMethod.ENQUEUE, requestBuilder.build(), BaragonRequestState.FAILED);
} catch (IOException e) {
return new SingularityLoadBalancerUpdate(BaragonRequestState.UNKNOWN, loadBalancerRequestId, Optional.of(e.getMessage()), System.currentTimeMillis(), LoadBalancerMethod.ENQUEUE, Optional.of(loadBalancerUri));
}
return sendBaragonRequest(loadBalancerRequestId, loadBalancerRequest, LoadBalancerMethod.ENQUEUE);
}

private List<UpstreamInfo> tasksToUpstreams(List<SingularityTask> tasks, String requestId) {
Expand Down Expand Up @@ -202,4 +209,15 @@ public SingularityLoadBalancerUpdate cancel(LoadBalancerRequestId loadBalancerRe

return sendRequestWrapper(loadBalancerRequestId, LoadBalancerMethod.CANCEL, requestBuilder.build(), BaragonRequestState.UNKNOWN);
}

@Override
public SingularityLoadBalancerUpdate delete(LoadBalancerRequestId loadBalancerRequestId, SingularityRequest request, SingularityDeploy deploy) {
final List<String> serviceOwners = request.getOwners().or(Collections.<String> emptyList());
final Set<String> loadBalancerGroups = deploy.getLoadBalancerGroups().or(Collections.<String> emptySet());
final BaragonService lbService = new BaragonService(request.getId(), serviceOwners, deploy.getServiceBasePath().get(), loadBalancerGroups, deploy.getLoadBalancerOptions().orNull());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the service not take optionals ? Seems awkward to have to transition between Optionals and empty collections here.


final BaragonRequest loadBalancerRequest = new BaragonRequest(loadBalancerRequestId.toString(), lbService, Collections.<UpstreamInfo>emptyList(), Collections.<UpstreamInfo>emptyList(), Collections.<UpstreamInfo>emptyList(), Optional.<String>absent(), Optional.of(RequestAction.DELETE));

return sendBaragonRequest(loadBalancerRequestId, loadBalancerRequest, LoadBalancerMethod.DELETE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,4 +476,12 @@ public SingularityRequest updateInstances(@ApiParam("The Request ID to scale") @
return newRequest;
}

@GET
@PropertyFiltering
@Path("/lbcleanup")
@ApiOperation("Retrieve the list of tasks being cleaned from load balancers.")
public Iterable<String> getLbCleanupRequests() {
return authorizationHelper.filterAuthorizedRequestIds(user, requestManager.getLBCleanupRequestIds());
}

}
Loading