From f50c8b7d195eafd73816167f4634ca9b7151c11e Mon Sep 17 00:00:00 2001 From: michelo Date: Tue, 3 Apr 2018 23:35:38 +0200 Subject: [PATCH] STORM-2979 WorkerHooks EOFException during run_worker_shutdown_hooks * commits squashed by Jungtaek Lim --- .../clj/org/apache/storm/daemon/worker.clj | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 6626272a540..03826cd93df 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -285,6 +285,7 @@ :executors executors :task-ids (->> receive-queue-map keys (map int) sort) :storm-conf storm-conf + :deserialized-worker-hooks (java.util.ArrayList.) :topology topology :system-topology (system-topology! storm-conf topology) :heartbeat-timer (mk-halting-timer "heartbeat-timer") @@ -560,23 +561,29 @@ (reset! latest-log-config new-log-configs) (log-debug "New merged log config is " @latest-log-config)))) -(defn run-worker-start-hooks [worker] +(defn deserialize-worker-hooks [worker] (let [topology (:topology worker) topo-conf (:storm-conf worker) worker-topology-context (worker-context worker) - hooks (.get_worker_hooks topology)] + hooks (.get_worker_hooks topology) + deserialized-worker-hooks (:deserialized-worker-hooks worker)] (dofor [hook hooks] (let [hook-bytes (Utils/toByteArray hook) deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] - (.start deser-hook topo-conf worker-topology-context))))) + (.add deserialized-worker-hooks deser-hook))))) -(defn run-worker-shutdown-hooks [worker] +(defn run-worker-start-hooks [worker] (let [topology (:topology worker) - hooks (.get_worker_hooks topology)] - (dofor [hook hooks] - (let [hook-bytes (Utils/toByteArray hook) - deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] - (.shutdown deser-hook))))) + topo-conf (:storm-conf worker) + worker-topology-context (worker-context worker) + deserialized-worker-hooks (:deserialized-worker-hooks worker)] + (dofor [hook deserialized-worker-hooks] + (.start hook topo-conf worker-topology-context)))) + +(defn run-worker-shutdown-hooks [worker] + (let [deserialized-worker-hooks (:deserialized-worker-hooks worker)] + (dofor [hook deserialized-worker-hooks] + (.shutdown hook)))) ;; TODO: should worker even take the storm-id as input? this should be ;; deducable from cluster state (by searching through assignments) @@ -585,6 +592,8 @@ (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id] (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id " and conf " conf) + ;; create an empty list to store deserialized hooks + (def deserialized-hooks (java.util.ArrayList.)) (if-not (local-mode? conf) (redirect-stdio-to-slf4j!)) ;; because in local mode, its not a separate @@ -633,6 +642,8 @@ _ (activate-worker-when-all-connections-ready worker) _ (refresh-storm-active worker nil) + + _ (deserialize-worker-hooks worker) _ (run-worker-start-hooks worker)