Skip to content

Commit

Permalink
Merge branch 'STORM-2979-1.1.x-merge' into 1.1.x-branch
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Apr 11, 2018
2 parents ded3d08 + f50c8b7 commit 2f15048
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions storm-core/src/clj/org/apache/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@
:executors executors
:task-ids (->> receive-queue-map keys (map int) sort)
:storm-conf storm-conf
:deserialized-worker-hooks (java.util.ArrayList.)
:topology topology
:system-topology (system-topology! storm-conf topology)
:heartbeat-timer (mk-halting-timer "heartbeat-timer")
Expand Down Expand Up @@ -560,23 +561,29 @@
(reset! latest-log-config new-log-configs)
(log-debug "New merged log config is " @latest-log-config))))

(defn run-worker-start-hooks [worker]
(defn deserialize-worker-hooks [worker]
(let [topology (:topology worker)
topo-conf (:storm-conf worker)
worker-topology-context (worker-context worker)
hooks (.get_worker_hooks topology)]
hooks (.get_worker_hooks topology)
deserialized-worker-hooks (:deserialized-worker-hooks worker)]
(dofor [hook hooks]
(let [hook-bytes (Utils/toByteArray hook)
deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
(.start deser-hook topo-conf worker-topology-context)))))
(.add deserialized-worker-hooks deser-hook)))))

(defn run-worker-shutdown-hooks [worker]
(defn run-worker-start-hooks [worker]
(let [topology (:topology worker)
hooks (.get_worker_hooks topology)]
(dofor [hook hooks]
(let [hook-bytes (Utils/toByteArray hook)
deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)]
(.shutdown deser-hook)))))
topo-conf (:storm-conf worker)
worker-topology-context (worker-context worker)
deserialized-worker-hooks (:deserialized-worker-hooks worker)]
(dofor [hook deserialized-worker-hooks]
(.start hook topo-conf worker-topology-context))))

(defn run-worker-shutdown-hooks [worker]
(let [deserialized-worker-hooks (:deserialized-worker-hooks worker)]
(dofor [hook deserialized-worker-hooks]
(.shutdown hook))))

;; TODO: should worker even take the storm-id as input? this should be
;; deducable from cluster state (by searching through assignments)
Expand All @@ -585,6 +592,8 @@
(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
(log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
" and conf " conf)
;; create an empty list to store deserialized hooks
(def deserialized-hooks (java.util.ArrayList.))
(if-not (local-mode? conf)
(redirect-stdio-to-slf4j!))
;; because in local mode, its not a separate
Expand Down Expand Up @@ -633,6 +642,8 @@
_ (activate-worker-when-all-connections-ready worker)

_ (refresh-storm-active worker nil)

_ (deserialize-worker-hooks worker)

_ (run-worker-start-hooks worker)

Expand Down

0 comments on commit 2f15048

Please sign in to comment.