Skip to content

Commit

Permalink
adding SingularityCheckingUpstreamsUpdate object to handle request an…
Browse files Browse the repository at this point in the history
…d service state
  • Loading branch information
sjeropkipruto committed Jan 15, 2019
1 parent c42ed3b commit 53c762a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 33 deletions.
@@ -1,7 +1,6 @@
package com.hubspot.singularity.hooks; package com.hubspot.singularity.hooks;


import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
Expand All @@ -14,6 +13,7 @@
import com.hubspot.singularity.SingularityLoadBalancerUpdate; import com.hubspot.singularity.SingularityLoadBalancerUpdate;
import com.hubspot.singularity.SingularityRequest; import com.hubspot.singularity.SingularityRequest;
import com.hubspot.singularity.SingularityTask; import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.hooks.LoadBalancerClientImpl.SingularityCheckingUpstreamsUpdate;


public interface LoadBalancerClient { public interface LoadBalancerClient {


Expand All @@ -25,7 +25,7 @@ public interface LoadBalancerClient {


SingularityLoadBalancerUpdate delete(LoadBalancerRequestId loadBalancerRequestId, String requestId, Set<String> loadBalancerGroups, String serviceBasePath); SingularityLoadBalancerUpdate delete(LoadBalancerRequestId loadBalancerRequestId, String requestId, Set<String> loadBalancerGroups, String serviceBasePath);


Collection<UpstreamInfo> getLoadBalancerUpstreamsForLoadBalancerRequest(LoadBalancerRequestId requestId) throws InterruptedException, ExecutionException, TimeoutException, IOException; SingularityCheckingUpstreamsUpdate getLoadBalancerServiceStateForLoadBalancerRequest(LoadBalancerRequestId loadBalancerRequestId) throws IOException, InterruptedException, ExecutionException, TimeoutException;


List<UpstreamInfo> getUpstreamsForTasks(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup); List<UpstreamInfo> getUpstreamsForTasks(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup);


Expand Down
Expand Up @@ -80,28 +80,52 @@ private String getLoadBalancerStateUri(LoadBalancerRequestId loadBalancerRequest
return String.format(OPERATION_URI, getStateUriFromRequestUri(), loadBalancerRequestId); return String.format(OPERATION_URI, getStateUriFromRequestUri(), loadBalancerRequestId);
} }


private Optional<BaragonServiceState> getLoadBalancerServiceStateForLoadBalancerRequest(LoadBalancerRequestId loadBalancerRequestId) throws IOException, InterruptedException, ExecutionException, TimeoutException { public class SingularityCheckingUpstreamsUpdate {
private final BaragonRequestState baragonRequestState;
private final Optional<BaragonServiceState> baragonServiceState;
private final LoadBalancerRequestId loadBalancerRequestId;

public SingularityCheckingUpstreamsUpdate (BaragonRequestState baragonRequestState,Optional<BaragonServiceState> baragonServiceState, LoadBalancerRequestId loadBalancerRequestId) {
this.baragonRequestState = baragonRequestState;
this.baragonServiceState = baragonServiceState;
this.loadBalancerRequestId = loadBalancerRequestId;
}

public BaragonRequestState getBaragonRequestState() {
return baragonRequestState;
}

public Optional<BaragonServiceState> getBaragonServiceState() {
return baragonServiceState;
}

public LoadBalancerRequestId getLoadBalancerRequestId() {
return loadBalancerRequestId;
}

@Override
public String toString() {
return "SingularityCheckingUpstreamsUpdate{" +
"baragonRequestState=" + baragonRequestState +
", baragonServiceState=" + baragonServiceState +
", loadBalancerRequestId=" + loadBalancerRequestId +
'}';
}
}

public SingularityCheckingUpstreamsUpdate getLoadBalancerServiceStateForLoadBalancerRequest(LoadBalancerRequestId loadBalancerRequestId) throws IOException, InterruptedException, ExecutionException, TimeoutException {
final String loadBalancerStateUri = getLoadBalancerStateUri(loadBalancerRequestId); final String loadBalancerStateUri = getLoadBalancerStateUri(loadBalancerRequestId);
final BoundRequestBuilder requestBuilder = httpClient.prepareGet(loadBalancerStateUri); final BoundRequestBuilder requestBuilder = httpClient.prepareGet(loadBalancerStateUri);
if (loadBalancerQueryParams.isPresent()) {
addAllQueryParams(requestBuilder, loadBalancerQueryParams.get());
}
final Request request = requestBuilder.build(); final Request request = requestBuilder.build();
LOG.trace("Sending LB {} request for {} to {}", request.getMethod(), loadBalancerRequestId, request.getUrl()); LOG.trace("Sending LB {} request for {} to {}", request.getMethod(), loadBalancerRequestId, request.getUrl());
ListenableFuture<Response> future = httpClient.executeRequest(request); ListenableFuture<Response> future = httpClient.executeRequest(request);
Response response = future.get(loadBalancerTimeoutMillis, TimeUnit.MILLISECONDS); Response response = future.get(loadBalancerTimeoutMillis, TimeUnit.MILLISECONDS);
LOG.trace("LB {} request {} returned with code {}", request.getMethod(), loadBalancerRequestId, response.getStatusCode()); LOG.trace("LB {} request {} returned with code {}", request.getMethod(), loadBalancerRequestId, response.getStatusCode());
return objectMapper.readValue(response.getResponseBodyAsBytes(), new TypeReference<BaragonServiceState>() {}); BaragonResponse lbResponse = readResponse(response);
Optional<BaragonServiceState> maybeBaragonServiceState = objectMapper.readValue(response.getResponseBodyAsBytes(), new TypeReference<BaragonServiceState>() {});
return new SingularityCheckingUpstreamsUpdate(lbResponse.getLoadBalancerState(), maybeBaragonServiceState, loadBalancerRequestId);
} }


public Collection<UpstreamInfo> getLoadBalancerUpstreamsForLoadBalancerRequest(LoadBalancerRequestId loadBalancerRequestId) throws InterruptedException, ExecutionException, TimeoutException, IOException {
Optional<BaragonServiceState> maybeBaragonServiceState = getLoadBalancerServiceStateForLoadBalancerRequest(loadBalancerRequestId);
if (maybeBaragonServiceState.isPresent()){
BaragonServiceState baragonServiceState = maybeBaragonServiceState.get();
return baragonServiceState.getUpstreams();
}
return Collections.emptyList();
}


private String getLoadBalancerUri(LoadBalancerRequestId loadBalancerRequestId) { private String getLoadBalancerUri(LoadBalancerRequestId loadBalancerRequestId) {
return String.format(OPERATION_URI, loadBalancerUri, loadBalancerRequestId); return String.format(OPERATION_URI, loadBalancerUri, loadBalancerRequestId);
Expand Down
@@ -1,11 +1,14 @@
package com.hubspot.singularity.scheduler; package com.hubspot.singularity.scheduler;


import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import javax.inject.Inject; import javax.inject.Inject;
Expand All @@ -17,6 +20,7 @@
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.hubspot.baragon.models.BaragonRequestState; import com.hubspot.baragon.models.BaragonRequestState;
import com.hubspot.baragon.models.BaragonServiceState;
import com.hubspot.baragon.models.UpstreamInfo; import com.hubspot.baragon.models.UpstreamInfo;
import com.hubspot.singularity.LoadBalancerRequestType; import com.hubspot.singularity.LoadBalancerRequestType;
import com.hubspot.singularity.LoadBalancerRequestType.LoadBalancerRequestId; import com.hubspot.singularity.LoadBalancerRequestType.LoadBalancerRequestId;
Expand All @@ -33,6 +37,7 @@
import com.hubspot.singularity.data.TaskManager; import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.helpers.RequestHelper; import com.hubspot.singularity.helpers.RequestHelper;
import com.hubspot.singularity.hooks.LoadBalancerClient; import com.hubspot.singularity.hooks.LoadBalancerClient;
import com.hubspot.singularity.hooks.LoadBalancerClientImpl.SingularityCheckingUpstreamsUpdate;
import com.hubspot.singularity.mesos.SingularitySchedulerLock; import com.hubspot.singularity.mesos.SingularitySchedulerLock;
import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies; import com.github.rholder.retry.WaitStrategies;
Expand All @@ -42,6 +47,7 @@ public class SingularityUpstreamChecker {


private static final Logger LOG = LoggerFactory.getLogger(SingularityUpstreamChecker.class); private static final Logger LOG = LoggerFactory.getLogger(SingularityUpstreamChecker.class);
private static final Predicate<SingularityLoadBalancerUpdate> IS_WAITING_STATE = singularityLoadBalancerUpdate -> singularityLoadBalancerUpdate.getLoadBalancerState() == BaragonRequestState.WAITING; private static final Predicate<SingularityLoadBalancerUpdate> IS_WAITING_STATE = singularityLoadBalancerUpdate -> singularityLoadBalancerUpdate.getLoadBalancerState() == BaragonRequestState.WAITING;
private static final Predicate<SingularityCheckingUpstreamsUpdate> CHECKING_IS_WAITING_STATE = singularityCheckingUpstreamsUpdate -> singularityCheckingUpstreamsUpdate.getBaragonRequestState() == BaragonRequestState.WAITING;


private final LoadBalancerClient lbClient; private final LoadBalancerClient lbClient;
private final TaskManager taskManager; private final TaskManager taskManager;
Expand Down Expand Up @@ -104,24 +110,30 @@ private List<UpstreamInfo> getExtraUpstreams(Collection<UpstreamInfo> upstreamsI
return new ArrayList<>(upstreamsInBaragonForRequest); return new ArrayList<>(upstreamsInBaragonForRequest);
} }


public Collection<UpstreamInfo> getLoadBalancerUpstreamsForLoadBalancerRequest(SingularityCheckingUpstreamsUpdate singularityCheckingUpstreamsUpdate) throws InterruptedException, ExecutionException, TimeoutException, IOException {
if (singularityCheckingUpstreamsUpdate.getBaragonServiceState().isPresent()){
LOG.info("Baragon service state for LB request {} was present", singularityCheckingUpstreamsUpdate.getLoadBalancerRequestId());
BaragonServiceState baragonServiceState = singularityCheckingUpstreamsUpdate.getBaragonServiceState().get();
return baragonServiceState.getUpstreams();
}
LOG.info("Baragon service state for LB request {} was absent", singularityCheckingUpstreamsUpdate.getLoadBalancerRequestId());
return Collections.emptyList();
}

private Collection<UpstreamInfo> getUpstreamsInLoadBalancer (SingularityRequest singularityRequest, SingularityDeploy deploy) { private Collection<UpstreamInfo> getUpstreamsInLoadBalancer (SingularityRequest singularityRequest, SingularityDeploy deploy) {
final LoadBalancerRequestId checkUpstreamsId = new LoadBalancerRequestId(String.format("%s-%s-%s", singularityRequest.getId(), deploy.getId(), System.currentTimeMillis()), LoadBalancerRequestType.REMOVE, Optional.absent()); final LoadBalancerRequestId checkUpstreamsId = new LoadBalancerRequestId(String.format("%s-%s-%s", singularityRequest.getId(), deploy.getId(), System.currentTimeMillis()), LoadBalancerRequestType.REMOVE, Optional.absent());
SingularityLoadBalancerUpdate checkUpstreamsState = lbClient.enqueue(checkUpstreamsId, singularityRequest, deploy, Collections.emptyList(), Collections.emptyList()); SingularityLoadBalancerUpdate singularityLoadBalancerUpdate = lbClient.enqueue(checkUpstreamsId, singularityRequest, deploy, Collections.emptyList(), Collections.emptyList());
LOG.info("Sent LB request {} to fetch upstreams", singularityLoadBalancerUpdate.getLoadBalancerRequestId());
try { try {
if (checkUpstreamsState.getLoadBalancerState() == BaragonRequestState.WAITING) { Retryer<SingularityCheckingUpstreamsUpdate> getLoadBalancerUpstreamsRetryer = RetryerBuilder.<SingularityCheckingUpstreamsUpdate>newBuilder()
Retryer<SingularityLoadBalancerUpdate> getLoadBalancerUpstreamsRetryer = RetryerBuilder.<SingularityLoadBalancerUpdate>newBuilder() .retryIfException()
.retryIfException() .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) .retryIfResult(CHECKING_IS_WAITING_STATE)
.retryIfResult(IS_WAITING_STATE) .build();
.build(); SingularityCheckingUpstreamsUpdate checkUpstreamsState = getLoadBalancerUpstreamsRetryer.call(() -> lbClient.getLoadBalancerServiceStateForLoadBalancerRequest(checkUpstreamsId));
checkUpstreamsState = getLoadBalancerUpstreamsRetryer.call(() -> lbClient.getState(checkUpstreamsId)); LOG.info("Getting LB upstreams for singularity request {} through LB request {} is: {}.", singularityRequest.getId(), checkUpstreamsId, checkUpstreamsState.toString());
} if (checkUpstreamsState.getBaragonRequestState() == BaragonRequestState.SUCCESS){

return getLoadBalancerUpstreamsForLoadBalancerRequest(checkUpstreamsState);
if (checkUpstreamsState.getLoadBalancerState() == BaragonRequestState.SUCCESS){
LOG.info("Getting LB upstreams for singularity request {} through LB request {} is {}.", singularityRequest.getId(), checkUpstreamsId, checkUpstreamsState.toString());
return lbClient.getLoadBalancerUpstreamsForLoadBalancerRequest(checkUpstreamsId);
} else {
LOG.error("Getting LB upstreams for singularity request {} throught LB request {} is {}.", singularityRequest.getId(), checkUpstreamsId, checkUpstreamsState.toString());
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not get LB upstreams for singularity request {} through LB request {}. ", singularityRequest.getId(), checkUpstreamsId, e); LOG.error("Could not get LB upstreams for singularity request {} through LB request {}. ", singularityRequest.getId(), checkUpstreamsId, e);
Expand Down
@@ -1,9 +1,11 @@
package com.hubspot.singularity.scheduler; package com.hubspot.singularity.scheduler;


import java.util.Collection; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;


import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.hubspot.baragon.models.BaragonRequestState; import com.hubspot.baragon.models.BaragonRequestState;
Expand All @@ -15,6 +17,7 @@
import com.hubspot.singularity.SingularityRequest; import com.hubspot.singularity.SingularityRequest;
import com.hubspot.singularity.SingularityTask; import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.hooks.LoadBalancerClient; import com.hubspot.singularity.hooks.LoadBalancerClient;
import com.hubspot.singularity.hooks.LoadBalancerClientImpl.SingularityCheckingUpstreamsUpdate;


public class TestingLoadBalancerClient implements LoadBalancerClient { public class TestingLoadBalancerClient implements LoadBalancerClient {


Expand Down Expand Up @@ -53,10 +56,11 @@ public SingularityLoadBalancerUpdate delete(LoadBalancerRequestId loadBalancerRe
} }


@Override @Override
public Collection<UpstreamInfo> getLoadBalancerUpstreamsForLoadBalancerRequest(LoadBalancerRequestId requestId) { public SingularityCheckingUpstreamsUpdate getLoadBalancerServiceStateForLoadBalancerRequest(LoadBalancerRequestId loadBalancerRequestId) throws IOException, InterruptedException, ExecutionException, TimeoutException {
return Collections.emptyList(); return null; //TODO: handle this
} }



@Override @Override
public List<UpstreamInfo> getUpstreamsForTasks(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup) { public List<UpstreamInfo> getUpstreamsForTasks(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup) {
return Collections.emptyList(); return Collections.emptyList();
Expand Down

0 comments on commit 53c762a

Please sign in to comment.