Skip to content

Commit

Permalink
more consolidation in CuratorAsyncManager
Browse files Browse the repository at this point in the history
  • Loading branch information
ssalinas committed Mar 3, 2016
1 parent d281f39 commit 0d6de9a
Showing 1 changed file with 19 additions and 19 deletions.
Expand Up @@ -37,6 +37,10 @@ public CuratorAsyncManager(CuratorFramework curator, SingularityConfiguration co
super(curator, configuration, metricRegistry);
}

private enum CuratorQueryMethod {
GET_DATA, CHECK_EXISTS, GET_CHILDREN
}

private <T> List<T> getAsyncChildrenThrows(final String parent, final Transcoder<T> transcoder) throws Exception {
final List<String> children = getChildren(parent);
final List<String> paths = Lists.newArrayListWithCapacity(children.size());
Expand Down Expand Up @@ -95,7 +99,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
};

return getDataAndReturnResultsThrows(objects, paths, callback, latch, pathNameForLogs, bytes);
return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA);
}

private void checkLatch(CountDownLatch latch, String path) throws InterruptedException {
Expand Down Expand Up @@ -131,19 +135,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
};

final long start = System.currentTimeMillis();

try {
for (String parent : parents) {
curator.getChildren().inBackground(callback).forPath(parent);
}

checkLatch(latch, pathNameforLogs);
} finally {
log(OperationType.READ, Optional.<Integer> of(objects.size()), Optional.<Integer> absent(), start, pathNameforLogs);
}

return objects;
return queryAndReturnResultsThrows(objects, parents, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.GET_CHILDREN);
}

protected <T extends SingularityId> List<T> getChildrenAsIdsForParents(final String pathNameforLogs, final Collection<String> parents, final IdTranscoder<T> idTranscoder) {
Expand Down Expand Up @@ -183,7 +175,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
};

return getDataAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger());
return queryAndReturnResultsThrows(objects, paths, callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS);
}

protected <T extends SingularityId> List<T> exists(final String pathNameForLogs, final Collection<String> paths, final IdTranscoder<T> idTranscoder) {
Expand Down Expand Up @@ -215,7 +207,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
};

return getDataAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger());
return queryAndReturnResultsThrows(objects, pathsMap.keySet(), callback, latch, pathNameforLogs, new AtomicInteger(), CuratorQueryMethod.CHECK_EXISTS);
}

protected <T extends SingularityId> List<T> notExists(final String pathNameForLogs, final Map<String, T> pathsMap) {
Expand Down Expand Up @@ -285,7 +277,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
};

return getDataAndReturnResultsThrows(resultsMap, allPathsMap.keySet(), callback, latch, pathNameForLogs, bytes);
return queryAndReturnResultsThrows(resultsMap, allPathsMap.keySet(), callback, latch, pathNameForLogs, bytes, CuratorQueryMethod.GET_DATA);
}

protected <T, Q> Map<T, List<Q>> getAsyncAsMap(final String pathNameForLogs, final Map<String, T> parentPathsMap, final String subpath, final Transcoder<Q> transcoder) {
Expand All @@ -296,12 +288,20 @@ protected <T, Q> Map<T, List<Q>> getAsyncAsMap(final String pathNameForLogs, fin
}
}

private <T> T getDataAndReturnResultsThrows(T results, Collection<String> paths, BackgroundCallback callback, CountDownLatch latch, String pathNameForLogs, AtomicInteger bytes) throws Exception {
private <T> T queryAndReturnResultsThrows(T results, Collection<String> paths, BackgroundCallback callback, CountDownLatch latch, String pathNameForLogs, AtomicInteger bytes, CuratorQueryMethod method) throws Exception {
final long start = System.currentTimeMillis();

try {
for (String path : paths) {
curator.getData().inBackground(callback).forPath(path);
switch (method) {
case GET_DATA:
curator.getData().inBackground(callback).forPath(path);
case GET_CHILDREN:
curator.getChildren().inBackground(callback).forPath(path);
case CHECK_EXISTS:
default:
curator.checkExists().inBackground(callback).forPath(path);
}
}

checkLatch(latch, pathNameForLogs);
Expand Down

0 comments on commit 0d6de9a

Please sign in to comment.