Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
sjeropkipruto committed Jan 16, 2019
1 parent ff20842 commit b312a2f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 39 deletions.
Expand Up @@ -83,10 +83,11 @@ public SingularityCheckingUpstreamsUpdate getLoadBalancerServiceStateForRequest(
final String loadBalancerStateUri = getLoadBalancerStateUri(singularityRequestId); final String loadBalancerStateUri = getLoadBalancerStateUri(singularityRequestId);
final BoundRequestBuilder requestBuilder = httpClient.prepareGet(loadBalancerStateUri); final BoundRequestBuilder requestBuilder = httpClient.prepareGet(loadBalancerStateUri);
final Request request = requestBuilder.build(); final Request request = requestBuilder.build();
LOG.trace("Sending LB {} request for {} to {}", request.getMethod(), singularityRequestId, request.getUrl()); LOG.trace("Sending load balancer {} request for {} to {}", request.getMethod(), singularityRequestId, request.getUrl());
ListenableFuture<Response> future = httpClient.executeRequest(request); ListenableFuture<Response> future = httpClient.executeRequest(request);
Response response = future.get(loadBalancerTimeoutMillis, TimeUnit.MILLISECONDS); Response response = future.get(loadBalancerTimeoutMillis, TimeUnit.MILLISECONDS);
LOG.trace("LB {} request {} returned with code {}", request.getMethod(), singularityRequestId, response.getStatusCode()); LOG.trace("Load balancer {} request {} returned with code {}", request.getMethod(), singularityRequestId, response.getStatusCode());
//TODO: maybe make optional depending on the status code
Optional<BaragonServiceState> maybeBaragonServiceState = Optional.fromNullable(objectMapper.readValue(response.getResponseBodyAsBytes(), BaragonServiceState.class)); Optional<BaragonServiceState> maybeBaragonServiceState = Optional.fromNullable(objectMapper.readValue(response.getResponseBodyAsBytes(), BaragonServiceState.class));
return new SingularityCheckingUpstreamsUpdate(maybeBaragonServiceState, singularityRequestId); return new SingularityCheckingUpstreamsUpdate(maybeBaragonServiceState, singularityRequestId);
} }
Expand Down
@@ -1,14 +1,11 @@
package com.hubspot.singularity.scheduler; package com.hubspot.singularity.scheduler;


import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import javax.inject.Inject; import javax.inject.Inject;
Expand Down Expand Up @@ -86,85 +83,89 @@ private Collection<UpstreamInfo> getUpstreamsFromActiveTasksForRequest(String si
} }


/** /**
* @param upstream * @return a collection of upstreams in the upstreams param that match with the upstream param on upstream and group.
* @param upstreams * We expect that the collection will have a maximum of one match, but we will keep it as a collection just in case.
* @return a collection of upstreams in the upstreams param that match with the upstream param on upstream, group and rackId
* We expect that the collection will have a maximum of one match, but we will keep it as a collection just in case
*/ */
private Collection<UpstreamInfo> getEqualUpstreams(UpstreamInfo upstream, Collection<UpstreamInfo> upstreams) { private Collection<UpstreamInfo> getEqualUpstreams(UpstreamInfo upstream, Collection<UpstreamInfo> upstreams) {
return upstreams.stream().filter(candidate -> UpstreamInfo.upstreamAndGroupMatches(candidate, upstream)).collect(Collectors.toList()); return upstreams.stream().filter(candidate -> UpstreamInfo.upstreamAndGroupMatches(candidate, upstream)).collect(Collectors.toList());
} }


private List<UpstreamInfo> getExtraUpstreams(Collection<UpstreamInfo> upstreamsInBaragonForRequest, Collection<UpstreamInfo> upstreamsInSingularityForRequest) { private List<UpstreamInfo> getExtraUpstreamsInLoadBalancer(Collection<UpstreamInfo> upstreamsInLoadBalancerForService, Collection<UpstreamInfo> upstreamsInSingularityForService) {
for (UpstreamInfo upstreamInSingularity : upstreamsInSingularityForRequest) { for (UpstreamInfo upstreamInSingularity : upstreamsInSingularityForService) {
final Collection<UpstreamInfo> matches = getEqualUpstreams(upstreamInSingularity, upstreamsInBaragonForRequest); final Collection<UpstreamInfo> matches = getEqualUpstreams(upstreamInSingularity, upstreamsInLoadBalancerForService);
upstreamsInBaragonForRequest.removeAll(matches); upstreamsInLoadBalancerForService.removeAll(matches);
} }
return new ArrayList<>(upstreamsInBaragonForRequest); return new ArrayList<>(upstreamsInLoadBalancerForService);
} }


public Collection<UpstreamInfo> getLoadBalancerUpstreamsForLoadBalancerRequest(SingularityCheckingUpstreamsUpdate singularityCheckingUpstreamsUpdate) throws InterruptedException, ExecutionException, TimeoutException, IOException { private Collection<UpstreamInfo> getLoadBalancerUpstreamsForServiceHelper(SingularityCheckingUpstreamsUpdate singularityCheckingUpstreamsUpdate) {
if (singularityCheckingUpstreamsUpdate.getBaragonServiceState().isPresent()){ if (singularityCheckingUpstreamsUpdate.getBaragonServiceState().isPresent()){
LOG.info("Baragon service state for service {} was present", singularityCheckingUpstreamsUpdate.getSingularityRequestId()); LOG.info("Baragon service state for service {} is present.", singularityCheckingUpstreamsUpdate.getSingularityRequestId());
BaragonServiceState baragonServiceState = singularityCheckingUpstreamsUpdate.getBaragonServiceState().get(); final BaragonServiceState baragonServiceState = singularityCheckingUpstreamsUpdate.getBaragonServiceState().get();
return baragonServiceState.getUpstreams(); return baragonServiceState.getUpstreams();
} }
LOG.info("Baragon service state for service {} was absent", singularityCheckingUpstreamsUpdate.getSingularityRequestId()); LOG.info("Baragon service state for service {} is absent.", singularityCheckingUpstreamsUpdate.getSingularityRequestId());
return Collections.emptyList(); return Collections.emptyList();
} }


private Collection<UpstreamInfo> getUpstreamsInLoadBalancer (String singularityRequestId) { private Collection<UpstreamInfo> getLoadBalancerUpstreamsForService(String singularityRequestId) {
LOG.info("Sent request to fetch upstream for service {}.", singularityRequestId); LOG.info("Sending request to get load balancer upstreams for service {}.", singularityRequestId);
try { try {
SingularityCheckingUpstreamsUpdate checkUpstreamsState = lbClient.getLoadBalancerServiceStateForRequest(singularityRequestId); final SingularityCheckingUpstreamsUpdate checkUpstreamsState = lbClient.getLoadBalancerServiceStateForRequest(singularityRequestId);
LOG.info("Getting LB upstreams for singularity request {} is {}.", singularityRequestId, checkUpstreamsState.toString()); LOG.info("Succeeded getting load balancer upstreams for singularity request {}. State is {}.", singularityRequestId, checkUpstreamsState.toString());
return getLoadBalancerUpstreamsForLoadBalancerRequest(checkUpstreamsState); return getLoadBalancerUpstreamsForServiceHelper(checkUpstreamsState);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not get LB upstreams for singularity request {}. ", singularityRequestId, e); LOG.error("Failed getting load balancer upstreams for singularity request {}. ", singularityRequestId, e);
} }
return Collections.emptyList(); return Collections.emptyList();
} }


private SingularityLoadBalancerUpdate syncUpstreamsForService(SingularityRequest singularityRequest, SingularityDeploy deploy, Optional<String> loadBalancerUpstreamGroup){ private Optional<SingularityLoadBalancerUpdate> syncUpstreamsForServiceHelper(SingularityRequest singularityRequest, SingularityDeploy deploy, Optional<String> loadBalancerUpstreamGroup) {
LOG.info("Sending load balancer request to sync upstreams for service {}.", singularityRequest.getId());
final LoadBalancerRequestId loadBalancerRequestId = new LoadBalancerRequestId(String.format("%s-%s-%s", singularityRequest.getId(), deploy.getId(), System.currentTimeMillis()), LoadBalancerRequestType.REMOVE, Optional.absent()); final LoadBalancerRequestId loadBalancerRequestId = new LoadBalancerRequestId(String.format("%s-%s-%s", singularityRequest.getId(), deploy.getId(), System.currentTimeMillis()), LoadBalancerRequestType.REMOVE, Optional.absent());
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
try { try {
Collection<UpstreamInfo> upstreamsInLoadBalancerForRequest = getUpstreamsInLoadBalancer(singularityRequest.getId()); Collection<UpstreamInfo> upstreamsInLoadBalancerForRequest = getLoadBalancerUpstreamsForService(singularityRequest.getId());
LOG.info("Upstreams in load balancer for service {} are {}.", singularityRequest.getId(), upstreamsInLoadBalancerForRequest); LOG.info("Upstreams in load balancer for service {} are {}.", singularityRequest.getId(), upstreamsInLoadBalancerForRequest);
Collection<UpstreamInfo> upstreamsInSingularityForRequest = getUpstreamsFromActiveTasksForRequest(singularityRequest.getId(), loadBalancerUpstreamGroup); Collection<UpstreamInfo> upstreamsInSingularityForRequest = getUpstreamsFromActiveTasksForRequest(singularityRequest.getId(), loadBalancerUpstreamGroup);
LOG.info("Upstreams in singularity for service {} are {}.", singularityRequest.getId(), upstreamsInSingularityForRequest); LOG.info("Upstreams in singularity for service {} are {}.", singularityRequest.getId(), upstreamsInSingularityForRequest);
final List<UpstreamInfo> extraUpstreams = getExtraUpstreams(upstreamsInLoadBalancerForRequest, upstreamsInSingularityForRequest); final List<UpstreamInfo> extraUpstreams = getExtraUpstreamsInLoadBalancer(upstreamsInLoadBalancerForRequest, upstreamsInSingularityForRequest);
LOG.info("Syncing upstreams for service {}. Making and sending load balancer request {} to remove {} extra upstreams. The upstreams removed are: {}.", singularityRequest.getId(), loadBalancerRequestId, extraUpstreams.size(), extraUpstreams); if (extraUpstreams.size() > 0){
return lbClient.makeAndSendLoadBalancerRequest(loadBalancerRequestId, Collections.emptyList(), extraUpstreams, deploy, singularityRequest); LOG.info("Syncing upstreams for service {}. Making and sending load balancer request {} to remove {} extra upstreams. The upstreams removed are: {}.", singularityRequest.getId(), loadBalancerRequestId, extraUpstreams.size(), extraUpstreams);
return Optional.of(lbClient.makeAndSendLoadBalancerRequest(loadBalancerRequestId, Collections.emptyList(), extraUpstreams, deploy, singularityRequest));
}
LOG.info("No extra upstreams for service {}. No load balancer request sent.", singularityRequest.getId());
return Optional.absent();
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not sync for service {}. Load balancer request {} threw error: ", singularityRequest.getId(), loadBalancerRequestId, e); LOG.error("Could not sync for service {}. Load balancer request {} threw error: ", singularityRequest.getId(), loadBalancerRequestId, e);
return new SingularityLoadBalancerUpdate(BaragonRequestState.UNKNOWN, loadBalancerRequestId, Optional.of(String.format("Exception %s - %s", e.getClass().getSimpleName(), e.getMessage())), start, LoadBalancerMethod.CHECK_STATE, Optional.absent()); return Optional.of(new SingularityLoadBalancerUpdate(BaragonRequestState.UNKNOWN, loadBalancerRequestId, Optional.of(String.format("Exception %s - %s", e.getClass().getSimpleName(), e.getMessage())), start, LoadBalancerMethod.CHECK_STATE, Optional.absent()));
} }
} }


private boolean noPendingDeploy() { private boolean noPendingDeploy() {
return deployManager.getPendingDeploys().size() == 0; return deployManager.getPendingDeploys().size() == 0;
} }


private void doSyncUpstreamsForService(SingularityRequest singularityRequest) { private void syncUpstreamsForService(SingularityRequest singularityRequest) {
if (singularityRequest.isLoadBalanced() && noPendingDeploy()) { if (singularityRequest.isLoadBalanced() && noPendingDeploy()) {
final String singularityRequestId = singularityRequest.getId(); final String singularityRequestId = singularityRequest.getId();
LOG.info("Doing syncing of upstreams for service: {}.", singularityRequestId); LOG.info("Starting syncing of upstreams for service: {}.", singularityRequestId);
final Optional<String> maybeDeployId = deployManager.getInUseDeployId(singularityRequestId); final Optional<String> maybeDeployId = deployManager.getInUseDeployId(singularityRequestId);
if (maybeDeployId.isPresent()) { if (maybeDeployId.isPresent()) {
final String deployId = maybeDeployId.get(); final String deployId = maybeDeployId.get();
final Optional<SingularityDeploy> maybeDeploy = deployManager.getDeploy(singularityRequestId, deployId); final Optional<SingularityDeploy> maybeDeploy = deployManager.getDeploy(singularityRequestId, deployId);
if (maybeDeploy.isPresent()) { if (maybeDeploy.isPresent()) {
final SingularityDeploy deploy = maybeDeploy.get(); final Optional<SingularityLoadBalancerUpdate> maybeSyncUpstreamsUpdate = syncUpstreamsForServiceHelper(singularityRequest, maybeDeploy.get(), maybeDeploy.get().getLoadBalancerUpstreamGroup());
final Optional<String> loadBalancerUpstreamGroup = deploy.getLoadBalancerUpstreamGroup(); if (maybeSyncUpstreamsUpdate.isPresent()) {
final SingularityLoadBalancerUpdate syncUpstreamsUpdate = syncUpstreamsForService(singularityRequest, deploy, loadBalancerUpstreamGroup); checkSyncUpstreamsState(maybeSyncUpstreamsUpdate.get().getLoadBalancerRequestId(), singularityRequestId);
checkSyncUpstreamsState(syncUpstreamsUpdate.getLoadBalancerRequestId(), singularityRequestId); }
} }
} }
} }
} }


private void checkSyncUpstreamsState(LoadBalancerRequestId loadBalancerRequestId, String singularityRequestId) { private void checkSyncUpstreamsState(LoadBalancerRequestId loadBalancerRequestId, String singularityRequestId) {
LOG.info("Checking load balancer request to sync upstreams for service {} using a retryer until the request state is no longer waiting.", singularityRequestId);
Retryer<SingularityLoadBalancerUpdate> syncingRetryer = RetryerBuilder.<SingularityLoadBalancerUpdate>newBuilder() Retryer<SingularityLoadBalancerUpdate> syncingRetryer = RetryerBuilder.<SingularityLoadBalancerUpdate>newBuilder()
.retryIfException() .retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
Expand All @@ -185,7 +186,7 @@ private void checkSyncUpstreamsState(LoadBalancerRequestId loadBalancerRequestId
public void syncUpstreams() { public void syncUpstreams() {
for (SingularityRequestWithState singularityRequestWithState: requestManager.getActiveRequests()){ for (SingularityRequestWithState singularityRequestWithState: requestManager.getActiveRequests()){
final SingularityRequest singularityRequest = singularityRequestWithState.getRequest(); final SingularityRequest singularityRequest = singularityRequestWithState.getRequest();
lock.runWithRequestLock(() -> doSyncUpstreamsForService(singularityRequest), singularityRequest.getId(), getClass().getSimpleName()); lock.runWithRequestLock(() -> syncUpstreamsForService(singularityRequest), singularityRequest.getId(), getClass().getSimpleName());
} }
} }
} }
Expand Up @@ -28,10 +28,10 @@ public class SingularityUpstreamPoller extends SingularityLeaderOnlyPoller {
@Override @Override
public void runActionOnPoll() { public void runActionOnPoll() {
if (!disasterManager.isDisabled(SingularityAction.RUN_UPSTREAM_POLLER)) { if (!disasterManager.isDisabled(SingularityAction.RUN_UPSTREAM_POLLER)) {
LOG.info("Checking upstreams"); LOG.info("Running SingularityUpstreamPoller");
upstreamChecker.syncUpstreams(); upstreamChecker.syncUpstreams();
} else { } else {
LOG.warn("Upstream poller is currently disabled"); LOG.warn("SingularityUpstreamPoller is currently disabled");
} }
} }
} }

0 comments on commit b312a2f

Please sign in to comment.