From d012f6c323d29a48ded29ff2ef86d77970a5d080 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 4 Oct 2018 15:12:18 -0500 Subject: [PATCH] STORM-3245: Don't blow up if empty log dirs exist --- .../logviewer/utils/DirectoryCleaner.java | 6 +-- .../daemon/logviewer/utils/LogCleaner.java | 9 +--- .../daemon/logviewer/utils/WorkerLogs.java | 47 +++++++++++-------- .../logviewer/utils/LogCleanerTest.java | 24 ++++++---- .../logviewer/utils/WorkerLogsTest.java | 7 ++- 5 files changed, 53 insertions(+), 40 deletions(-) diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java index 0b17c84d86d..2509fc81f0d 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java @@ -86,7 +86,7 @@ public DirectoryStream getStreamForDirectory(File dir) throws IOException * @return number of files deleted */ public DeletionMeta deleteOldestWhileTooLarge(List dirs, - long quota, boolean forPerDir, Set activeDirs) throws IOException { + long quota, boolean forPerDir, Set activeDirs) throws IOException { long totalSize = 0; for (File dir : dirs) { try (DirectoryStream stream = getStreamForDirectory(dir)) { @@ -171,12 +171,12 @@ public DeletionMeta deleteOldestWhileTooLarge(List dirs, return new DeletionMeta(deletedSize, deletedFiles); } - private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set activeDirs, File dir, File file) throws IOException { + private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set activeDirs, File dir, File file) throws IOException { if (forPerDir) { return ACTIVE_LOG_PATTERN.matcher(file.getName()).matches(); } else { // for global cleanup // for an active worker's dir, make sure for the last "/" - return activeDirs.contains(dir.getCanonicalPath()) ? ACTIVE_LOG_PATTERN.matcher(file.getName()).matches() : + return activeDirs.contains(dir) ? ACTIVE_LOG_PATTERN.matcher(file.getName()).matches() : META_LOG_PATTERN.matcher(file.getName()).matches(); } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java index 035fe3b6012..036d224d3ee 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java @@ -210,7 +210,7 @@ List perWorkerDirCleanup(long size) { @VisibleForTesting DeletionMeta globalLogCleanup(long size) throws Exception { List workerDirs = new ArrayList<>(workerLogs.getAllWorkerDirs()); - Set aliveWorkerDirs = new HashSet<>(workerLogs.getAliveWorkerDirs()); + Set aliveWorkerDirs = workerLogs.getAliveWorkerDirs(); return directoryCleaner.deleteOldestWhileTooLarge(workerDirs, size, false, aliveWorkerDirs); } @@ -235,12 +235,7 @@ SortedSet getDeadWorkerDirs(int nowSecs, Set logDirs) throws Excepti return new TreeSet<>(); } else { Set aliveIds = workerLogs.getAliveIds(nowSecs); - Map idToDir = workerLogs.identifyWorkerLogDirs(logDirs); - - return idToDir.entrySet().stream() - .filter(entry -> !aliveIds.contains(entry.getKey())) - .map(Map.Entry::getValue) - .collect(toCollection(TreeSet::new)); + return workerLogs.getLogDirs(logDirs, (wid) -> !aliveIds.contains(wid)); } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java index 8d9934c6eef..3cd59213fbd 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.storm.daemon.supervisor.ClientSupervisorUtils; @@ -48,7 +49,6 @@ import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.jooq.lambda.Unchecked; -import org.jooq.lambda.tuple.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +56,7 @@ * A class that knows about how to operate with worker log directory. */ public class WorkerLogs { - private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); + private static final Logger LOG = LoggerFactory.getLogger(WorkerLogs.class); public static final String WORKER_YAML = "worker.yaml"; @@ -140,15 +140,10 @@ public Set getAllWorkerDirs() { /** * Return a sorted set of java.io.Files that were written by workers that are now active. */ - public SortedSet getAliveWorkerDirs() { + public SortedSet getAliveWorkerDirs() { Set aliveIds = getAliveIds(Time.currentTimeSecs()); Set logDirs = getAllWorkerDirs(); - Map idToDir = identifyWorkerLogDirs(logDirs); - - return idToDir.entrySet().stream() - .filter(entry -> aliveIds.contains(entry.getKey())) - .map(Unchecked.function(entry -> entry.getValue().getCanonicalPath())) - .collect(toCollection(TreeSet::new)); + return getLogDirs(logDirs, (wid) -> aliveIds.contains(wid)); } /** @@ -172,7 +167,7 @@ public Optional getMetadataFileForWorkerLogDir(File logDir) throws IOExcep */ public String getWorkerIdFromMetadataFile(String metaFile) { Map map = (Map) Utils.readYamlFile(metaFile); - return ObjectReader.getString(map.get("worker-id"), null); + return ObjectReader.getString(map == null ? null : map.get("worker-id"), null); } /** @@ -199,19 +194,33 @@ public Set getAliveIds(int nowSecs) { } /** - * Finds a worker ID for each directory in set and return it as map. + * Finds directories for specific worker ids that can be cleaned up. * * @param logDirs directories to check whether they're worker directories or not - * @return the pair of worker ID, directory. worker ID will be an empty string if the directory is not a worker directory. + * @param predicate a check on a worker id to see if the log dir should be included + * @return directories that can be cleaned up. */ - public Map identifyWorkerLogDirs(Set logDirs) { + public SortedSet getLogDirs(Set logDirs, Predicate predicate) { // we could also make this static, but not to do it due to mock - return logDirs.stream().map(Unchecked.function(logDir -> { - Optional metaFile = getMetadataFileForWorkerLogDir(logDir); - - return metaFile.map(Unchecked.function(m -> new Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir))) - .orElse(new Tuple2<>("", logDir)); - })).collect(toMap(Tuple2::v1, Tuple2::v2)); + TreeSet ret = new TreeSet<>(); + for (File logDir: logDirs) { + String workerId = ""; + try { + Optional metaFile = getMetadataFileForWorkerLogDir(logDir); + if (metaFile.isPresent()) { + workerId = getWorkerIdFromMetadataFile(metaFile.get().getCanonicalPath()); + if (workerId == null) { + workerId = ""; + } + } + } catch (IOException e) { + LOG.warn("Error trying to find worker.yaml in {}", logDir, e); + } + if (predicate.test(workerId)) { + ret.add(logDir); + } + } + return ret; } /** diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java index d85edea32e4..92c5a97f59c 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java @@ -51,6 +51,7 @@ import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Predicate; import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder; import org.apache.storm.daemon.logviewer.testsupport.MockRemovableFileBuilder; import org.apache.storm.daemon.supervisor.SupervisorUtils; @@ -187,7 +188,7 @@ public void testGlobalLogCleanup() throws Exception { List paths = Arrays.stream(file.listFiles()).map(f -> mkMockPath(f)).collect(toList()); return mkDirectoryStream(paths); }); - when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), anyLong(), anyBoolean(), anySetOf(String.class))) + when(mockDirectoryCleaner.deleteOldestWhileTooLarge(anyListOf(File.class), anyLong(), anyBoolean(), anySetOf(File.class))) .thenCallRealMethod(); long nowMillis = Time.currentTimeMillis(); @@ -225,8 +226,8 @@ public void testGlobalLogCleanup() throws Exception { StormMetricsRegistry metricRegistry = new StormMetricsRegistry(); WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, rootDir, metricRegistry) { @Override - public SortedSet getAliveWorkerDirs() { - return new TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1")); + public SortedSet getAliveWorkerDirs() { + return new TreeSet<>(Collections.singletonList(new File("/workers-artifacts/topo1/port1"))); } }; @@ -264,12 +265,17 @@ public void testGetDeadWorkerDirs() throws Exception { StormMetricsRegistry metricRegistry = new StormMetricsRegistry(); WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, null, metricRegistry) { @Override - public Map identifyWorkerLogDirs(Set logDirs) { - Map ret = new HashMap<>(); - ret.put("42", unexpectedDir1); - ret.put("007", expectedDir2); - // this tests a directory with no yaml file thus no worker id - ret.put("", expectedDir3); + public SortedSet getLogDirs(Set logDirs, Predicate predicate) { + TreeSet ret = new TreeSet<>(); + if (predicate.test("42")) { + ret.add(unexpectedDir1); + } + if (predicate.test("007")) { + ret.add(expectedDir2); + } + if(predicate.test("")) { + ret.add(expectedDir3); + } return ret; } diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java index c7ca3ddf7f8..3b3a72fed7f 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/WorkerLogsTest.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; import org.apache.storm.daemon.logviewer.testsupport.MockDirectoryBuilder; import org.apache.storm.daemon.logviewer.testsupport.MockFileBuilder; import org.apache.storm.daemon.supervisor.SupervisorUtils; @@ -47,7 +49,8 @@ public void testIdentifyWorkerLogDirs() throws Exception { File mockMetaFile = new MockFileBuilder().setFileName("worker.yaml").build(); String expId = "id12345"; - Map expected = Collections.singletonMap(expId, port1Dir); + SortedSet expected = new TreeSet<>(); + expected.add(port1Dir); try { SupervisorUtils mockedSupervisorUtils = mock(SupervisorUtils.class); @@ -67,7 +70,7 @@ public String getWorkerIdFromMetadataFile(String metaFile) { }; when(mockedSupervisorUtils.readWorkerHeartbeatsImpl(anyMapOf(String.class, Object.class))).thenReturn(null); - assertEquals(expected, workerLogs.identifyWorkerLogDirs(Collections.singleton(port1Dir))); + assertEquals(expected, workerLogs.getLogDirs(Collections.singleton(port1Dir), (wid) -> true)); } finally { SupervisorUtils.resetInstance(); }