From 99f47570750266c12d31eaa11ef912e92575b095 Mon Sep 17 00:00:00 2001 From: Aaron Gresch Date: Mon, 21 May 2018 14:52:58 -0500 Subject: [PATCH 1/2] STORM-3053 prevent race deleting blobstores before topologies can be submitted --- .../test/clj/org/apache/storm/nimbus_test.clj | 10 ++-- .../java/org/apache/storm/DaemonConfig.java | 6 ++ .../apache/storm/daemon/nimbus/Nimbus.java | 55 +++++++++++++++++-- .../storm/daemon/nimbus/NimbusTest.java | 30 ++++++++++ 4 files changed, 92 insertions(+), 9 deletions(-) diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index a9742316865..93246d35f26 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -1856,7 +1856,7 @@ (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3")) store (Mockito/mock BlobStore)] (.thenReturn (Mockito/when (.storedTopoIds store)) #{}) - (is (= (Nimbus/topoIdsToClean mock-state store) #{"topo2" "topo3"})))) + (is (= (Nimbus/topoIdsToClean mock-state store (new java.util.HashMap)) #{"topo2" "topo3"})))) (deftest cleanup-storm-ids-performs-union-of-storm-ids-with-active-znodes (let [active-topos (list "hb1" "e2" "bp3") @@ -1866,7 +1866,7 @@ mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos) store (Mockito/mock BlobStore)] (.thenReturn (Mockito/when (.storedTopoIds store)) #{}) - (is (= (Nimbus/topoIdsToClean mock-state store) + (is (= (Nimbus/topoIdsToClean mock-state store (new java.util.HashMap)) #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"})))) (deftest cleanup-storm-ids-returns-empty-set-when-all-topos-are-active @@ -1877,7 +1877,7 @@ mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos) store (Mockito/mock BlobStore)] (.thenReturn (Mockito/when (.storedTopoIds store)) #{}) - (is (= (Nimbus/topoIdsToClean mock-state store) + (is (= (Nimbus/topoIdsToClean mock-state store (new java.util.HashMap)) #{})))) (deftest do-cleanup-removes-inactive-znodes @@ -1885,7 +1885,7 @@ hb-cache (into {}(map vector inactive-topos '(nil nil))) mock-state (mock-cluster-state) mock-blob-store (Mockito/mock BlobStore) - conf {NIMBUS-MONITOR-FREQ-SECS 10}] + conf {NIMBUS-MONITOR-FREQ-SECS 10 NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MSEC 0}] (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))] (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))] @@ -1920,6 +1920,7 @@ ;; remove topos from heartbeat cache (is (= (count (.get (.getHeartbeatsCache nimbus))) 0))))))) + (deftest do-cleanup-does-not-teardown-active-topos (let [inactive-topos () hb-cache {"topo1" nil "topo2" nil} @@ -1988,3 +1989,4 @@ (is (= (list "topo1" "authorized") supervisor-topologies)) (is (= #{"authorized"} user-topologies))))) + diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index 2487414912a..3f04d83d842 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -279,6 +279,12 @@ public class DaemonConfig implements Validated { @isString public static final String NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS = "nimbus.worker.heartbeats.recovery.strategy.class"; + /** + * This controls the number of milliseconds nimbus will wait before deleting a topology blobstore once detected it is able to delete. + */ + @isInteger + public static final String NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MSEC = "nimbus.topology.blobstore.deletion.delay.msec"; + /** * Storm UI binds to this host/interface. */ diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 48e8c210a2e..9e172d07067 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -187,6 +187,7 @@ import org.apache.storm.utils.LocalState; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ReflectionUtils; +import org.apache.storm.utils.RotatingMap; import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.SimpleVersion; @@ -371,6 +372,8 @@ public static List getNimbusAcls(Map conf) { private static final List EMPTY_STRING_LIST = Collections.unmodifiableList(Collections.emptyList()); private static final Set EMPTY_STRING_SET = Collections.unmodifiableSet(Collections.emptySet()); private static final Pattern TOPOLOGY_NAME_REGEX = Pattern.compile("^[^/.:\\\\]+$"); + private static final RotatingMap topologyCleanupDetected = new RotatingMap(2); + private static long topologyCleanupRotationTime = 0L; // END TOPOLOGY STATE TRANSITIONS @@ -828,12 +831,54 @@ private static Map tryReadTopoConf(String topoId, TopoCache tc) } } + private static void rotateTopologyCleanupMap(long deletionDelay) { + if (Time.currentTimeMillis() - topologyCleanupRotationTime > deletionDelay) { + topologyCleanupDetected.rotate(); + topologyCleanupRotationTime = Time.currentTimeMillis(); + } + } + + private static long getTopologyCleanupDetectedTime(String topologyId) { + Long firstDetectedForDeletion = topologyCleanupDetected.get(topologyId); + if (firstDetectedForDeletion == null) { + firstDetectedForDeletion = Time.currentTimeMillis(); + topologyCleanupDetected.put(topologyId, firstDetectedForDeletion); + } + return firstDetectedForDeletion; + } + + /** + * Finds blobstore entries with no matching topology. Waits NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MSEC + * before reporting the topologies found. The delay is to prevent a race condition between when a blobstore + * is created and when the topology is submitted. It is possible the Nimbus cleanup timer task will find + * entries to delete between these two events. + * + * @param store blobstore to search + * @param conf the nimbus conf + * @return a set of + */ + static Set getIdleTopologyIds(BlobStore store, Map conf) { + Set toposToClean = store.storedTopoIds(); + Set idleTopologies = new HashSet<>(); + long topologyDeletionDelay = ObjectReader.getInt( + conf.get(DaemonConfig.NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MSEC), 5 * 60 * 1000); + for (String topologyId : toposToClean) { + if (Time.currentTimeMillis() - getTopologyCleanupDetectedTime(topologyId) >= topologyDeletionDelay) { + idleTopologies.add(topologyId); + } + } + + rotateTopologyCleanupMap(topologyDeletionDelay); + + return idleTopologies; + } + @VisibleForTesting - public static Set topoIdsToClean(IStormClusterState state, BlobStore store) { + public static Set topoIdsToClean(IStormClusterState state, BlobStore store, Map conf) { Set ret = new HashSet<>(); ret.addAll(Utils.OR(state.heartbeatStorms(), EMPTY_STRING_LIST)); ret.addAll(Utils.OR(state.errorTopologies(), EMPTY_STRING_LIST)); - ret.addAll(Utils.OR(store.storedTopoIds(), EMPTY_STRING_SET)); + ret.addAll(Utils.OR(getIdleTopologyIds(store, conf), EMPTY_STRING_SET)); ret.addAll(Utils.OR(state.backpressureTopologies(), EMPTY_STRING_LIST)); ret.addAll(Utils.OR(state.idsOfTopologiesWithPrivateWorkerKeys(), EMPTY_STRING_SET)); ret.removeAll(Utils.OR(state.activeStorms(), EMPTY_STRING_LIST)); @@ -2377,7 +2422,7 @@ public void doCleanup() throws Exception { IStormClusterState state = stormClusterState; Set toClean; synchronized (submitLock) { - toClean = topoIdsToClean(state, blobStore); + toClean = topoIdsToClean(state, blobStore, this.conf); } if (toClean != null) { for (String topoId : toClean) { @@ -3306,7 +3351,7 @@ public String beginCreateBlob(String key, SettableBlobMeta meta) try { String sessionId = Utils.uuid(); blobUploaders.put(sessionId, blobStore.createBlob(key, meta, getSubject())); - LOG.info("Created blob for {}", key); + LOG.info("Created blob {} for session {}", key, sessionId); return sessionId; } catch (Exception e) { LOG.warn("begin create blob exception.", e); @@ -3437,7 +3482,7 @@ public BeginDownloadResult beginBlobDownload(String key) (int) conf .getOrDefault(Config.STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES, 65536))); - LOG.info("Created download session for {}", key); + LOG.info("Created download session {} for {}", sessionId, key); return ret; } catch (Exception e) { LOG.warn("begin blob download exception.", e); diff --git a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java index 47484ac7728..bad75f0a201 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java @@ -18,14 +18,23 @@ package org.apache.storm.daemon.nimbus; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.storm.Config; import org.apache.storm.DaemonConfig; +import org.apache.storm.blobstore.BlobStore; import org.apache.storm.generated.StormTopology; import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy; import org.apache.storm.testing.TestWordSpout; import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Time; +import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.fail; @@ -55,4 +64,25 @@ public void testMemoryLoadLargerThanMaxHeapSize() throws Exception { //Expected... } } + + @Test + public void uploadedBlobPersistsMinimumTime() { + BlobStore store = Mockito.mock(BlobStore.class); + Set idleTopologies = new HashSet<>(); + idleTopologies.add("topology1"); + Mockito.when(store.storedTopoIds()).thenReturn(idleTopologies); + Map conf = new HashMap<>(); + + try (Time.SimulatedTime t = new Time.SimulatedTime(null)) { + Set toDelete = Nimbus.getIdleTopologyIds(store, conf); + Assert.assertTrue(toDelete.isEmpty()); + + Time.advanceTime(10 * 60 * 1000L); + + toDelete = Nimbus.getIdleTopologyIds(store, conf); + Assert.assertTrue(toDelete.contains("topology1")); + Assert.assertEquals(1, toDelete.size()); + + } + } } From 787a4340a8d2d3291e5e4ecef47c8b85ff9a3b74 Mon Sep 17 00:00:00 2001 From: Aaron Gresch Date: Wed, 30 May 2018 10:40:23 -0500 Subject: [PATCH 2/2] review rework --- conf/defaults.yaml | 2 ++ .../test/clj/org/apache/storm/nimbus_test.clj | 2 +- .../java/org/apache/storm/DaemonConfig.java | 2 +- .../apache/storm/daemon/nimbus/Nimbus.java | 24 ++++++++++--------- .../storm/daemon/nimbus/NimbusTest.java | 5 ++-- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 0e67957f158..a304aa5ed62 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -50,6 +50,7 @@ storm.nimbus.retry.interval.millis: 2000 storm.nimbus.retry.intervalceiling.millis: 60000 storm.nimbus.zookeeper.acls.check: true storm.nimbus.zookeeper.acls.fixup: true + storm.auth.simple-white-list.users: [] storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory" storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate" @@ -83,6 +84,7 @@ nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAs nimbus.assignments.service.threads: 10 nimbus.assignments.service.thread.queue.size: 100 nimbus.worker.heartbeats.recovery.strategy.class: "org.apache.storm.nimbus.TimeOutWorkerHeartbeatsRecoveryStrategy" +nimbus.topology.blobstore.deletion.delay.ms: 300000 ### ui.* configs are for the master ui.host: 0.0.0.0 diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 93246d35f26..77cbc98557b 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -1885,7 +1885,7 @@ hb-cache (into {}(map vector inactive-topos '(nil nil))) mock-state (mock-cluster-state) mock-blob-store (Mockito/mock BlobStore) - conf {NIMBUS-MONITOR-FREQ-SECS 10 NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MSEC 0}] + conf {NIMBUS-MONITOR-FREQ-SECS 10 NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MS 0}] (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls] (MockLeaderElector. ))))] (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))] diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index 3f04d83d842..f17477335c8 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -283,7 +283,7 @@ public class DaemonConfig implements Validated { * This controls the number of milliseconds nimbus will wait before deleting a topology blobstore once detected it is able to delete. */ @isInteger - public static final String NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MSEC = "nimbus.topology.blobstore.deletion.delay.msec"; + public static final String NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MS = "nimbus.topology.blobstore.deletion.delay.ms"; /** * Storm UI binds to this host/interface. diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 9e172d07067..4bd2f4a1ce2 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -372,7 +372,7 @@ public static List getNimbusAcls(Map conf) { private static final List EMPTY_STRING_LIST = Collections.unmodifiableList(Collections.emptyList()); private static final Set EMPTY_STRING_SET = Collections.unmodifiableSet(Collections.emptySet()); private static final Pattern TOPOLOGY_NAME_REGEX = Pattern.compile("^[^/.:\\\\]+$"); - private static final RotatingMap topologyCleanupDetected = new RotatingMap(2); + private static final RotatingMap topologyCleanupDetected = new RotatingMap<>(2); private static long topologyCleanupRotationTime = 0L; // END TOPOLOGY STATE TRANSITIONS @@ -848,20 +848,22 @@ private static long getTopologyCleanupDetectedTime(String topologyId) { } /** - * Finds blobstore entries with no matching topology. Waits NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MSEC - * before reporting the topologies found. The delay is to prevent a race condition between when a blobstore - * is created and when the topology is submitted. It is possible the Nimbus cleanup timer task will find - * entries to delete between these two events. + * Finds blobstore entries with no matching topology. Blobstore entries first detected less than + * NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MS ago are ignored. The delay is to prevent a race condition + * between when a blobstore is created and when the topology is submitted. It is possible the Nimbus cleanup + * timer task will find entries to delete between these two events. * - * @param store blobstore to search - * @param conf the nimbus conf - * @return a set of + * Tracked blobstore entries are rotated out of the stored map periodically. + * + * @param store blobstore to search + * @param conf the nimbus conf + * @return the set of blobstores with no matching topology */ - static Set getIdleTopologyIds(BlobStore store, Map conf) { + static Set getExpiredTopologyIds(BlobStore store, Map conf) { Set toposToClean = store.storedTopoIds(); Set idleTopologies = new HashSet<>(); long topologyDeletionDelay = ObjectReader.getInt( - conf.get(DaemonConfig.NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MSEC), 5 * 60 * 1000); + conf.get(DaemonConfig.NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MS), 5 * 60 * 1000); for (String topologyId : toposToClean) { if (Time.currentTimeMillis() - getTopologyCleanupDetectedTime(topologyId) >= topologyDeletionDelay) { idleTopologies.add(topologyId); @@ -878,7 +880,7 @@ public static Set topoIdsToClean(IStormClusterState state, BlobStore sto Set ret = new HashSet<>(); ret.addAll(Utils.OR(state.heartbeatStorms(), EMPTY_STRING_LIST)); ret.addAll(Utils.OR(state.errorTopologies(), EMPTY_STRING_LIST)); - ret.addAll(Utils.OR(getIdleTopologyIds(store, conf), EMPTY_STRING_SET)); + ret.addAll(Utils.OR(getExpiredTopologyIds(store, conf), EMPTY_STRING_SET)); ret.addAll(Utils.OR(state.backpressureTopologies(), EMPTY_STRING_LIST)); ret.addAll(Utils.OR(state.idsOfTopologiesWithPrivateWorkerKeys(), EMPTY_STRING_SET)); ret.removeAll(Utils.OR(state.activeStorms(), EMPTY_STRING_LIST)); diff --git a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java index bad75f0a201..8018a81b13e 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java @@ -72,14 +72,15 @@ public void uploadedBlobPersistsMinimumTime() { idleTopologies.add("topology1"); Mockito.when(store.storedTopoIds()).thenReturn(idleTopologies); Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_TOPOLOGY_BLOBSTORE_DELETION_DELAY_MS, 300000); try (Time.SimulatedTime t = new Time.SimulatedTime(null)) { - Set toDelete = Nimbus.getIdleTopologyIds(store, conf); + Set toDelete = Nimbus.getExpiredTopologyIds(store, conf); Assert.assertTrue(toDelete.isEmpty()); Time.advanceTime(10 * 60 * 1000L); - toDelete = Nimbus.getIdleTopologyIds(store, conf); + toDelete = Nimbus.getExpiredTopologyIds(store, conf); Assert.assertTrue(toDelete.contains("topology1")); Assert.assertEquals(1, toDelete.size());