diff --git a/Docs/reference/configuration.md b/Docs/reference/configuration.md index db4ab85fe9..fc9ed54e3d 100644 --- a/Docs/reference/configuration.md +++ b/Docs/reference/configuration.md @@ -124,8 +124,11 @@ These settings are less likely to be changed, but were included in the configura | newTaskCheckerBaseDelaySeconds | 1 | Added to the the amount of deploy to wait before checking a new task | long | | allowTestResourceCalls | false | If true, allows calls to be made to the test resource, which can test internal methods | boolean | | deleteDeploysFromZkWhenNoDatabaseAfterHours | 336 (14 days) | Delete deploys from zk when they are older than this if we are not using a database | long | +| maxStaleDeploysPerRequestInZkWhenNoDatabase | infinite (disabled) | Delete oldest deploys from zk when there are more than this number for a given request, if we're not already persisting them to a database | int | | deleteStaleRequestsFromZkWhenNoDatabaseAfterHours | 336 (14 days) | Delete stale requests after this amount of time if we are not using a database | long | +| maxRequestsWithHistoryInZkWhenNoDatabase | infinite (disabled) | Delete history of oldest requests from zk when there are more than this number of requests, if we're not already persisting them to a database | int | | deleteTasksFromZkWhenNoDatabaseAfterHours | 168 (7 days) | Delete old tasks from zk after this amount of time if we are not using a database | long | +| maxStaleTasksPerRequestInZkWhenNoDatabase | infinite (disabled) | Delete oldest tasks from zk when there are more than this number for a given request, if we're not already persisting them to a database | int | | deleteDeadSlavesAfterHours | 168 (7 days) | Remove dead slaves from the list after this amount of time | long | | deleteUndeliverableWebhooksAfterHours | 168 (7 days) | Delete (and stop retrying) failed webhooks after this amount of time | long | | waitForListeners | true | If true, the event system waits for all listeners having processed an event. | boolean | diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityRequestHistory.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityRequestHistory.java index 3fc5f69f01..89577e15e0 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityRequestHistory.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityRequestHistory.java @@ -7,7 +7,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ComparisonChain; -public class SingularityRequestHistory implements Comparable { +public class SingularityRequestHistory implements Comparable, SingularityHistoryItem { private final long createdAt; private final Optional user; @@ -86,6 +86,12 @@ public Optional getMessage() { return message; } + @Override + @JsonIgnore + public long getCreateTimestampForCalculatingHistoryAge() { + return createdAt; + } + @Override public String toString() { return "SingularityRequestHistory [createdAt=" + createdAt + ", user=" + user + ", eventType=" + eventType + ", request=" + request + ", message=" + message + "]"; diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskId.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskId.java index 423435dfa9..8c02112628 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskId.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTaskId.java @@ -65,6 +65,15 @@ public int compare(SingularityTaskId o1, SingularityTaskId o2) { }; + public static Comparator STARTED_AT_COMPARATOR_DESC = new Comparator() { + + @Override + public int compare(SingularityTaskId o1, SingularityTaskId o2) { + return Long.compare(o2.startedAt, o1.startedAt); + } + + }; + public static Predicate notIn(Collection exclude) { return Predicates.not(Predicates.in(exclude)); } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java b/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java index afd75db853..09457c8274 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java @@ -83,12 +83,18 @@ public class SingularityConfiguration extends Configuration { private long deleteDeploysFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(14); + private Optional maxStaleDeploysPerRequestInZkWhenNoDatabase = Optional.absent(); + private long deleteDeadSlavesAfterHours = TimeUnit.DAYS.toHours(7); private long deleteStaleRequestsFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(14); + private Optional maxRequestsWithHistoryInZkWhenNoDatabase = Optional.absent(); + private long deleteTasksFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(7); + private Optional maxStaleTasksPerRequestInZkWhenNoDatabase = Optional.absent(); + private long deleteUndeliverableWebhooksAfterHours = TimeUnit.DAYS.toHours(7); private long deltaAfterWhichTasksAreLateMillis = TimeUnit.SECONDS.toMillis(30); @@ -925,4 +931,28 @@ public boolean isDeleteRemovedRequestsFromLoadBalancer() { public void setDeleteRemovedRequestsFromLoadBalancer(boolean deleteRemovedRequestsFromLoadBalancer) { this.deleteRemovedRequestsFromLoadBalancer = deleteRemovedRequestsFromLoadBalancer; } + + public Optional getMaxStaleDeploysPerRequestInZkWhenNoDatabase() { + return maxStaleDeploysPerRequestInZkWhenNoDatabase; + } + + public void setMaxStaleDeploysPerRequestInZkWhenNoDatabase(Optional maxStaleDeploysPerRequestInZkWhenNoDatabase) { + this.maxStaleDeploysPerRequestInZkWhenNoDatabase = maxStaleDeploysPerRequestInZkWhenNoDatabase; + } + + public Optional getMaxRequestsWithHistoryInZkWhenNoDatabase() { + return maxRequestsWithHistoryInZkWhenNoDatabase; + } + + public void setMaxRequestsWithHistoryInZkWhenNoDatabase(Optional maxRequestsWithHistoryInZkWhenNoDatabase) { + this.maxRequestsWithHistoryInZkWhenNoDatabase = maxRequestsWithHistoryInZkWhenNoDatabase; + } + + public Optional getMaxStaleTasksPerRequestInZkWhenNoDatabase() { + return maxStaleTasksPerRequestInZkWhenNoDatabase; + } + + public void setMaxStaleTasksPerRequestInZkWhenNoDatabase(Optional maxStaleTasksPerRequestInZkWhenNoDatabase) { + this.maxStaleTasksPerRequestInZkWhenNoDatabase = maxStaleTasksPerRequestInZkWhenNoDatabase; + } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityDeployHistoryPersister.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityDeployHistoryPersister.java index 7deed2a706..69595b15e7 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityDeployHistoryPersister.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityDeployHistoryPersister.java @@ -1,5 +1,6 @@ package com.hubspot.singularity.data.history; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -10,6 +11,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.collect.TreeMultimap; import com.google.inject.Inject; import com.hubspot.mesos.JavaUtils; import com.hubspot.singularity.SingularityDeleteResult; @@ -43,6 +45,7 @@ public void runActionOnPoll() { final List allDeployIds = deployManager.getAllDeployIds(); final Map byRequestId = deployManager.getAllRequestDeployStatesByRequestId(); + final TreeMultimap deployHistoryByRequestId = TreeMultimap.create(); int numTotal = 0; int numTransferred = 0; @@ -56,16 +59,22 @@ public void runActionOnPoll() { Optional deployHistory = deployManager.getDeployHistory(deployKey.getRequestId(), deployKey.getDeployId(), true); - if (!deployHistory.isPresent()) { + if (deployHistory.isPresent()) { + deployHistoryByRequestId.put(deployKey.getRequestId(), deployHistory.get()); + } else { LOG.info("Deploy history for key {} not found", deployKey); - continue; } + } - if (moveToHistoryOrCheckForPurge(deployHistory.get())) { - numTransferred++; - } + for (Collection deployHistoryForRequest : deployHistoryByRequestId.asMap().values()) { + int i=0; + for (SingularityDeployHistory deployHistory : deployHistoryForRequest) { + if (moveToHistoryOrCheckForPurge(deployHistory, i++)) { + numTransferred++; + } - numTotal++; + numTotal++; + } } LOG.info("Transferred {} out of {} deploys in {}", numTransferred, numTotal, JavaUtils.duration(start)); @@ -76,6 +85,11 @@ protected long getMaxAgeInMillisOfItem() { return TimeUnit.HOURS.toMillis(configuration.getDeleteDeploysFromZkWhenNoDatabaseAfterHours()); } + @Override + protected Optional getMaxNumberOfItems() { + return configuration.getMaxStaleDeploysPerRequestInZkWhenNoDatabase(); + } + private boolean shouldTransferDeploy(SingularityRequestDeployState deployState, SingularityDeployKey deployKey) { if (deployState == null) { LOG.warn("Missing request deploy state for deployKey {}", deployKey); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityHistoryPersister.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityHistoryPersister.java index 0942446481..59acb3fe12 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityHistoryPersister.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityHistoryPersister.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.hubspot.mesos.JavaUtils; import com.hubspot.singularity.SingularityDeleteResult; import com.hubspot.singularity.SingularityHistoryItem; @@ -34,19 +35,21 @@ protected boolean persistsHistoryInsteadOfPurging() { @Override protected boolean isEnabled() { - return persistsHistoryInsteadOfPurging() || getMaxAgeInMillisOfItem() > 0; + return persistsHistoryInsteadOfPurging() || getMaxAgeInMillisOfItem() > 0 || getMaxNumberOfItems().isPresent(); } protected abstract long getMaxAgeInMillisOfItem(); + protected abstract Optional getMaxNumberOfItems(); + protected abstract boolean moveToHistory(T object); protected abstract SingularityDeleteResult purgeFromZk(T object); - protected boolean moveToHistoryOrCheckForPurge(T object) { + protected boolean moveToHistoryOrCheckForPurge(T object, int index) { final long start = System.currentTimeMillis(); - if (moveToHistoryOrCheckForPurgeAndShouldDelete(object)) { + if (moveToHistoryOrCheckForPurgeAndShouldDelete(object, index)) { SingularityDeleteResult deleteResult = purgeFromZk(object); LOG.debug("{} {} (deleted: {}) in {}", persistsHistoryInsteadOfPurging() ? "Persisted" : "Purged", object, deleteResult, JavaUtils.duration(start)); return true; @@ -55,7 +58,7 @@ protected boolean moveToHistoryOrCheckForPurge(T object) { return false; } - private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object) { + private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object, int index) { if (persistsHistoryInsteadOfPurging()) { return moveToHistory(object); } @@ -67,6 +70,11 @@ private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object) { return true; } + if (getMaxNumberOfItems().isPresent() && index >= getMaxNumberOfItems().get()) { + LOG.trace("Deleting {} because it is item number {} (max: {})", object, index, getMaxNumberOfItems().get()); + return true; + } + return false; } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityRequestHistoryPersister.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityRequestHistoryPersister.java index 0c01a40c86..e07fc4be10 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityRequestHistoryPersister.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityRequestHistoryPersister.java @@ -1,6 +1,9 @@ package com.hubspot.singularity.data.history; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.inject.Singleton; @@ -8,6 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; +import com.google.common.primitives.Longs; import com.google.inject.Inject; import com.hubspot.mesos.JavaUtils; import com.hubspot.singularity.SingularityDeleteResult; @@ -33,7 +38,7 @@ public SingularityRequestHistoryPersister(SingularityConfiguration configuration this.historyManager = historyManager; } - public static class SingularityRequestHistoryParent implements SingularityHistoryItem { + public static class SingularityRequestHistoryParent implements SingularityHistoryItem, Comparable { private final List history; private final String requestId; @@ -59,6 +64,38 @@ public long getCreateTimestampForCalculatingHistoryAge() { return createTime; } + @Override + public int compareTo(SingularityRequestHistoryParent o) { + return Longs.compare(this.getCreateTimestampForCalculatingHistoryAge(), o.getCreateTimestampForCalculatingHistoryAge()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SingularityRequestHistoryParent that = (SingularityRequestHistoryParent) o; + return createTime == that.createTime && + Objects.equals(history, that.history) && + Objects.equals(requestId, that.requestId); + } + + @Override + public int hashCode() { + return Objects.hash(history, requestId, createTime); + } + + @Override + public String toString() { + return "SingularityRequestHistoryParent[" + + "history=" + history + + ", requestId='" + requestId + '\'' + + ", createTime=" + createTime + + ']'; + } } @Override @@ -67,23 +104,24 @@ public void runActionOnPoll() { final long start = System.currentTimeMillis(); - final List requestIdsWithHistory = requestManager.getRequestIdsWithHistory(); + final List requestHistoryParents = new ArrayList(); int numHistoryTransferred = 0; - int numRequests = 0; - for (String requestId : requestIdsWithHistory) { - numRequests++; + for (String requestId : requestManager.getRequestIdsWithHistory()) { + requestHistoryParents.add(new SingularityRequestHistoryParent(requestManager.getRequestHistory(requestId), requestId)); + } - List historyForRequestId = requestManager.getRequestHistory(requestId); - SingularityRequestHistoryParent requestHistoryParent = new SingularityRequestHistoryParent(historyForRequestId, requestId); + Collections.sort(requestHistoryParents, Collections.reverseOrder()); // createdAt descending - if (moveToHistoryOrCheckForPurge(requestHistoryParent)) { + int i=0; + for (SingularityRequestHistoryParent requestHistoryParent : requestHistoryParents) { + if (moveToHistoryOrCheckForPurge(requestHistoryParent, i++)) { numHistoryTransferred += requestHistoryParent.history.size(); } } - LOG.info("Transferred {} history updates for {} requests in {}", numHistoryTransferred, numRequests, JavaUtils.duration(start)); + LOG.info("Transferred {} history updates for {} requests in {}", numHistoryTransferred, requestHistoryParents.size(), JavaUtils.duration(start)); } @Override @@ -91,6 +129,11 @@ protected long getMaxAgeInMillisOfItem() { return TimeUnit.HOURS.toMillis(configuration.getDeleteStaleRequestsFromZkWhenNoDatabaseAfterHours()); } + @Override + protected Optional getMaxNumberOfItems() { + return configuration.getMaxRequestsWithHistoryInZkWhenNoDatabase(); + } + @Override protected boolean moveToHistory(SingularityRequestHistoryParent object) { for (SingularityRequestHistory requestHistory : object.history) { diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityTaskHistoryPersister.java b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityTaskHistoryPersister.java index 075ef36456..f5d874fbee 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityTaskHistoryPersister.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityTaskHistoryPersister.java @@ -1,6 +1,8 @@ package com.hubspot.singularity.data.history; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -10,7 +12,10 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; import com.google.inject.Inject; import com.hubspot.mesos.JavaUtils; import com.hubspot.singularity.SingularityDeleteResult; @@ -54,30 +59,39 @@ public void runActionOnPoll() { int numTotal = 0; int numTransferred = 0; + final Multimap eligibleTaskIdByRequestId = TreeMultimap.create(Ordering.natural(), SingularityTaskId.STARTED_AT_COMPARATOR_DESC); + for (SingularityTaskId taskId : allTaskIds) { - if (activeTaskIds.contains(taskId) || lbCleaningTaskIds.contains(taskId) || isPartofPendingDeploy(pendingDeploys, taskId)) { + if (activeTaskIds.contains(taskId) || lbCleaningTaskIds.contains(taskId) || isPartOfPendingDeploy(pendingDeploys, taskId)) { continue; } - final long age = start - taskId.getStartedAt(); + eligibleTaskIdByRequestId.put(taskId.getRequestId(), taskId); + } - if (age < configuration.getTaskPersistAfterStartupBufferMillis()) { - LOG.debug("Not persisting {}, it has started up too recently {} (buffer: {}) - this prevents race conditions with ZK tx", taskId, JavaUtils.durationFromMillis(age), - JavaUtils.durationFromMillis(configuration.getTaskPersistAfterStartupBufferMillis())); - continue; - } + for (Map.Entry> entry : eligibleTaskIdByRequestId.asMap().entrySet()) { + int i = 0; + for (SingularityTaskId taskId : entry.getValue()) { + final long age = start - taskId.getStartedAt(); - if (moveToHistoryOrCheckForPurge(taskId)) { - numTransferred++; - } + if (age < configuration.getTaskPersistAfterStartupBufferMillis()) { + LOG.debug("Not persisting {}, it has started up too recently {} (buffer: {}) - this prevents race conditions with ZK tx", taskId, JavaUtils.durationFromMillis(age), + JavaUtils.durationFromMillis(configuration.getTaskPersistAfterStartupBufferMillis())); + continue; + } - numTotal++; + if (moveToHistoryOrCheckForPurge(taskId, i++)) { + numTransferred++; + } + + numTotal++; + } } LOG.info("Transferred {} out of {} inactive task ids (total {}) in {}", numTransferred, numTotal, allTaskIds.size(), JavaUtils.duration(start)); } - private boolean isPartofPendingDeploy(List pendingDeploys, SingularityTaskId taskId) { + private boolean isPartOfPendingDeploy(List pendingDeploys, SingularityTaskId taskId) { for (SingularityPendingDeploy pendingDeploy : pendingDeploys) { if (pendingDeploy.getDeployMarker().getDeployId().equals(taskId.getDeployId()) && pendingDeploy.getDeployMarker().getRequestId().equals(taskId.getRequestId())) { return true; @@ -92,6 +106,11 @@ protected long getMaxAgeInMillisOfItem() { return TimeUnit.HOURS.toMillis(configuration.getDeleteTasksFromZkWhenNoDatabaseAfterHours()); } + @Override + protected Optional getMaxNumberOfItems() { + return configuration.getMaxStaleTasksPerRequestInZkWhenNoDatabase(); + } + @Override protected boolean moveToHistory(SingularityTaskId object) { final Optional taskHistory = taskManager.getTaskHistory(object); diff --git a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/HistoryPersisterTest.java b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/HistoryPersisterTest.java index 091ba147db..9936e2ed27 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/HistoryPersisterTest.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/HistoryPersisterTest.java @@ -1,5 +1,7 @@ package com.hubspot.singularity.scheduler; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.mesos.Protos.TaskState; @@ -8,8 +10,14 @@ import com.google.common.base.Optional; import com.google.inject.Inject; +import com.hubspot.singularity.RequestType; +import com.hubspot.singularity.SingularityDeploy; +import com.hubspot.singularity.SingularityRequest; +import com.hubspot.singularity.SingularityRequestBuilder; import com.hubspot.singularity.SingularityRequestHistory.RequestHistoryType; import com.hubspot.singularity.SingularityTask; +import com.hubspot.singularity.SingularityTaskId; +import com.hubspot.singularity.data.history.SingularityDeployHistoryPersister; import com.hubspot.singularity.data.history.SingularityRequestHistoryPersister; import com.hubspot.singularity.data.history.SingularityTaskHistoryPersister; @@ -22,6 +30,8 @@ public class HistoryPersisterTest extends SingularitySchedulerTestBase { @Inject protected SingularityTaskHistoryPersister taskHistoryPersister; @Inject + protected SingularityDeployHistoryPersister deployHistoryPersister; + @Inject protected SingularityCleaner cleaner; public HistoryPersisterTest() { @@ -29,7 +39,7 @@ public HistoryPersisterTest() { } @Test - public void testRequestPurging() { + public void testRequestAgePurging() { initRequest(); configuration.setDeleteStaleRequestsFromZkWhenNoDatabaseAfterHours(7); @@ -57,7 +67,46 @@ public void testRequestPurging() { } @Test - public void testTaskPurging() { + public void testRequestCountPurging() { + final SingularityRequest requestOne = new SingularityRequestBuilder("request1", RequestType.WORKER).build(); + final SingularityRequest requestTwo = new SingularityRequestBuilder("request2", RequestType.WORKER).build(); + final SingularityRequest requestThree = new SingularityRequestBuilder("request3", RequestType.WORKER).build(); + + saveRequest(requestOne); + saveRequest(requestTwo); + saveRequest(requestThree); + + configuration.setMaxRequestsWithHistoryInZkWhenNoDatabase(Optional.of(2)); + configuration.setDeleteStaleRequestsFromZkWhenNoDatabaseAfterHours(7); + + requestManager.deleteRequest(requestOne, user, Optional.absent(), Optional.absent()); + requestManager.deleteHistoryParent(requestOne.getId()); + requestManager.activate(requestOne, RequestHistoryType.CREATED, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(4), Optional. absent(), Optional. absent()); + requestManager.cooldown(requestOne, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(3)); + + requestManager.deleteRequest(requestTwo, user, Optional.absent(), Optional.absent()); + requestManager.deleteHistoryParent(requestTwo.getId()); + requestManager.activate(requestTwo, RequestHistoryType.CREATED, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(4), Optional. absent(), Optional. absent()); + requestManager.cooldown(requestTwo, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(3)); + + requestManager.deleteRequest(requestThree, user, Optional.absent(), Optional.absent()); + requestManager.deleteHistoryParent(requestThree.getId()); + requestManager.activate(requestThree, RequestHistoryType.CREATED, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(4), Optional. absent(), Optional. absent()); + requestManager.cooldown(requestThree, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(3)); + + Assert.assertEquals(2, requestManager.getRequestHistory(requestOne.getId()).size()); + Assert.assertEquals(2, requestManager.getRequestHistory(requestTwo.getId()).size()); + Assert.assertEquals(2, requestManager.getRequestHistory(requestThree.getId()).size()); + + requestHistoryPersister.runActionOnPoll(); + + Assert.assertEquals(0, requestManager.getRequestHistory(requestOne.getId()).size()); + Assert.assertEquals(2, requestManager.getRequestHistory(requestTwo.getId()).size()); + Assert.assertEquals(2, requestManager.getRequestHistory(requestThree.getId()).size()); + } + + @Test + public void testTaskAgePurging() { initLoadBalancedRequest(); initFirstDeploy(); @@ -91,6 +140,59 @@ public void testTaskPurging() { Assert.assertTrue(taskManager.getTaskHistory(taskTwo.getTaskId()).isPresent()); } + @Test + public void testTaskCountPurging() { + initLoadBalancedRequest(); + initFirstDeploy(); + + configuration.setMaxStaleTasksPerRequestInZkWhenNoDatabase(Optional.of(2)); + final int tasksToLaunch = 10; + + final List taskIds = new ArrayList<>(); + + for (int i=0; i tasksBeforePurge = taskManager.getInactiveTaskIdsForDeploy(requestId, firstDeployId); + Assert.assertEquals(taskIds.size(), tasksBeforePurge.size()); + Assert.assertTrue(tasksBeforePurge.containsAll(taskIds)); + + taskHistoryPersister.runActionOnPoll(); + + final List tasksAfterPurge = taskManager.getInactiveTaskIdsForDeploy(requestId, firstDeployId); + Assert.assertEquals(configuration.getMaxStaleTasksPerRequestInZkWhenNoDatabase().get().intValue(), tasksAfterPurge.size()); + Assert.assertTrue(tasksAfterPurge.containsAll(taskIds.subList(tasksToLaunch-2, tasksToLaunch-1))); // we should just have the last 2 tasks + } + + @Test + public void testDeployCountPurging() { + SingularityRequest requestOne = buildRequest("request1"); + SingularityRequest requestTwo = buildRequest("request2"); + + SingularityDeploy requestOneDeployOne = initAndFinishDeploy(requestOne, "r1d1"); + SingularityDeploy requestOneDeployTwo = initAndFinishDeploy(requestOne, "r1d2"); + SingularityDeploy requestOneDeployThree = initAndFinishDeploy(requestOne, "r1d3"); + SingularityDeploy requestOneDeployFour = initAndFinishDeploy(requestOne, "r1d4"); // r1d4 is the active deploy, not eligible for purging + SingularityDeploy requestTwoDeployOne = initAndFinishDeploy(requestTwo, "r2d1"); + SingularityDeploy requestTwoDeployTwo = initAndFinishDeploy(requestTwo, "r2d2"); // r2d2 is the active deploy, not eligible for purging + + configuration.setMaxStaleDeploysPerRequestInZkWhenNoDatabase(Optional.of(2)); + + deployHistoryPersister.runActionOnPoll(); + + Assert.assertTrue(!deployManager.getDeployHistory(requestOneDeployOne.getRequestId(), requestOneDeployOne.getId(), true).isPresent()); + Assert.assertTrue(deployManager.getDeployHistory(requestOneDeployTwo.getRequestId(), requestOneDeployTwo.getId(), true).isPresent()); + Assert.assertTrue(deployManager.getDeployHistory(requestOneDeployThree.getRequestId(), requestOneDeployThree.getId(), true).isPresent()); + Assert.assertTrue(deployManager.getDeployHistory(requestOneDeployFour.getRequestId(), requestOneDeployFour.getId(), true).isPresent()); + Assert.assertTrue(deployManager.getDeployHistory(requestTwoDeployOne.getRequestId(), requestTwoDeployOne.getId(), true).isPresent()); + Assert.assertTrue(deployManager.getDeployHistory(requestTwoDeployTwo.getRequestId(), requestTwoDeployTwo.getId(), true).isPresent()); + } + @Test public void testPurgingDoesntApplyIfDatabasePresent() { initRequest();