Skip to content

Commit

Permalink
STORM-3168 prevent AsyncLocalizer cleanup from crashing
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Gresch committed Aug 2, 2018
1 parent 146beff commit 5c6e102
Showing 1 changed file with 53 additions and 46 deletions.
Expand Up @@ -567,60 +567,67 @@ private void forEachTopologyDistDir(ConsumePathAndId consumer) throws IOExceptio

@VisibleForTesting
void cleanup() {
LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
// need one large set of all and then clean via LRU
for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
toClean.addResources(t.getValue());
LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
}
try {
LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
// need one large set of all and then clean via LRU
for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
toClean.addResources(t.getValue());
LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
}

for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
toClean.addResources(t.getValue());
LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
}
for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
toClean.addResources(t.getValue());
LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
}

toClean.addResources(topologyBlobs);
try (ClientBlobStore store = getClientBlobStore()) {
toClean.cleanup(store);
}
toClean.addResources(topologyBlobs);
try (ClientBlobStore store = getClientBlobStore()) {
toClean.cleanup(store);
}

HashSet<String> safeTopologyIds = new HashSet<>();
for (String blobKey : topologyBlobs.keySet()) {
safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
}
HashSet<String> safeTopologyIds = new HashSet<>();
for (String blobKey : topologyBlobs.keySet()) {
safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
}

//Deleting this early does not hurt anything
topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
//Deleting this early does not hurt anything
topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));

try {
forEachTopologyDistDir((p, topologyId) -> {
if (!safeTopologyIds.contains(topologyId)) {
fsOps.deleteIfExists(p.toFile());
}
});
} catch (Exception e) {
LOG.error("Could not read topology directories for cleanup", e);
}
try {
forEachTopologyDistDir((p, topologyId) -> {
if (!safeTopologyIds.contains(topologyId)) {
fsOps.deleteIfExists(p.toFile());
}
});
} catch (Exception e) {
LOG.error("Could not read topology directories for cleanup", e);
}

LOG.debug("Resource cleanup: {}", toClean);
Set<String> allUsers = new HashSet<>(userArchives.keySet());
allUsers.addAll(userFiles.keySet());
for (String user : allUsers) {
ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
if ((filesForUser == null || filesForUser.size() == 0)
&& (archivesForUser == null || archivesForUser.size() == 0)) {

LOG.debug("removing empty set: {}", user);
try {
LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
userFiles.remove(user);
userArchives.remove(user);
} catch (IOException e) {
LOG.error("Error trying to delete cached user files", e);
LOG.debug("Resource cleanup: {}", toClean);
Set<String> allUsers = new HashSet<>(userArchives.keySet());
allUsers.addAll(userFiles.keySet());
for (String user : allUsers) {
ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
if ((filesForUser == null || filesForUser.size() == 0)
&& (archivesForUser == null || archivesForUser.size() == 0)) {

LOG.debug("removing empty set: {}", user);
try {
LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
userFiles.remove(user);
userArchives.remove(user);
} catch (IOException e) {
LOG.error("Error trying to delete cached user files", e);
}
}
}
} catch (Exception ex) {
LOG.error("AsyncLocalizer cleanup failure", ex);
} catch (Error error) {
LOG.error("AsyncLocalizer cleanup failure", error);
Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
}
}

Expand Down

0 comments on commit 5c6e102

Please sign in to comment.