From 91f1522820f3ac1c53594e943dfa8f3a2cde1a8f Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 1 Feb 2018 10:08:48 +0900 Subject: [PATCH] STORM-2853 Initialize tick tuple after initializing spouts/bolts * this prevents newly-initializing executor in deactivated topology to show high CPU usage * this is based on the fact that all the tasks in executor are for same component --- .../src/clj/org/apache/storm/daemon/executor.clj | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index b9bcaae1d01..3940f1bdc13 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -350,7 +350,7 @@ (when (seq data-points) (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))) -(defn setup-ticks! [worker executor-data] +(defn setup-ticks! [executor-data] (let [storm-conf (:storm-conf executor-data) comp-id (:component-id executor-data) tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS) @@ -362,7 +362,7 @@ (= :spout (:type executor-data)))) (log-message "Timeouts disabled for executor " comp-id ":" (:executor-id executor-data)) (schedule-recurring - (:user-timer worker) + (:user-timer (:worker executor-data)) tick-time-secs tick-time-secs (fn [] @@ -390,14 +390,13 @@ (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK)) (.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE))) - ;; starting the batch-transfer->worker ensures that anything publishing to that queue + ;; starting the batch-transfer->worker ensures that anything publishing to that queue ;; doesn't block (because it's a single threaded queue and the caching/consumer started ;; trick isn't thread-safe) system-threads [(start-batch-transfer->worker-handler! worker executor-data)] handlers (with-error-reaction report-error-and-die (mk-threads executor-data task-datas initial-credentials)) - threads (concat handlers system-threads)] - (setup-ticks! worker executor-data) + threads (concat handlers system-threads)] (log-message "Finished loading executor " component-id ":" (pr-str executor-id)) ;; TODO: add method here to get rendered stats... have worker call that when heartbeating @@ -627,6 +626,7 @@ ))))) (reset! open-or-prepare-was-called? true) (log-message "Opened spout " component-id ":" (keys task-datas)) + (setup-ticks! executor-data) (setup-metrics! executor-data) (fn [] @@ -852,6 +852,7 @@ ))))) (reset! open-or-prepare-was-called? true) (log-message "Prepared bolt " component-id ":" (keys task-datas)) + (setup-ticks! executor-data) (setup-metrics! executor-data) (let [receive-queue (:receive-queue executor-data)