Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.

Commit

Permalink
add pipeline shutdown hook (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
flosell committed Apr 3, 2016
1 parent b9bbbef commit 4370af8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
30 changes: 23 additions & 7 deletions src/clj/lambdacd/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,28 @@
[lambdacd.internal.execution :as execution]
[lambdacd.event-bus :as event-bus]
[lambdacd.internal.running-builds-tracking :as running-builds-tracking]
[lambdacd.internal.pipeline-state :as pipeline-state]))
[lambdacd.internal.pipeline-state :as pipeline-state]
[clojure.tools.logging :as log]
[lambdacd.runners :as runners]))

(defn- add-shutdown-sequence! [ctx]
(doto (Runtime/getRuntime)
(.addShutdownHook (fn []
((:shutdown-sequence (:config ctx)) ctx)))))

(def default-shutdown-sequence
(fn [ctx]
(runners/stop-runner ctx)
(execution/kill-all-pipelines ctx)
(pipeline-state/stop-pipeline-state-updater ctx)))

(def default-config
{:ms-to-wait-for-shutdown (* 10 1000)})
{:ms-to-wait-for-shutdown (* 10 1000)
:shutdown-sequence default-shutdown-sequence})

(defn- initialize-pipeline-state-updater [ctx]
(let [updater (pipeline-state/start-pipeline-state-updater (:pipeline-state-component ctx) ctx)]
(assoc ctx :pipeline-state-updater updater)))

(defn assemble-pipeline
([pipeline-def config]
Expand All @@ -16,12 +34,10 @@
(let [context (-> {:config (merge default-config config)}
(event-bus/initialize-event-bus)
(running-builds-tracking/initialize-running-builds-tracking)
(assoc :pipeline-state-component pipeline-state-component))
pipeline-state-updater (pipeline-state/start-pipeline-state-updater (:pipeline-state-component context) context)]

(assoc :pipeline-state-component pipeline-state-component)
(initialize-pipeline-state-updater))]
{:context context
:pipeline-def pipeline-def
:pipeline-state-updater pipeline-state-updater})))
:pipeline-def pipeline-def})))

(defn retrigger [pipeline context build-number step-id-to-retrigger]
(execution/retrigger-async pipeline context build-number step-id-to-retrigger))
Expand Down
4 changes: 2 additions & 2 deletions src/clj/lambdacd/internal/pipeline_state.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
(update pipeline-state build-number step-id step-result)
(recur)))))))

(defn stop-pipeline-state-updater [ctx start-pipeline-state-updater-result]
(defn stop-pipeline-state-updater [ctx]
(log/info "Shutting down pipeline state updater...")
(event-bus/publish ctx :stop-pipeline-state-updater {})
(async/<!! start-pipeline-state-updater-result)
(async/<!! (:pipeline-state-updater ctx))
(log/info "Pipeline state updater stopped"))
8 changes: 4 additions & 4 deletions test/clj/lambdacd/internal/pipeline_state_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
(update [_ _ _ _]
(throw (Exception. "no update expected"))))
updater-finished-ch (start-pipeline-state-updater pipeline-state ctx)]
(tu/call-with-timeout 1000 (stop-pipeline-state-updater ctx updater-finished-ch))
(tu/call-with-timeout 1000 (stop-pipeline-state-updater (assoc ctx :pipeline-state-updater updater-finished-ch)))
(is (not= {:status :timeout} (tu/get-or-timeout updater-finished-ch :timeout 1000)))))
(testing "that stopping is idempotent"
(let [ctx (some-ctx)
pipeline-state (reify PipelineStateComponent
(update [_ _ _ _]
(throw (Exception. "no update expected"))))
updater-finished-ch (start-pipeline-state-updater pipeline-state ctx)]
(tu/call-with-timeout 1000 (stop-pipeline-state-updater ctx updater-finished-ch))
(tu/call-with-timeout 1000 (stop-pipeline-state-updater ctx updater-finished-ch))
(tu/call-with-timeout 1000 (stop-pipeline-state-updater (assoc ctx :pipeline-state-updater updater-finished-ch)))
(tu/call-with-timeout 1000 (stop-pipeline-state-updater (assoc ctx :pipeline-state-updater updater-finished-ch)))
(is (not= {:status :timeout} (tu/get-or-timeout updater-finished-ch :timeout 1000)))
(tu/call-with-timeout 1000 (stop-pipeline-state-updater ctx updater-finished-ch))))))
(tu/call-with-timeout 1000 (stop-pipeline-state-updater (assoc ctx :pipeline-state-updater updater-finished-ch)))))))

0 comments on commit 4370af8

Please sign in to comment.