Skip to content

Commit

Permalink
store stats in ZK, expose in separate endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tpetr committed Aug 29, 2016
1 parent 8aa4f5c commit f78c4a3
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 26 deletions.
Expand Up @@ -50,8 +50,6 @@ public class SingularityState {


private final Optional<Boolean> authDatastoreHealthy; private final Optional<Boolean> authDatastoreHealthy;


private final Optional<SingularityTaskReconciliationStatistics> lastTaskReconciliationStatistics;

@JsonCreator @JsonCreator
public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonProperty("activeRequests") int activeRequests, @JsonProperty("cooldownRequests") int cooldownRequests, public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonProperty("activeRequests") int activeRequests, @JsonProperty("cooldownRequests") int cooldownRequests,
@JsonProperty("pausedRequests") int pausedRequests, @JsonProperty("scheduledTasks") int scheduledTasks, @JsonProperty("pendingRequests") int pendingRequests, @JsonProperty("lbCleanupTasks") int lbCleanupTasks, @JsonProperty("pausedRequests") int pausedRequests, @JsonProperty("scheduledTasks") int scheduledTasks, @JsonProperty("pendingRequests") int pendingRequests, @JsonProperty("lbCleanupTasks") int lbCleanupTasks,
Expand All @@ -61,8 +59,7 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope
@JsonProperty("lateTasks") int lateTasks, @JsonProperty("futureTasks") int futureTasks, @JsonProperty("maxTaskLag") long maxTaskLag, @JsonProperty("generatedAt") long generatedAt, @JsonProperty("lateTasks") int lateTasks, @JsonProperty("futureTasks") int futureTasks, @JsonProperty("maxTaskLag") long maxTaskLag, @JsonProperty("generatedAt") long generatedAt,
@JsonProperty("overProvisionedRequestIds") List<String> overProvisionedRequestIds, @JsonProperty("underProvisionedRequestIds") List<String> underProvisionedRequestIds, @JsonProperty("overProvisionedRequestIds") List<String> overProvisionedRequestIds, @JsonProperty("underProvisionedRequestIds") List<String> underProvisionedRequestIds,
@JsonProperty("overProvisionedRequests") int overProvisionedRequests, @JsonProperty("underProvisionedRequests") int underProvisionedRequests, @JsonProperty("finishedRequests") int finishedRequests, @JsonProperty("overProvisionedRequests") int overProvisionedRequests, @JsonProperty("underProvisionedRequests") int underProvisionedRequests, @JsonProperty("finishedRequests") int finishedRequests,
@JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy, @JsonProperty("unknownRacks") int unknownRacks, @JsonProperty("unknownSlaves") int unknownSlaves, @JsonProperty("authDatastoreHealthy") Optional<Boolean> authDatastoreHealthy) {
@JsonProperty("lastTaskReconciliationStatistics") Optional<SingularityTaskReconciliationStatistics> lastTaskReconciliationStatistics) {
this.activeTasks = activeTasks; this.activeTasks = activeTasks;
this.activeRequests = activeRequests; this.activeRequests = activeRequests;
this.pausedRequests = pausedRequests; this.pausedRequests = pausedRequests;
Expand Down Expand Up @@ -94,7 +91,6 @@ public SingularityState(@JsonProperty("activeTasks") int activeTasks, @JsonPrope
this.overProvisionedRequestIds = overProvisionedRequestIds; this.overProvisionedRequestIds = overProvisionedRequestIds;
this.underProvisionedRequestIds = underProvisionedRequestIds; this.underProvisionedRequestIds = underProvisionedRequestIds;
this.authDatastoreHealthy = authDatastoreHealthy; this.authDatastoreHealthy = authDatastoreHealthy;
this.lastTaskReconciliationStatistics = lastTaskReconciliationStatistics;
} }


public int getFinishedRequests() { public int getFinishedRequests() {
Expand Down Expand Up @@ -235,10 +231,6 @@ public Optional<Boolean> getAuthDatastoreHealthy() {
return authDatastoreHealthy; return authDatastoreHealthy;
} }


public Optional<SingularityTaskReconciliationStatistics> getLastTaskReconciliationStatistics() {
return lastTaskReconciliationStatistics;
}

@Override @Override
public String toString() { public String toString() {
return "SingularityState [activeTasks=" + activeTasks + ", pausedRequests=" + pausedRequests + ", activeRequests=" + activeRequests + ", cooldownRequests=" + cooldownRequests + ", scheduledTasks=" + scheduledTasks return "SingularityState [activeTasks=" + activeTasks + ", pausedRequests=" + pausedRequests + ", activeRequests=" + activeRequests + ", cooldownRequests=" + cooldownRequests + ", scheduledTasks=" + scheduledTasks
Expand All @@ -247,7 +239,7 @@ public String toString() {
+ activeSlaves + ", deadSlaves=" + deadSlaves + ", decommissioningSlaves=" + decommissioningSlaves + ", unknownSlaves=" + unknownSlaves + ", activeRacks=" + activeRacks + ", deadRacks=" + activeSlaves + ", deadSlaves=" + deadSlaves + ", decommissioningSlaves=" + decommissioningSlaves + ", unknownSlaves=" + unknownSlaves + ", activeRacks=" + activeRacks + ", deadRacks="
+ deadRacks + ", decommissioningRacks=" + decommissioningRacks + ", unknownRacks=" + unknownRacks + ", oldestDeploy=" + oldestDeploy + ", numDeploys=" + numDeploys + ", generatedAt=" + deadRacks + ", decommissioningRacks=" + decommissioningRacks + ", unknownRacks=" + unknownRacks + ", oldestDeploy=" + oldestDeploy + ", numDeploys=" + numDeploys + ", generatedAt="
+ generatedAt + ", hostStates=" + hostStates + ", overProvisionedRequestIds=" + overProvisionedRequestIds + ", underProvisionedRequestIds=" + underProvisionedRequestIds + generatedAt + ", hostStates=" + hostStates + ", overProvisionedRequestIds=" + overProvisionedRequestIds + ", underProvisionedRequestIds=" + underProvisionedRequestIds
+ ", overProvisionedRequests=" + overProvisionedRequests + ", underProvisionedRequests=" + underProvisionedRequests + ", authDatastoreHealthy=" + authDatastoreHealthy + ", lastTaskReconciliationStatistics=" + lastTaskReconciliationStatistics + "]"; + ", overProvisionedRequests=" + overProvisionedRequests + ", underProvisionedRequests=" + underProvisionedRequests + ", authDatastoreHealthy=" + authDatastoreHealthy + "]";
} }


} }
Expand Up @@ -21,6 +21,7 @@
import com.google.inject.Inject; import com.google.inject.Inject;
import com.hubspot.mesos.CounterMap; import com.hubspot.mesos.CounterMap;
import com.hubspot.mesos.JavaUtils; import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityCreateResult;
import com.hubspot.singularity.SingularityHostState; import com.hubspot.singularity.SingularityHostState;
import com.hubspot.singularity.SingularityPendingDeploy; import com.hubspot.singularity.SingularityPendingDeploy;
import com.hubspot.singularity.SingularityPendingTaskId; import com.hubspot.singularity.SingularityPendingTaskId;
Expand All @@ -44,6 +45,7 @@ public class StateManager extends CuratorManager {


private static final String ROOT_PATH = "/hosts"; private static final String ROOT_PATH = "/hosts";
private static final String STATE_PATH = "/STATE"; private static final String STATE_PATH = "/STATE";
private static final String TASK_RECONCILIATION_STATISTICS_PATH = STATE_PATH + "/taskReconciliation";


private final RequestManager requestManager; private final RequestManager requestManager;
private final TaskManager taskManager; private final TaskManager taskManager;
Expand All @@ -54,12 +56,12 @@ public class StateManager extends CuratorManager {
private final Transcoder<SingularityHostState> hostStateTranscoder; private final Transcoder<SingularityHostState> hostStateTranscoder;
private final SingularityConfiguration singularityConfiguration; private final SingularityConfiguration singularityConfiguration;
private final SingularityAuthDatastore authDatastore; private final SingularityAuthDatastore authDatastore;
private final SingularityTaskReconciliation taskReconciliation; private final Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder;


@Inject @Inject
public StateManager(CuratorFramework curatorFramework, SingularityConfiguration configuration, MetricRegistry metricRegistry, RequestManager requestManager, TaskManager taskManager, public StateManager(CuratorFramework curatorFramework, SingularityConfiguration configuration, MetricRegistry metricRegistry, RequestManager requestManager, TaskManager taskManager,
DeployManager deployManager, SlaveManager slaveManager, RackManager rackManager, Transcoder<SingularityState> stateTranscoder, Transcoder<SingularityHostState> hostStateTranscoder, DeployManager deployManager, SlaveManager slaveManager, RackManager rackManager, Transcoder<SingularityState> stateTranscoder, Transcoder<SingularityHostState> hostStateTranscoder,
SingularityConfiguration singularityConfiguration, SingularityAuthDatastore authDatastore, SingularityTaskReconciliation taskReconciliation) { SingularityConfiguration singularityConfiguration, SingularityAuthDatastore authDatastore, Transcoder<SingularityTaskReconciliationStatistics> taskReconciliationStatisticsTranscoder) {
super(curatorFramework, configuration, metricRegistry); super(curatorFramework, configuration, metricRegistry);


this.requestManager = requestManager; this.requestManager = requestManager;
Expand All @@ -71,7 +73,15 @@ public StateManager(CuratorFramework curatorFramework, SingularityConfiguration
this.deployManager = deployManager; this.deployManager = deployManager;
this.singularityConfiguration = singularityConfiguration; this.singularityConfiguration = singularityConfiguration;
this.authDatastore = authDatastore; this.authDatastore = authDatastore;
this.taskReconciliation = taskReconciliation; this.taskReconciliationStatisticsTranscoder = taskReconciliationStatisticsTranscoder;
}

public SingularityCreateResult saveTaskReconciliationStatistics(SingularityTaskReconciliationStatistics taskReconciliationStatistics) {
return save(TASK_RECONCILIATION_STATISTICS_PATH, taskReconciliationStatistics, taskReconciliationStatisticsTranscoder);
}

public Optional<SingularityTaskReconciliationStatistics> getTaskReconciliationStatistics() {
return getData(TASK_RECONCILIATION_STATISTICS_PATH, taskReconciliationStatisticsTranscoder);
} }


public void save(SingularityHostState hostState) throws InterruptedException { public void save(SingularityHostState hostState) throws InterruptedException {
Expand Down Expand Up @@ -295,11 +305,9 @@ public SingularityState generateState(boolean includeRequestIds) {


final Optional<Boolean> authDatastoreHealthy = authDatastore.isHealthy(); final Optional<Boolean> authDatastoreHealthy = authDatastore.isHealthy();


final Optional<SingularityTaskReconciliationStatistics> lastTaskReconciliationStatistics = taskReconciliation.getLastTaskReconciliationStatistics();

return new SingularityState(activeTasks, numActiveRequests, cooldownRequests, numPausedRequests, scheduledTasks, pendingRequests, lbCleanupTasks, lbCleanupRequests, cleaningRequests, activeSlaves, return new SingularityState(activeTasks, numActiveRequests, cooldownRequests, numPausedRequests, scheduledTasks, pendingRequests, lbCleanupTasks, lbCleanupRequests, cleaningRequests, activeSlaves,
deadSlaves, decommissioningSlaves, activeRacks, deadRacks, decommissioningRacks, cleaningTasks, states, oldestDeploy, numDeploys, scheduledTasksInfo.getNumLateTasks(), deadSlaves, decommissioningSlaves, activeRacks, deadRacks, decommissioningRacks, cleaningTasks, states, oldestDeploy, numDeploys, scheduledTasksInfo.getNumLateTasks(),
scheduledTasksInfo.getNumFutureTasks(), scheduledTasksInfo.getMaxTaskLag(), System.currentTimeMillis(), includeRequestIds ? overProvisionedRequestIds : null, scheduledTasksInfo.getNumFutureTasks(), scheduledTasksInfo.getMaxTaskLag(), System.currentTimeMillis(), includeRequestIds ? overProvisionedRequestIds : null,
includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy, lastTaskReconciliationStatistics); includeRequestIds ? underProvisionedRequestIds : null, overProvisionedRequestIds.size(), underProvisionedRequestIds.size(), numFinishedRequests, unknownRacks, unknownSlaves, authDatastoreHealthy);
} }
} }
Expand Up @@ -36,6 +36,7 @@
import com.hubspot.singularity.SingularityTaskId; import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.SingularityTaskDestroyFrameworkMessage; import com.hubspot.singularity.SingularityTaskDestroyFrameworkMessage;
import com.hubspot.singularity.SingularityTaskMetadata; import com.hubspot.singularity.SingularityTaskMetadata;
import com.hubspot.singularity.SingularityTaskReconciliationStatistics;
import com.hubspot.singularity.SingularityTaskShellCommandRequest; import com.hubspot.singularity.SingularityTaskShellCommandRequest;
import com.hubspot.singularity.SingularityTaskShellCommandUpdate; import com.hubspot.singularity.SingularityTaskShellCommandUpdate;
import com.hubspot.singularity.SingularityTaskStatusHolder; import com.hubspot.singularity.SingularityTaskStatusHolder;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void configure(final Binder binder) {
bindTranscoder(binder).asJson(SingularityExpiringScale.class); bindTranscoder(binder).asJson(SingularityExpiringScale.class);
bindTranscoder(binder).asJson(SingularityExpiringSkipHealthchecks.class); bindTranscoder(binder).asJson(SingularityExpiringSkipHealthchecks.class);
bindTranscoder(binder).asJson(SingularityTaskDestroyFrameworkMessage.class); bindTranscoder(binder).asJson(SingularityTaskDestroyFrameworkMessage.class);
bindTranscoder(binder).asJson(SingularityTaskReconciliationStatistics.class);


bindTranscoder(binder).asCompressedJson(SingularityDeployHistory.class); bindTranscoder(binder).asCompressedJson(SingularityDeployHistory.class);
bindTranscoder(binder).asCompressedJson(SingularityDeploy.class); bindTranscoder(binder).asCompressedJson(SingularityDeploy.class);
Expand Down
Expand Up @@ -8,9 +8,11 @@
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;


import com.google.common.base.Optional;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.hubspot.singularity.SingularityService; import com.hubspot.singularity.SingularityService;
import com.hubspot.singularity.SingularityState; import com.hubspot.singularity.SingularityState;
import com.hubspot.singularity.SingularityTaskReconciliationStatistics;
import com.hubspot.singularity.data.StateManager; import com.hubspot.singularity.data.StateManager;
import com.wordnik.swagger.annotations.Api; import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation; import com.wordnik.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -47,4 +49,11 @@ public List<String> getUnderProvisionedRequestIds(@QueryParam("skipCache") boole
public List<String> getOverProvisionedRequestIds(@QueryParam("skipCache") boolean skipCache) { public List<String> getOverProvisionedRequestIds(@QueryParam("skipCache") boolean skipCache) {
return stateManager.getState(skipCache, true).getOverProvisionedRequestIds(); return stateManager.getState(skipCache, true).getOverProvisionedRequestIds();
} }

@GET
@Path("/task-reconciliation")
@ApiOperation("Retrieve information about the most recent task reconciliation")
public Optional<SingularityTaskReconciliationStatistics> getTaskReconciliationStatistics() {
return stateManager.getTaskReconciliationStatistics();
}
} }
Expand Up @@ -7,7 +7,6 @@
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;


import javax.inject.Singleton; import javax.inject.Singleton;


Expand All @@ -33,6 +32,7 @@
import com.hubspot.singularity.SingularityTaskReconciliationStatistics; import com.hubspot.singularity.SingularityTaskReconciliationStatistics;
import com.hubspot.singularity.SingularityTaskStatusHolder; import com.hubspot.singularity.SingularityTaskStatusHolder;
import com.hubspot.singularity.config.SingularityConfiguration; import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.StateManager;
import com.hubspot.singularity.data.TaskManager; import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.mesos.SchedulerDriverSupplier; import com.hubspot.singularity.mesos.SchedulerDriverSupplier;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier; import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
Expand All @@ -50,18 +50,19 @@ public class SingularityTaskReconciliation {
private final SingularityAbort abort; private final SingularityAbort abort;
private final SingularityExceptionNotifier exceptionNotifier; private final SingularityExceptionNotifier exceptionNotifier;
private final SchedulerDriverSupplier schedulerDriverSupplier; private final SchedulerDriverSupplier schedulerDriverSupplier;

private final StateManager stateManager;
private final AtomicReference<SingularityTaskReconciliationStatistics> taskReconciliationStatistics;


@Inject @Inject
public SingularityTaskReconciliation(SingularityManagedScheduledExecutorServiceFactory executorServiceFactory, public SingularityTaskReconciliation(SingularityManagedScheduledExecutorServiceFactory executorServiceFactory,
SingularityExceptionNotifier exceptionNotifier, SingularityExceptionNotifier exceptionNotifier,
TaskManager taskManager, TaskManager taskManager,
StateManager stateManager,
SingularityConfiguration configuration, SingularityConfiguration configuration,
@Named(SingularityMainModule.SERVER_ID_PROPERTY) String serverId, @Named(SingularityMainModule.SERVER_ID_PROPERTY) String serverId,
SingularityAbort abort, SingularityAbort abort,
SchedulerDriverSupplier schedulerDriverSupplier) { SchedulerDriverSupplier schedulerDriverSupplier) {
this.taskManager = taskManager; this.taskManager = taskManager;
this.stateManager = stateManager;
this.serverId = serverId; this.serverId = serverId;


this.exceptionNotifier = exceptionNotifier; this.exceptionNotifier = exceptionNotifier;
Expand All @@ -71,8 +72,6 @@ public SingularityTaskReconciliation(SingularityManagedScheduledExecutorServiceF


this.isRunningReconciliation = new AtomicBoolean(false); this.isRunningReconciliation = new AtomicBoolean(false);
this.executorService = executorServiceFactory.get(getClass().getSimpleName()); this.executorService = executorServiceFactory.get(getClass().getSimpleName());

this.taskReconciliationStatistics = new AtomicReference<>();
} }


enum ReconciliationState { enum ReconciliationState {
Expand All @@ -84,10 +83,6 @@ boolean isReconciliationRunning() {
return isRunningReconciliation.get(); return isRunningReconciliation.get();
} }


public Optional<SingularityTaskReconciliationStatistics> getLastTaskReconciliationStatistics() {
return Optional.fromNullable(taskReconciliationStatistics.get());
}

public ReconciliationState startReconciliation() { public ReconciliationState startReconciliation() {
final long taskReconciliationStartedAt = System.currentTimeMillis(); final long taskReconciliationStartedAt = System.currentTimeMillis();


Expand Down Expand Up @@ -165,7 +160,7 @@ private void checkReconciliation(final SchedulerDriver driver, final long reconc
if (taskStatuses.isEmpty()) { if (taskStatuses.isEmpty()) {
LOG.info("Task reconciliation ended after {} checks and {}", numTimes, JavaUtils.duration(reconciliationStart)); LOG.info("Task reconciliation ended after {} checks and {}", numTimes, JavaUtils.duration(reconciliationStart));


taskReconciliationStatistics.set(new SingularityTaskReconciliationStatistics(reconciliationStart, System.currentTimeMillis() - reconciliationStart, numTimes, remainingTaskCounts)); stateManager.saveTaskReconciliationStatistics(new SingularityTaskReconciliationStatistics(reconciliationStart, System.currentTimeMillis() - reconciliationStart, numTimes, remainingTaskCounts));


isRunningReconciliation.set(false); isRunningReconciliation.set(false);


Expand Down

0 comments on commit f78c4a3

Please sign in to comment.