Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class SingularityState {

private final Optional<Double> minimumPriorityLevel;

private final long avgStatusUpdateDelayMs;

@JsonCreator
public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonProperty("launchingTasks") int launchingTasks, @JsonProperty("activeRequests") int activeRequests, @JsonProperty("cooldownRequests") int cooldownRequests,
@JsonProperty("pausedRequests") int pausedRequests, @JsonProperty("scheduledTasks") int scheduledTasks, @JsonProperty("pendingRequests") int pendingRequests, @JsonProperty("lbCleanupTasks") int lbCleanupTasks,
Expand All @@ -62,7 +64,8 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope
@JsonProperty("lateTasks") int lateTasks, @JsonProperty("futureTasks") int futureTasks, @JsonProperty("maxTaskLag") long maxTaskLag, @JsonProperty("generatedAt") long generatedAt,
@JsonProperty("overProvisionedRequestIds") List<String> overProvisionedRequestIds, @JsonProperty("underProvisionedRequestIds") List<String> underProvisionedRequestIds,
@JsonProperty("overProvisionedRequests") int overProvisionedRequests, @JsonProperty("underProvisionedRequests") int underProvisionedRequests, @JsonProperty("finishedRequests") int finishedRequests,
@JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy, @JsonProperty("minimumPriorityLevel") Optional<Double> minimumPriorityLevel) {
@JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy, @JsonProperty("minimumPriorityLevel") Optional<Double> minimumPriorityLevel,
@JsonProperty("avgStatusUpdateDelayMs") long avgStatusUpdateDelayMs) {
this.activeTasks = activeTasks;
this.launchingTasks = launchingTasks;
this.activeRequests = activeRequests;
Expand Down Expand Up @@ -96,6 +99,7 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope
this.underProvisionedRequestIds = underProvisionedRequestIds;
this.authDatastoreHealthy = authDatastoreHealthy;
this.minimumPriorityLevel = minimumPriorityLevel;
this.avgStatusUpdateDelayMs = avgStatusUpdateDelayMs;
}

public int getFinishedRequests() {
Expand Down Expand Up @@ -244,6 +248,10 @@ public Optional<Double> getMinimumPriorityLevel() {
return minimumPriorityLevel;
}

public long getAvgStatusUpdateDelayMs() {
return avgStatusUpdateDelayMs;
}

@Override
public String toString() {
return "SingularityState{" +
Expand Down Expand Up @@ -280,6 +288,7 @@ public String toString() {
", underProvisionedRequests=" + underProvisionedRequests +
", authDatastoreHealthy=" + authDatastoreHealthy +
", minimumPriorityLevel=" + minimumPriorityLevel +
", avgStatusUpdateDelayMs=" + avgStatusUpdateDelayMs +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import java.net.UnknownHostException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;
import javax.inject.Provider;
Expand All @@ -23,7 +25,6 @@
import com.amazonaws.services.s3.AmazonS3Client;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -106,7 +107,8 @@ public class SingularityMainModule implements Module {

public static final String LOST_TASKS_METER = "singularity.lost.tasks.meter";

public static final String STATUS_UPDATE_DELTA_TIMER = "singularity.status.update.delta.timer";
public static final String STATUS_UPDATE_DELTA_30S_AVERAGE = "singularity.status.update.delta.minute.average";
public static final String STATUS_UPDATE_DELTAS = "singularity.status.update.deltas";

private final SingularityConfiguration configuration;

Expand Down Expand Up @@ -365,8 +367,15 @@ public Meter providesLostTasksMeter(MetricRegistry registry) {

@Provides
@Singleton
@Named(STATUS_UPDATE_DELTA_TIMER)
public Timer providesStatusUpdateDeltaMeter(MetricRegistry registry) {
return registry.timer("com.hubspot.singularity.statusUpdateDelta");
@Named(STATUS_UPDATE_DELTA_30S_AVERAGE)
public AtomicLong provideDeltasMap() {
return new AtomicLong(0);
}

@Provides
@Singleton
@Named(STATUS_UPDATE_DELTAS)
public ConcurrentHashMap<Long, Long> provideUpdateDeltasMap() {
return new ConcurrentHashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public class SingularityConfiguration extends Configuration {

private int maxDecommissioningSlaves = 2;

private long delayPollersWhenDeltaOverMs = 15000;

private boolean delayOfferProcessingForLargeStatusUpdateDelta = true;

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
}
Expand Down Expand Up @@ -1374,4 +1378,20 @@ public int getMaxDecommissioningSlaves() {
public void setMaxDecommissioningSlaves(int maxDecommissioningSlaves) {
this.maxDecommissioningSlaves = maxDecommissioningSlaves;
}

public long getDelayPollersWhenDeltaOverMs() {
return delayPollersWhenDeltaOverMs;
}

public void setDelayPollersWhenDeltaOverMs(long delayPollersWhenDeltaOverMs) {
this.delayPollersWhenDeltaOverMs = delayPollersWhenDeltaOverMs;
}

public boolean isDelayOfferProcessingForLargeStatusUpdateDelta() {
return delayOfferProcessingForLargeStatusUpdateDelta;
}

public void setDelayOfferProcessingForLargeStatusUpdateDelta(boolean delayOfferProcessingForLargeStatusUpdateDelta) {
this.delayOfferProcessingForLargeStatusUpdateDelta = delayOfferProcessingForLargeStatusUpdateDelta;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Singleton;

Expand All @@ -19,10 +20,12 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.mesos.CounterMap;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityCreateResult;
import com.hubspot.singularity.SingularityHostState;
import com.hubspot.singularity.SingularityMainModule;
import com.hubspot.singularity.SingularityPendingDeploy;
import com.hubspot.singularity.SingularityPendingTaskId;
import com.hubspot.singularity.SingularityPriorityFreezeParent;
Expand Down Expand Up @@ -58,11 +61,13 @@ public class StateManager extends CuratorManager {
private final SingularityAuthDatastore authDatastore;
private final Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder;
private final PriorityManager priorityManager;
private final AtomicLong statusUpdateDeltaAvg;

@Inject
public StateManager(CuratorFramework curatorFramework, SingularityConfiguration configuration, MetricRegistry metricRegistry, RequestManager requestManager, TaskManager taskManager,
DeployManager deployManager, SlaveManager slaveManager, RackManager rackManager, Transcoder<SingularityState> stateTranscoder, Transcoder<SingularityHostState> hostStateTranscoder,
SingularityConfiguration singularityConfiguration, SingularityAuthDatastore authDatastore, PriorityManager priorityManager, Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder) {
SingularityConfiguration singularityConfiguration, SingularityAuthDatastore authDatastore, PriorityManager priorityManager, Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder,
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) {
super(curatorFramework, configuration, metricRegistry);

this.requestManager = requestManager;
Expand All @@ -76,6 +81,7 @@ public StateManager(CuratorFramework curatorFramework, SingularityConfiguration
this.authDatastore = authDatastore;
this.priorityManager = priorityManager;
this.taskReconciliationStatisticsTranscoder = taskReconciliationStatisticsTranscoder;
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;
}

public SingularityCreateResult saveTaskReconciliationStatistics(SingularityTaskReconciliationStatistics taskReconciliationStatistics) {
Expand Down Expand Up @@ -320,6 +326,7 @@ public SingularityState generateState(boolean includeRequestIds) {
return new SingularityState(activeTasks, launchingTasks, numActiveRequests, cooldownRequests, numPausedRequests, scheduledTasks, pendingRequests, lbCleanupTasks, lbCleanupRequests, cleaningRequests, activeSlaves,
deadSlaves, decommissioningSlaves, activeRacks, deadRacks, decommissioningRacks, cleaningTasks, states, oldestDeploy, numDeploys, scheduledTasksInfo.getNumLateTasks(),
scheduledTasksInfo.getNumFutureTasks(), scheduledTasksInfo.getMaxTaskLag(), System.currentTimeMillis(), includeRequestIds ? overProvisionedRequestIds : null,
includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy, minimumPriorityLevel);
includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy, minimumPriorityLevel,
statusUpdateDeltaAvg.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.inject.Singleton;

Expand Down Expand Up @@ -89,7 +88,7 @@ private Map<String, SingularityTaskRequestHolder> getDueTaskRequestHolders() {
return taskRequestHolders;
}

public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> offers, final Set<Protos.OfferID> acceptedOffers) {
public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> offers) {
boolean useTaskCredits = disasterManager.isTaskCreditEnabled();
int taskCredits = useTaskCredits ? disasterManager.getUpdatedCreditCount() : -1;
final SingularitySchedulerStateCache stateCache = stateCacheProvider.get();
Expand All @@ -101,6 +100,11 @@ public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> o

final int numDueTasks = pendingTaskIdToTaskRequest.size();

if (offers.isEmpty()) {
LOG.debug("No offers to check");
return Collections.emptyList();
}

final List<SingularityOfferHolder> offerHolders = Lists.newArrayListWithCapacity(offers.size());
final Map<String, Map<String, Integer>> tasksPerOfferPerRequest = new HashMap<>();
for (Protos.Offer offer : offers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Singleton;

Expand All @@ -16,9 +17,11 @@
import com.codahale.metrics.annotation.Timed;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.mesos.MesosUtils;
import com.hubspot.singularity.SingularityAction;
import com.hubspot.singularity.SingularityMainModule;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DisasterManager;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager.CheckResult;
Expand All @@ -36,10 +39,14 @@ public class SingularityMesosScheduler implements Scheduler {
private final SingularityMesosOfferScheduler offerScheduler;
private final SingularityMesosStatusUpdateHandler statusUpdateHandler;
private final boolean offerCacheEnabled;
private final boolean delayWhenStatusUpdateDeltaTooLarge;
private final long delayWhenDeltaOverMs;
private final AtomicLong statusUpdateDeltaAvg;

@Inject
public SingularityMesosScheduler(SingularityMesosFrameworkMessageHandler messageHandler, SingularitySlaveAndRackManager slaveAndRackManager, SchedulerDriverSupplier schedulerDriverSupplier,
OfferCache offerCache, SingularityMesosOfferScheduler offerScheduler, SingularityMesosStatusUpdateHandler statusUpdateHandler, DisasterManager disasterManager, SingularityConfiguration configuration) {
OfferCache offerCache, SingularityMesosOfferScheduler offerScheduler, SingularityMesosStatusUpdateHandler statusUpdateHandler, DisasterManager disasterManager, SingularityConfiguration configuration,
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDeltaAvg) {
this.messageHandler = messageHandler;
this.slaveAndRackManager = slaveAndRackManager;
this.schedulerDriverSupplier = schedulerDriverSupplier;
Expand All @@ -48,6 +55,9 @@ public SingularityMesosScheduler(SingularityMesosFrameworkMessageHandler message
this.offerScheduler = offerScheduler;
this.statusUpdateHandler = statusUpdateHandler;
this.offerCacheEnabled = configuration.isCacheOffers();
this.delayWhenStatusUpdateDeltaTooLarge = configuration.isDelayOfferProcessingForLargeStatusUpdateDelta();
this.delayWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs();
this.statusUpdateDeltaAvg = statusUpdateDeltaAvg;
}

@Override
Expand All @@ -67,13 +77,23 @@ public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
final long start = System.currentTimeMillis();
LOG.info("Received {} offer(s)", offers.size());
boolean delclineImmediately = false;
if (disasterManager.isDisabled(SingularityAction.PROCESS_OFFERS)) {
LOG.info("Processing offers is currently disabled, declining {} offers", offers.size());
delclineImmediately = true;
}
if (delayWhenStatusUpdateDeltaTooLarge && statusUpdateDeltaAvg.get() > delayWhenDeltaOverMs) {
LOG.info("Status update delta is too large ({}), declining offers while status updates catch up", statusUpdateDeltaAvg.get());
delclineImmediately = true;
}

if (delclineImmediately) {
for (Protos.Offer offer : offers) {
driver.declineOffer(offer.getId());
}
return;
}

if (offerCacheEnabled) {
if (disasterManager.isDisabled(SingularityAction.CACHE_OFFERS)) {
offerCache.disableOfferCache();
Expand All @@ -100,7 +120,7 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
final Set<Protos.OfferID> acceptedOffers = Sets.newHashSetWithExpectedSize(offersToCheck.size());

try {
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offersToCheck, acceptedOffers);
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers);

for (SingularityOfferHolder offerHolder : offerHolders) {
if (!offerHolder.getAcceptedTasks().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.hubspot.singularity.mesos;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskState;
Expand All @@ -10,7 +10,6 @@
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.annotation.Timed;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -63,7 +62,7 @@ public class SingularityMesosStatusUpdateHandler {
private final SingularityConfiguration configuration;
private final Multiset<Protos.TaskStatus.Reason> taskLostReasons;
private final Meter lostTasksMeter;
private final Timer statusUpdateDeltaTimer;
private final ConcurrentHashMap<Long, Long> statusUpdateDeltas;

@Inject
public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManager deployManager, RequestManager requestManager,
Expand All @@ -75,7 +74,7 @@ public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManage
SingularityConfiguration configuration,
@Named(SingularityMesosModule.TASK_LOST_REASONS_COUNTER) Multiset<Protos.TaskStatus.Reason> taskLostReasons,
@Named(SingularityMainModule.LOST_TASKS_METER) Meter lostTasksMeter,
@Named(SingularityMainModule.STATUS_UPDATE_DELTA_TIMER) Timer statusUpdateDeltaTimer) {
@Named(SingularityMainModule.STATUS_UPDATE_DELTAS) ConcurrentHashMap<Long, Long> statusUpdateDeltas) {
this.taskManager = taskManager;
this.deployManager = deployManager;
this.requestManager = requestManager;
Expand All @@ -93,7 +92,7 @@ public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManage
this.configuration = configuration;
this.taskLostReasons = taskLostReasons;
this.lostTasksMeter = lostTasksMeter;
this.statusUpdateDeltaTimer = statusUpdateDeltaTimer;
this.statusUpdateDeltas = statusUpdateDeltas;
}

/**
Expand Down Expand Up @@ -171,10 +170,11 @@ private void unsafeProcessStatusUpdate(Protos.TaskStatus status) {
timestamp = (long) (status.getTimestamp() * 1000);
}

long delta = System.currentTimeMillis() - timestamp;
long now = System.currentTimeMillis();
long delta = now - timestamp;

LOG.debug("Update: task {} is now {} ({}) at {} (delta: {})", taskId, status.getState(), status.getMessage(), timestamp, JavaUtils.durationFromMillis(delta));
statusUpdateDeltaTimer.update(delta, TimeUnit.MILLISECONDS);
statusUpdateDeltas.put(now, delta);

final Optional<SingularityTaskId> maybeTaskId = getTaskId(taskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,16 @@ private void cleanupRequestData(SingularityRequestCleanup requestCleanup) {
LOG.trace("Deleted stale request data for {}", requestCleanup.getRequestId());
}

public void drainCleanupQueue() {
public int drainCleanupQueue() {
drainRequestCleanupQueue();
drainTaskCleanupQueue();
int cleanupTasks = drainTaskCleanupQueue();

final List<SingularityTaskId> lbCleanupTasks = taskManager.getLBCleanupTasks();
drainLBTaskCleanupQueue(lbCleanupTasks);
drainLBRequestCleanupQueue(lbCleanupTasks);

checkKilledTaskIdRecords();
return cleanupTasks;
}

private boolean isValidTask(SingularityTaskCleanup cleanupTask) {
Expand Down Expand Up @@ -568,14 +569,14 @@ private void checkKilledTaskIdRecords() {
LOG.info("{} obsolete, {} waiting, {} rekilled tasks based on {} killedTaskIdRecords", obsolete, waiting, rekilled, killedTaskIdRecords.size());
}

private void drainTaskCleanupQueue() {
private int drainTaskCleanupQueue() {
final long start = System.currentTimeMillis();

final List<SingularityTaskCleanup> cleanupTasks = taskManager.getCleanupTasks();

if (cleanupTasks.isEmpty()) {
LOG.trace("Task cleanup queue is empty");
return;
return 0;
}

final Multiset<SingularityDeployKey> incrementalCleaningTasks = HashMultiset.create(cleanupTasks.size());
Expand Down Expand Up @@ -631,6 +632,7 @@ private void drainTaskCleanupQueue() {
}

LOG.info("Killed {} tasks in {}", killedTasks, JavaUtils.duration(start));
return cleanupTasks.size();
}

private void updateRequestToTaskMap(SingularityTaskCleanup cleanupTask, Map<String, List<String>> requestIdToTaskIds) {
Expand Down
Loading