Skip to content

Commit

Permalink
migration to clean old zk nodes, namespace active tasks, namespace pe…
Browse files Browse the repository at this point in the history
…nding tasks
  • Loading branch information
ssalinas committed Apr 22, 2019
1 parent d332fc3 commit d9c4dbf
Show file tree
Hide file tree
Showing 17 changed files with 291 additions and 77 deletions.
Expand Up @@ -21,7 +21,6 @@


import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.hubspot.singularity.SingularityId; import com.hubspot.singularity.SingularityId;
Expand Down Expand Up @@ -155,7 +154,7 @@ protected <T extends SingularityId> List<T> getChildrenAsIdsForParents(final Str
try { try {
return getChildrenAsIdsForParentsThrows(pathNameforLogs, parents, idTranscoder); return getChildrenAsIdsForParentsThrows(pathNameforLogs, parents, idTranscoder);
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
} }
} }


Expand Down Expand Up @@ -193,7 +192,7 @@ protected <T extends SingularityId> List<T> exists(final String pathNameForLogs,
try { try {
return existsThrows(pathNameForLogs, paths, idTranscoder); return existsThrows(pathNameForLogs, paths, idTranscoder);
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
} }
} }


Expand Down Expand Up @@ -227,39 +226,81 @@ protected <T extends SingularityId> List<T> notExists(final String pathNameForLo
try { try {
return notExistsThrows(pathNameForLogs, pathsMap); return notExistsThrows(pathNameForLogs, pathsMap);
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
} }
} }


protected <T> List<T> getAsync(final String pathNameForLogs, final Collection<String> paths, final Transcoder<T> transcoder, final ZkCache<T> cache) { protected <T> List<T> getAsync(final String pathNameForLogs, final Collection<String> paths, final Transcoder<T> transcoder, final ZkCache<T> cache) {
try { try {
return new ArrayList<>(getAsyncThrows(pathNameForLogs, paths, transcoder, Optional.of(cache)).values()); return new ArrayList<>(getAsyncThrows(pathNameForLogs, paths, transcoder, Optional.of(cache)).values());
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
} }
} }


protected <T> List<T> getAsync(final String pathNameForLogs, final Collection<String> paths, final Transcoder<T> transcoder) { protected <T> List<T> getAsync(final String pathNameForLogs, final Collection<String> paths, final Transcoder<T> transcoder) {
try { try {
return new ArrayList<>(getAsyncThrows(pathNameForLogs, paths, transcoder, Optional.<ZkCache<T>> absent()).values()); return new ArrayList<>(getAsyncThrows(pathNameForLogs, paths, transcoder, Optional.<ZkCache<T>> absent()).values());
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
} }
} }


protected <T> Map<String, T> getAsyncWithPath(final String pathNameForLogs, final Collection<String> paths, final Transcoder<T> transcoder) { protected <T> Map<String, T> getAsyncWithPath(final String pathNameForLogs, final Collection<String> paths, final Transcoder<T> transcoder) {
try { try {
return getAsyncThrows(pathNameForLogs, paths, transcoder, Optional.<ZkCache<T>> absent()); return getAsyncThrows(pathNameForLogs, paths, transcoder, Optional.<ZkCache<T>> absent());
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
} }
} }


protected <T> List<T> getAsyncChildren(final String parent, final Transcoder<T> transcoder) { protected <T> List<T> getAsyncChildren(final String parent, final Transcoder<T> transcoder) {
try { try {
return getAsyncChildrenThrows(parent, transcoder); return getAsyncChildrenThrows(parent, transcoder);
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
}
}

protected <T> List<T> getAsyncNestedChildrenAsListThrows(final String pathNameForLogs, final List<String> parentPaths, final Transcoder<T> transcoder) throws Exception {
final List<String> allPaths = new ArrayList<>();
for (String parent : parentPaths) {
for (String child : getChildren(parent)) {
allPaths.add(ZKPaths.makePath(parent, child));
}
}

final List<T> results = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(allPaths.size());
final AtomicInteger bytes = new AtomicInteger();
final BackgroundCallback callback = new BackgroundCallback() {

@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
try {
if (event.getData() == null || event.getData().length == 0) {
LOG.trace("Expected active node {} but it wasn't there", event.getPath());
return;
}
bytes.getAndAdd(event.getData().length);

final T object = transcoder.fromBytes(event.getData());

results.add(object);
} finally {
latch.countDown();
}
}
};

return queryAndReturnResultsThrows(results, allPaths, callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA);
}

protected <T> List<T> getAsyncNestedChildrenAsList(final String pathNameForLogs, final List<String> parentPaths, final Transcoder<T> transcoder) {
try {
return getAsyncNestedChildrenAsListThrows(pathNameForLogs, parentPaths, transcoder);
} catch (Throwable t) {
throw new RuntimeException(t);
} }
} }


Expand Down Expand Up @@ -304,7 +345,43 @@ protected <T, Q> Map<T, List<Q>> getAsyncNestedChildDataAsMap(final String pathN
try { try {
return getAsyncNestedChildDataAsMapThrows(pathNameForLogs, parentPathsMap, subpath, transcoder); return getAsyncNestedChildDataAsMapThrows(pathNameForLogs, parentPathsMap, subpath, transcoder);
} catch (Throwable t) { } catch (Throwable t) {
throw Throwables.propagate(t); throw new RuntimeException(t);
}
}

protected <T extends SingularityId> List<T> getAsyncNestedChildIdsAsListThrows(final String pathNameForLogs, final String parentPath, final IdTranscoder<T> transcoder) throws Exception {
final List<String> allPaths = new ArrayList<>();
for (String child : getChildren(parentPath)) {
allPaths.add(ZKPaths.makePath(parentPath, child));
}

final List<T> results = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(allPaths.size());
final AtomicInteger bytes = new AtomicInteger();
final BackgroundCallback callback = new BackgroundCallback() {

@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
try {
event.getChildren().forEach((child) -> {
final T object = transcoder.fromString(child);
bytes.getAndAdd(child.getBytes().length);
results.add(object);
});
} finally {
latch.countDown();
}
}
};

return queryAndReturnResultsThrows(results, allPaths, callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_CHILDREN);
}

protected <T extends SingularityId> List<T> getAsyncNestedChildIdsAsList(final String pathNameForLogs, final String parentPath, final IdTranscoder<T> transcoder) {
try {
return getAsyncNestedChildIdsAsListThrows(pathNameForLogs, parentPath, transcoder);
} catch (Throwable t) {
throw new RuntimeException(t);
} }
} }


Expand Down
Expand Up @@ -69,7 +69,6 @@ public class TaskManager extends CuratorAsyncManager {


private static final String TASKS_ROOT = "/tasks"; private static final String TASKS_ROOT = "/tasks";


private static final String ACTIVE_PATH_ROOT = TASKS_ROOT + "/active";
private static final String LAST_ACTIVE_TASK_STATUSES_PATH_ROOT = TASKS_ROOT + "/statuses"; private static final String LAST_ACTIVE_TASK_STATUSES_PATH_ROOT = TASKS_ROOT + "/statuses";
private static final String PENDING_PATH_ROOT = TASKS_ROOT + "/scheduled"; private static final String PENDING_PATH_ROOT = TASKS_ROOT + "/scheduled";
private static final String CLEANUP_PATH_ROOT = TASKS_ROOT + "/cleanup"; private static final String CLEANUP_PATH_ROOT = TASKS_ROOT + "/cleanup";
Expand Down Expand Up @@ -158,7 +157,7 @@ public TaskManager(CuratorFramework curator, SingularityConfiguration configurat
// since we can't use creatingParentsIfNeeded in transactions // since we can't use creatingParentsIfNeeded in transactions
public void createRequiredParents() { public void createRequiredParents() {
create(HISTORY_PATH_ROOT); create(HISTORY_PATH_ROOT);
create(ACTIVE_PATH_ROOT); create(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT);
} }


private String getLastHealthcheckPath(SingularityTaskId taskId) { private String getLastHealthcheckPath(SingularityTaskId taskId) {
Expand All @@ -182,7 +181,11 @@ private String getHealthchecksFinishedPath(SingularityTaskId taskId) {
} }


private String getLastActiveTaskStatusPath(SingularityTaskId taskId) { private String getLastActiveTaskStatusPath(SingularityTaskId taskId) {
return ZKPaths.makePath(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT, taskId.getId()); return ZKPaths.makePath(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT, taskId.getRequestId(), taskId.getId());
}

private String getLastActiveTaskParent(String requestId) {
return ZKPaths.makePath(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT, requestId);
} }


private String getHealthcheckPath(SingularityTaskHealthcheckResult healthcheck) { private String getHealthcheckPath(SingularityTaskHealthcheckResult healthcheck) {
Expand Down Expand Up @@ -253,12 +256,12 @@ private String getHistoryPath(SingularityTaskId taskId) {
return ZKPaths.makePath(getRequestPath(taskId.getRequestId()), taskId.getId()); return ZKPaths.makePath(getRequestPath(taskId.getRequestId()), taskId.getId());
} }


private String getActivePath(String taskId) { private String getPendingPath(SingularityPendingTaskId pendingTaskId) {
return ZKPaths.makePath(ACTIVE_PATH_ROOT, taskId); return ZKPaths.makePath(PENDING_PATH_ROOT, pendingTaskId.getRequestId(), pendingTaskId.getId());
} }


private String getPendingPath(SingularityPendingTaskId pendingTaskId) { private String getPendingForRequestPath(String requestId) {
return ZKPaths.makePath(PENDING_PATH_ROOT, pendingTaskId.getId()); return ZKPaths.makePath(PENDING_PATH_ROOT, requestId);
} }


private String getPendingTasksToDeletePath(SingularityPendingTaskId pendingTaskId) { return ZKPaths.makePath(PENDING_TASKS_TO_DELETE_PATH_ROOT, pendingTaskId.getId()); } private String getPendingTasksToDeletePath(SingularityPendingTaskId pendingTaskId) { return ZKPaths.makePath(PENDING_TASKS_TO_DELETE_PATH_ROOT, pendingTaskId.getId()); }
Expand Down Expand Up @@ -287,14 +290,22 @@ public int getNumActiveTasks() {
if (leaderCache.active()) { if (leaderCache.active()) {
return leaderCache.getNumActiveTasks(); return leaderCache.getNumActiveTasks();
} }
return getNumChildren(ACTIVE_PATH_ROOT); int total = 0;
for (String requestId : getChildren(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT)) {
total += getNumChildren(getLastActiveTaskParent(requestId));
}
return total;
} }


public int getNumScheduledTasks() { public int getNumScheduledTasks() {
if (leaderCache.active()) { if (leaderCache.active()) {
return leaderCache.getNumPendingTasks(); return leaderCache.getNumPendingTasks();
} }
return getNumChildren(PENDING_PATH_ROOT); int total = 0;
for (String requestId : getChildren(PENDING_PATH_ROOT)) {
total += getNumChildren(getPendingForRequestPath(requestId));
}
return total;
} }


public void saveLoadBalancerState(SingularityTaskId taskId, LoadBalancerRequestType requestType, SingularityLoadBalancerUpdate lbUpdate) { public void saveLoadBalancerState(SingularityTaskId taskId, LoadBalancerRequestType requestType, SingularityLoadBalancerUpdate lbUpdate) {
Expand Down Expand Up @@ -370,7 +381,14 @@ public List<String> getActiveTaskIdsAsStrings() {
if (leaderCache.active()) { if (leaderCache.active()) {
return leaderCache.getActiveTaskIdsAsStrings(); return leaderCache.getActiveTaskIdsAsStrings();
} }
return getChildren(ACTIVE_PATH_ROOT);
List<String> results = new ArrayList<>();

for (String requestId : getChildren(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT)) {
results.addAll(getChildren(getLastActiveTaskParent(requestId)));
}

return results;
} }


public List<SingularityTaskId> getActiveTaskIds() { public List<SingularityTaskId> getActiveTaskIds() {
Expand All @@ -386,7 +404,7 @@ public List<SingularityTaskId> getActiveTaskIds(boolean useWebCache) {
return webCache.getActiveTaskIds(); return webCache.getActiveTaskIds();
} }


return getTaskIds(ACTIVE_PATH_ROOT); return getAsyncNestedChildIdsAsList(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT, LAST_ACTIVE_TASK_STATUSES_PATH_ROOT, taskIdTranscoder);
} }


public List<SingularityTaskId> getCleanupTaskIds() { public List<SingularityTaskId> getCleanupTaskIds() {
Expand Down Expand Up @@ -609,12 +627,12 @@ public SingularityDeleteResult deleteTaskHistoryUpdate(SingularityTaskId taskId,
return delete(getUpdatePath(taskId, state)); return delete(getUpdatePath(taskId, state));
} }


public boolean isActiveTask(String taskId) { public boolean isActiveTask(SingularityTaskId taskId) {
if (leaderCache.active()) { if (leaderCache.active()) {
return leaderCache.isActiveTask(taskId); return leaderCache.isActiveTask(taskId);
} }


return exists(getActivePath(taskId)); return exists(getLastActiveTaskStatusPath(taskId));
} }


public SingularityCreateResult markUnhealthyKill(SingularityTaskId taskId) { public SingularityCreateResult markUnhealthyKill(SingularityTaskId taskId) {
Expand Down Expand Up @@ -669,7 +687,7 @@ public List<SingularityTaskId> filterActiveTaskIds(List<SingularityTaskId> taskI
final List<String> paths = Lists.newArrayListWithCapacity(taskIds.size()); final List<String> paths = Lists.newArrayListWithCapacity(taskIds.size());


for (SingularityTaskId taskId : taskIds) { for (SingularityTaskId taskId : taskIds) {
paths.add(getActivePath(taskId.getId())); paths.add(getLastActiveTaskStatusPath(taskId));
} }


return exists("filterActiveTaskIds", paths, taskIdTranscoder); return exists("filterActiveTaskIds", paths, taskIdTranscoder);
Expand All @@ -695,7 +713,7 @@ public List<SingularityTaskId> filterInactiveTaskIds(List<SingularityTaskId> tas
final Map<String, SingularityTaskId> pathsMap = Maps.newHashMap(); final Map<String, SingularityTaskId> pathsMap = Maps.newHashMap();


for (SingularityTaskId taskId : taskIds) { for (SingularityTaskId taskId : taskIds) {
pathsMap.put(getActivePath(taskId.getId()), taskId); pathsMap.put(getLastActiveTaskStatusPath(taskId), taskId);
} }


return notExists("filterInactiveTaskIds", pathsMap); return notExists("filterInactiveTaskIds", pathsMap);
Expand Down Expand Up @@ -830,14 +848,14 @@ public boolean taskExistsInZk(SingularityTaskId taskId) {
public void activateLeaderCache() { public void activateLeaderCache() {
leaderCache.cachePendingTasks(fetchPendingTasks()); leaderCache.cachePendingTasks(fetchPendingTasks());
leaderCache.cachePendingTasksToDelete(getPendingTasksMarkedForDeletion()); leaderCache.cachePendingTasksToDelete(getPendingTasksMarkedForDeletion());
leaderCache.cacheActiveTaskIds(getTaskIds(ACTIVE_PATH_ROOT)); leaderCache.cacheActiveTaskIds(getActiveTaskIds(false));
leaderCache.cacheCleanupTasks(fetchCleanupTasks()); leaderCache.cacheCleanupTasks(fetchCleanupTasks());
leaderCache.cacheKilledTasks(fetchKilledTaskIdRecords()); leaderCache.cacheKilledTasks(fetchKilledTaskIdRecords());
leaderCache.cacheTaskHistoryUpdates(getAllTaskHistoryUpdates()); leaderCache.cacheTaskHistoryUpdates(getAllTaskHistoryUpdates());
} }


private List<SingularityPendingTask> fetchPendingTasks() { private List<SingularityPendingTask> fetchPendingTasks() {
return getAsyncChildren(PENDING_PATH_ROOT, pendingTaskTranscoder); return getAsyncNestedChildrenAsList(PENDING_PATH_ROOT, getChildren(PENDING_PATH_ROOT), pendingTaskTranscoder);
} }


public List<SingularityPendingTaskId> getPendingTaskIds() { public List<SingularityPendingTaskId> getPendingTaskIds() {
Expand All @@ -853,22 +871,26 @@ public List<SingularityPendingTaskId> getPendingTaskIds(boolean useWebCache) {
return webCache.getPendingTaskIds(); return webCache.getPendingTaskIds();
} }


return getChildrenAsIds(PENDING_PATH_ROOT, pendingTaskIdTranscoder); return getAsyncNestedChildIdsAsList(PENDING_PATH_ROOT, PENDING_PATH_ROOT, pendingTaskIdTranscoder);
} }


public List<SingularityPendingTaskId> getPendingTaskIdsForRequest(final String requestId) { public List<SingularityPendingTaskId> getPendingTaskIdsForRequest(final String requestId) {
List<SingularityPendingTaskId> pendingTaskIds = getPendingTaskIds(); return getChildrenAsIds(getPendingForRequestPath(requestId), pendingTaskIdTranscoder);
return pendingTaskIds.stream()
.filter(pendingTaskId -> pendingTaskId.getRequestId().equals(requestId))
.collect(Collectors.collectingAndThen(Collectors.toList(), ImmutableList::copyOf));
} }


public List<SingularityPendingTask> getPendingTasksForRequest(final String requestId) { public List<SingularityPendingTask> getPendingTasksForRequest(final String requestId, boolean useWebCache) {
return getAsync( if (leaderCache.active()) {
PENDING_PATH_ROOT, return leaderCache.getPendingTasks().stream()
getPendingTaskIdsForRequest(requestId).stream().map(this::getPendingPath).collect(Collectors.toList()), .filter((p) -> p.getPendingTaskId().getRequestId().equals(requestId))
pendingTaskTranscoder .collect(Collectors.toList());
); }

if (useWebCache && webCache.useCachedPendingTasks()) {
return webCache.getPendingTasks().stream()
.filter((p) -> p.getPendingTaskId().getRequestId().equals(requestId))
.collect(Collectors.toList());
}
return getAsyncChildren(getPendingForRequestPath(requestId), pendingTaskTranscoder);
} }


public List<SingularityPendingTask> getPendingTasks() { public List<SingularityPendingTask> getPendingTasks() {
Expand Down Expand Up @@ -935,7 +957,7 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task) throws


CuratorTransactionFinal transaction = curator.inTransaction().create().forPath(path, taskTranscoder.toBytes(task)).and(); CuratorTransactionFinal transaction = curator.inTransaction().create().forPath(path, taskTranscoder.toBytes(task)).and();


transaction.create().forPath(getActivePath(task.getTaskId().getId())).and().commit(); transaction.create().forPath(getLastActiveTaskStatusPath(task.getTaskId())).and().commit();


leaderCache.putActiveTask(task); leaderCache.putActiveTask(task);
taskCache.set(path, task); taskCache.set(path, task);
Expand Down Expand Up @@ -991,6 +1013,7 @@ public SingularityDeleteResult deleteKilledRecord(SingularityTaskId taskId) {


@Timed @Timed
public SingularityDeleteResult deleteLastActiveTaskStatus(SingularityTaskId taskId) { public SingularityDeleteResult deleteLastActiveTaskStatus(SingularityTaskId taskId) {
leaderCache.deleteActiveTaskId(taskId);
return delete(getLastActiveTaskStatusPath(taskId)); return delete(getLastActiveTaskStatusPath(taskId));
} }


Expand Down Expand Up @@ -1083,11 +1106,6 @@ public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup)
return result; return result;
} }


public void deleteActiveTask(String taskId) {
leaderCache.deleteActiveTaskId(taskId);
delete(getActivePath(taskId));
}

public void deletePendingTask(SingularityPendingTaskId pendingTaskId) { public void deletePendingTask(SingularityPendingTaskId pendingTaskId) {
leaderCache.deletePendingTask(pendingTaskId); leaderCache.deletePendingTask(pendingTaskId);
delete(getPendingPath(pendingTaskId)); delete(getPendingPath(pendingTaskId));
Expand Down

0 comments on commit d9c4dbf

Please sign in to comment.