Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ These settings are less likely to be changed, but were included in the configura
| newTaskCheckerBaseDelaySeconds | 1 | Added to the the amount of deploy to wait before checking a new task | long |
| allowTestResourceCalls | false | If true, allows calls to be made to the test resource, which can test internal methods | boolean |
| deleteDeploysFromZkWhenNoDatabaseAfterHours | 336 (14 days) | Delete deploys from zk when they are older than this if we are not using a database | long |
| maxStaleDeploysPerRequestInZkWhenNoDatabase | infinite (disabled) | Delete oldest deploys from zk when there are more than this number for a given request, if we're not already persisting them to a database | int |
| deleteStaleRequestsFromZkWhenNoDatabaseAfterHours | 336 (14 days) | Delete stale requests after this amount of time if we are not using a database | long |
| maxRequestsWithHistoryInZkWhenNoDatabase | infinite (disabled) | Delete history of oldest requests from zk when there are more than this number of requests, if we're not already persisting them to a database | int |
| deleteTasksFromZkWhenNoDatabaseAfterHours | 168 (7 days) | Delete old tasks from zk after this amount of time if we are not using a database | long |
| maxStaleTasksPerRequestInZkWhenNoDatabase | infinite (disabled) | Delete oldest tasks from zk when there are more than this number for a given request, if we're not already persisting them to a database | int |
| deleteDeadSlavesAfterHours | 168 (7 days) | Remove dead slaves from the list after this amount of time | long |
| deleteUndeliverableWebhooksAfterHours | 168 (7 days) | Delete (and stop retrying) failed webhooks after this amount of time | long |
| waitForListeners | true | If true, the event system waits for all listeners having processed an event. | boolean |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.google.common.base.Optional;
import com.google.common.collect.ComparisonChain;

public class SingularityRequestHistory implements Comparable<SingularityRequestHistory> {
public class SingularityRequestHistory implements Comparable<SingularityRequestHistory>, SingularityHistoryItem {

private final long createdAt;
private final Optional<String> user;
Expand Down Expand Up @@ -86,6 +86,12 @@ public Optional<String> getMessage() {
return message;
}

@Override
@JsonIgnore
public long getCreateTimestampForCalculatingHistoryAge() {
return createdAt;
}

@Override
public String toString() {
return "SingularityRequestHistory [createdAt=" + createdAt + ", user=" + user + ", eventType=" + eventType + ", request=" + request + ", message=" + message + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public int compare(SingularityTaskId o1, SingularityTaskId o2) {

};

public static Comparator<SingularityTaskId> STARTED_AT_COMPARATOR_DESC = new Comparator<SingularityTaskId>() {

@Override
public int compare(SingularityTaskId o1, SingularityTaskId o2) {
return Long.compare(o2.startedAt, o1.startedAt);
}

};

public static Predicate<SingularityTaskId> notIn(Collection<SingularityTaskId> exclude) {
return Predicates.not(Predicates.in(exclude));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ public class SingularityConfiguration extends Configuration {

private long deleteDeploysFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(14);

private Optional<Integer> maxStaleDeploysPerRequestInZkWhenNoDatabase = Optional.absent();

private long deleteDeadSlavesAfterHours = TimeUnit.DAYS.toHours(7);

private long deleteStaleRequestsFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(14);

private Optional<Integer> maxRequestsWithHistoryInZkWhenNoDatabase = Optional.absent();

private long deleteTasksFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(7);

private Optional<Integer> maxStaleTasksPerRequestInZkWhenNoDatabase = Optional.absent();

private long deleteUndeliverableWebhooksAfterHours = TimeUnit.DAYS.toHours(7);

private long deltaAfterWhichTasksAreLateMillis = TimeUnit.SECONDS.toMillis(30);
Expand Down Expand Up @@ -925,4 +931,28 @@ public boolean isDeleteRemovedRequestsFromLoadBalancer() {
public void setDeleteRemovedRequestsFromLoadBalancer(boolean deleteRemovedRequestsFromLoadBalancer) {
this.deleteRemovedRequestsFromLoadBalancer = deleteRemovedRequestsFromLoadBalancer;
}

public Optional<Integer> getMaxStaleDeploysPerRequestInZkWhenNoDatabase() {
return maxStaleDeploysPerRequestInZkWhenNoDatabase;
}

public void setMaxStaleDeploysPerRequestInZkWhenNoDatabase(Optional<Integer> maxStaleDeploysPerRequestInZkWhenNoDatabase) {
this.maxStaleDeploysPerRequestInZkWhenNoDatabase = maxStaleDeploysPerRequestInZkWhenNoDatabase;
}

public Optional<Integer> getMaxRequestsWithHistoryInZkWhenNoDatabase() {
return maxRequestsWithHistoryInZkWhenNoDatabase;
}

public void setMaxRequestsWithHistoryInZkWhenNoDatabase(Optional<Integer> maxRequestsWithHistoryInZkWhenNoDatabase) {
this.maxRequestsWithHistoryInZkWhenNoDatabase = maxRequestsWithHistoryInZkWhenNoDatabase;
}

public Optional<Integer> getMaxStaleTasksPerRequestInZkWhenNoDatabase() {
return maxStaleTasksPerRequestInZkWhenNoDatabase;
}

public void setMaxStaleTasksPerRequestInZkWhenNoDatabase(Optional<Integer> maxStaleTasksPerRequestInZkWhenNoDatabase) {
this.maxStaleTasksPerRequestInZkWhenNoDatabase = maxStaleTasksPerRequestInZkWhenNoDatabase;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hubspot.singularity.data.history;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -10,6 +11,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.collect.TreeMultimap;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityDeleteResult;
Expand Down Expand Up @@ -43,6 +45,7 @@ public void runActionOnPoll() {

final List<SingularityDeployKey> allDeployIds = deployManager.getAllDeployIds();
final Map<String, SingularityRequestDeployState> byRequestId = deployManager.getAllRequestDeployStatesByRequestId();
final TreeMultimap<String, SingularityDeployHistory> deployHistoryByRequestId = TreeMultimap.create();

int numTotal = 0;
int numTransferred = 0;
Expand All @@ -56,16 +59,22 @@ public void runActionOnPoll() {

Optional<SingularityDeployHistory> deployHistory = deployManager.getDeployHistory(deployKey.getRequestId(), deployKey.getDeployId(), true);

if (!deployHistory.isPresent()) {
if (deployHistory.isPresent()) {
deployHistoryByRequestId.put(deployKey.getRequestId(), deployHistory.get());
} else {
LOG.info("Deploy history for key {} not found", deployKey);
continue;
}
}

if (moveToHistoryOrCheckForPurge(deployHistory.get())) {
numTransferred++;
}
for (Collection<SingularityDeployHistory> deployHistoryForRequest : deployHistoryByRequestId.asMap().values()) {
int i=0;
for (SingularityDeployHistory deployHistory : deployHistoryForRequest) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to sort the Collection<SingularityDeployHistory> first here? I saw where we were sorting for tasks and for requests, but did not see any sort for deploy history.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The values in a TreeMultimap<> are sorted at insertion time

if (moveToHistoryOrCheckForPurge(deployHistory, i++)) {
numTransferred++;
}

numTotal++;
numTotal++;
}
}

LOG.info("Transferred {} out of {} deploys in {}", numTransferred, numTotal, JavaUtils.duration(start));
Expand All @@ -76,6 +85,11 @@ protected long getMaxAgeInMillisOfItem() {
return TimeUnit.HOURS.toMillis(configuration.getDeleteDeploysFromZkWhenNoDatabaseAfterHours());
}

@Override
protected Optional<Integer> getMaxNumberOfItems() {
return configuration.getMaxStaleDeploysPerRequestInZkWhenNoDatabase();
}

private boolean shouldTransferDeploy(SingularityRequestDeployState deployState, SingularityDeployKey deployKey) {
if (deployState == null) {
LOG.warn("Missing request deploy state for deployKey {}", deployKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityDeleteResult;
import com.hubspot.singularity.SingularityHistoryItem;
Expand Down Expand Up @@ -34,19 +35,21 @@ protected boolean persistsHistoryInsteadOfPurging() {

@Override
protected boolean isEnabled() {
return persistsHistoryInsteadOfPurging() || getMaxAgeInMillisOfItem() > 0;
return persistsHistoryInsteadOfPurging() || getMaxAgeInMillisOfItem() > 0 || getMaxNumberOfItems().isPresent();
}

protected abstract long getMaxAgeInMillisOfItem();

protected abstract Optional<Integer> getMaxNumberOfItems();

protected abstract boolean moveToHistory(T object);

protected abstract SingularityDeleteResult purgeFromZk(T object);

protected boolean moveToHistoryOrCheckForPurge(T object) {
protected boolean moveToHistoryOrCheckForPurge(T object, int index) {
final long start = System.currentTimeMillis();

if (moveToHistoryOrCheckForPurgeAndShouldDelete(object)) {
if (moveToHistoryOrCheckForPurgeAndShouldDelete(object, index)) {
SingularityDeleteResult deleteResult = purgeFromZk(object);
LOG.debug("{} {} (deleted: {}) in {}", persistsHistoryInsteadOfPurging() ? "Persisted" : "Purged", object, deleteResult, JavaUtils.duration(start));
return true;
Expand All @@ -55,7 +58,7 @@ protected boolean moveToHistoryOrCheckForPurge(T object) {
return false;
}

private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object) {
private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object, int index) {
if (persistsHistoryInsteadOfPurging()) {
return moveToHistory(object);
}
Expand All @@ -67,6 +70,11 @@ private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object) {
return true;
}

if (getMaxNumberOfItems().isPresent() && index >= getMaxNumberOfItems().get()) {
LOG.trace("Deleting {} because it is item number {} (max: {})", object, index, getMaxNumberOfItems().get());
return true;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.hubspot.singularity.data.history;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import javax.inject.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityDeleteResult;
Expand All @@ -33,7 +38,7 @@ public SingularityRequestHistoryPersister(SingularityConfiguration configuration
this.historyManager = historyManager;
}

public static class SingularityRequestHistoryParent implements SingularityHistoryItem {
public static class SingularityRequestHistoryParent implements SingularityHistoryItem, Comparable<SingularityRequestHistoryParent> {

private final List<SingularityRequestHistory> history;
private final String requestId;
Expand All @@ -59,6 +64,38 @@ public long getCreateTimestampForCalculatingHistoryAge() {
return createTime;
}

@Override
public int compareTo(SingularityRequestHistoryParent o) {
return Longs.compare(this.getCreateTimestampForCalculatingHistoryAge(), o.getCreateTimestampForCalculatingHistoryAge());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SingularityRequestHistoryParent that = (SingularityRequestHistoryParent) o;
return createTime == that.createTime &&
Objects.equals(history, that.history) &&
Objects.equals(requestId, that.requestId);
}

@Override
public int hashCode() {
return Objects.hash(history, requestId, createTime);
}

@Override
public String toString() {
return "SingularityRequestHistoryParent[" +
"history=" + history +
", requestId='" + requestId + '\'' +
", createTime=" + createTime +
']';
}
}

@Override
Expand All @@ -67,30 +104,36 @@ public void runActionOnPoll() {

final long start = System.currentTimeMillis();

final List<String> requestIdsWithHistory = requestManager.getRequestIdsWithHistory();
final List<SingularityRequestHistoryParent> requestHistoryParents = new ArrayList();

int numHistoryTransferred = 0;
int numRequests = 0;

for (String requestId : requestIdsWithHistory) {
numRequests++;
for (String requestId : requestManager.getRequestIdsWithHistory()) {
requestHistoryParents.add(new SingularityRequestHistoryParent(requestManager.getRequestHistory(requestId), requestId));
}

List<SingularityRequestHistory> historyForRequestId = requestManager.getRequestHistory(requestId);
SingularityRequestHistoryParent requestHistoryParent = new SingularityRequestHistoryParent(historyForRequestId, requestId);
Collections.sort(requestHistoryParents, Collections.<SingularityRequestHistoryParent>reverseOrder()); // createdAt descending

if (moveToHistoryOrCheckForPurge(requestHistoryParent)) {
int i=0;
for (SingularityRequestHistoryParent requestHistoryParent : requestHistoryParents) {
if (moveToHistoryOrCheckForPurge(requestHistoryParent, i++)) {
numHistoryTransferred += requestHistoryParent.history.size();
}
}

LOG.info("Transferred {} history updates for {} requests in {}", numHistoryTransferred, numRequests, JavaUtils.duration(start));
LOG.info("Transferred {} history updates for {} requests in {}", numHistoryTransferred, requestHistoryParents.size(), JavaUtils.duration(start));
}

@Override
protected long getMaxAgeInMillisOfItem() {
return TimeUnit.HOURS.toMillis(configuration.getDeleteStaleRequestsFromZkWhenNoDatabaseAfterHours());
}

@Override
protected Optional<Integer> getMaxNumberOfItems() {
return configuration.getMaxRequestsWithHistoryInZkWhenNoDatabase();
}

@Override
protected boolean moveToHistory(SingularityRequestHistoryParent object) {
for (SingularityRequestHistory requestHistory : object.history) {
Expand Down
Loading