From b27b8c4fa015f19db6a087c3a565a703b39dc517 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Thu, 26 Jun 2014 21:13:56 +0000 Subject: [PATCH 1/6] Demo conservative ZK assignments (lots of logging) --- storm-core/src/clj/backtype/storm/cluster.clj | 31 ++++++++++++- .../src/clj/backtype/storm/daemon/common.clj | 1 + .../clj/backtype/storm/daemon/supervisor.clj | 46 +++++++++++++++---- .../src/clj/backtype/storm/zookeeper.clj | 25 ++++++++++ 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index d3682acae21..c5e6f16d077 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -29,6 +29,8 @@ ;; if node does not exist, create persistent with this data (set-data [this path data]) (get-data [this path watch?]) + (get-version [this path watch?]) + (get-data-with-version [this path watch?]) (get-children [this path watch?]) (mkdirs [this path]) (close [this]) @@ -56,7 +58,6 @@ (callback type path))))))] (reify ClusterState - (register [this callback] (let [id (uuid)] @@ -100,6 +101,14 @@ [this path watch?] (zk/get-data zk path watch?)) + (get-data-with-version + [this path watch?] + (zk/get-data-with-version zk path watch?)) + + (get-version + [this path watch?] + (zk/get-version zk path watch?)) + (get-children [this path watch?] (zk/get-children zk path watch?)) @@ -113,9 +122,12 @@ (reset! active false) (.close zk))))) + (defprotocol StormClusterState (assignments [this callback]) (assignment-info [this storm-id callback]) + (assignment-info-with-version [this storm-id callback]) + (assignment-version [this storm-id callback]) (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) @@ -225,6 +237,8 @@ [false cluster-state-spec] [true (mk-distributed-cluster-state cluster-state-spec)]) assignment-info-callback (atom {}) + assignment-info-with-version-callback (atom {}) + assignment-version-callback (atom {}) supervisors-callback (atom nil) assignments-callback (atom nil) storm-base-callback (atom {}) @@ -257,6 +271,21 @@ (swap! assignment-info-callback assoc storm-id callback)) (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))) + (assignment-info-with-version + [this storm-id callback] + (when callback + (swap! assignment-info-with-version-callback assoc storm-id callback)) + (let [{data :data version :version} + (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))] + {:data (maybe-deserialize data) + :version version})) + + (assignment-version + [this storm-id callback] + (when callback + (swap! assignment-version-callback assoc storm-id callback)) + (get-version cluster-state (assignment-path storm-id) (not-nil? callback))) + (active-storms [this] (get-children cluster-state STORMS-SUBTREE false)) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index 43746b3be8d..f2ad7ce858b 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -61,6 +61,7 @@ (def LS-ID "supervisor-id") (def LS-LOCAL-ASSIGNMENTS "local-assignments") (def LS-APPROVED-WORKERS "approved-workers") +(def LS-ASSIGNMENT-VERSIONS "local-assignment-versions") diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 414d894e131..3b2a6b57fcc 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -35,13 +35,33 @@ (shutdown-all-workers [this]) ) -(defn- assignments-snapshot [storm-cluster-state callback] +(defn- assignments-snapshot [storm-cluster-state callback existing-assignment assignment-versions] + (log-message (str "Recalculating assignments with old: " assignment-versions)) (let [storm-ids (.assignments storm-cluster-state callback)] - (->> (dofor [sid storm-ids] {sid (.assignment-info storm-cluster-state sid callback)}) - (apply merge) - (filter-val not-nil?) - ))) - + (let [new-assignments + (->> + (dofor [sid storm-ids] + (let [recorded-version (:version (get assignment-versions sid))] + (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)] + (do + (log-message (str "Version: " assignment-version " || Recorded Version: " recorded-version)) + (if (= assignment-version recorded-version) + (do + (log-message "Using Existing assignment.") + {sid (get assignment-versions sid)}) + (do + (log-message "Getting new Assignments.") + (let [assignments (.assignment-info-with-version storm-cluster-state sid callback)] + (log-message (str "Assignments: " assignments)) + {sid assignments})))) + {sid nil}))) + (apply merge) + (filter-val not-nil?))] + + {:assignments (into {} (for [[k v] new-assignments] [k (:data v)])) + :versions new-assignments}))) + + (defn- read-my-executors [assignments-snapshot storm-id assignment-id] (let [assignment (get assignments-snapshot storm-id) my-executors (filter (fn [[_ [node _]]] (= node assignment-id)) @@ -297,7 +317,13 @@ ^ISupervisor isupervisor (:isupervisor supervisor) ^LocalState local-state (:local-state supervisor) sync-callback (fn [& ignored] (.add event-manager this)) - assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback) + existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS) + assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS) + {assignments-snapshot :assignments versions :versions} (assignments-snapshot + storm-cluster-state sync-callback + existing-assignment assignment-versions) + _ (log-message (str "Got Assignments: " assignments-snapshot + " || And Versions: " (pr-str versions))) storm-code-map (read-storm-code-locations assignments-snapshot) downloaded-storm-ids (set (read-downloaded-storm-ids conf)) all-assignment (read-assignments @@ -305,8 +331,7 @@ (:assignment-id supervisor)) new-assignment (->> all-assignment (filter-key #(.confirmAssigned isupervisor %))) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) - existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] + assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] (log-debug "Synchronizing supervisor") (log-debug "Storm code map: " storm-code-map) (log-debug "Downloaded storm ids: " downloaded-storm-ids) @@ -340,6 +365,9 @@ (.put local-state LS-LOCAL-ASSIGNMENTS new-assignment) + (.put local-state + LS-ASSIGNMENT-VERSIONS + versions) (reset! (:curr-assignment supervisor) new-assignment) ;; remove any downloaded code that's no longer assigned or active ;; important that this happens after setting the local assignment so that diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj index 46d1c69dba3..8a97be89872 100644 --- a/storm-core/src/clj/backtype/storm/zookeeper.clj +++ b/storm-core/src/clj/backtype/storm/zookeeper.clj @@ -133,6 +133,31 @@ nil ) (catch Exception e (throw (wrap-in-runtime e)))))) +(defn get-data-with-version + [^CuratorFramework zk ^String path watch?] + (let [stats (org.apache.zookeeper.data.Stat. ) + path (normalize-path path)] + (try-cause + (if-let [data + (if (exists-node? zk path watch?) + (if watch? + (.. zk (getData) (watched) (storingStatIn stats) (forPath path)) + (.. zk (getData) (storingStatIn stats) (forPath path))))] + {:data data + :version (.getVersion stats)}) + (catch KeeperException$NoNodeException e + ;; this is fine b/c we still have a watch from the successful exists call + nil )))) + +(defn get-version +[^CuratorFramework zk ^String path watch?] + (if-let [stats + (if watch? + (.. zk (checkExists) (watched) (forPath (normalize-path path))) + (.. zk (checkExists) (forPath (normalize-path path))))] + (.getVersion stats) + nil)) + (defn get-children [^CuratorFramework zk ^String path watch?] (try From 41e5b91a71a609818c8ca8b0291ea292710dd468 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Fri, 27 Jun 2014 17:52:52 +0000 Subject: [PATCH 2/6] Added smart assignment-refreshing for workers. --- .../src/clj/backtype/storm/daemon/worker.clj | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 74475ee873d..0ec9bda8852 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -28,7 +28,8 @@ (defmulti mk-suicide-fn cluster-mode) -(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port] +(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions] + (log-message "Reading Assignments.") (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))] (doall (concat @@ -174,11 +175,15 @@ ) :timer-name timer-name)) +(def assignment-versions (atom {})) + (defn worker-data [conf mq-context storm-id assignment-id port worker-id] (let [cluster-state (cluster/mk-distributed-cluster-state conf) storm-cluster-state (cluster/mk-storm-cluster-state cluster-state) storm-conf (read-supervisor-storm-conf conf storm-id) - executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port)) + _temp (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions) + _ (log-message (str "Worker assignments: " _temp)) + executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) @@ -240,6 +245,8 @@ [node (Integer/valueOf port-str)] )) +(def assignment-versions (atom {})) + (defn mk-refresh-connections [worker] (let [outbound-tasks (worker-outbound-tasks worker) conf (:conf worker) @@ -249,7 +256,17 @@ ([] (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) ([callback] - (let [assignment (.assignment-info storm-cluster-state storm-id callback) + (log-message "Refreshing Assignments") + (let [version (.assignment-version storm-cluster-state storm-id callback) +; _ (log-message (str "Assignments are: " @assignment-versions)) + assignment (if (= version (:version (get @assignment-versions storm-id))) + (do + (log-message "Keeping old Assignments.") + (:data (get @assignment-versions storm-id))) + (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)] + (log-message "Getting new Assignments.") + (swap! assignment-versions assoc storm-id new-assignment) + (:data new-assignment))) my-assignment (-> assignment :executor->node+port to-task->node+port From 310e09e1a594ef44785e7aeb4b2fd5e4299a303b Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Thu, 26 Jun 2014 21:40:29 +0000 Subject: [PATCH 3/6] Cleaned up and ready for pull. --- .../clj/backtype/storm/daemon/supervisor.clj | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 3b2a6b57fcc..927446ea025 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -35,32 +35,22 @@ (shutdown-all-workers [this]) ) -(defn- assignments-snapshot [storm-cluster-state callback existing-assignment assignment-versions] - (log-message (str "Recalculating assignments with old: " assignment-versions)) +(defn- assignments-snapshot [storm-cluster-state callback assignment-versions] (let [storm-ids (.assignments storm-cluster-state callback)] (let [new-assignments (->> (dofor [sid storm-ids] (let [recorded-version (:version (get assignment-versions sid))] (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)] - (do - (log-message (str "Version: " assignment-version " || Recorded Version: " recorded-version)) - (if (= assignment-version recorded-version) - (do - (log-message "Using Existing assignment.") - {sid (get assignment-versions sid)}) - (do - (log-message "Getting new Assignments.") - (let [assignments (.assignment-info-with-version storm-cluster-state sid callback)] - (log-message (str "Assignments: " assignments)) - {sid assignments})))) + (if (= assignment-version recorded-version) + {sid (get assignment-versions sid)} + {sid (.assignment-info-with-version storm-cluster-state sid callback)}) {sid nil}))) (apply merge) (filter-val not-nil?))] - + {:assignments (into {} (for [[k v] new-assignments] [k (:data v)])) :versions new-assignments}))) - (defn- read-my-executors [assignments-snapshot storm-id assignment-id] (let [assignment (get assignments-snapshot storm-id) @@ -317,13 +307,10 @@ ^ISupervisor isupervisor (:isupervisor supervisor) ^LocalState local-state (:local-state supervisor) sync-callback (fn [& ignored] (.add event-manager this)) - existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS) assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS) {assignments-snapshot :assignments versions :versions} (assignments-snapshot storm-cluster-state sync-callback - existing-assignment assignment-versions) - _ (log-message (str "Got Assignments: " assignments-snapshot - " || And Versions: " (pr-str versions))) + assignment-versions) storm-code-map (read-storm-code-locations assignments-snapshot) downloaded-storm-ids (set (read-downloaded-storm-ids conf)) all-assignment (read-assignments @@ -331,7 +318,8 @@ (:assignment-id supervisor)) new-assignment (->> all-assignment (filter-key #(.confirmAssigned isupervisor %))) - assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)] + assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) + existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] (log-debug "Synchronizing supervisor") (log-debug "Storm code map: " storm-code-map) (log-debug "Downloaded storm ids: " downloaded-storm-ids) From 6d560cb6b7c1b120f30d026a50042c768ab90692 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Fri, 27 Jun 2014 18:29:03 +0000 Subject: [PATCH 4/6] Cleanup for pull --- storm-core/src/clj/backtype/storm/cluster.clj | 2 +- storm-core/src/clj/backtype/storm/daemon/worker.clj | 9 +-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index c5e6f16d077..8ff5a2cde6c 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -58,6 +58,7 @@ (callback type path))))))] (reify ClusterState + (register [this callback] (let [id (uuid)] @@ -122,7 +123,6 @@ (reset! active false) (.close zk))))) - (defprotocol StormClusterState (assignments [this callback]) (assignment-info [this storm-id callback]) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 0ec9bda8852..8ad72d7e7cd 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -245,8 +245,6 @@ [node (Integer/valueOf port-str)] )) -(def assignment-versions (atom {})) - (defn mk-refresh-connections [worker] (let [outbound-tasks (worker-outbound-tasks worker) conf (:conf worker) @@ -256,15 +254,10 @@ ([] (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) ([callback] - (log-message "Refreshing Assignments") (let [version (.assignment-version storm-cluster-state storm-id callback) -; _ (log-message (str "Assignments are: " @assignment-versions)) assignment (if (= version (:version (get @assignment-versions storm-id))) - (do - (log-message "Keeping old Assignments.") - (:data (get @assignment-versions storm-id))) + (:data (get @assignment-versions storm-id)) (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)] - (log-message "Getting new Assignments.") (swap! assignment-versions assoc storm-id new-assignment) (:data new-assignment))) my-assignment (-> assignment From f4e6c1933ff3f2e8ded52b9ceed389362ad41796 Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Fri, 27 Jun 2014 18:31:07 +0000 Subject: [PATCH 5/6] One more bit of cleanup --- storm-core/src/clj/backtype/storm/daemon/worker.clj | 2 -- 1 file changed, 2 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 8ad72d7e7cd..605bc49ed96 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -181,8 +181,6 @@ (let [cluster-state (cluster/mk-distributed-cluster-state conf) storm-cluster-state (cluster/mk-storm-cluster-state cluster-state) storm-conf (read-supervisor-storm-conf conf storm-id) - _temp (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions) - _ (log-message (str "Worker assignments: " _temp)) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) From f6b826b91d2e299e3286dac78c4866cbded165ce Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Tue, 1 Jul 2014 16:06:13 +0000 Subject: [PATCH 6/6] Keeping version data only in memory, as part of supervisor and worker structures. --- storm-core/src/clj/backtype/storm/daemon/common.clj | 1 - .../src/clj/backtype/storm/daemon/supervisor.clj | 7 +++---- storm-core/src/clj/backtype/storm/daemon/worker.clj | 12 ++++++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index f2ad7ce858b..43746b3be8d 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -61,7 +61,6 @@ (def LS-ID "supervisor-id") (def LS-LOCAL-ASSIGNMENTS "local-assignments") (def LS-APPROVED-WORKERS "approved-workers") -(def LS-ASSIGNMENT-VERSIONS "local-assignment-versions") diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 927446ea025..7ba1c6971cf 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -211,6 +211,7 @@ (log-error t "Error when processing event") (exit-process! 20 "Error when processing an event") )) + :assignment-versions (atom {}) }) (defn sync-processes [supervisor] @@ -307,7 +308,7 @@ ^ISupervisor isupervisor (:isupervisor supervisor) ^LocalState local-state (:local-state supervisor) sync-callback (fn [& ignored] (.add event-manager this)) - assignment-versions (.get local-state LS-ASSIGNMENT-VERSIONS) + assignment-versions @(:assignment-versions supervisor) {assignments-snapshot :assignments versions :versions} (assignments-snapshot storm-cluster-state sync-callback assignment-versions) @@ -353,9 +354,7 @@ (.put local-state LS-LOCAL-ASSIGNMENTS new-assignment) - (.put local-state - LS-ASSIGNMENT-VERSIONS - versions) + (swap! (:assignment-versions supervisor) versions) (reset! (:curr-assignment supervisor) new-assignment) ;; remove any downloaded code that's no longer assigned or active ;; important that this happens after setting the local assignment so that diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 605bc49ed96..aeabdf6fd41 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -175,10 +175,9 @@ ) :timer-name timer-name)) -(def assignment-versions (atom {})) - (defn worker-data [conf mq-context storm-id assignment-id port worker-id] - (let [cluster-state (cluster/mk-distributed-cluster-state conf) + (let [assignment-versions (atom {}) + cluster-state (cluster/mk-distributed-cluster-state conf) storm-cluster-state (cluster/mk-storm-cluster-state cluster-state) storm-conf (read-supervisor-storm-conf conf storm-id) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) @@ -233,6 +232,7 @@ :transfer-local-fn (mk-transfer-local-fn <>) :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT) :transfer-fn (mk-transfer-fn <>) + :assignment-versions assignment-versions ))) (defn- endpoint->string [[node port]] @@ -253,10 +253,10 @@ (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) ([callback] (let [version (.assignment-version storm-cluster-state storm-id callback) - assignment (if (= version (:version (get @assignment-versions storm-id))) - (:data (get @assignment-versions storm-id)) + assignment (if (= version (:version (get @(:assignment-versions worker) storm-id))) + (:data (get @(:assignment-versions worker) storm-id)) (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)] - (swap! assignment-versions assoc storm-id new-assignment) + (swap! (:assignment-versions worker) assoc storm-id new-assignment) (:data new-assignment))) my-assignment (-> assignment :executor->node+port