Skip to content

Commit

Permalink
replacing RunTime.halt() with RunTime.exit() with a special shutdown …
Browse files Browse the repository at this point in the history
…hook that allows 1 second for cleanup shutdown hooks and then sends kill -9 to process. Added shutdown hooks for supervisor and worker.
  • Loading branch information
Parth-Brahmbhatt committed Jun 16, 2014
1 parent ecac64f commit 2c2570f
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 29 deletions.
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/cluster.clj
Expand Up @@ -239,7 +239,7 @@
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
;; this should never happen
(halt-process! 30 "Unknown callback for subtree " subtree args)))))]
(exit-process! 30 "Unknown callback for subtree " subtree args)))))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
(mkdirs cluster-state p))
(reify
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/daemon/common.clj
Expand Up @@ -104,7 +104,7 @@
(throw e#))
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
(halt-process! 13 "Error on initialization")
(exit-process! 13 "Error on initialization")
)))))

(defn- validate-ids! [^StormTopology topology]
Expand Down
7 changes: 3 additions & 4 deletions storm-core/src/clj/backtype/storm/daemon/drpc.clj
Expand Up @@ -158,10 +158,9 @@
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor
(DistributedRPCInvocations$Processor. service-handler))))]

(.addShutdownHook
(Runtime/getRuntime)
(Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
(add-shutdown-hook-with-force-kill-in-1-sec (fn []
(.stop handler-server)
(.stop invoke-server)))
(log-message "Starting Distributed RPC servers...")
(future (.serve invoke-server))
(.serve handler-server))))
Expand Down
6 changes: 4 additions & 2 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Expand Up @@ -73,7 +73,7 @@
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
(exit-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
}))
Expand Down Expand Up @@ -1153,7 +1153,9 @@
(.processor (Nimbus$Processor. service-handler))
)
server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
(add-shutdown-hook-with-force-kill-in-1-sec (fn []
(.shutdown service-handler)
(.stop server)))
(log-message "Starting Nimbus server...")
(.serve server)))
Expand Down
5 changes: 3 additions & 2 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Expand Up @@ -197,7 +197,7 @@
:curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
(exit-process! 20 "Error when processing an event")
))
})

Expand Down Expand Up @@ -543,7 +543,8 @@
(defn -launch [supervisor]
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
(mk-supervisor conf nil supervisor)))
(let [supervisor (mk-supervisor conf nil supervisor)]
(add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))))

(defn standalone-supervisor []
(let [conf-atom (atom nil)
Expand Down
11 changes: 6 additions & 5 deletions storm-core/src/clj/backtype/storm/daemon/worker.clj
Expand Up @@ -170,7 +170,7 @@
(defn mk-halting-timer [timer-name]
(mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
(exit-process! 20 "Error when processing an event")
)
:timer-name timer-name))

Expand Down Expand Up @@ -331,7 +331,7 @@
(:port worker)
(:transfer-local-fn worker)
(-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
:kill-fn (fn [t] (halt-process! 11))))
:kill-fn (fn [t] (exit-process! 11))))

(defn- close-resources [worker]
(let [dr (:default-shared-resources worker)]
Expand Down Expand Up @@ -442,13 +442,14 @@

(defmethod mk-suicide-fn
:local [conf]
(fn [] (halt-process! 1 "Worker died")))
(fn [] (exit-process! 1 "Worker died")))

(defmethod mk-suicide-fn
:distributed [conf]
(fn [] (halt-process! 1 "Worker died")))
(fn [] (exit-process! 1 "Worker died")))

(defn -main [storm-id assignment-id port-str worker-id]
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
(mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)))
(let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)]
(add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown worker)))))
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/disruptor.clj
Expand Up @@ -89,7 +89,7 @@

(defnk consume-loop*
[^DisruptorQueue queue handler
:kill-fn (fn [error] (halt-process! 1 "Async loop died!"))]
:kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
(let [ret (async-loop
(fn [] (consume-batch-when-available queue handler) 0)
:kill-fn kill-fn
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/event.clj
Expand Up @@ -42,7 +42,7 @@
(log-message "Event manager interrupted"))
(catch Throwable t
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")))))]
(exit-process! 20 "Error when processing an event")))))]
(.setDaemon runner daemon?)
(.start runner)
(reify
Expand Down
55 changes: 43 additions & 12 deletions storm-core/src/clj/backtype/storm/util.clj
Expand Up @@ -312,10 +312,10 @@
[& vals]
(byte-array (map byte vals)))

(defn halt-process!
(defn exit-process!
[val & msg]
(log-message "Halting process: " msg)
(.halt (Runtime/getRuntime) val))
(.exit (Runtime/getRuntime) val))

(defn sum
[vals]
Expand Down Expand Up @@ -388,27 +388,58 @@
(catch IOException e
(log-message "Could not extract " dir " from " jarpath))))

(defn ensure-process-killed! [pid]
(defn sleep-secs [secs]
(when (pos? secs)
(Time/sleep (* (long secs) 1000))))

(defn sleep-until-secs [target-secs]
(Time/sleepUntil (* (long target-secs) 1000)))

(def ^:const sig-kill 9)

(def ^:const sig-term 15)

(defn send-signal-to-process
[pid signum]
(try-cause
(exec-command! (str (if on-windows?
(if (== signum sig-kill) "taskkill /f /pid " "taskkill /pid ")
(str "kill -" signum " "))
pid))
(catch ExecuteException e
(log-message "Error when trying to kill " pid ". Process is probably already dead."))))

(defn force-kill-process
[pid]
(send-signal-to-process pid sig-kill))

(defn kill-process-with-sig-term
[pid]
(send-signal-to-process pid sig-term))

(defn ensure-process-killed!
[pid]
;; TODO: should probably do a ps ax of some sort to make sure it was killed
(try-cause
(exec-command! (str (if on-windows? "taskkill /f /pid " "kill -9 ") pid))
(kill-process-with-sig-term pid)
(catch ExecuteException e
(log-message "Error when trying to kill " pid ". Process is probably already dead."))))

(defn add-shutdown-hook-with-force-kill-in-1-sec
"adds the user supplied function as a shutdown hook for cleanup.
Also adds a function that sleeps for a second and then sends kill -9 to process to avoid any zombie process in case
cleanup function hangs."
[func]
(.addShutdownHook (Runtime/getRuntime) (Thread. #((func)
(sleep-secs 1)
(force-kill-process (process-pid))))))
(defnk launch-process [command :environment {}]
(let [builder (ProcessBuilder. command)
process-env (.environment builder)]
(doseq [[k v] environment]
(.put process-env k v))
(.start builder)))

(defn sleep-secs [secs]
(when (pos? secs)
(Time/sleep (* (long secs) 1000))))

(defn sleep-until-secs [target-secs]
(Time/sleepUntil (* (long target-secs) 1000)))

(defprotocol SmartThread
(start [this])
(join [this])
Expand All @@ -418,7 +449,7 @@
;; afn returns amount of time to sleep
(defnk async-loop [afn
:daemon false
:kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
:kill-fn (fn [error] (exit-process! 1 "Async loop died!"))
:priority Thread/NORM_PRIORITY
:factory? false
:start true
Expand Down

0 comments on commit 2c2570f

Please sign in to comment.