Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> o
scheduler.checkForDecomissions(stateCache);
scheduler.drainPendingQueue(stateCache);

for (Protos.Offer offer : offers) {
slaveAndRackManager.checkOffer(offer);
}

final Map<String, SingularityTaskRequestHolder> pendingTaskIdToTaskRequest = getDueTaskRequestHolders();

final int numDueTasks = pendingTaskIdToTaskRequest.size();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hubspot.singularity.mesos;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

Expand All @@ -20,6 +21,7 @@
import com.hubspot.singularity.SingularityAction;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DisasterManager;
import com.hubspot.singularity.mesos.SingularitySlaveAndRackManager.CheckResult;

@Singleton
public class SingularityMesosScheduler implements Scheduler {
Expand Down Expand Up @@ -63,6 +65,7 @@ public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
@Override
@Timed
public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
final long start = System.currentTimeMillis();
LOG.info("Received {} offer(s)", offers.size());
if (disasterManager.isDisabled(SingularityAction.PROCESS_OFFERS)) {
LOG.info("Processing offers is currently disabled, declining {} offers", offers.size());
Expand All @@ -79,18 +82,25 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
}
}

List<Protos.Offer> offersToCheck = new ArrayList<>(offers);

for (Offer offer : offers) {
String rolesInfo = MesosUtils.getRoles(offer).toString();
LOG.debug("Received offer ID {} with roles {} from {} ({}) for {} cpu(s), {} memory, {} ports, and {} disk", offer.getId().getValue(), rolesInfo, offer.getHostname(), offer.getSlaveId().getValue(), MesosUtils.getNumCpus(offer), MesosUtils.getMemory(offer),
MesosUtils.getNumPorts(offer), MesosUtils.getDisk(offer));
}

final long start = System.currentTimeMillis();
CheckResult checkResult = slaveAndRackManager.checkOffer(offer);
if (checkResult == CheckResult.NOT_ACCEPTING_TASKS) {
driver.declineOffer(offer.getId());
offersToCheck.remove(offer);
LOG.debug("Will decline offer {}, slave {} is not currently in a state to launch tasks", offer.getId().getValue(), offer.getHostname());
}
}

final Set<Protos.OfferID> acceptedOffers = Sets.newHashSetWithExpectedSize(offers.size());
final Set<Protos.OfferID> acceptedOffers = Sets.newHashSetWithExpectedSize(offersToCheck.size());

try {
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offers, acceptedOffers);
List<SingularityOfferHolder> offerHolders = offerScheduler.checkOffers(offersToCheck, acceptedOffers);

for (SingularityOfferHolder offerHolder : offerHolders) {
if (!offerHolder.getAcceptedTasks().isEmpty()) {
Expand All @@ -104,7 +114,7 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
} catch (Throwable t) {
LOG.error("Received fatal error while handling offers - will decline all available offers", t);

for (Protos.Offer offer : offers) {
for (Protos.Offer offer : offersToCheck) {
if (acceptedOffers.contains(offer.getId())) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public SingularityOfferCache(SingularityConfiguration configuration, SchedulerDr
@Override
public void cacheOffer(SchedulerDriver driver, long timestamp, Offer offer) {
if (!useOfferCache.get()) {
driver.declineOffer(offer.getId());
return;
}
LOG.debug("Caching offer {} for {}", offer.getId().getValue(), JavaUtils.durationFromMillis(configuration.getCacheOffersForMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ public void loadSlavesAndRacksFromMaster(MesosMasterStateObject state, boolean i
LOG.info("Found {} new racks ({} missing) and {} new slaves ({} missing)", racks, remainingActiveRacks.size(), slaves, activeSlavesById.size());
}

private enum CheckResult {
NEW, DECOMMISSIONING, ALREADY_ACTIVE;
public enum CheckResult {
NEW, NOT_ACCEPTING_TASKS, ALREADY_ACTIVE;
}

private <T extends SingularityMachineAbstraction<T>> CheckResult check(T object, AbstractMachineManager<T> manager) {
Expand All @@ -388,31 +388,33 @@ private <T extends SingularityMachineAbstraction<T>> CheckResult check(T object,

switch (currentState) {
case ACTIVE:
case FROZEN:
return CheckResult.ALREADY_ACTIVE;
case DEAD:
case MISSING_ON_STARTUP:
manager.changeState(object.getId(), MachineState.ACTIVE, Optional.<String> absent(), Optional.<String> absent());
return CheckResult.NEW;
case FROZEN:
case DECOMMISSIONED:
case DECOMMISSIONING:
case STARTING_DECOMMISSION:
return CheckResult.DECOMMISSIONING;
return CheckResult.NOT_ACCEPTING_TASKS;
}

throw new IllegalStateException(String.format("Invalid state %s for %s", currentState, object.getId()));
}

@Timed
public void checkOffer(Offer offer) {
public CheckResult checkOffer(Offer offer) {
final String slaveId = offer.getSlaveId().getValue();
final String rackId = slaveAndRackHelper.getRackIdOrDefault(offer);
final String host = slaveAndRackHelper.getMaybeTruncatedHost(offer);
final Map<String, String> textAttributes = slaveAndRackHelper.getTextAttributes(offer);

final SingularitySlave slave = new SingularitySlave(slaveId, host, rackId, textAttributes, Optional.<MesosResourcesObject>absent());

if (check(slave, slaveManager) == CheckResult.NEW) {
CheckResult result = check(slave, slaveManager);

if (result == CheckResult.NEW) {
if (inactiveSlaveManager.isInactive(slave.getHost())) {
LOG.info("Slave {} on inactive host {} attempted to rejoin. Marking as decommissioned.", slave, host);
slaveManager.changeState(slave, MachineState.STARTING_DECOMMISSION,
Expand All @@ -428,6 +430,8 @@ public void checkOffer(Offer offer) {
if (check(rack, rackManager) == CheckResult.NEW) {
LOG.info("Offer revealed a new rack {}", rack);
}

return result;
}

@Timed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SingularitySchedulerPoller extends SingularityLeaderOnlyPoller {
private final SchedulerDriverSupplier schedulerDriverSupplier;
private final SingularityMesosOfferScheduler offerScheduler;
private final DisasterManager disasterManager;
private final SingularityConfiguration configuration;

@Inject
SingularitySchedulerPoller(SingularityMesosOfferScheduler offerScheduler, OfferCache offerCache, SchedulerDriverSupplier schedulerDriverSupplier,
Expand All @@ -47,6 +48,7 @@ public class SingularitySchedulerPoller extends SingularityLeaderOnlyPoller {
this.offerScheduler = offerScheduler;
this.schedulerDriverSupplier = schedulerDriverSupplier;
this.disasterManager = disasterManager;
this.configuration = configuration;
}

@Override
Expand Down Expand Up @@ -98,4 +100,8 @@ public void runActionOnPoll() {
LOG.info("Launched {} tasks on {} cached offers (returned {}) in {}", launchedTasks, acceptedOffers, offerHolders.size() - acceptedOffers, JavaUtils.duration(start));
}

@Override
protected boolean isEnabled() {
return configuration.isCacheOffers();
}
}