Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Prune updates that have no surviving job keys in the TaskStore.
We are running into a situation where we have a lot of short-lived ad-hoc services launched and their updates are sticking around for 30 days, even though the tasks are garbage collected much sooner. This change picks up those updates and prunes them as soon as the tasks are gone.

Reviewed at https://reviews.apache.org/r/68071/
  • Loading branch information
DavidMcLaughlin committed Jul 26, 2018
1 parent 3738c3e commit d8cecba05b2eb3b1961b784d524365c03f3fbd09
Showing 2 changed files with 27 additions and 14 deletions.
@@ -108,7 +108,6 @@ void runForTest() {
@Override
protected void runOneIteration() {
storage.write((NoResult.Quiet) storeProvider -> {

List<IJobUpdateSummary> completedUpdates = storeProvider.getJobUpdateStore()
.fetchJobUpdates(IJobUpdateQuery.build(
new JobUpdateQuery().setUpdateStatuses(TERMINAL_STATES)))
@@ -120,12 +119,16 @@ protected void runOneIteration() {
Predicate<IJobUpdateSummary> expiredFilter =
s -> s.getState().getCreatedTimestampMs() < cutoff;

// Set up a predicate to detect updates that have no tasks left in the store.
Set<IJobKey> currentJobs = storeProvider.getTaskStore().getJobKeys();
Predicate<IJobUpdateSummary> orphanFilter = u -> !currentJobs.contains(u.getKey().getJob());

ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();

// Gather updates based on time threshold.
// Gather updates based on time threshold or ones that have been orphaned.
pruneBuilder.addAll(completedUpdates
.stream()
.filter(expiredFilter)
.filter(expiredFilter.or(orphanFilter))
.map(IJobUpdateSummary::getKey)
.collect(Collectors.toList()));

@@ -38,6 +38,7 @@
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
@@ -53,6 +54,7 @@
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
import static org.junit.Assert.assertEquals;

public class JobUpdateHistoryPrunerTest {
@@ -67,26 +69,28 @@ public void setUp() {
@Test
public void testPruneHistory() {
IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
IJobKey missingKey = JobKeys.from("missing-role", "missing-env", "missing-name");

IJobUpdateDetails update1 = makeAndSave(makeKey("u1"), ROLLING_BACK, 123L, 123L);
IJobUpdateDetails update2 = makeAndSave(makeKey("u2"), ABORTED, 124L, 124L);
IJobUpdateDetails update3 = makeAndSave(makeKey("u3"), ROLLED_BACK, 125L, 125L);
IJobUpdateDetails update4 = makeAndSave(makeKey("u4"), FAILED, 126L, 126L);
IJobUpdateDetails update5 = makeAndSave(makeKey(job2, "u5"), ERROR, 123L, 123L);
IJobUpdateDetails update6 = makeAndSave(makeKey(job2, "u6"), FAILED, 125L, 125L);
IJobUpdateDetails update7 = makeAndSave(makeKey(job2, "u7"), ROLLING_FORWARD, 126L, 126L);
IJobUpdateDetails update1 = makeAndSave(makeKey("u1"), ROLLING_BACK, 123L, 123L, true);
IJobUpdateDetails update2 = makeAndSave(makeKey("u2"), ABORTED, 124L, 124L, true);
IJobUpdateDetails update3 = makeAndSave(makeKey("u3"), ROLLED_BACK, 125L, 125L, true);
IJobUpdateDetails update4 = makeAndSave(makeKey("u4"), FAILED, 126L, 126L, true);
IJobUpdateDetails update5 = makeAndSave(makeKey(job2, "u5"), ERROR, 123L, 123L, true);
IJobUpdateDetails update6 = makeAndSave(makeKey(job2, "u6"), FAILED, 125L, 125L, true);
IJobUpdateDetails update7 = makeAndSave(makeKey(job2, "u7"), ROLLING_FORWARD, 126L, 126L, true);
makeAndSave(makeKey(missingKey, "u1"), ERROR, 123L, 123L, false);

long pruningThreshold = 120;

// No updates pruned.
// Only orphaned updates pruned.
pruneHistory(3, pruningThreshold);
assertRetainedUpdates(update1, update2, update3, update4, update5, update6, update7);

// 1 update pruned.
// 2 updates pruned.
pruneHistory(2, pruningThreshold);
assertRetainedUpdates(update1, update3, update4, update5, update6, update7);

// 2 update pruned.
// 3 updates pruned.
pruneHistory(1, pruningThreshold);
assertRetainedUpdates(update1, update4, update6, update7);

@@ -137,7 +141,8 @@ private IJobUpdateDetails makeAndSave(
IJobUpdateKey key,
JobUpdateStatus status,
long createdMs,
long lastMs) {
long lastMs,
boolean hasTasks) {

IJobUpdateDetails update = IJobUpdateDetails.build(new JobUpdateDetails()
.setUpdateEvents(ImmutableList.of(
@@ -163,6 +168,11 @@ private IJobUpdateDetails makeAndSave(
store.saveJobUpdate(update.getUpdate());
update.getUpdateEvents().forEach(event -> store.saveJobUpdateEvent(key, event));
update.getInstanceEvents().forEach(event -> store.saveJobInstanceUpdateEvent(key, event));

TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
if (hasTasks) {
taskStore.saveTasks(ImmutableSet.of(makeTask("test", key.getJob())));
}
});
return update;
}

0 comments on commit d8cecba

Please sign in to comment.