Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader cache everywhere #1594

Merged
merged 6 commits into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public int getNumObjectsAtState(MachineState state) {
return getObjectsFiltered(state).size();
}

public int getNumActive() {
return getNumObjectsAtState(MachineState.ACTIVE);
}

public Map<String, T> getObjectsByIdForState(MachineState state) {
List<T> filteredObjects = getObjectsFiltered(state);

Expand Down Expand Up @@ -207,10 +211,10 @@ public SingularityDeleteResult deleteObject(String objectId) {
return delete(getObjectPath(objectId));
}

public SingularityCreateResult saveObject(T object) {
public void saveObject(T object) {
saveHistoryUpdate(object.getCurrentState());

return save(getObjectPath(object.getId()), object, transcoder);
save(getObjectPath(object.getId()), object, transcoder);
}

private String getExpiringPath(String machineId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,69 @@
import org.apache.curator.framework.CuratorFramework;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.singularity.MachineState;
import com.hubspot.singularity.SingularityMachineStateHistoryUpdate;
import com.hubspot.singularity.SingularityRack;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.Transcoder;
import com.hubspot.singularity.expiring.SingularityExpiringMachineState;
import com.hubspot.singularity.scheduler.SingularityLeaderCache;

@Singleton
public class RackManager extends AbstractMachineManager<SingularityRack> {

private static final String RACK_ROOT = "/racks";
private final SingularityLeaderCache leaderCache;

@Inject
public RackManager(CuratorFramework curator, SingularityConfiguration configuration, MetricRegistry metricRegistry,Transcoder<SingularityRack> rackTranscoder,
Transcoder<SingularityMachineStateHistoryUpdate> stateHistoryTranscoder, Transcoder<SingularityExpiringMachineState> expiringMachineStateTranscoder) {
public RackManager(CuratorFramework curator,
SingularityConfiguration configuration,
MetricRegistry metricRegistry,
Transcoder<SingularityRack> rackTranscoder,
Transcoder<SingularityMachineStateHistoryUpdate> stateHistoryTranscoder,
Transcoder<SingularityExpiringMachineState> expiringMachineStateTranscoder,
SingularityLeaderCache leaderCache) {
super(curator, configuration, metricRegistry, rackTranscoder, stateHistoryTranscoder, expiringMachineStateTranscoder);

this.leaderCache = leaderCache;
}

@Override
protected String getRoot() {
return RACK_ROOT;
}

public void activateLeaderCache() {
leaderCache.cacheRacks(getObjects());
}

public Optional<SingularityRack> getRack(String rackName) {
if (leaderCache.active()) {
return leaderCache.getRack(rackName);
}

return getObject(rackName);
}

@Override
public int getNumActive() {
if (leaderCache.active()) {
return Math.toIntExact(leaderCache.getRacks().stream().filter(x -> x.getCurrentState().getState().equals(MachineState.ACTIVE)).count());
}

return super.getNumActive();
}

@Override
public void saveObject(SingularityRack rack) {
if (leaderCache.active()) {
leaderCache.putRack(rack);
}

super.saveObject(rack);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,70 @@
import org.apache.curator.framework.CuratorFramework;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.singularity.MachineState;
import com.hubspot.singularity.SingularityMachineStateHistoryUpdate;
import com.hubspot.singularity.SingularitySlave;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.Transcoder;
import com.hubspot.singularity.expiring.SingularityExpiringMachineState;
import com.hubspot.singularity.scheduler.SingularityLeaderCache;

@Singleton
public class SlaveManager extends AbstractMachineManager<SingularitySlave> {

private static final String SLAVE_ROOT = "/slaves";
private final SingularityLeaderCache leaderCache;

@Inject
public SlaveManager(CuratorFramework curator, SingularityConfiguration configuration, MetricRegistry metricRegistry, Transcoder<SingularitySlave> slaveTranscoder,
Transcoder<SingularityMachineStateHistoryUpdate> stateHistoryTranscoder, Transcoder<SingularityExpiringMachineState> expiringMachineStateTranscoder) {
public SlaveManager(CuratorFramework curator,
SingularityConfiguration configuration,
MetricRegistry metricRegistry,
Transcoder<SingularitySlave> slaveTranscoder,
Transcoder<SingularityMachineStateHistoryUpdate> stateHistoryTranscoder,
Transcoder<SingularityExpiringMachineState> expiringMachineStateTranscoder,
SingularityLeaderCache leaderCache) {
super(curator, configuration, metricRegistry, slaveTranscoder, stateHistoryTranscoder, expiringMachineStateTranscoder);
this.leaderCache = leaderCache;
}

@Override
protected String getRoot() {
return SLAVE_ROOT;
}

public void activateLeaderCache() {
leaderCache.cacheSlaves(getObjects());
}

public Optional<SingularitySlave> getSlave(String slaveId) {
if (leaderCache.active()) {
return leaderCache.getSlave(slaveId);
}

return getObject(slaveId);
}

@Override
public int getNumActive() {
if (leaderCache.active()) {
return Math.toIntExact(leaderCache.getSlaves().stream().filter(x -> x.getCurrentState().getState().equals(MachineState.ACTIVE)).count());
}

return super.getNumActive();
}

@Override
public void saveObject(SingularitySlave slave) {
if (leaderCache.active()) {
leaderCache.putSlave(slave);
}

super.saveObject(slave);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ public Map<SingularityTaskId, SingularityTask> getTasks(Iterable<SingularityTask
}

private void createTaskAndDeletePendingTaskPrivate(SingularityTask task) throws Exception {
// TODO: Should more of the below be done within a transaction?
deletePendingTask(task.getTaskRequest().getPendingTask().getPendingTaskId());

final long now = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hubspot.mesos.MesosUtils;
import com.hubspot.mesos.Resources;
import com.hubspot.singularity.RequestType;
Expand All @@ -37,8 +36,8 @@
import com.hubspot.singularity.data.DisasterManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.UsageManager;
import com.hubspot.singularity.scheduler.SingularityLeaderCache;
import com.hubspot.singularity.scheduler.SingularityScheduler;
import com.hubspot.singularity.scheduler.SingularitySchedulerStateCache;

@Singleton
public class SingularityMesosOfferScheduler {
Expand All @@ -60,7 +59,7 @@ public class SingularityMesosOfferScheduler {
private final DeployManager deployManager;


private final Provider<SingularitySchedulerStateCache> stateCacheProvider;
private final SingularityLeaderCache leaderCache;

@Inject
public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
Expand All @@ -73,7 +72,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
SingularitySlaveAndRackManager slaveAndRackManager,
SingularityTaskSizeOptimizer taskSizeOptimizer,
SingularitySlaveAndRackHelper slaveAndRackHelper,
Provider<SingularitySchedulerStateCache> stateCacheProvider,
SingularityLeaderCache leaderCache,
DisasterManager disasterManager,
UsageManager usageManager,
DeployManager deployManager) {
Expand All @@ -85,7 +84,7 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
this.mesosTaskBuilder = mesosTaskBuilder;
this.slaveAndRackManager = slaveAndRackManager;
this.taskSizeOptimizer = taskSizeOptimizer;
this.stateCacheProvider = stateCacheProvider;
this.leaderCache = leaderCache;
this.slaveAndRackHelper = slaveAndRackHelper;
this.disasterManager = disasterManager;
this.taskPrioritizer = taskPrioritizer;
Expand All @@ -96,10 +95,9 @@ public SingularityMesosOfferScheduler(MesosConfiguration mesosConfiguration,
public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> offers) {
boolean useTaskCredits = disasterManager.isTaskCreditEnabled();
int taskCredits = useTaskCredits ? disasterManager.getUpdatedCreditCount() : -1;
final SingularitySchedulerStateCache stateCache = stateCacheProvider.get();

scheduler.checkForDecomissions(stateCache);
scheduler.drainPendingQueue(stateCache);
scheduler.checkForDecomissions();
scheduler.drainPendingQueue();

final Map<String, SingularityTaskRequestHolder> pendingTaskIdToTaskRequest = getDueTaskRequestHolders();

Expand Down Expand Up @@ -151,7 +149,7 @@ public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> o
}

Optional<SingularitySlaveUsageWithId> maybeSlaveUsage = getSlaveUsage(currentSlaveUsages, offerHolder.getSlaveId());
double score = score(offerHolder, stateCache, tasksPerOfferPerRequest, taskRequestHolder, maybeSlaveUsage);
double score = score(offerHolder, tasksPerOfferPerRequest, taskRequestHolder, maybeSlaveUsage);
LOG.trace("Scored {} | Task {} | Offer - mem {} - cpu {} | Slave {} | maybeSlaveUsage - {}", score, taskRequestHolder.getTaskRequest().getPendingTask().getPendingTaskId().getId(),
MesosUtils.getMemory(offerHolder.getCurrentResources(), Optional.absent()), MesosUtils.getNumCpus(offerHolder.getCurrentResources(), Optional.absent()), offerHolder.getHostname(), maybeSlaveUsage);

Expand All @@ -164,7 +162,7 @@ public List<SingularityOfferHolder> checkOffers(final Collection<Protos.Offer> o
SingularityOfferHolder bestOffer = Collections.max(scorePerOffer.entrySet(), Map.Entry.comparingByValue()).getKey();
LOG.info("Best offer {}/1 is on {}", scorePerOffer.get(bestOffer), bestOffer.getSanitizedHost());

SingularityTask task = acceptTask(bestOffer, stateCache, tasksPerOfferPerRequest, taskRequestHolder);
SingularityTask task = acceptTask(bestOffer, tasksPerOfferPerRequest, taskRequestHolder);

tasksScheduled++;
if (useTaskCredits) {
Expand Down Expand Up @@ -240,7 +238,7 @@ private Optional<SingularitySlaveUsageWithId> getSlaveUsage(List<SingularitySlav
return filteredSlaveUsages.size() == 1 ? Optional.of(filteredSlaveUsages.get(0)) : Optional.absent();
}

private double score(SingularityOfferHolder offerHolder, SingularitySchedulerStateCache stateCache, Map<String, Map<String, Integer>> tasksPerOfferHostPerRequest,
private double score(SingularityOfferHolder offerHolder, Map<String, Map<String, Integer>> tasksPerOfferHostPerRequest,
SingularityTaskRequestHolder taskRequestHolder, Optional<SingularitySlaveUsageWithId> maybeSlaveUsage) {

final SingularityTaskRequest taskRequest = taskRequestHolder.getTaskRequest();
Expand All @@ -255,7 +253,7 @@ private double score(SingularityOfferHolder offerHolder, SingularitySchedulerSta
return 0;
}

if (isTooManyInstancesForRequest(taskRequest, stateCache)) {
if (isTooManyInstancesForRequest(taskRequest)) {
LOG.debug("Skipping pending task {}, too many instances already running", pendingTaskId);
return 0;
}
Expand All @@ -268,7 +266,7 @@ private double score(SingularityOfferHolder offerHolder, SingularitySchedulerSta

final boolean matchesResources = MesosUtils.doesOfferMatchResources(taskRequest.getRequest().getRequiredRole(),
taskRequestHolder.getTotalResources(), offerHolder.getCurrentResources(), taskRequestHolder.getRequestedPorts());
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest, stateCache);
final SlaveMatchState slaveMatchState = slaveAndRackManager.doesOfferMatch(offerHolder, taskRequest);

if (matchesResources && slaveMatchState.isMatchAllowed()) {
return score(offerHolder.getHostname(), taskRequest, maybeSlaveUsage);
Expand Down Expand Up @@ -354,7 +352,7 @@ private double calculateScore(double longRunningMemUsedScore, double memFreeScor
return score;
}

private SingularityTask acceptTask(SingularityOfferHolder offerHolder, SingularitySchedulerStateCache stateCache, Map<String, Map<String, Integer>> tasksPerOfferPerRequest, SingularityTaskRequestHolder taskRequestHolder) {
private SingularityTask acceptTask(SingularityOfferHolder offerHolder, Map<String, Map<String, Integer>> tasksPerOfferPerRequest, SingularityTaskRequestHolder taskRequestHolder) {
final SingularityTaskRequest taskRequest = taskRequestHolder.getTaskRequest();
final SingularityTask task = mesosTaskBuilder.buildTask(offerHolder, offerHolder.getCurrentResources(), taskRequest, taskRequestHolder.getTaskResources(), taskRequestHolder.getExecutorResources());

Expand All @@ -368,10 +366,7 @@ private SingularityTask acceptTask(SingularityOfferHolder offerHolder, Singulari

taskManager.createTaskAndDeletePendingTask(zkTask);

stateCache.getActiveTaskIds().add(task.getTaskId());
stateCache.getActiveTaskIdsForRequest(task.getTaskRequest().getRequest().getId()).add(task.getTaskId());
addRequestToMapByOfferHost(tasksPerOfferPerRequest, offerHolder.getHostname(), taskRequest.getRequest().getId());
stateCache.getScheduledTasks().remove(taskRequest.getPendingTask());

return task;
}
Expand Down Expand Up @@ -402,11 +397,11 @@ private boolean tooManyTasksPerOfferHostForRequest(Map<String, Map<String, Integ
return maxPerOfferPerRequest > 0 && tasksPerOfferHostPerRequest.get(hostname).get(taskRequest.getRequest().getId()) > maxPerOfferPerRequest;
}

private boolean isTooManyInstancesForRequest(SingularityTaskRequest taskRequest, SingularitySchedulerStateCache stateCache) {
private boolean isTooManyInstancesForRequest(SingularityTaskRequest taskRequest) {
if (taskRequest.getRequest().getRequestType() == RequestType.ON_DEMAND) {
int maxActiveOnDemandTasks = taskRequest.getRequest().getInstances().or(configuration.getMaxActiveOnDemandTasksPerRequest());
if (maxActiveOnDemandTasks > 0) {
int activeTasksForRequest = stateCache.getActiveTaskIdsForRequest(taskRequest.getRequest().getId()).size();
int activeTasksForRequest = leaderCache.getActiveTaskIdsForRequest(taskRequest.getRequest().getId()).size();
LOG.debug("Running {} instances for request {}. Max is {}", activeTasksForRequest, taskRequest.getRequest().getId(), maxActiveOnDemandTasks);
if (activeTasksForRequest >= maxActiveOnDemandTasks) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.google.common.base.Strings;
import com.google.common.collect.Multiset;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.hubspot.mesos.JavaUtils;
Expand All @@ -36,9 +35,9 @@
import com.hubspot.singularity.data.transcoders.IdTranscoder;
import com.hubspot.singularity.data.transcoders.SingularityTranscoderException;
import com.hubspot.singularity.scheduler.SingularityHealthchecker;
import com.hubspot.singularity.scheduler.SingularityLeaderCache;
import com.hubspot.singularity.scheduler.SingularityNewTaskChecker;
import com.hubspot.singularity.scheduler.SingularityScheduler;
import com.hubspot.singularity.scheduler.SingularitySchedulerStateCache;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;

@Singleton
Expand All @@ -55,7 +54,7 @@ public class SingularityMesosStatusUpdateHandler {
private final SingularitySlaveAndRackManager slaveAndRackManager;
private final SingularityMesosExecutorInfoSupport logSupport;
private final SingularityScheduler scheduler;
private final Provider<SingularitySchedulerStateCache> stateCacheProvider;
private final SingularityLeaderCache leaderCache;
private final String serverId;
private final SchedulerDriverSupplier schedulerDriverSupplier;
private final SingularitySchedulerLock schedulerLock;
Expand All @@ -68,7 +67,7 @@ public class SingularityMesosStatusUpdateHandler {
public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManager deployManager, RequestManager requestManager,
IdTranscoder<SingularityTaskId> taskIdTranscoder, SingularityExceptionNotifier exceptionNotifier, SingularityHealthchecker healthchecker,
SingularityNewTaskChecker newTaskChecker, SingularitySlaveAndRackManager slaveAndRackManager, SingularityMesosExecutorInfoSupport logSupport, SingularityScheduler scheduler,
Provider<SingularitySchedulerStateCache> stateCacheProvider, @Named(SingularityMainModule.SERVER_ID_PROPERTY) String serverId,
SingularityLeaderCache leaderCache, @Named(SingularityMainModule.SERVER_ID_PROPERTY) String serverId,
SchedulerDriverSupplier schedulerDriverSupplier,
SingularitySchedulerLock schedulerLock,
SingularityConfiguration configuration,
Expand All @@ -85,7 +84,7 @@ public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManage
this.slaveAndRackManager = slaveAndRackManager;
this.logSupport = logSupport;
this.scheduler = scheduler;
this.stateCacheProvider = stateCacheProvider;
this.leaderCache = leaderCache;
this.serverId = serverId;
this.schedulerDriverSupplier = schedulerDriverSupplier;
this.schedulerLock = schedulerLock;
Expand Down Expand Up @@ -243,11 +242,9 @@ private void unsafeProcessStatusUpdate(Protos.TaskStatus status) {

taskManager.deleteKilledRecord(taskIdObj);

SingularitySchedulerStateCache stateCache = stateCacheProvider.get();
slaveAndRackManager.checkStateAfterFinishedTask(taskIdObj, status.getSlaveId().getValue(), leaderCache);

slaveAndRackManager.checkStateAfterFinishedTask(taskIdObj, status.getSlaveId().getValue(), stateCache);

scheduler.handleCompletedTask(task, taskIdObj, isActiveTask, timestamp, taskState, taskHistoryUpdateCreateResult, stateCache, status);
scheduler.handleCompletedTask(task, taskIdObj, isActiveTask, timestamp, taskState, taskHistoryUpdateCreateResult, status);
}

saveNewTaskStatusHolder(taskIdObj, newTaskStatusHolder, taskState);
Expand Down
Loading