Skip to content

Commit

Permalink
use Histogram for task reconciliation times
Browse files Browse the repository at this point in the history
  • Loading branch information
tpetr committed Aug 30, 2016
1 parent f78c4a3 commit f753fe2
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 17 deletions.
@@ -1,25 +1,53 @@
package com.hubspot.singularity; package com.hubspot.singularity;


import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;


public class SingularityTaskReconciliationStatistics { public class SingularityTaskReconciliationStatistics {
private final long taskReconciliationStartedAt; private final long taskReconciliationStartedAt;
private final long taskReconciliationDurationMillis; private final long taskReconciliationDurationMillis;
private final int taskReconciliationIterations; private final int taskReconciliationIterations;
private final List<Integer> taskReconciliationCounts; private final long taskReconciliationResponseCount;
private final long taskReconciliationResponseMax;
private final double taskReconciliationResponseMean;
private final long taskReconciliationResponseMin;
private final double taskReconciliationResponseP50;
private final double taskReconciliationResponseP75;
private final double taskReconciliationResponseP95;
private final double taskReconciliationResponseP98;
private final double taskReconciliationResponseP99;
private final double taskReconciliationResponseP999;
private final double taskReconciliationResponseStddev;


@JsonCreator @JsonCreator
public SingularityTaskReconciliationStatistics(@JsonProperty("taskReconciliationStartedAt") long taskReconciliationStartedAt, public SingularityTaskReconciliationStatistics(@JsonProperty("taskReconciliationStartedAt") long taskReconciliationStartedAt,
@JsonProperty("taskReconciliationDurationMillis") long taskReconciliationDurationMillis, @JsonProperty("taskReconciliationDurationMillis") long taskReconciliationDurationMillis,
@JsonProperty("taskReconciliationIterations") int taskReconciliationIterations, @JsonProperty("taskReconciliationIterations") int taskReconciliationIterations,
@JsonProperty("taskReconciliationCounts") List<Integer> taskReconciliationCounts) { @JsonProperty("taskReconciliationResponseCount") long taskReconciliationResponseCount,
@JsonProperty("taskReconciliationResponseMax") long taskReconciliationResponseMax,
@JsonProperty("taskReconciliationResponseMean") double taskReconciliationResponseMean,
@JsonProperty("taskReconciliationResponseMin") long taskReconciliationResponseMin,
@JsonProperty("taskReconciliationResponseP50") double taskReconciliationResponseP50,
@JsonProperty("taskReconciliationResponseP75") double taskReconciliationResponseP75,
@JsonProperty("taskReconciliationResponseP95") double taskReconciliationResponseP95,
@JsonProperty("taskReconciliationResponseP98") double taskReconciliationResponseP98,
@JsonProperty("taskReconciliationResponseP99") double taskReconciliationResponseP99,
@JsonProperty("taskReconciliationResponseP999") double taskReconciliationResponseP999,
@JsonProperty("taskReconciliationResponseStddev") double taskReconciliationResponseStddev) {
this.taskReconciliationStartedAt = taskReconciliationStartedAt; this.taskReconciliationStartedAt = taskReconciliationStartedAt;
this.taskReconciliationDurationMillis = taskReconciliationDurationMillis; this.taskReconciliationDurationMillis = taskReconciliationDurationMillis;
this.taskReconciliationIterations = taskReconciliationIterations; this.taskReconciliationIterations = taskReconciliationIterations;
this.taskReconciliationCounts = taskReconciliationCounts; this.taskReconciliationResponseCount = taskReconciliationResponseCount;
this.taskReconciliationResponseMax = taskReconciliationResponseMax;
this.taskReconciliationResponseMean = taskReconciliationResponseMean;
this.taskReconciliationResponseMin = taskReconciliationResponseMin;
this.taskReconciliationResponseP50 = taskReconciliationResponseP50;
this.taskReconciliationResponseP75 = taskReconciliationResponseP75;
this.taskReconciliationResponseP95 = taskReconciliationResponseP95;
this.taskReconciliationResponseP98 = taskReconciliationResponseP98;
this.taskReconciliationResponseP99 = taskReconciliationResponseP99;
this.taskReconciliationResponseP999 = taskReconciliationResponseP999;
this.taskReconciliationResponseStddev = taskReconciliationResponseStddev;
} }


public long getTaskReconciliationStartedAt() { public long getTaskReconciliationStartedAt() {
Expand All @@ -34,8 +62,48 @@ public int getTaskReconciliationIterations() {
return taskReconciliationIterations; return taskReconciliationIterations;
} }


public List<Integer> getTaskReconciliationCounts() { public long getTaskReconciliationResponseCount() {
return taskReconciliationCounts; return taskReconciliationResponseCount;
}

public long getTaskReconciliationResponseMax() {
return taskReconciliationResponseMax;
}

public double getTaskReconciliationResponseMean() {
return taskReconciliationResponseMean;
}

public long getTaskReconciliationResponseMin() {
return taskReconciliationResponseMin;
}

public double getTaskReconciliationResponseP50() {
return taskReconciliationResponseP50;
}

public double getTaskReconciliationResponseP75() {
return taskReconciliationResponseP75;
}

public double getTaskReconciliationResponseP95() {
return taskReconciliationResponseP95;
}

public double getTaskReconciliationResponseP98() {
return taskReconciliationResponseP98;
}

public double getTaskReconciliationResponseP99() {
return taskReconciliationResponseP99;
}

public double getTaskReconciliationResponseP999() {
return taskReconciliationResponseP999;
}

public double getTaskReconciliationResponseStddev() {
return taskReconciliationResponseStddev;
} }


@Override @Override
Expand All @@ -44,7 +112,17 @@ public String toString() {
"taskReconciliationStartedAt=" + taskReconciliationStartedAt + "taskReconciliationStartedAt=" + taskReconciliationStartedAt +
", taskReconciliationDurationMillis=" + taskReconciliationDurationMillis + ", taskReconciliationDurationMillis=" + taskReconciliationDurationMillis +
", taskReconciliationIterations=" + taskReconciliationIterations + ", taskReconciliationIterations=" + taskReconciliationIterations +
", taskReconciliationCounts=" + taskReconciliationCounts + ", taskReconciliationResponseCount=" + taskReconciliationResponseCount +
", taskReconciliationResponseMax=" + taskReconciliationResponseMax +
", taskReconciliationResponseMean=" + taskReconciliationResponseMean +
", taskReconciliationResponseMin=" + taskReconciliationResponseMin +
", taskReconciliationResponseP50=" + taskReconciliationResponseP50 +
", taskReconciliationResponseP75=" + taskReconciliationResponseP75 +
", taskReconciliationResponseP95=" + taskReconciliationResponseP95 +
", taskReconciliationResponseP98=" + taskReconciliationResponseP98 +
", taskReconciliationResponseP99=" + taskReconciliationResponseP99 +
", taskReconciliationResponseP999=" + taskReconciliationResponseP999 +
", taskReconciliationResponseStddev=" + taskReconciliationResponseStddev +
'}'; '}';
} }
} }
@@ -1,6 +1,5 @@
package com.hubspot.singularity.scheduler; package com.hubspot.singularity.scheduler;


import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
Expand All @@ -18,6 +17,9 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand Down Expand Up @@ -106,22 +108,20 @@ public ReconciliationState startReconciliation() {
SchedulerDriver driver = schedulerDriver.get(); SchedulerDriver driver = schedulerDriver.get();
driver.reconcileTasks(Collections.<TaskStatus> emptyList()); driver.reconcileTasks(Collections.<TaskStatus> emptyList());


scheduleReconciliationCheck(driver, taskReconciliationStartedAt, activeTaskIds, 0, new ArrayList<Integer>()); scheduleReconciliationCheck(driver, taskReconciliationStartedAt, activeTaskIds, 0, new Histogram(new UniformReservoir()));


return ReconciliationState.STARTED; return ReconciliationState.STARTED;
} }


private void scheduleReconciliationCheck(final SchedulerDriver driver, final long reconciliationStart, final Collection<SingularityTaskId> remainingTaskIds, final int numTimes, final List<Integer> remainingTaskCounts) { private void scheduleReconciliationCheck(final SchedulerDriver driver, final long reconciliationStart, final Collection<SingularityTaskId> remainingTaskIds, final int numTimes, final Histogram histogram) {
LOG.info("Scheduling reconciliation check #{} - {} tasks left - waiting {}", numTimes + 1, remainingTaskIds.size(), JavaUtils.durationFromMillis(configuration.getCheckReconcileWhenRunningEveryMillis())); LOG.info("Scheduling reconciliation check #{} - {} tasks left - waiting {}", numTimes + 1, remainingTaskIds.size(), JavaUtils.durationFromMillis(configuration.getCheckReconcileWhenRunningEveryMillis()));


remainingTaskCounts.add(remainingTaskIds.size());

executorService.schedule(new Runnable() { executorService.schedule(new Runnable() {


@Override @Override
public void run() { public void run() {
try { try {
checkReconciliation(driver, reconciliationStart, remainingTaskIds, numTimes + 1, remainingTaskCounts); checkReconciliation(driver, reconciliationStart, remainingTaskIds, numTimes + 1, histogram);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("While checking for reconciliation tasks", t); LOG.error("While checking for reconciliation tasks", t);
exceptionNotifier.notify(t); exceptionNotifier.notify(t);
Expand All @@ -131,12 +131,13 @@ public void run() {
}, configuration.getCheckReconcileWhenRunningEveryMillis(), TimeUnit.MILLISECONDS); }, configuration.getCheckReconcileWhenRunningEveryMillis(), TimeUnit.MILLISECONDS);
} }


private void checkReconciliation(final SchedulerDriver driver, final long reconciliationStart, final Collection<SingularityTaskId> remainingTaskIds, final int numTimes, final List<Integer> remainingTaskCounts) { private void checkReconciliation(final SchedulerDriver driver, final long reconciliationStart, final Collection<SingularityTaskId> remainingTaskIds, final int numTimes, final Histogram histogram) {
final List<SingularityTaskStatusHolder> taskStatusHolders = taskManager.getLastActiveTaskStatusesFor(remainingTaskIds); final List<SingularityTaskStatusHolder> taskStatusHolders = taskManager.getLastActiveTaskStatusesFor(remainingTaskIds);
final List<TaskStatus> taskStatuses = Lists.newArrayListWithCapacity(taskStatusHolders.size()); final List<TaskStatus> taskStatuses = Lists.newArrayListWithCapacity(taskStatusHolders.size());


for (SingularityTaskStatusHolder taskStatusHolder : taskStatusHolders) { for (SingularityTaskStatusHolder taskStatusHolder : taskStatusHolders) {
if (taskStatusHolder.getServerId().equals(serverId) && taskStatusHolder.getServerTimestamp() > reconciliationStart) { if (taskStatusHolder.getServerId().equals(serverId) && taskStatusHolder.getServerTimestamp() > reconciliationStart) {
histogram.update(taskStatusHolder.getServerTimestamp() - reconciliationStart);
continue; continue;
} }


Expand All @@ -160,7 +161,8 @@ private void checkReconciliation(final SchedulerDriver driver, final long reconc
if (taskStatuses.isEmpty()) { if (taskStatuses.isEmpty()) {
LOG.info("Task reconciliation ended after {} checks and {}", numTimes, JavaUtils.duration(reconciliationStart)); LOG.info("Task reconciliation ended after {} checks and {}", numTimes, JavaUtils.duration(reconciliationStart));


stateManager.saveTaskReconciliationStatistics(new SingularityTaskReconciliationStatistics(reconciliationStart, System.currentTimeMillis() - reconciliationStart, numTimes, remainingTaskCounts)); final Snapshot snapshot = histogram.getSnapshot();
stateManager.saveTaskReconciliationStatistics(new SingularityTaskReconciliationStatistics(reconciliationStart, System.currentTimeMillis() - reconciliationStart, numTimes, histogram.getCount(), snapshot.getMax(), snapshot.getMean(), snapshot.getMin(), snapshot.getMedian(), snapshot.get75thPercentile(), snapshot.get95thPercentile(), snapshot.get98thPercentile(), snapshot.get99thPercentile(), snapshot.get999thPercentile(), snapshot.getStdDev()));


isRunningReconciliation.set(false); isRunningReconciliation.set(false);


Expand All @@ -171,6 +173,6 @@ private void checkReconciliation(final SchedulerDriver driver, final long reconc


driver.reconcileTasks(taskStatuses); driver.reconcileTasks(taskStatuses);


scheduleReconciliationCheck(driver, reconciliationStart, remainingTaskIds, numTimes, remainingTaskCounts); scheduleReconciliationCheck(driver, reconciliationStart, remainingTaskIds, numTimes, histogram);
} }
} }

0 comments on commit f753fe2

Please sign in to comment.