Skip to content

Commit

Permalink
Merge pull request #1369 from HubSpot/history_purge_thresholds
Browse files Browse the repository at this point in the history
Additional threshold for deleting task history row
  • Loading branch information
ssalinas committed Dec 14, 2016
2 parents c5818cc + 868bbae commit c1a4f22
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 91 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.hubspot.singularity.config;

import com.google.common.base.Optional;

public class HistoryPurgeRequestSettings {
private Optional<Integer> deleteTaskHistoryAfterDays = Optional.absent();
private Optional<Integer> deleteTaskHistoryAfterTasksPerRequest = Optional.absent();
private Optional<Integer> deleteTaskHistoryBytesAfterDays = Optional.absent();
private Optional<Integer> deleteTaskHistoryBytesAfterTasksPerRequest = Optional.absent();

public Optional<Integer> getDeleteTaskHistoryAfterDays() {
return deleteTaskHistoryAfterDays;
}

public HistoryPurgeRequestSettings setDeleteTaskHistoryAfterDays(Optional<Integer> deleteTaskHistoryAfterDays) {
this.deleteTaskHistoryAfterDays = deleteTaskHistoryAfterDays;
return this;
}

public Optional<Integer> getDeleteTaskHistoryAfterTasksPerRequest() {
return deleteTaskHistoryAfterTasksPerRequest;
}

public void setDeleteTaskHistoryAfterTasksPerRequest(Optional<Integer> deleteTaskHistoryAfterTasksPerRequest) {
this.deleteTaskHistoryAfterTasksPerRequest = deleteTaskHistoryAfterTasksPerRequest;
}

public Optional<Integer> getDeleteTaskHistoryBytesAfterDays() {
return deleteTaskHistoryBytesAfterDays;
}

public void setDeleteTaskHistoryBytesAfterDays(Optional<Integer> deleteTaskHistoryBytesAfterDays) {
this.deleteTaskHistoryBytesAfterDays = deleteTaskHistoryBytesAfterDays;
}

public Optional<Integer> getDeleteTaskHistoryBytesAfterTasksPerRequest() {
return deleteTaskHistoryBytesAfterTasksPerRequest;
}

public void setDeleteTaskHistoryBytesAfterTasksPerRequest(Optional<Integer> deleteTaskHistoryBytesAfterTasksPerRequest) {
this.deleteTaskHistoryBytesAfterTasksPerRequest = deleteTaskHistoryBytesAfterTasksPerRequest;
}

@Override
public String toString() {
return "HistoryPurgeRequestSettings{" +
"deleteTaskHistoryAfterDays=" + deleteTaskHistoryAfterDays +
", deleteTaskHistoryAfterTasksPerRequest=" + deleteTaskHistoryAfterTasksPerRequest +
", deleteTaskHistoryBytesAfterDays=" + deleteTaskHistoryBytesAfterDays +
", deleteTaskHistoryBytesAfterTasksPerRequest=" + deleteTaskHistoryBytesAfterTasksPerRequest +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@

public class HistoryPurgingConfiguration {

private int deleteTaskHistoryAfterDays = 365;
private int deleteTaskHistoryAfterDays = 0;

private int deleteTaskHistoryAfterTasksPerRequest = 10000;
private int deleteTaskHistoryAfterTasksPerRequest = 0;

private boolean deleteTaskHistoryBytesInsteadOfEntireRow = true;
private int deleteTaskHistoryBytesAfterDays = 365;

private int deleteTaskHistoryBytesAfterTasksPerRequest = 10000;

private int checkTaskHistoryEveryHours = 24;

private boolean enabled = false;

private Map<String, HistoryPurgeRequestOverride> requestOverrides = Collections.emptyMap();
private int purgeLimitPerQuery = 25000;

private Map<String, HistoryPurgeRequestSettings> requestOverrides = Collections.emptyMap();

private Optional<Integer> absentIfNotOverOne(int value) {
if (value < 1) {
Expand All @@ -42,12 +46,20 @@ public void setDeleteTaskHistoryAfterTasksPerRequest(int deleteTaskHistoryAfterT
this.deleteTaskHistoryAfterTasksPerRequest = deleteTaskHistoryAfterTasksPerRequest;
}

public boolean isDeleteTaskHistoryBytesInsteadOfEntireRow() {
return deleteTaskHistoryBytesInsteadOfEntireRow;
public Optional<Integer> getDeleteTaskHistoryBytesAfterDays() {
return absentIfNotOverOne(deleteTaskHistoryBytesAfterDays);
}

public void setDeleteTaskHistoryBytesAfterDays(int deleteTaskHistoryBytesAfterDays) {
this.deleteTaskHistoryBytesAfterDays = deleteTaskHistoryBytesAfterDays;
}

public Optional<Integer> getDeleteTaskHistoryBytesAfterTasksPerRequest() {
return absentIfNotOverOne(deleteTaskHistoryBytesAfterTasksPerRequest);
}

public void setDeleteTaskHistoryBytesInsteadOfEntireRow(boolean deleteTaskHistoryBytesInsteadOfEntireRow) {
this.deleteTaskHistoryBytesInsteadOfEntireRow = deleteTaskHistoryBytesInsteadOfEntireRow;
public void setDeleteTaskHistoryBytesAfterTasksPerRequest(int deleteTaskHistoryBytesAfterTasksPerRequest) {
this.deleteTaskHistoryBytesAfterTasksPerRequest = deleteTaskHistoryBytesAfterTasksPerRequest;
}

public int getCheckTaskHistoryEveryHours() {
Expand All @@ -66,11 +78,19 @@ public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public Map<String, HistoryPurgeRequestOverride> getRequestOverrides() {
public Map<String, HistoryPurgeRequestSettings> getRequestOverrides() {
return requestOverrides;
}

public void setRequestOverrides(Map<String, HistoryPurgeRequestOverride> requestOverrides) {
public void setRequestOverrides(Map<String, HistoryPurgeRequestSettings> requestOverrides) {
this.requestOverrides = requestOverrides;
}

public int getPurgeLimitPerQuery() {
return purgeLimitPerQuery;
}

public void setPurgeLimitPerQuery(int purgeLimitPerQuery) {
this.purgeLimitPerQuery = purgeLimitPerQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ abstract void insertTaskHistory(@Bind("requestId") String requestId, @Bind("task
@SqlQuery("SELECT MIN(updatedAt) from (SELECT updatedAt FROM taskHistory WHERE requestId = :requestId ORDER BY updatedAt DESC LIMIT :limit) as alias")
abstract Date getMinUpdatedAtWithLimitForRequest(@Bind("requestId") String requestId, @Bind("limit") Integer limit);

@SqlUpdate("UPDATE taskHistory SET bytes = '', purged = true WHERE requestId = :requestId AND purged = false AND updatedAt \\< :updatedAtBefore")
abstract void updateTaskHistoryNullBytesForRequestBefore(@Bind("requestId") String requestId, @Bind("updatedAtBefore") Date updatedAtBefore);
@SqlUpdate("UPDATE taskHistory SET bytes = '', purged = true WHERE requestId = :requestId AND purged = false AND updatedAt \\< :updatedAtBefore LIMIT :purgeLimitPerQuery")
abstract void updateTaskHistoryNullBytesForRequestBefore(@Bind("requestId") String requestId, @Bind("updatedAtBefore") Date updatedAtBefore, @Bind("purgeLimitPerQuery") Integer purgeLimitPerQuery);

@SqlUpdate("DELETE FROM taskHistory WHERE requestId = :requestId AND updatedAt \\< :updatedAtBefore")
abstract void deleteTaskHistoryForRequestBefore(@Bind("requestId") String requestId, @Bind("updatedAtBefore") Date updatedAtBefore);
@SqlUpdate("DELETE FROM taskHistory WHERE requestId = :requestId AND updatedAt \\< :updatedAtBefore LIMIT :purgeLimitPerQuery")
abstract void deleteTaskHistoryForRequestBefore(@Bind("requestId") String requestId, @Bind("updatedAtBefore") Date updatedAtBefore, @Bind("purgeLimitPerQuery") Integer purgeLimitPerQuery);

@SqlQuery("SELECT DISTINCT requestId FROM taskHistory")
abstract List<String> getRequestIdsInTaskHistory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ int getTaskIdHistoryCount(Optional<String> requestId, Optional<String> deployId,

int getUnpurgedTaskHistoryCountByRequestBefore(String requestId, Date before);

void purgeTaskHistory(String requestId, int count, Optional<Integer> limit, Optional<Date> purgeBefore, boolean deleteRowInsteadOfUpdate);
void purgeTaskHistory(String requestId, int count, Optional<Integer> limit, Optional<Date> purgeBefore, boolean deleteRowInsteadOfUpdate, Integer maxPurgeCount);

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,30 +186,30 @@ public int getUnpurgedTaskHistoryCountByRequestBefore(String requestId, Date bef
}

@Override
public void purgeTaskHistory(String requestId, int count, Optional<Integer> limit, Optional<Date> purgeBefore, boolean deleteRowInsteadOfUpdate) {
public void purgeTaskHistory(String requestId, int count, Optional<Integer> limit, Optional<Date> purgeBefore, boolean deleteRowInsteadOfUpdate, Integer maxPurgeCount) {
if (limit.isPresent() && count > limit.get()) {
Date beforeBasedOnLimit = history.getMinUpdatedAtWithLimitForRequest(requestId, limit.get());

if (deleteRowInsteadOfUpdate) {
LOG.debug("Deleting task history for {} above {} items (before {})", requestId, limit.get(), beforeBasedOnLimit);

history.deleteTaskHistoryForRequestBefore(requestId, beforeBasedOnLimit);
history.deleteTaskHistoryForRequestBefore(requestId, beforeBasedOnLimit, maxPurgeCount);
} else {
LOG.debug("Purging task history bytes for {} above {} items (before {})", requestId, limit.get(), beforeBasedOnLimit);

history.updateTaskHistoryNullBytesForRequestBefore(requestId, beforeBasedOnLimit);
history.updateTaskHistoryNullBytesForRequestBefore(requestId, beforeBasedOnLimit, maxPurgeCount);
}
}

if (purgeBefore.isPresent()) {
if (deleteRowInsteadOfUpdate) {
LOG.debug("Deleting task history for {} before {}", requestId, purgeBefore.get());

history.deleteTaskHistoryForRequestBefore(requestId, purgeBefore.get());
history.deleteTaskHistoryForRequestBefore(requestId, purgeBefore.get(), maxPurgeCount);
} else {
LOG.debug("Purging task history bytes for {} before {}", requestId, purgeBefore.get());

history.updateTaskHistoryNullBytesForRequestBefore(requestId, purgeBefore.get());
history.updateTaskHistoryNullBytesForRequestBefore(requestId, purgeBefore.get(), maxPurgeCount);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public int getUnpurgedTaskHistoryCountByRequestBefore(String requestId, Date bef
}

@Override
public void purgeTaskHistory(String requestId, int count, Optional<Integer> limit, Optional<Date> purgeBefore, boolean deleteRowInsteadOfUpdate) {
public void purgeTaskHistory(String requestId, int count, Optional<Integer> limit, Optional<Date> purgeBefore, boolean deleteRowInsteadOfUpdate, Integer maxPurgeCount) {
throw new UnsupportedOperationException("NoopHistoryManager can not update/delete");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.config.HistoryPurgeRequestOverride;
import com.hubspot.singularity.ExtendedTaskState;
import com.hubspot.singularity.config.HistoryPurgeRequestSettings;
import com.hubspot.singularity.config.HistoryPurgingConfiguration;
import com.hubspot.singularity.scheduler.SingularityLeaderOnlyPoller;

Expand Down Expand Up @@ -40,46 +41,71 @@ protected boolean isEnabled() {
public void runActionOnPoll() {
final long start = System.currentTimeMillis();
for (String requestId : historyManager.getRequestIdsInTaskHistory()) {
Optional<Integer> deleteTaskHistoryAfterDays;
Optional<Integer> deleteTaskHistoryAfterTasksPerRequest;
boolean deleteTaskHistoryBytesInsteadOfEntireRow;
if (historyPurgingConfiguration.getRequestOverrides().containsKey(requestId)) {
HistoryPurgeRequestOverride override = historyPurgingConfiguration.getRequestOverrides().get(requestId);
deleteTaskHistoryAfterDays = override.getDeleteTaskHistoryAfterDays().or(historyPurgingConfiguration.getDeleteTaskHistoryAfterDays());
deleteTaskHistoryAfterTasksPerRequest = override.getDeleteTaskHistoryAfterTasksPerRequest().or(historyPurgingConfiguration.getDeleteTaskHistoryAfterTasksPerRequest());
deleteTaskHistoryBytesInsteadOfEntireRow = override.getDeleteTaskHistoryBytesInsteadOfEntireRow().or(historyPurgingConfiguration.isDeleteTaskHistoryBytesInsteadOfEntireRow());
} else {
deleteTaskHistoryAfterDays = historyPurgingConfiguration.getDeleteTaskHistoryAfterDays();
deleteTaskHistoryAfterTasksPerRequest = historyPurgingConfiguration.getDeleteTaskHistoryAfterTasksPerRequest();
deleteTaskHistoryBytesInsteadOfEntireRow = historyPurgingConfiguration.isDeleteTaskHistoryBytesInsteadOfEntireRow();
}
HistoryPurgeRequestSettings settings = getRequestPurgeSettings(requestId);

LOG.debug("Attempting to purge tasks for {}, using purge settings {}", requestId, settings);
purge(requestId, start, settings.getDeleteTaskHistoryAfterTasksPerRequest(), settings.getDeleteTaskHistoryAfterDays(), true);
purge(requestId, start, settings.getDeleteTaskHistoryBytesAfterTasksPerRequest(), settings.getDeleteTaskHistoryBytesAfterDays(), false);
}
}

Optional<Date> purgeBefore = Optional.absent();
Date checkBefore = new Date();
private void purge(String requestId, long start, Optional<Integer> afterTasksPerRequest, Optional<Integer> afterDays, boolean deleteRow) {
Optional<Date> purgeBefore = Optional.absent();
Date checkBefore = new Date();

if (deleteTaskHistoryAfterDays.isPresent()) {
purgeBefore = Optional.of(new Date(start - TimeUnit.DAYS.toMillis(deleteTaskHistoryAfterDays.get().longValue())));
if (afterDays.isPresent()) {
purgeBefore = Optional.of(new Date(start - TimeUnit.DAYS.toMillis(afterDays.get().longValue())));

if (!deleteTaskHistoryAfterTasksPerRequest.isPresent()) {
checkBefore = purgeBefore.get();
}
if (!afterTasksPerRequest.isPresent()) {
checkBefore = purgeBefore.get();
}
}

LOG.info("Finding taskHistory counts before {} (purging tasks over limit of {} or created before {}) for request {}", checkBefore, deleteTaskHistoryAfterTasksPerRequest, purgeBefore, requestId);
LOG.info("Finding taskHistory counts before {} (purging tasks over limit of {} or created before {}) for request {}", checkBefore, afterTasksPerRequest, purgeBefore, requestId);

int unpurgedCount = historyManager.getUnpurgedTaskHistoryCountByRequestBefore(requestId, checkBefore);
int unpurgedCount;
if (deleteRow) {
unpurgedCount = historyManager.getTaskIdHistoryCount(Optional.of(requestId), Optional.<String>absent(), Optional.<String>absent(), Optional.<String>absent(), Optional.<ExtendedTaskState>absent(), Optional.<Long>absent(), Optional.<Long>absent(), Optional.<Long>absent(), Optional.<Long>absent());
} else {
unpurgedCount = historyManager.getUnpurgedTaskHistoryCountByRequestBefore(requestId, checkBefore);
}

if (!deleteTaskHistoryAfterDays.isPresent() && deleteTaskHistoryAfterTasksPerRequest.isPresent() &&
unpurgedCount < deleteTaskHistoryAfterTasksPerRequest.get()) {
LOG.debug("Not purging old taskHistory for {} - {} count is less than {}", requestId, unpurgedCount, deleteTaskHistoryAfterTasksPerRequest.get());
continue;
}
if (!afterDays.isPresent() && afterTasksPerRequest.isPresent() &&
unpurgedCount < afterTasksPerRequest.get()) {
LOG.debug("Not purging old taskHistory for {} - {} count is less than {}", requestId, unpurgedCount, afterTasksPerRequest.get());
return;
}

final long startRequestId = System.currentTimeMillis();

final long startRequestId = System.currentTimeMillis();
historyManager.purgeTaskHistory(requestId, unpurgedCount, afterTasksPerRequest, purgeBefore, deleteRow, historyPurgingConfiguration.getPurgeLimitPerQuery());

historyManager.purgeTaskHistory(requestId, unpurgedCount, deleteTaskHistoryAfterTasksPerRequest, purgeBefore, !deleteTaskHistoryBytesInsteadOfEntireRow);
LOG.info("Purged old taskHistory for {} ({} count) in {} (deleteRows: {})", requestId, unpurgedCount, JavaUtils.duration(startRequestId), deleteRow);
}

LOG.info("Purged old taskHistory for {} ({} count) in {}", requestId, unpurgedCount, JavaUtils.duration(startRequestId));
private HistoryPurgeRequestSettings getRequestPurgeSettings(String requestId) {
if (historyPurgingConfiguration.getRequestOverrides().containsKey(requestId)) {
HistoryPurgeRequestSettings override = historyPurgingConfiguration.getRequestOverrides().get(requestId);
if (!override.getDeleteTaskHistoryAfterDays().isPresent()) {
override.setDeleteTaskHistoryAfterDays(historyPurgingConfiguration.getDeleteTaskHistoryAfterDays());
}
if (!override.getDeleteTaskHistoryAfterTasksPerRequest().isPresent()) {
override.setDeleteTaskHistoryAfterTasksPerRequest(historyPurgingConfiguration.getDeleteTaskHistoryAfterTasksPerRequest());
}
if (!override.getDeleteTaskHistoryBytesAfterDays().isPresent()) {
override.setDeleteTaskHistoryBytesAfterDays(historyPurgingConfiguration.getDeleteTaskHistoryBytesAfterDays());
}
if (!override.getDeleteTaskHistoryBytesAfterTasksPerRequest().isPresent()) {
override.setDeleteTaskHistoryBytesAfterTasksPerRequest(historyPurgingConfiguration.getDeleteTaskHistoryBytesAfterTasksPerRequest());
}
return override;
} else {
HistoryPurgeRequestSettings settings = new HistoryPurgeRequestSettings();
settings.setDeleteTaskHistoryAfterDays(historyPurgingConfiguration.getDeleteTaskHistoryAfterDays());
settings.setDeleteTaskHistoryAfterTasksPerRequest(historyPurgingConfiguration.getDeleteTaskHistoryAfterTasksPerRequest());
settings.setDeleteTaskHistoryBytesAfterDays(historyPurgingConfiguration.getDeleteTaskHistoryBytesAfterDays());
settings.setDeleteTaskHistoryBytesAfterTasksPerRequest(historyPurgingConfiguration.getDeleteTaskHistoryBytesAfterTasksPerRequest());
return settings;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public void historyUpdaterTest() {

HistoryPurgingConfiguration historyPurgingConfiguration = new HistoryPurgingConfiguration();
historyPurgingConfiguration.setEnabled(true);
historyPurgingConfiguration.setDeleteTaskHistoryBytesInsteadOfEntireRow(true);
historyPurgingConfiguration.setDeleteTaskHistoryAfterDays(1);
historyPurgingConfiguration.setDeleteTaskHistoryBytesAfterDays(1);

SingularityTaskHistory taskHistory = buildTask(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3));

Expand Down Expand Up @@ -161,7 +160,6 @@ public void historyPurgerTest() {

HistoryPurgingConfiguration historyPurgingConfiguration = new HistoryPurgingConfiguration();
historyPurgingConfiguration.setEnabled(true);
historyPurgingConfiguration.setDeleteTaskHistoryBytesInsteadOfEntireRow(false);
historyPurgingConfiguration.setDeleteTaskHistoryAfterDays(10);

SingularityHistoryPurger purger = new SingularityHistoryPurger(historyPurgingConfiguration, historyManager);
Expand Down

0 comments on commit c1a4f22

Please sign in to comment.