From a26f81187c3ea54e05584d31b5eedb66d9600a17 Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Thu, 18 Feb 2016 13:25:17 -0600 Subject: [PATCH 1/2] Supervisor should kill/restart if existing worker has changed assignments --- .../src/clj/org/apache/storm/daemon/supervisor.clj | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index ad9db760143..d057a01d312 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -578,7 +578,11 @@ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) localizer (:localizer supervisor) checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids)) - downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)] + downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids) + assigned-executors (or (ls-local-assignments local-state) {}) + allocated (read-allocated-workers supervisor assigned-executors (Time/currentTimeSecs)) + valid-allocated (filter-val (fn [[state _]] (= state :valid)) allocated) + port->worker-id (clojure.set/map-invert (map-val #((nth % 1) :port) valid-allocated))] (log-debug "Synchronizing supervisor") (log-debug "Storm code map: " storm-code-map) @@ -611,6 +615,10 @@ (doseq [p (set/difference (set (keys existing-assignment)) (set (keys new-assignment)))] (.killedWorker isupervisor (int p))) + (doseq [p (set/intersection (set (keys existing-assignment)) + (set (keys new-assignment)))] + (if (not= (:executors (existing-assignment p)) (:executors (new-assignment p))) + (shutdown-worker supervisor (port->worker-id p)))) (.assigned isupervisor (keys new-assignment)) (ls-local-assignments! local-state new-assignment) From bbdad03967962dfd017b9ecfb3f83f1d53df7595 Mon Sep 17 00:00:00 2001 From: Kishor Patil Date: Fri, 19 Feb 2016 13:52:16 -0600 Subject: [PATCH 2/2] Refactoring kill workers method --- .../org/apache/storm/daemon/supervisor.clj | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index d057a01d312..18fec2d6abf 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -553,6 +553,16 @@ (rm-topo-files conf storm-id localizer false) storm-id))))) +(defn kill-existing-workers-with-change-in-components [supervisor existing-assignment new-assignment] + (let [assigned-executors (or (ls-local-assignments (:local-state supervisor)) {}) + allocated (read-allocated-workers supervisor assigned-executors (Time/currentTimeSecs)) + valid-allocated (filter-val (fn [[state _]] (= state :valid)) allocated) + port->worker-id (clojure.set/map-invert (map-val #((nth % 1) :port) valid-allocated))] + (doseq [p (set/intersection (set (keys existing-assignment)) + (set (keys new-assignment)))] + (if (not= (:executors (existing-assignment p)) (:executors (new-assignment p))) + (shutdown-worker supervisor (port->worker-id p)))))) + (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager] (fn this [] (let [conf (:conf supervisor) @@ -578,11 +588,7 @@ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) localizer (:localizer supervisor) checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids)) - downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids) - assigned-executors (or (ls-local-assignments local-state) {}) - allocated (read-allocated-workers supervisor assigned-executors (Time/currentTimeSecs)) - valid-allocated (filter-val (fn [[state _]] (= state :valid)) allocated) - port->worker-id (clojure.set/map-invert (map-val #((nth % 1) :port) valid-allocated))] + downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)] (log-debug "Synchronizing supervisor") (log-debug "Storm code map: " storm-code-map) @@ -615,10 +621,7 @@ (doseq [p (set/difference (set (keys existing-assignment)) (set (keys new-assignment)))] (.killedWorker isupervisor (int p))) - (doseq [p (set/intersection (set (keys existing-assignment)) - (set (keys new-assignment)))] - (if (not= (:executors (existing-assignment p)) (:executors (new-assignment p))) - (shutdown-worker supervisor (port->worker-id p)))) + (kill-existing-workers-with-change-in-components supervisor existing-assignment new-assignment) (.assigned isupervisor (keys new-assignment)) (ls-local-assignments! local-state new-assignment)