From 12b60c93fab14f2615bba60874d6117cea9978c1 Mon Sep 17 00:00:00 2001 From: Arun Mahadevan Date: Mon, 13 Mar 2017 18:11:13 +0530 Subject: [PATCH] STORM-2412: Nimbus isLeader check while waiting for max replication While using local FS blob store, nimbus goes into a state where it indefinitely waits for max replication (with max replication wait time set to -1). At this time its also observed that the nimbus that received the topology submission is no longer the leader. While waiting for max replication, nimbus can also do the `isLeader` check and fail the topology submission if its no longer the leader. Also added some debug logs to better troubleshoot the race conditions when it happens. --- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 6 ++++-- .../src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java | 1 + .../jvm/org/apache/storm/blobstore/BlobSynchronizer.java | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index d9d71a14a50..9b2d04b000c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -505,8 +505,10 @@ (or (neg? max-replication-wait-time) (< @total-wait-time max-replication-wait-time))) (sleep-secs 1) - (log-debug "waiting for desired replication to be achieved. - min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time + (log-debug "Checking if I am still the leader") + (is-leader nimbus) + (log-debug "waiting for desired replication to be achieved for storm-id = " storm-id + " min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time (if (not (local-mode? conf))"current-replication-count for jar key = " @current-replication-count-jar) "current-replication-count for code key = " @current-replication-count-code "current-replication-count for conf key = " @current-replication-count-conf diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java index bd96b86a4bf..d62a71be28e 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java @@ -127,6 +127,7 @@ public static boolean downloadMissingBlob(Map conf, BlobStore blobStore, String if(isSuccess) { break; } + LOG.debug("Download blob key: {}, NimbusInfo {}", key, nimbusInfo); try(NimbusClient client = new NimbusClient(conf, nimbusInfo.getHost(), nimbusInfo.getPort(), null)) { rbm = client.getClient().getBlobMeta(key); remoteBlobStore = new NimbusBlobStore(); diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java index 3321bcf329a..f035709792d 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java @@ -81,6 +81,7 @@ public synchronized void syncBlobs() { for (String key : keySetToDownload) { try { Set nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key); + LOG.debug("syncBlobs, key: {}, nimbusInfoSet: {}", key, nimbusInfoSet); if (BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet)) { BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo); }