Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.

Commit

Permalink
make pipeline state updater stoppable (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
flosell committed Apr 3, 2016
1 parent 3c60abb commit 0cc19df
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
14 changes: 8 additions & 6 deletions src/clj/lambdacd/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
([pipeline-def config]
(assemble-pipeline pipeline-def config (default-pipeline-state/new-default-pipeline-state config)))
([pipeline-def config pipeline-state-component]
(let [context (-> {:config config}
(event-bus/initialize-event-bus)
(assoc :pipeline-state-component pipeline-state-component))]
(pipeline-state/start-pipeline-state-updater (:pipeline-state-component context) context)
{:context context
:pipeline-def pipeline-def})))
(let [context (-> {:config config}
(event-bus/initialize-event-bus)
(assoc :pipeline-state-component pipeline-state-component))
pipeline-state-updater (pipeline-state/start-pipeline-state-updater (:pipeline-state-component context) context)]

{:context context
:pipeline-def pipeline-def
:pipeline-state-updater pipeline-state-updater})))

(defn retrigger [pipeline context build-number step-id-to-retrigger]
(execution/retrigger-async pipeline context build-number step-id-to-retrigger))
Expand Down
31 changes: 21 additions & 10 deletions src/clj/lambdacd/internal/pipeline_state.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
(ns lambdacd.internal.pipeline-state
(:require [clojure.core.async :as async]
[lambdacd.event-bus :as event-bus]
[lambdacd.util :as util])
(:refer-clojure :exclude [alias]))
[lambdacd.util :as util]
[clojure.tools.logging :as log])
(:refer-clojure :exclude [alias update]))


; TODO: move this protocol out of internal once the interface is more polished and stable
Expand All @@ -15,12 +16,22 @@
(next-build-number [self]))

(defn start-pipeline-state-updater [pipeline-state ctx]
(let [subscription (event-bus/subscribe ctx :step-result-updated)
step-updates-channel (util/buffered (event-bus/only-payload subscription))]
(let [step-updates-channel (util/buffered
(event-bus/only-payload
(event-bus/subscribe ctx :step-result-updated)))
stop-updater-channel (event-bus/only-payload
(event-bus/subscribe ctx :stop-pipeline-state-updater))]
(async/go-loop []
(if-let [step-result-update (async/<! step-updates-channel)]
(let [step-result (:step-result step-result-update)
build-number (:build-number step-result-update)
step-id (:step-id step-result-update)]
(update pipeline-state build-number step-id step-result)
(recur))))))
(if-let [[step-result-update ch] (async/alts! [step-updates-channel stop-updater-channel])]
(when (not= stop-updater-channel ch)
(let [step-result (:step-result step-result-update)
build-number (:build-number step-result-update)
step-id (:step-id step-result-update)]
(update pipeline-state build-number step-id step-result)
(recur)))))))

(defn stop-pipeline-state-updater [ctx start-pipeline-state-updater-result]
(log/info "Shutting down pipeline state updater...")
(event-bus/publish ctx :stop-pipeline-state-updater {})
(async/<!! start-pipeline-state-updater-result)
(log/info "Pipeline state updater stopped"))
17 changes: 12 additions & 5 deletions test/clj/lambdacd/internal/pipeline_state_test.clj
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
(ns lambdacd.internal.pipeline-state-test
(:use [lambdacd.testsupport.test-util])
(:refer-clojure :exclude [alias update])
(:require [clojure.test :refer :all]
[lambdacd.internal.pipeline-state :refer :all]
[lambdacd.testsupport.test-util :as tu]
[lambdacd.testsupport.data :refer [some-ctx-with some-ctx]]
[lambdacd.util :as util]
[lambdacd.event-bus :as event-bus]
[clojure.core.async :as async]))
[lambdacd.event-bus :as event-bus]))

(deftest pipeline-state-updater-test
(testing "that we tap into the event bus update the pipeline state with its information"
(let [updates (atom [])
(let [updates (atom [])
ctx (some-ctx)
pipeline-state (reify PipelineStateComponent
(update [self build-number step-id step-result]
Expand All @@ -24,4 +23,12 @@
(wait-for (= 3 (count @updates)))
(is (= [[1 [1 2] {:status :running}]
[2 [1 2] {:status :success}]
[1 [1 2] {:status :running :foo :bar}]] @updates)))))
[1 [1 2] {:status :running :foo :bar}]] @updates))))
(testing "that the pipeline-state-updater can be stopped with a message on the event bus"
(let [ctx (some-ctx)
pipeline-state (reify PipelineStateComponent
(update [_ _ _ _]
(throw (Exception. "no update expected"))))
updater-finished-ch (start-pipeline-state-updater pipeline-state ctx)]
(event-bus/publish ctx :stop-pipeline-state-updater {})
(is (not= {:status :timeout} (tu/get-or-timeout updater-finished-ch :timeout 1000))))))

0 comments on commit 0cc19df

Please sign in to comment.