From 55f18e6b4c4708761370164e9e800db401613ae9 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Wed, 19 Nov 2014 20:39:03 +0800 Subject: [PATCH 01/10] Fix calls to submit-mocked-assignment: executor id should be of the form [first-task last-task] --- .../clj/backtype/storm/supervisor_test.clj | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj index a3594a3ccc2..189ca04f14c 100644 --- a/storm-core/test/clj/backtype/storm/supervisor_test.clj +++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj @@ -96,10 +96,10 @@ 2 "1" 3 "1" 4 "1"} - {[1] ["sup1" 1] - [2] ["sup1" 2] - [3] ["sup1" 3] - [4] ["sup1" 3] + {[1 1] ["sup1" 1] + [2 2] ["sup1" 2] + [3 3] ["sup1" 3] + [4 4] ["sup1" 3] }) (advance-cluster-time cluster 2) (heartbeat-workers cluster "sup1" [1 2 3]) @@ -168,9 +168,9 @@ {1 "1" 2 "1" 3 "1"} - {[1] ["sup1" 3] - [2] ["sup1" 3] - [3] ["sup2" 2] + {[1 1] ["sup1" 3] + [2 2] ["sup1" 3] + [3 3] ["sup2" 2] }) (advance-cluster-time cluster 2) (heartbeat-workers cluster "sup1" [3]) @@ -607,8 +607,8 @@ topology1 {1 "1" 2 "1"} - {[1] ["sup1" 1] - [2] ["sup1" 2] + {[1 1] ["sup1" 1] + [2 2] ["sup1" 2] }) (submit-mocked-assignment (:nimbus cluster) @@ -617,8 +617,8 @@ topology2 {1 "1" 2 "1"} - {[1] ["sup1" 1] - [2] ["sup1" 2] + {[1 1] ["sup1" 1] + [2 2] ["sup1" 2] }) (advance-cluster-time cluster 10) )) From 032e082dbe8c5925f70b33f9f5a66e60a2a8c84d Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Wed, 19 Nov 2014 20:41:21 +0800 Subject: [PATCH 02/10] Ignore exceptions in reading worker heartbeat: some worker may be shut down when iterating all the workers --- storm-core/src/clj/backtype/storm/daemon/supervisor.clj | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index d82fd12fb56..a9878349906 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -97,7 +97,9 @@ (defn read-worker-heartbeat [conf id] (let [local-state (worker-state conf id)] - (.get local-state LS-WORKER-HEARTBEAT) + (try + (.get local-state LS-WORKER-HEARTBEAT) + (catch Exception e nil)) )) From 2593627bae6e853e062abdd2defe30355041ff80 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Mon, 13 Oct 2014 21:28:27 +0800 Subject: [PATCH 03/10] keep workers during rebalance --- .../src/clj/backtype/storm/daemon/common.clj | 2 +- .../clj/backtype/storm/daemon/executor.clj | 2 +- .../clj/backtype/storm/daemon/supervisor.clj | 4 +- .../src/clj/backtype/storm/daemon/task.clj | 2 +- .../src/clj/backtype/storm/daemon/worker.clj | 207 +++++++++++------- .../clj/backtype/storm/supervisor_test.clj | 2 +- 6 files changed, 130 insertions(+), 89 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index f091dfb3406..7c9671bda03 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -345,7 +345,7 @@ (supervisor-stormdist-root (:conf worker) (:storm-id worker))) (worker-pids-root (:conf worker) (:worker-id worker)) (:port worker) - (:task-ids worker) + (seq @(:task-ids worker)) (:default-shared-resources worker) (:user-shared-resources worker) )) diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj index be364cc1ad7..cd6b7e755c1 100644 --- a/storm-core/src/clj/backtype/storm/daemon/executor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj @@ -224,7 +224,7 @@ :component-id component-id :open-or-prepare-was-called? (atom false) :storm-conf storm-conf - :receive-queue ((:executor-receive-queue-map worker) executor-id) + :receive-queue (@(:executor-receive-queue-map worker) executor-id) :storm-id (:storm-id worker) :conf (:conf worker) :shared-executor-data (HashMap.) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index a9878349906..c11daaf4835 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -119,9 +119,7 @@ (defn matches-an-assignment? [worker-heartbeat assigned-executors] (let [local-assignment (assigned-executors (:port worker-heartbeat))] (and local-assignment - (= (:storm-id worker-heartbeat) (:storm-id local-assignment)) - (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID) - (set (:executors local-assignment)))))) + (= (:storm-id worker-heartbeat) (:storm-id local-assignment))))) (let [dead-workers (atom #{})] (defn get-dead-workers [] diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj index 6a61cea6fad..3d4beed5379 100644 --- a/storm-core/src/clj/backtype/storm/daemon/task.clj +++ b/storm-core/src/clj/backtype/storm/daemon/task.clj @@ -40,7 +40,7 @@ (worker-pids-root conf (:worker-id worker)) (int %) (:port worker) - (:task-ids worker) + (seq @(:task-ids worker)) (:default-shared-resources worker) (:user-shared-resources worker) (:shared-executor-data executor-data) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index e717ce4112b..0a382b6a03b 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -31,9 +31,9 @@ (defmulti mk-suicide-fn cluster-mode) -(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port assignment-versions] +(defn read-worker-executors [assignment-info assignment-id port] (log-message "Reading Assignments.") - (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))] + (let [assignment (:executor->node+port assignment-info)] (doall (concat [Constants/SYSTEM_EXECUTOR_ID] @@ -46,7 +46,7 @@ (defnk do-executor-heartbeats [worker :executors nil] ;; stats is how we know what executors are assigned to this worker (let [stats (if-not executors - (into {} (map (fn [e] {e nil}) (:executors worker))) + (into {} (map (fn [e] {e nil}) @(:executors worker))) (->> executors (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)})) (apply merge))) @@ -64,7 +64,7 @@ hb (WorkerHeartbeat. (current-time-secs) (:storm-id worker) - (:executors worker) + @(:executors worker) (:port worker)) state (worker-state conf (:worker-id worker))] (log-debug "Doing heartbeat " (pr-str hb)) @@ -90,7 +90,7 @@ vals (map keys) (apply concat))) - (:task-ids worker))] + @(:task-ids worker))] (-> worker :task->component reverse-map @@ -102,14 +102,14 @@ (defn mk-transfer-local-fn [worker] (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker) task->short-executor (:task->short-executor worker) - task-getter (comp #(get task->short-executor %) fast-first)] + task-getter (comp #(get @task->short-executor %) fast-first)] (fn [tuple-batch] (let [grouped (fast-group-by task-getter tuple-batch)] (fast-map-iter [[short-executor pairs] grouped] - (let [q (short-executor-receive-queue-map short-executor)] + (let [q (@short-executor-receive-queue-map short-executor)] (if q (disruptor/publish q pairs) - (log-warn "Received invalid messages for unknown tasks. Dropping... ") + (log-warn "Received invalid messages for unknown tasks " short-executor ". Dropping... ") ))))))) (defn- assert-can-serialize [^KryoTupleSerializer serializer tuple-batch] @@ -118,7 +118,7 @@ (.serialize serializer tuple))) (defn mk-transfer-fn [worker] - (let [local-tasks (-> worker :task-ids set) + (let [local-tasks (:task-ids worker) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker) task->node+port (:cached-task->node+port worker) @@ -128,8 +128,8 @@ (let [local (ArrayList.) remoteMap (HashMap.)] (fast-list-iter [[task tuple :as pair] tuple-batch] - (if (local-tasks task) - (.add local pair) + (if (@local-tasks task) + (.add local pair) ;;Using java objects directly to avoid performance issues in java code (let [node+port (get @task->node+port task)] @@ -137,7 +137,7 @@ (.put remoteMap node+port (ArrayList.))) (let [remote (.get remoteMap node+port)] (.add remote (TaskMessage. task (.serialize serializer tuple))) - )))) + )))) (local-transfer local) (disruptor/publish transfer-queue remoteMap) ))] @@ -193,14 +193,8 @@ (defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state] (let [assignment-versions (atom {}) - executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) - executor-receive-queue-map (mk-receive-queue-map storm-conf executors) - - receive-queue-map (->> executor-receive-queue-map - (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue]))) - (into {})) topology (read-supervisor-topology conf storm-id) mq-context (if mq-context @@ -218,8 +212,9 @@ :cluster-state cluster-state :storm-cluster-state storm-cluster-state :storm-active-atom (atom false) - :executors executors - :task-ids (->> receive-queue-map keys (map int) sort) + :executors (atom nil) + :task-ids (atom nil) + :outbound-tasks (atom nil) :storm-conf storm-conf :topology topology :system-topology (system-topology! storm-conf topology) @@ -236,12 +231,9 @@ :cached-node+port->socket (atom {}) :cached-task->node+port (atom {}) :transfer-queue transfer-queue - :executor-receive-queue-map executor-receive-queue-map - :short-executor-receive-queue-map (map-key first executor-receive-queue-map) - :task->short-executor (->> executors - (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)]))) - (into {}) - (HashMap.)) + :executor-receive-queue-map (atom {}) + :short-executor-receive-queue-map (atom {}) + :task->short-executor (atom {}) :suicide-fn (mk-suicide-fn conf) :uptime (uptime-computer) :default-shared-resources (mk-default-resources <>) @@ -260,63 +252,117 @@ [node (Integer/valueOf port-str)] )) -(defn mk-refresh-connections [worker] - (let [outbound-tasks (worker-outbound-tasks worker) +(defn mk-sync-executors [worker executors credentials] + (let [conf (:conf worker) + storm-cluster-state (:storm-cluster-state worker) + storm-id (:storm-id worker) + assignment-versions (:assignment-versions worker) + assignment-id (:assignment-id worker) + port (:port worker)] + (fn [] + (let [old-executor-ids @(:executors worker) + assignment-info (:data (@assignment-versions storm-id)) + new-executor-ids (set (read-worker-executors assignment-info assignment-id port))] + (if (not= new-executor-ids old-executor-ids) + (let [executors-to-launch (set/difference new-executor-ids old-executor-ids) + executors-to-kill (set/difference old-executor-ids new-executor-ids) + executor-receive-queue-map (merge + (mk-receive-queue-map conf executors-to-launch) + (apply dissoc @(:executor-receive-queue-map worker) executors-to-kill)) + receive-queue-map (->> executor-receive-queue-map + (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue]))) + (into {})) + task-ids (->> receive-queue-map keys (map int) sort) + update-executors-fn (fn [executors] + (doall + (concat + (mapcat (fn [e] + (let [id (executor/get-executor-id e)] + (if (new-executor-ids id) + [e] + (do + (log-message "Shutting down executor " id) + (.shutdown e))))) + executors) + (dofor [e executors-to-launch] + (executor/mk-executor worker e @credentials)))))] + (reset! (:executors worker) new-executor-ids) + (reset! (:executor-receive-queue-map worker) executor-receive-queue-map) + (reset! (:short-executor-receive-queue-map worker) (map-key first executor-receive-queue-map)) + (reset! (:task-ids worker) (set task-ids)) + (reset! (:task->short-executor worker) (->> new-executor-ids + (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)]))) + (into {}) + (HashMap.))) + (reset! (:outbound-tasks worker) (worker-outbound-tasks worker)) + #(swap! executors update-executors-fn))))))) + +(defn mk-refresh-connections [worker executors credentials] + (let [outbound-tasks (:outbound-tasks worker) conf (:conf worker) storm-cluster-state (:storm-cluster-state worker) - storm-id (:storm-id worker)] + storm-id (:storm-id worker) + assignment-versions (:assignment-versions worker) + sync-executors (mk-sync-executors worker executors credentials)] (fn this ([] (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) ([callback] (let [version (.assignment-version storm-cluster-state storm-id callback) - assignment (if (= version (:version (get @(:assignment-versions worker) storm-id))) - (:data (get @(:assignment-versions worker) storm-id)) + old-version (:version (get @assignment-versions storm-id)) + version-changed? (not= version old-version) + assignment (if (not version-changed?) + (:data (get @assignment-versions storm-id)) (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)] - (swap! (:assignment-versions worker) assoc storm-id new-assignment) + (swap! assignment-versions assoc storm-id new-assignment) (:data new-assignment))) - my-assignment (-> assignment - :executor->node+port - to-task->node+port - (select-keys outbound-tasks) - (#(map-val endpoint->string %))) - ;; we dont need a connection for the local tasks anymore - needed-assignment (->> my-assignment - (filter-key (complement (-> worker :task-ids set)))) - needed-connections (-> needed-assignment vals set) - needed-tasks (-> needed-assignment keys) - - current-connections (set (keys @(:cached-node+port->socket worker))) - new-connections (set/difference needed-connections current-connections) - remove-connections (set/difference current-connections needed-connections)] - (swap! (:cached-node+port->socket worker) - #(HashMap. (merge (into {} %1) %2)) - (into {} - (dofor [endpoint-str new-connections - :let [[node port] (string->endpoint endpoint-str)]] - [endpoint-str - (.connect - ^IContext (:mq-context worker) - storm-id - ((:node->host assignment) node) - port) - ] - ))) - (write-locked (:endpoint-socket-lock worker) - (reset! (:cached-task->node+port worker) - (HashMap. my-assignment))) - (doseq [endpoint remove-connections] - (.close (get @(:cached-node+port->socket worker) endpoint))) - (apply swap! - (:cached-node+port->socket worker) - #(HashMap. (apply dissoc (into {} %1) %&)) - remove-connections) - - (let [missing-tasks (->> needed-tasks - (filter (complement my-assignment)))] - (when-not (empty? missing-tasks) - (log-warn "Missing assignment for following tasks: " (pr-str missing-tasks)) - ))))))) + update-executors (if version-changed? (sync-executors) (fn [])) + + my-assignment (-> assignment + :executor->node+port + to-task->node+port + (select-keys @outbound-tasks) + (#(map-val endpoint->string %))) + ;; we dont need a connection for the local tasks anymore + needed-assignment (->> my-assignment + (filter-key (complement @(:task-ids worker)))) + needed-connections (-> needed-assignment vals set) + needed-tasks (-> needed-assignment keys) + + current-connections (set (keys @(:cached-node+port->socket worker))) + new-connections (set/difference needed-connections current-connections) + remove-connections (set/difference current-connections needed-connections)] + (log-message "refresh-connections " needed-connections) + (swap! (:cached-node+port->socket worker) + #(HashMap. (merge (into {} %1) %2)) + (into {} + (dofor [endpoint-str new-connections + :let [[node port] (string->endpoint endpoint-str)]] + [endpoint-str + (.connect + ^IContext (:mq-context worker) + storm-id + ((:node->host assignment) node) + port) + ] + ))) + (write-locked (:endpoint-socket-lock worker) + (reset! (:cached-task->node+port worker) + (HashMap. my-assignment))) + (doseq [endpoint remove-connections] + (.close (get @(:cached-node+port->socket worker) endpoint))) + (apply swap! + (:cached-node+port->socket worker) + #(HashMap. (apply dissoc (into {} %1) %&)) + remove-connections) + + (let [missing-tasks (->> needed-tasks + (filter (complement my-assignment)))] + (when-not (empty? missing-tasks) + (log-warn "Missing assignment for following tasks: " (pr-str missing-tasks)))) + + ;; this needs to be done last as executor initialization may require connections + (update-executors)))))) (defn refresh-storm-active ([worker] @@ -331,12 +377,9 @@ ;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues (defn mk-transfer-tuples-handler [worker] - (let [^DisruptorQueue transfer-queue (:transfer-queue worker) - drainer (TransferDrainer.) + (let [drainer (TransferDrainer.) node+port->socket (:cached-node+port->socket worker) - task->node+port (:cached-task->node+port worker) - endpoint-socket-lock (:endpoint-socket-lock worker) - ] + endpoint-socket-lock (:endpoint-socket-lock worker)] (disruptor/clojure-handler (fn [packets _ batch-end?] (.add drainer packets) @@ -400,18 +443,19 @@ ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on) _ (heartbeat-fn) + credentials (atom initial-credentials) executors (atom nil) ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout ;; to the supervisor _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors)) - refresh-connections (mk-refresh-connections worker) + refresh-connections (mk-refresh-connections worker executors credentials) _ (refresh-connections nil) _ (refresh-storm-active worker nil) - _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials))) + receive-thread-shutdown (launch-receive-thread worker) transfer-tuples (mk-transfer-tuples-handler worker) @@ -472,7 +516,6 @@ (timer-waiting? (:user-timer worker)) )) ) - credentials (atom initial-credentials) check-credentials-changed (fn [] (let [new-creds (.credentials (:storm-cluster-state worker) storm-id nil)] (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj index 189ca04f14c..9f4714b8c17 100644 --- a/storm-core/test/clj/backtype/storm/supervisor_test.clj +++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj @@ -54,7 +54,7 @@ (worker/do-heartbeat {:conf conf :port port :storm-id storm-id - :executors executors + :executors (atom executors) :worker-id (find-worker-id conf port)}))) (defn heartbeat-workers [cluster supervisor-id ports] From 9f6930f16566a160866255ad22a31e135c2a8c8c Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Tue, 10 Feb 2015 20:47:50 +0800 Subject: [PATCH 04/10] add logging --- storm-core/src/clj/backtype/storm/daemon/worker.clj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 0a382b6a03b..ecb17fc8568 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -264,7 +264,8 @@ assignment-info (:data (@assignment-versions storm-id)) new-executor-ids (set (read-worker-executors assignment-info assignment-id port))] (if (not= new-executor-ids old-executor-ids) - (let [executors-to-launch (set/difference new-executor-ids old-executor-ids) + (let [_ (log-message "Syncing executors " old-executor-ids " -> " new-executor-ids) + executors-to-launch (set/difference new-executor-ids old-executor-ids) executors-to-kill (set/difference old-executor-ids new-executor-ids) executor-receive-queue-map (merge (mk-receive-queue-map conf executors-to-launch) From 482c715c15089b4cfc099fb9c124ea0336f38604 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Tue, 10 Mar 2015 14:13:38 +0800 Subject: [PATCH 05/10] add MigrationScheduler --- conf/defaults.yaml | 2 + .../src/clj/backtype/storm/daemon/worker.clj | 2 +- .../storm/scheduler/MigrationScheduler.clj | 121 ++++++++++++++++++ 3 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 storm-core/src/clj/backtype/storm/scheduler/MigrationScheduler.clj diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 2601beeda63..3eeffc880c9 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -198,3 +198,5 @@ topology.classpath: null topology.environment: null dev.zookeeper.path: "/tmp/dev-storm-zookeeper" + +storm.scheduler: "backtype.storm.scheduler.MigrationScheduler" diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index ecb17fc8568..eecb2c09c02 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -333,7 +333,7 @@ current-connections (set (keys @(:cached-node+port->socket worker))) new-connections (set/difference needed-connections current-connections) remove-connections (set/difference current-connections needed-connections)] - (log-message "refresh-connections " needed-connections) + (log-message "refresh-connections " current-connections " -> " needed-connections) (swap! (:cached-node+port->socket worker) #(HashMap. (merge (into {} %1) %2)) (into {} diff --git a/storm-core/src/clj/backtype/storm/scheduler/MigrationScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/MigrationScheduler.clj new file mode 100644 index 00000000000..b0b28410153 --- /dev/null +++ b/storm-core/src/clj/backtype/storm/scheduler/MigrationScheduler.clj @@ -0,0 +1,121 @@ +(ns backtype.storm.scheduler.MigrationScheduler + (:use [backtype.storm util config log]) + (:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler]) + (:require [backtype.storm [zookeeper :as zk] [cluster :as cluster]]) + (:require [clojure.set :as set]) + (:import [backtype.storm.utils Utils]) + (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap]) + (:import [backtype.storm.scheduler IScheduler Topologies + Cluster TopologyDetails WorkerSlot SchedulerAssignment + EvenScheduler ExecutorDetails]) + (:gen-class + :init init + :constructors {[] []} + :state state + :implements [backtype.storm.scheduler.IScheduler])) + +(defn -init [] + [[] (atom {})]) + +(defn -prepare [this conf] + (let [zk (zk/mk-client conf + (conf STORM-ZOOKEEPER-SERVERS) + (conf STORM-ZOOKEEPER-PORT) + :root (conf STORM-ZOOKEEPER-ROOT))] + (swap! (.state this) into {:conf conf :zk zk}))) + +(defn get-zk-assignment [cluster topology-id zk] + (let [assignment-data (zk/get-data zk (cluster/assignment-path topology-id) false)] + (if assignment-data (Utils/deserialize assignment-data)))) + +(defn get-alive-assigned-node+port->executors [existing-assignment] + (let [executor->node+port (:executor->node+port existing-assignment) + alive-assigned (map-val shuffle (reverse-map executor->node+port))] + alive-assigned)) + +(defn sort-slots [all-slots] + (let [split-up (sort-by count > (vals (group-by first all-slots)))] + (apply interleave-all split-up))) + +(defn reverse-slot->executors [slot->executors] + (->> slot->executors + (map (fn [[k v]] (map #(vector % k) v))) + (apply concat) + (into {}))) + +;; TODO: executors may change, slots may die +(defn schedule-scale-up [^TopologyDetails topology ^Cluster cluster alive-assigned diff] + (let [all-executors (->> topology + .getExecutors + (map #(vector (.getStartTask %) (.getEndTask %))) + set) + available-slots (->> (.getAvailableSlots cluster) + (map #(vector (.getNodeId %) (.getPort %))) + set) + alive-assigned (filter-key available-slots alive-assigned) + new-slots (->> available-slots + (#(set/difference % (set (keys alive-assigned)))) + sort-slots + (take diff)) + _ (log-message "available-slots " (vec available-slots)) + _ (log-message "alive-assigned: " alive-assigned) + _ (log-message "new-slots: " (vec new-slots)) + total-slots-to-use (+ (count alive-assigned) (count new-slots)) + num-executors (count all-executors) + avg (quot num-executors total-slots-to-use) + remainder (rem num-executors total-slots-to-use) + _ (log-message "diff: " diff " " total-slots-to-use " " num-executors " " avg " " remainder) + keep-assignment (->> (concat (repeat remainder (+ avg 1)) (repeat avg)) + (map (fn [[k v] n] [k (take n v)]) alive-assigned) + reverse-slot->executors) + _ (log-message "keep-assignment: " keep-assignment) + reassign-executors (sort (set/difference all-executors + (set (keys keep-assignment)))) + reassignment (->> (repeat-seq (count reassign-executors) new-slots) + (map vector reassign-executors) + (into {})) + _ (log-message "reassignment: " reassignment)] + (into keep-assignment reassignment))) + +(defn schedule-scale-down [^TopologyDetails topology ^Cluster cluster alive-assigned diff] + (let [slots (set (keys alive-assigned)) + keep-slots (set (take (.getNumWorkers topology) slots)) + _ (log-message "keep-slots: " keep-slots) + keep-assignment (reverse-slot->executors (filter-key keep-slots alive-assigned)) + _ (log-message "keep-assignment: " keep-assignment) + drop-slots (set/difference slots keep-slots) + _ (log-message "drop-slots: " drop-slots) + reassign-executors (apply concat (map alive-assigned drop-slots)) + _ (log-message "reassign-executors " (vec reassign-executors)) + reassignment (->> (repeat-seq (count reassign-executors) keep-slots) + (map vector reassign-executors) + (into {})) + _ (log-message "reassignment " reassignment)] + (into keep-assignment reassignment))) + +(defn schedule-migration [^TopologyDetails topology ^Cluster cluster zk] + (let [topology-id (.getId topology) + existing-assignment (get-zk-assignment cluster topology-id zk)] + ;; only shedules migrating topologies: present in zk but missing in cluster + (if (and existing-assignment (not (.getAssignmentById cluster topology-id))) + (let [alive-assigned (get-alive-assigned-node+port->executors existing-assignment) + slots-diff (- (.getNumWorkers topology) (count alive-assigned))] + (if (> slots-diff 0) + (schedule-scale-up topology cluster alive-assigned slots-diff) + (schedule-scale-down topology cluster alive-assigned slots-diff)))))) + +(defn -schedule [this ^Topologies topologies ^Cluster cluster] + (let [state @(.state this) + needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)] + (doseq [^TopologyDetails topology needs-scheduling-topologies + :let [topology-id (.getId topology) + new-assignment (schedule-migration topology cluster (state :zk))]] + (if new-assignment + (do + (log-message "new-assignment: " (reverse-map new-assignment)) + (doseq [[node+port executors] (reverse-map new-assignment) + :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port)) + executors (for [[start-task end-task] executors] + (ExecutorDetails. start-task end-task))]] + (.assign cluster slot topology-id executors))) + (DefaultScheduler/default-schedule (Topologies. {topology-id topology}) cluster))))) From cf1b782118bbb0d0b8471a12cd92ea8dcf4845a9 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Tue, 10 Mar 2015 14:16:12 +0800 Subject: [PATCH 06/10] netty client fix --- .../src/clj/backtype/storm/daemon/worker.clj | 13 +++++++------ .../backtype/storm/messaging/netty/Client.java | 17 ++++++++++------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index eecb2c09c02..d43bf823359 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -350,12 +350,13 @@ (write-locked (:endpoint-socket-lock worker) (reset! (:cached-task->node+port worker) (HashMap. my-assignment))) - (doseq [endpoint remove-connections] - (.close (get @(:cached-node+port->socket worker) endpoint))) - (apply swap! - (:cached-node+port->socket worker) - #(HashMap. (apply dissoc (into {} %1) %&)) - remove-connections) + (write-locked (:endpoint-socket-lock worker) + (doseq [endpoint remove-connections] + (.close (get @(:cached-node+port->socket worker) endpoint))) + (apply swap! + (:cached-node+port->socket worker) + #(HashMap. (apply dissoc (into {} %1) %&)) + remove-connections)) (let [missing-tasks (->> needed-tasks (filter (complement my-assignment)))] diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java index d7704810497..d447d66eaec 100644 --- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java +++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java @@ -147,10 +147,13 @@ public void run() { scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS); } + private synchronized void connect() { + connect(max_retries); + } /** * We will retry connection with exponential back-off policy */ - private synchronized void connect() { + private synchronized void connect(int max_retries) { try { Channel channel = channelRef.get(); @@ -184,7 +187,7 @@ private synchronized void connect() { if (null != channel) { LOG.info("connection established to a remote host " + name() + ", " + channel.toString()); channelRef.set(channel); - } else { + } else if (max_retries > 0) { close(); throw new RuntimeException("Remote address is not reachable. We will close this client " + name()); } @@ -209,13 +212,13 @@ synchronized public void send(Iterator msgs) { Channel channel = channelRef.get(); if (null == channel) { - connect(); + connect(0); channel = channelRef.get(); } while (msgs.hasNext()) { - if (!channel.isConnected()) { - connect(); + if (channel == null || !channel.isConnected()) { + connect(0); channel = channelRef.get(); } TaskMessage message = msgs.next(); @@ -224,7 +227,7 @@ synchronized public void send(Iterator msgs) { } messageBatch.add(message); - if (messageBatch.isFull()) { + if (channel != null && messageBatch.isFull()) { MessageBatch toBeFlushed = messageBatch; flushRequest(channel, toBeFlushed); messageBatch = null; @@ -232,7 +235,7 @@ synchronized public void send(Iterator msgs) { } if (null != messageBatch && !messageBatch.isEmpty()) { - if (channel.isWritable()) { + if (channel != null && channel.isWritable()) { flushCheckTimer.set(Long.MAX_VALUE); // Flush as fast as we can to reduce the latency From 624402a5238cb04065f12f01d6d26ca6d6902882 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Tue, 10 Mar 2015 21:41:06 +0800 Subject: [PATCH 07/10] executor: do not process tuples when shutting down --- .../src/clj/backtype/storm/daemon/executor.clj | 13 ++++++++----- storm-core/src/clj/backtype/storm/daemon/worker.clj | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj index cd6b7e755c1..f89160488ce 100644 --- a/storm-core/src/clj/backtype/storm/daemon/executor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj @@ -246,6 +246,7 @@ ((:suicide-fn <>))) :deserializer (KryoTupleDeserializer. storm-conf worker-context) :sampler (mk-stats-sampler storm-conf) + :shutting-down (atom false) ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function? ))) @@ -361,6 +362,7 @@ (shutdown [this] (log-message "Shutting down executor " component-id ":" (pr-str executor-id)) + (reset! (:shutting-down executor-data) true) (disruptor/halt-with-interrupt! (:receive-queue executor-data)) (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data)) (doseq [t threads] @@ -412,12 +414,13 @@ (fast-list-iter [[task-id msg] tuple-batch] (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))] (when debug? (log-message "Processing received message FOR " task-id " TUPLE: " tuple)) - (if task-id - (tuple-action-fn task-id tuple) - ;; null task ids are broadcast tuples - (fast-list-iter [task-id task-ids] + (if (not @(:shutting-down executor-data)) + (if task-id (tuple-action-fn task-id tuple) - )) + ;; null task ids are broadcast tuples + (fast-list-iter [task-id task-ids] + (tuple-action-fn task-id tuple) + ))) )))))) (defn executor-max-spout-pending [storm-conf num-tasks] diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index d43bf823359..bbe6e5079c4 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -228,7 +228,7 @@ :component->stream->fields (component->stream->fields (:system-topology <>)) :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort)) :endpoint-socket-lock (mk-rw-lock) - :cached-node+port->socket (atom {}) + :cached-node+port->socket (atom (HashMap.)) :cached-task->node+port (atom {}) :transfer-queue transfer-queue :executor-receive-queue-map (atom {}) From 484e6c942131a6d7e2eb647aa57196f6df14ccf9 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Wed, 11 Mar 2015 10:09:50 +0800 Subject: [PATCH 08/10] add missing zk callbacks --- storm-core/src/clj/backtype/storm/cluster.clj | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 8ead7108879..2045c6d4972 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -271,7 +271,11 @@ (condp = subtree ASSIGNMENTS-ROOT (if (empty? args) (issue-callback! assignments-callback) - (issue-map-callback! assignment-info-callback (first args))) + (do + (issue-map-callback! assignment-info-callback (first args)) + (issue-map-callback! assignment-version-callback (first args)) + (issue-map-callback! assignment-info-with-version-callback (first args)) + )) SUPERVISORS-ROOT (issue-callback! supervisors-callback) STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) From e820f78a39101c65547fe93809c44cc75cc6d278 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Wed, 11 Mar 2015 11:48:10 +0800 Subject: [PATCH 09/10] timer fix --- storm-core/src/clj/backtype/storm/timer.clj | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/timer.clj b/storm-core/src/clj/backtype/storm/timer.clj index 6f00a5c7508..e01cf8b3ab3 100644 --- a/storm-core/src/clj/backtype/storm/timer.clj +++ b/storm-core/src/clj/backtype/storm/timer.clj @@ -54,13 +54,14 @@ ;; are scheduled then we will always go ;; through this branch, sleeping only the ;; exact necessary amount of time. - (Time/sleep (- time-millis (current-time-millis))) + ; (Time/sleep (- time-millis (current-time-millis))) + (Time/sleep 300) ;; Otherwise poll to see if any new event ;; was scheduled. This is, in essence, the ;; response time for detecting any new event ;; schedulings when there are no scheduled ;; events. - (Time/sleep 1000)))) + (Time/sleep 300)))) (catch Throwable t ;; Because the interrupted exception can be ;; wrapped in a RuntimeException. From 9e6266a934b79c8b2a778fe40532f7bf2e24b275 Mon Sep 17 00:00:00 2001 From: Mansheng Yang Date: Thu, 12 Mar 2015 22:51:58 +0800 Subject: [PATCH 10/10] logging --- storm-core/src/clj/backtype/storm/daemon/worker.clj | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index bbe6e5079c4..6a288d53075 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -307,7 +307,9 @@ sync-executors (mk-sync-executors worker executors credentials)] (fn this ([] - (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) + (this (fn [& ignored] + (log-message "refresh-connections callback fired") + (schedule (:refresh-connections-timer worker) 0 this)))) ([callback] (let [version (.assignment-version storm-cluster-state storm-id callback) old-version (:version (get @assignment-versions storm-id)) @@ -384,6 +386,7 @@ endpoint-socket-lock (:endpoint-socket-lock worker)] (disruptor/clojure-handler (fn [packets _ batch-end?] + (log-message "transfering outbound " packets) (.add drainer packets) (when batch-end?