diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityState.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityState.java index 9fd22c5450..577a797757 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityState.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityState.java @@ -53,6 +53,8 @@ public class SingularityState { private final Optional minimumPriorityLevel; + private final long avgStatusUpdateDelayMs; + @JsonCreator public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonProperty("launchingTasks") int launchingTasks, @JsonProperty("activeRequests") int activeRequests, @JsonProperty("cooldownRequests") int cooldownRequests, @JsonProperty("pausedRequests") int pausedRequests, @JsonProperty("scheduledTasks") int scheduledTasks, @JsonProperty("pendingRequests") int pendingRequests, @JsonProperty("lbCleanupTasks") int lbCleanupTasks, @@ -62,7 +64,8 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope @JsonProperty("lateTasks") int lateTasks, @JsonProperty("futureTasks") int futureTasks, @JsonProperty("maxTaskLag") long maxTaskLag, @JsonProperty("generatedAt") long generatedAt, @JsonProperty("overProvisionedRequestIds") List overProvisionedRequestIds, @JsonProperty("underProvisionedRequestIds") List underProvisionedRequestIds, @JsonProperty("overProvisionedRequests") int overProvisionedRequests, @JsonProperty("underProvisionedRequests") int underProvisionedRequests, @JsonProperty("finishedRequests") int finishedRequests, - @JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional authDatastoreHealthy, @JsonProperty("minimumPriorityLevel") Optional minimumPriorityLevel) { + @JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional authDatastoreHealthy, @JsonProperty("minimumPriorityLevel") Optional minimumPriorityLevel, + @JsonProperty("avgStatusUpdateDelayMs") long avgStatusUpdateDelayMs) { this.activeTasks = activeTasks; this.launchingTasks = launchingTasks; this.activeRequests = activeRequests; @@ -96,6 +99,7 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope this.underProvisionedRequestIds = underProvisionedRequestIds; this.authDatastoreHealthy = authDatastoreHealthy; this.minimumPriorityLevel = minimumPriorityLevel; + this.avgStatusUpdateDelayMs = avgStatusUpdateDelayMs; } public int getFinishedRequests() { @@ -244,6 +248,10 @@ public Optional getMinimumPriorityLevel() { return minimumPriorityLevel; } + public long getAvgStatusUpdateDelayMs() { + return avgStatusUpdateDelayMs; + } + @Override public String toString() { return "SingularityState{" + @@ -280,6 +288,7 @@ public String toString() { ", underProvisionedRequests=" + underProvisionedRequests + ", authDatastoreHealthy=" + authDatastoreHealthy + ", minimumPriorityLevel=" + minimumPriorityLevel + + ", avgStatusUpdateDelayMs=" + avgStatusUpdateDelayMs + '}'; } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/SingularityMainModule.java b/SingularityService/src/main/java/com/hubspot/singularity/SingularityMainModule.java index c2bea1c500..a71c31c6ed 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/SingularityMainModule.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/SingularityMainModule.java @@ -8,7 +8,9 @@ import java.net.UnknownHostException; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; import javax.inject.Provider; @@ -23,7 +25,6 @@ import com.amazonaws.services.s3.AmazonS3Client; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -106,7 +107,8 @@ public class SingularityMainModule implements Module { public static final String LOST_TASKS_METER = "singularity.lost.tasks.meter"; - public static final String STATUS_UPDATE_DELTA_TIMER = "singularity.status.update.delta.timer"; + public static final String STATUS_UPDATE_DELTA_30S_AVERAGE = "singularity.status.update.delta.minute.average"; + public static final String STATUS_UPDATE_DELTAS = "singularity.status.update.deltas"; private final SingularityConfiguration configuration; @@ -365,8 +367,15 @@ public Meter providesLostTasksMeter(MetricRegistry registry) { @Provides @Singleton - @Named(STATUS_UPDATE_DELTA_TIMER) - public Timer providesStatusUpdateDeltaMeter(MetricRegistry registry) { - return registry.timer("com.hubspot.singularity.statusUpdateDelta"); + @Named(STATUS_UPDATE_DELTA_30S_AVERAGE) + public AtomicLong provideDeltasMap() { + return new AtomicLong(0); + } + + @Provides + @Singleton + @Named(STATUS_UPDATE_DELTAS) + public ConcurrentHashMap provideUpdateDeltasMap() { + return new ConcurrentHashMap<>(); } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java b/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java index 24c75463a0..125e693445 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java @@ -328,6 +328,10 @@ public class SingularityConfiguration extends Configuration { private int maxDecommissioningSlaves = 2; + private long delayPollersWhenDeltaOverMs = 15000; + + private boolean delayOfferProcessingForLargeStatusUpdateDelta = true; + public long getAskDriverToKillTasksAgainAfterMillis() { return askDriverToKillTasksAgainAfterMillis; } @@ -1374,4 +1378,20 @@ public int getMaxDecommissioningSlaves() { public void setMaxDecommissioningSlaves(int maxDecommissioningSlaves) { this.maxDecommissioningSlaves = maxDecommissioningSlaves; } + + public long getDelayPollersWhenDeltaOverMs() { + return delayPollersWhenDeltaOverMs; + } + + public void setDelayPollersWhenDeltaOverMs(long delayPollersWhenDeltaOverMs) { + this.delayPollersWhenDeltaOverMs = delayPollersWhenDeltaOverMs; + } + + public boolean isDelayOfferProcessingForLargeStatusUpdateDelta() { + return delayOfferProcessingForLargeStatusUpdateDelta; + } + + public void setDelayOfferProcessingForLargeStatusUpdateDelta(boolean delayOfferProcessingForLargeStatusUpdateDelta) { + this.delayOfferProcessingForLargeStatusUpdateDelta = delayOfferProcessingForLargeStatusUpdateDelta; + } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/StateManager.java b/SingularityService/src/main/java/com/hubspot/singularity/data/StateManager.java index 5cfc272bb7..ab8e97bc46 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/StateManager.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/StateManager.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import javax.inject.Singleton; @@ -19,10 +20,12 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.inject.Inject; +import com.google.inject.name.Named; import com.hubspot.mesos.CounterMap; import com.hubspot.mesos.JavaUtils; import com.hubspot.singularity.SingularityCreateResult; import com.hubspot.singularity.SingularityHostState; +import com.hubspot.singularity.SingularityMainModule; import com.hubspot.singularity.SingularityPendingDeploy; import com.hubspot.singularity.SingularityPendingTaskId; import com.hubspot.singularity.SingularityPriorityFreezeParent; @@ -58,11 +61,13 @@ public class StateManager extends CuratorManager { private final SingularityAuthDatastore authDatastore; private final Transcoder taskReconciliationStatisticsTranscoder; private final PriorityManager priorityManager; + private final AtomicLong statusUpdateDeltaAvg; @Inject public StateManager(CuratorFramework curatorFramework, SingularityConfiguration configuration, MetricRegistry metricRegistry, RequestManager requestManager, TaskManager taskManager, DeployManager deployManager, SlaveManager slaveManager, RackManager rackManager, Transcoder stateTranscoder, Transcoder hostStateTranscoder, - SingularityConfiguration singularityConfiguration, SingularityAuthDatastore authDatastore, PriorityManager priorityManager, Transcoder taskReconciliationStatisticsTranscoder) { + SingularityConfiguration singularityConfiguration, SingularityAuthDatastore authDatastore, PriorityManager priorityManager, Transcoder taskReconciliationStatisticsTranscoder, + @Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) { super(curatorFramework, configuration, metricRegistry); this.requestManager = requestManager; @@ -76,6 +81,7 @@ public StateManager(CuratorFramework curatorFramework, SingularityConfiguration this.authDatastore = authDatastore; this.priorityManager = priorityManager; this.taskReconciliationStatisticsTranscoder = taskReconciliationStatisticsTranscoder; + this.statusUpdateDeltaAvg = statusUpdateDeltaAvg; } public SingularityCreateResult saveTaskReconciliationStatistics(SingularityTaskReconciliationStatistics taskReconciliationStatistics) { @@ -320,6 +326,7 @@ public SingularityState generateState(boolean includeRequestIds) { return new SingularityState(activeTasks, launchingTasks, numActiveRequests, cooldownRequests, numPausedRequests, scheduledTasks, pendingRequests, lbCleanupTasks, lbCleanupRequests, cleaningRequests, activeSlaves, deadSlaves, decommissioningSlaves, activeRacks, deadRacks, decommissioningRacks, cleaningTasks, states, oldestDeploy, numDeploys, scheduledTasksInfo.getNumLateTasks(), scheduledTasksInfo.getNumFutureTasks(), scheduledTasksInfo.getMaxTaskLag(), System.currentTimeMillis(), includeRequestIds ? overProvisionedRequestIds : null, - includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy, minimumPriorityLevel); + includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy, minimumPriorityLevel, + statusUpdateDeltaAvg.get()); } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java index a1630033d6..59235c0416 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosOfferScheduler.java @@ -5,7 +5,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import javax.inject.Singleton; @@ -89,7 +88,7 @@ private Map getDueTaskRequestHolders() { return taskRequestHolders; } - public List checkOffers(final Collection offers, final Set acceptedOffers) { + public List checkOffers(final Collection offers) { boolean useTaskCredits = disasterManager.isTaskCreditEnabled(); int taskCredits = useTaskCredits ? disasterManager.getUpdatedCreditCount() : -1; final SingularitySchedulerStateCache stateCache = stateCacheProvider.get(); @@ -101,6 +100,11 @@ public List checkOffers(final Collection o final int numDueTasks = pendingTaskIdToTaskRequest.size(); + if (offers.isEmpty()) { + LOG.debug("No offers to check"); + return Collections.emptyList(); + } + final List offerHolders = Lists.newArrayListWithCapacity(offers.size()); final Map> tasksPerOfferPerRequest = new HashMap<>(); for (Protos.Offer offer : offers) { diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosScheduler.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosScheduler.java index 5018bf6131..32824bdc4c 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosScheduler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosScheduler.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import javax.inject.Singleton; @@ -16,9 +17,11 @@ import com.codahale.metrics.annotation.Timed; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.google.inject.name.Named; import com.hubspot.mesos.JavaUtils; import com.hubspot.mesos.MesosUtils; import com.hubspot.singularity.SingularityAction; +import com.hubspot.singularity.SingularityMainModule; import com.hubspot.singularity.config.SingularityConfiguration; import com.hubspot.singularity.data.DisasterManager; import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager.CheckResult; @@ -36,10 +39,14 @@ public class SingularityMesosScheduler implements Scheduler { private final SingularityMesosOfferScheduler offerScheduler; private final SingularityMesosStatusUpdateHandler statusUpdateHandler; private final boolean offerCacheEnabled; + private final boolean delayWhenStatusUpdateDeltaTooLarge; + private final long delayWhenDeltaOverMs; + private final AtomicLong statusUpdateDeltaAvg; @Inject public SingularityMesosScheduler(SingularityMesosFrameworkMessageHandler messageHandler, SingularitySlaveAndRackManager slaveAndRackManager, SchedulerDriverSupplier schedulerDriverSupplier, - OfferCache offerCache, SingularityMesosOfferScheduler offerScheduler, SingularityMesosStatusUpdateHandler statusUpdateHandler, DisasterManager disasterManager, SingularityConfiguration configuration) { + OfferCache offerCache, SingularityMesosOfferScheduler offerScheduler, SingularityMesosStatusUpdateHandler statusUpdateHandler, DisasterManager disasterManager, SingularityConfiguration configuration, + @Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) { this.messageHandler = messageHandler; this.slaveAndRackManager = slaveAndRackManager; this.schedulerDriverSupplier = schedulerDriverSupplier; @@ -48,6 +55,9 @@ public SingularityMesosScheduler(SingularityMesosFrameworkMessageHandler message this.offerScheduler = offerScheduler; this.statusUpdateHandler = statusUpdateHandler; this.offerCacheEnabled = configuration.isCacheOffers(); + this.delayWhenStatusUpdateDeltaTooLarge = configuration.isDelayOfferProcessingForLargeStatusUpdateDelta(); + this.delayWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs(); + this.statusUpdateDeltaAvg = statusUpdateDeltaAvg; } @Override @@ -67,13 +77,23 @@ public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) { public void resourceOffers(SchedulerDriver driver, List offers) { final long start = System.currentTimeMillis(); LOG.info("Received {} offer(s)", offers.size()); + boolean delclineImmediately = false; if (disasterManager.isDisabled(SingularityAction.PROCESS_OFFERS)) { LOG.info("Processing offers is currently disabled, declining {} offers", offers.size()); + delclineImmediately = true; + } + if (delayWhenStatusUpdateDeltaTooLarge && statusUpdateDeltaAvg.get() > delayWhenDeltaOverMs) { + LOG.info("Status update delta is too large ({}), declining offers while status updates catch up", statusUpdateDeltaAvg.get()); + delclineImmediately = true; + } + + if (delclineImmediately) { for (Protos.Offer offer : offers) { driver.declineOffer(offer.getId()); } return; } + if (offerCacheEnabled) { if (disasterManager.isDisabled(SingularityAction.CACHE_OFFERS)) { offerCache.disableOfferCache(); @@ -100,7 +120,7 @@ public void resourceOffers(SchedulerDriver driver, List offers) { final Set acceptedOffers = Sets.newHashSetWithExpectedSize(offersToCheck.size()); try { - List offerHolders = offerScheduler.checkOffers(offersToCheck, acceptedOffers); + List offerHolders = offerScheduler.checkOffers(offers); for (SingularityOfferHolder offerHolder : offerHolders) { if (!offerHolder.getAcceptedTasks().isEmpty()) { diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java index 2c5cda7be9..3d3891c071 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java @@ -1,6 +1,6 @@ package com.hubspot.singularity.mesos; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; import org.apache.mesos.Protos; import org.apache.mesos.Protos.TaskState; @@ -10,7 +10,6 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import com.codahale.metrics.annotation.Timed; import com.google.common.base.Optional; import com.google.common.base.Strings; @@ -63,7 +62,7 @@ public class SingularityMesosStatusUpdateHandler { private final SingularityConfiguration configuration; private final Multiset taskLostReasons; private final Meter lostTasksMeter; - private final Timer statusUpdateDeltaTimer; + private final ConcurrentHashMap statusUpdateDeltas; @Inject public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManager deployManager, RequestManager requestManager, @@ -75,7 +74,7 @@ public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManage SingularityConfiguration configuration, @Named(SingularityMesosModule.TASK_LOST_REASONS_COUNTER) Multiset taskLostReasons, @Named(SingularityMainModule.LOST_TASKS_METER) Meter lostTasksMeter, - @Named(SingularityMainModule.STATUS_UPDATE_DELTA_TIMER) Timer statusUpdateDeltaTimer) { + @Named(SingularityMainModule.STATUS_UPDATE_DELTAS) ConcurrentHashMap statusUpdateDeltas) { this.taskManager = taskManager; this.deployManager = deployManager; this.requestManager = requestManager; @@ -93,7 +92,7 @@ public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManage this.configuration = configuration; this.taskLostReasons = taskLostReasons; this.lostTasksMeter = lostTasksMeter; - this.statusUpdateDeltaTimer = statusUpdateDeltaTimer; + this.statusUpdateDeltas = statusUpdateDeltas; } /** @@ -171,10 +170,11 @@ private void unsafeProcessStatusUpdate(Protos.TaskStatus status) { timestamp = (long) (status.getTimestamp() * 1000); } - long delta = System.currentTimeMillis() - timestamp; + long now = System.currentTimeMillis(); + long delta = now - timestamp; LOG.debug("Update: task {} is now {} ({}) at {} (delta: {})", taskId, status.getState(), status.getMessage(), timestamp, JavaUtils.durationFromMillis(delta)); - statusUpdateDeltaTimer.update(delta, TimeUnit.MILLISECONDS); + statusUpdateDeltas.put(now, delta); final Optional maybeTaskId = getTaskId(taskId); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleaner.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleaner.java index 89660d9c89..9a719fa8e0 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleaner.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleaner.java @@ -508,15 +508,16 @@ private void cleanupRequestData(SingularityRequestCleanup requestCleanup) { LOG.trace("Deleted stale request data for {}", requestCleanup.getRequestId()); } - public void drainCleanupQueue() { + public int drainCleanupQueue() { drainRequestCleanupQueue(); - drainTaskCleanupQueue(); + int cleanupTasks = drainTaskCleanupQueue(); final List lbCleanupTasks = taskManager.getLBCleanupTasks(); drainLBTaskCleanupQueue(lbCleanupTasks); drainLBRequestCleanupQueue(lbCleanupTasks); checkKilledTaskIdRecords(); + return cleanupTasks; } private boolean isValidTask(SingularityTaskCleanup cleanupTask) { @@ -568,14 +569,14 @@ private void checkKilledTaskIdRecords() { LOG.info("{} obsolete, {} waiting, {} rekilled tasks based on {} killedTaskIdRecords", obsolete, waiting, rekilled, killedTaskIdRecords.size()); } - private void drainTaskCleanupQueue() { + private int drainTaskCleanupQueue() { final long start = System.currentTimeMillis(); final List cleanupTasks = taskManager.getCleanupTasks(); if (cleanupTasks.isEmpty()) { LOG.trace("Task cleanup queue is empty"); - return; + return 0; } final Multiset incrementalCleaningTasks = HashMultiset.create(cleanupTasks.size()); @@ -631,6 +632,7 @@ private void drainTaskCleanupQueue() { } LOG.info("Killed {} tasks in {}", killedTasks, JavaUtils.duration(start)); + return cleanupTasks.size(); } private void updateRequestToTaskMap(SingularityTaskCleanup cleanupTask, Map> requestIdToTaskIds) { diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleanupPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleanupPoller.java index 179c2a10c8..e3cd60f631 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleanupPoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCleanupPoller.java @@ -8,10 +8,10 @@ import org.slf4j.LoggerFactory; import com.google.inject.Inject; -import com.hubspot.singularity.config.SingularityConfiguration; -import com.hubspot.singularity.mesos.SingularitySchedulerLock; import com.hubspot.singularity.SingularityAction; +import com.hubspot.singularity.config.SingularityConfiguration; import com.hubspot.singularity.data.DisasterManager; +import com.hubspot.singularity.mesos.SingularitySchedulerLock; @Singleton public class SingularityCleanupPoller extends SingularityLeaderOnlyPoller { @@ -23,7 +23,7 @@ public class SingularityCleanupPoller extends SingularityLeaderOnlyPoller { @Inject SingularityCleanupPoller(SingularityConfiguration configuration, SingularityCleaner cleaner, SingularitySchedulerLock lock, DisasterManager disasterManager) { - super(configuration.getCleanupEverySeconds(), TimeUnit.SECONDS, lock); + super(configuration.getCleanupEverySeconds(), TimeUnit.SECONDS, lock, true); this.cleaner = cleaner; this.disasterManager = disasterManager; diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCooldownPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCooldownPoller.java index 1fa3381a3f..04db9f6f61 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCooldownPoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCooldownPoller.java @@ -15,7 +15,7 @@ public class SingularityCooldownPoller extends SingularityLeaderOnlyPoller { @Inject SingularityCooldownPoller(SingularityConfiguration configuration, SingularityCooldownChecker checker, SingularitySchedulerLock lock) { - super(TimeUnit.MINUTES.toMillis(configuration.getCooldownExpiresAfterMinutes()) / 2, TimeUnit.MILLISECONDS, lock); + super(TimeUnit.MINUTES.toMillis(configuration.getCooldownExpiresAfterMinutes()) / 2, TimeUnit.MILLISECONDS, lock, true); this.checker = checker; } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityDeployPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityDeployPoller.java index 7fa151c952..2e8d51d33b 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityDeployPoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityDeployPoller.java @@ -24,7 +24,7 @@ public class SingularityDeployPoller extends SingularityLeaderOnlyPoller { @Inject SingularityDeployPoller(SingularityDeployChecker deployChecker, SingularityConfiguration configuration, SingularitySchedulerLock lock, DisasterManager disasterManager) { - super(configuration.getCheckDeploysEverySeconds(), TimeUnit.SECONDS, lock); + super(configuration.getCheckDeploysEverySeconds(), TimeUnit.SECONDS, lock, true); this.deployChecker = deployChecker; this.disasterManager = disasterManager; diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityExpiringUserActionPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityExpiringUserActionPoller.java index fd7e080803..234d3efd0c 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityExpiringUserActionPoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityExpiringUserActionPoller.java @@ -72,7 +72,7 @@ public class SingularityExpiringUserActionPoller extends SingularityLeaderOnlyPo @Inject SingularityExpiringUserActionPoller(SingularityConfiguration configuration, RequestManager requestManager, DeployManager deployManager, TaskManager taskManager, SlaveManager slaveManager, RackManager rackManager, SingularitySchedulerLock lock, RequestHelper requestHelper, SingularityMailer mailer, DisasterManager disasterManager) { - super(configuration.getCheckExpiringUserActionEveryMillis(), TimeUnit.MILLISECONDS, lock); + super(configuration.getCheckExpiringUserActionEveryMillis(), TimeUnit.MILLISECONDS, lock, true); this.deployManager = deployManager; this.requestManager = requestManager; diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityLeaderOnlyPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityLeaderOnlyPoller.java index 1218ecadcd..30f7bc6031 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityLeaderOnlyPoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityLeaderOnlyPoller.java @@ -4,6 +4,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.slf4j.Logger; @@ -11,10 +12,13 @@ import com.google.common.base.Optional; import com.google.inject.Inject; +import com.google.inject.name.Named; import com.hubspot.mesos.JavaUtils; import com.hubspot.singularity.SingularityAbort; import com.hubspot.singularity.SingularityAbort.AbortReason; +import com.hubspot.singularity.SingularityMainModule; import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory; +import com.hubspot.singularity.config.SingularityConfiguration; import com.hubspot.singularity.mesos.SingularityMesosSchedulerDelegator; import com.hubspot.singularity.mesos.SingularitySchedulerLock; import com.hubspot.singularity.sentry.SingularityExceptionNotifier; @@ -28,25 +32,29 @@ public abstract class SingularityLeaderOnlyPoller implements Managed { private final long pollDelay; private final TimeUnit pollTimeUnit; private final Optional lockHolder; + private final boolean delayWhenLargeStatusUpdateDelta; private ScheduledExecutorService executorService; private LeaderLatch leaderLatch; private SingularityExceptionNotifier exceptionNotifier; private SingularityAbort abort; private SingularityMesosSchedulerDelegator mesosScheduler; + private long delayPollersWhenDeltaOverMs; + private AtomicLong statusUpdateDelta30sAverage; protected SingularityLeaderOnlyPoller(long pollDelay, TimeUnit pollTimeUnit) { - this(pollDelay, pollTimeUnit, Optional. absent()); + this(pollDelay, pollTimeUnit, Optional. absent(), false); } - protected SingularityLeaderOnlyPoller(long pollDelay, TimeUnit pollTimeUnit, SingularitySchedulerLock lock) { - this(pollDelay, pollTimeUnit, Optional.of(lock)); + protected SingularityLeaderOnlyPoller(long pollDelay, TimeUnit pollTimeUnit, SingularitySchedulerLock lock, boolean delayWhenLargeStatusUpdateDelta) { + this(pollDelay, pollTimeUnit, Optional.of(lock), delayWhenLargeStatusUpdateDelta); } - private SingularityLeaderOnlyPoller(long pollDelay, TimeUnit pollTimeUnit, Optional lockHolder) { + private SingularityLeaderOnlyPoller(long pollDelay, TimeUnit pollTimeUnit, Optional lockHolder, boolean delayWhenLargeStatusUpdateDelta) { this.pollDelay = pollDelay; this.pollTimeUnit = pollTimeUnit; this.lockHolder = lockHolder; + this.delayWhenLargeStatusUpdateDelta = delayWhenLargeStatusUpdateDelta; } @Inject @@ -54,12 +62,16 @@ void injectPollerDependencies(SingularityManagedScheduledExecutorServiceFactory LeaderLatch leaderLatch, SingularityExceptionNotifier exceptionNotifier, SingularityAbort abort, - SingularityMesosSchedulerDelegator mesosScheduler) { + SingularityMesosSchedulerDelegator mesosScheduler, + SingularityConfiguration configuration, + @Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDelta30sAverage) { this.executorService = executorServiceFactory.get(getClass().getSimpleName()); this.leaderLatch = checkNotNull(leaderLatch, "leaderLatch is null"); this.exceptionNotifier = checkNotNull(exceptionNotifier, "exceptionNotifier is null"); this.abort = checkNotNull(abort, "abort is null"); this.mesosScheduler = checkNotNull(mesosScheduler, "mesosScheduler is null"); + this.delayPollersWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs(); + this.statusUpdateDelta30sAverage = checkNotNull(statusUpdateDelta30sAverage, "statusUpdateDeltaAverage is null"); } @Override @@ -96,6 +108,11 @@ private void runActionIfLeaderAndMesosIsRunning() { return; } + if (delayWhenLargeStatusUpdateDelta && statusUpdateDelta30sAverage.get() > delayPollersWhenDeltaOverMs) { + LOG.info("Delaying run of {} until status updates have caught up", getClass().getSimpleName()); + return; + } + LOG.trace("Running {} (period: {})", getClass().getSimpleName(), JavaUtils.durationFromMillis(pollTimeUnit.toMillis(pollDelay))); long start = System.currentTimeMillis(); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerModule.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerModule.java index b0ef31c37c..6712bcb523 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerModule.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerModule.java @@ -13,6 +13,7 @@ protected void configure() { bind(SingularityHealthchecker.class).in(Scopes.SINGLETON); bind(SingularityNewTaskChecker.class).in(Scopes.SINGLETON); bind(SingularityCleanupPoller.class).in(Scopes.SINGLETON); + bind(SingularityStatusUpdateDeltaPoller.class).in(Scopes.SINGLETON); bind(SingularityExpiringUserActionPoller.class).in(Scopes.SINGLETON); bind(SingularityHistoryPurger.class).in(Scopes.SINGLETON); bind(SingularitySlaveReconciliationPoller.class).in(Scopes.SINGLETON); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerPoller.java index c87665e85f..28ebeac2d4 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerPoller.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerPoller.java @@ -9,13 +9,11 @@ import javax.inject.Singleton; import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.OfferID; import org.apache.mesos.SchedulerDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; -import com.google.common.collect.Sets; import com.google.inject.Inject; import com.hubspot.mesos.JavaUtils; import com.hubspot.singularity.SingularityAction; @@ -42,7 +40,7 @@ public class SingularitySchedulerPoller extends SingularityLeaderOnlyPoller { @Inject SingularitySchedulerPoller(SingularityMesosOfferScheduler offerScheduler, OfferCache offerCache, SchedulerDriverSupplier schedulerDriverSupplier, SingularityConfiguration configuration, SingularitySchedulerLock lock, DisasterManager disasterManager) { - super(configuration.getCheckSchedulerEverySeconds(), TimeUnit.SECONDS, lock); + super(configuration.getCheckSchedulerEverySeconds(), TimeUnit.SECONDS, lock, true); this.offerCache = offerCache; this.offerScheduler = offerScheduler; @@ -68,7 +66,7 @@ public void runActionOnPoll() { offers.add(cachedOffer.getOffer()); } - List offerHolders = offerScheduler.checkOffers(offers, Sets. newHashSet()); + List offerHolders = offerScheduler.checkOffers(offers); if (offerHolders.isEmpty()) { return; @@ -99,9 +97,4 @@ public void runActionOnPoll() { LOG.info("Launched {} tasks on {} cached offers (returned {}) in {}", launchedTasks, acceptedOffers, offerHolders.size() - acceptedOffers, JavaUtils.duration(start)); } - - @Override - protected boolean isEnabled() { - return configuration.isCacheOffers(); - } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityStatusUpdateDeltaPoller.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityStatusUpdateDeltaPoller.java new file mode 100644 index 0000000000..5b6589f493 --- /dev/null +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityStatusUpdateDeltaPoller.java @@ -0,0 +1,39 @@ +package com.hubspot.singularity.scheduler; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import com.google.common.math.DoubleMath; +import com.google.inject.Inject; +import com.google.inject.name.Named; +import com.hubspot.singularity.SingularityMainModule; + +public class SingularityStatusUpdateDeltaPoller extends SingularityLeaderOnlyPoller { + private final ConcurrentHashMap statusUpdateDeltas; + private final AtomicLong statusUpdateDelta30sAverage; + + @Inject + public SingularityStatusUpdateDeltaPoller(@Named(SingularityMainModule.STATUS_UPDATE_DELTAS) ConcurrentHashMap statusUpdateDeltas, + @Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDelta30sAverage) { + super(5L, TimeUnit.SECONDS); + this.statusUpdateDeltas = statusUpdateDeltas; + this.statusUpdateDelta30sAverage = statusUpdateDelta30sAverage; + } + + @Override + public void runActionOnPoll() { + long now = System.currentTimeMillis(); + List toRemove = statusUpdateDeltas.keySet().stream() + .filter((e) -> e < now - 30000) + .collect(Collectors.toList()); + toRemove.forEach(statusUpdateDeltas::remove); + if (statusUpdateDeltas.isEmpty()) { + statusUpdateDelta30sAverage.set(0L); + } else { + statusUpdateDelta30sAverage.set((long) DoubleMath.mean(statusUpdateDeltas.values())); + } + } +}