Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Interrupt all worker threads when a task fails

  • Loading branch information...
commit 3a1b632cdb7943e57a51e6923c07814971d1e510 1 parent 729e075
@fbrubacher authored
View
8 src/clj/backtype/storm/daemon/task.clj
@@ -169,7 +169,7 @@
ACKER-ACK-STREAM-ID))
)))
-(defn mk-task [conf storm-conf topology-context storm-id mq-context cluster-state storm-active-atom transfer-fn]
+(defn mk-task [conf storm-conf topology-context storm-id mq-context cluster-state storm-active-atom transfer-fn all-threads]
(let [task-id (.getThisTaskId topology-context)
component-id (.getThisComponentId topology-context)
task-info (.getTaskToComponent topology-context)
@@ -186,9 +186,9 @@
report-error-and-die (fn [error]
(report-error error)
- (when (not= "local" (storm-conf STORM-CLUSTER-MODE))
- (halt-process! 1 "Task died")))
-
+ (if (not= "local" (storm-conf STORM-CLUSTER-MODE))
+ (halt-process! 1 "Task died")
+ (doseq [t all-threads] (.interrupt t) (.join t))))
;; heartbeat ASAP so nimbus doesn't reassign
heartbeat-thread (async-loop
(fn []
View
2  src/clj/backtype/storm/daemon/worker.clj
@@ -169,7 +169,6 @@
(when @active (heartbeat-fn) (conf WORKER-HEARTBEAT-FREQUENCY-SECS))
)
:priority Thread/MAX_PRIORITY)
- tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn))
threads [(async-loop
(fn []
(.add event-manager refresh-connections)
@@ -194,6 +193,7 @@
0 )
:args-fn (fn [] [(ArrayList.) (TupleSerializer. storm-conf)]))
heartbeat-thread]
+ tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn threads))
virtual-port-shutdown (when (local-mode-zmq? conf)
(log-message "Launching virtual port for " supervisor-id ":" port)
(msg-loader/launch-virtual-port!
Please sign in to comment.
Something went wrong with that request. Please try again.