Skip to content

Commit

Permalink
fix es to startupDelay
Browse files Browse the repository at this point in the history
  • Loading branch information
ssalinas committed Oct 13, 2016
1 parent 660a43f commit 82924a3
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
Expand Up @@ -124,7 +124,7 @@ public class SingularityConfiguration extends Configuration {

private int healthcheckTimeoutSeconds = 5;

private int startupDelaySeconds = 0;
private Optional<Integer> startupDelaySeconds = Optional.absent();

private int startupTimeoutSeconds = 60;

Expand Down Expand Up @@ -975,11 +975,11 @@ public void setZooKeeperConfiguration(ZooKeeperConfiguration zooKeeperConfigurat
this.zooKeeperConfiguration = zooKeeperConfiguration;
}

public int getStartupDelaySeconds() {
public Optional<Integer> getStartupDelaySeconds() {
return startupDelaySeconds;
}

public void setStartupDelaySeconds(int startupDelaySeconds) {
public void setStartupDelaySeconds(Optional<Integer> startupDelaySeconds) {
this.startupDelaySeconds = startupDelaySeconds;
}

Expand Down
Expand Up @@ -147,7 +147,7 @@ private void checkDeploy(final SingularityPendingDeploy pendingDeploy, final Lis
final List<SingularityTaskId> allOtherMatchingTasks = Lists.newArrayList(Iterables.filter(activeTasks, Predicates.not(SingularityTaskId.matchingDeploy(pendingDeployMarker.getDeployId()))));

SingularityDeployResult deployResult =
getDeployResult(request, cancelRequest, pendingDeploy, updatePendingDeployRequest, deployKey, deploy, deployMatchingTasks, allOtherMatchingTasks, inactiveDeployMatchingTasks);
getDeployResult(request, cancelRequest, pendingDeploy, updatePendingDeployRequest, deploy, deployMatchingTasks, allOtherMatchingTasks, inactiveDeployMatchingTasks);

LOG.info("Deploy {} had result {} after {}", pendingDeployMarker, deployResult, JavaUtils.durationFromMillis(System.currentTimeMillis() - pendingDeployMarker.getTimestamp()));

Expand Down Expand Up @@ -431,8 +431,7 @@ private LoadBalancerRequestId getLoadBalancerRequestId(SingularityPendingDeploy
}

private SingularityDeployResult getDeployResult(final SingularityRequest request, final Optional<SingularityDeployMarker> cancelRequest, final SingularityPendingDeploy pendingDeploy,
final Optional<SingularityUpdatePendingDeployRequest> updatePendingDeployRequest,
final SingularityDeployKey deployKey, final Optional<SingularityDeploy> deploy, final Collection<SingularityTaskId> deployActiveTasks, final Collection<SingularityTaskId> otherActiveTasks,
final Optional<SingularityUpdatePendingDeployRequest> updatePendingDeployRequest, final Optional<SingularityDeploy> deploy, final Collection<SingularityTaskId> deployActiveTasks, final Collection<SingularityTaskId> otherActiveTasks,
final Collection<SingularityTaskId> inactiveDeployMatchingTasks) {
if (!request.isDeployable()) {
LOG.info("Succeeding a deploy {} because the request {} was not deployable", pendingDeploy, request);
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void saveResult(Optional<Integer> statusCode, Optional<String> responseBo
return;
}

healthchecker.enqueueHealthcheck(task, true, inStartup);
healthchecker.enqueueHealthcheck(task, true, inStartup, false);
} else {
healthchecker.markHealthcheckFinished(task.getTaskId().getId());

Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.hubspot.singularity.SingularityRequestWithState;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskHealthcheckResult;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
Expand Down Expand Up @@ -69,7 +70,7 @@ public SingularityHealthchecker(@Named(SingularityMainModule.HEALTHCHECK_THREADP
this.executorService = executorService;
}

public void enqueueHealthcheck(SingularityTask task, boolean ignoreExisting, boolean inStartup) {
public void enqueueHealthcheck(SingularityTask task, boolean ignoreExisting, boolean inStartup, boolean isFirstCheck) {
HealthcheckOptions options = task.getTaskRequest().getDeploy().getHealthcheck().get();
final Optional<Integer> healthcheckMaxRetries = options.getMaxRetries().or(configuration.getHealthcheckMaxRetries());

Expand All @@ -78,9 +79,7 @@ public void enqueueHealthcheck(SingularityTask task, boolean ignoreExisting, boo
return;
}

int delaySeconds = inStartup ? options.getStartupIntervalSeconds().or(configuration.getStartupIntervalSeconds()) : options.getIntervalSeconds().or(configuration.getHealthcheckIntervalSeconds());

ScheduledFuture<?> future = enqueueHealthcheckWithDelay(task, delaySeconds, inStartup);
ScheduledFuture<?> future = enqueueHealthcheckWithDelay(task, getDelaySeconds(task.getTaskId(), options, inStartup, isFirstCheck), inStartup);

ScheduledFuture<?> existing = taskIdToHealthcheck.put(task.getTaskId().getId(), future);

Expand All @@ -92,21 +91,37 @@ public void enqueueHealthcheck(SingularityTask task, boolean ignoreExisting, boo
}
}

private int getDelaySeconds(SingularityTaskId taskId, HealthcheckOptions options, boolean inStartup, boolean isFirstCheck) {
if (isFirstCheck && options.getStartupDelaySeconds().or(configuration.getStartupDelaySeconds()).isPresent()) {
int delaySeconds = options.getStartupDelaySeconds().or(configuration.getStartupDelaySeconds()).get();
LOG.trace("Delaying first healthcheck %s seconds for task {}", delaySeconds, taskId);
return delaySeconds;
} else if (inStartup) {
return options.getStartupIntervalSeconds().or(configuration.getStartupIntervalSeconds());
} else {
return options.getIntervalSeconds().or(configuration.getHealthcheckIntervalSeconds());
}
}

@Timed
public boolean enqueueHealthcheck(SingularityTask task, Optional<SingularityPendingDeploy> pendingDeploy, Optional<SingularityRequestWithState> request) {
if (!shouldHealthcheck(task, request, pendingDeploy)) {
return false;
}

enqueueHealthcheck(task, true, true);
Optional<SingularityTaskHealthcheckResult> lastHealthcheck = taskManager.getLastHealthcheck(task.getTaskId());

enqueueHealthcheck(task, true, true, !lastHealthcheck.isPresent());

return true;
}

public void checkHealthcheck(SingularityTask task) {
if (!taskIdToHealthcheck.containsKey(task.getTaskId().getId())) {
LOG.info("Enqueueing expected healthcheck for task {}", task.getTaskId());
enqueueHealthcheck(task, false, true);

Optional<SingularityTaskHealthcheckResult> lastHealthcheck = taskManager.getLastHealthcheck(task.getTaskId());
enqueueHealthcheck(task, false, true, !lastHealthcheck.isPresent());
}
}

Expand Down Expand Up @@ -155,7 +170,7 @@ public void run() {

public void reEnqueueOrAbort(SingularityTask task, boolean inStartup) {
try {
enqueueHealthcheck(task, true, inStartup);
enqueueHealthcheck(task, true, inStartup, false);
} catch (Throwable t) {
LOG.error("Caught throwable while re-enqueuing health check for {}, aborting", task.getTaskId(), t);
exceptionNotifier.notify(String.format("Caught throwable while re-enqueuing health check (%s)", t.getMessage()), t, ImmutableMap.of("taskId", task.getTaskId().toString()));
Expand Down
Expand Up @@ -109,7 +109,7 @@ private int getDelaySeconds(SingularityTask task, Optional<SingularityRequestWit
int delaySeconds = configuration.getNewTaskCheckerBaseDelaySeconds();

if (hasHealthcheck(task, requestWithState)) {
delaySeconds += task.getTaskRequest().getDeploy().getHealthcheck().get().getStartupDelaySeconds().or(configuration.getStartupDelaySeconds());
delaySeconds += task.getTaskRequest().getDeploy().getHealthcheck().get().getStartupDelaySeconds().or(configuration.getStartupDelaySeconds()).or(0);
} else if (task.getTaskRequest().getRequest().isLoadBalanced()) {
return delaySeconds;
}
Expand Down

0 comments on commit 82924a3

Please sign in to comment.