Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.

Commit

Permalink
wait for pipelines to complete when killing all pipelines (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
flosell committed Apr 3, 2016
1 parent 8bb4a15 commit c470c53
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 13 deletions.
5 changes: 3 additions & 2 deletions src/clj/lambdacd/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
(:require [lambdacd.internal.default-pipeline-state :as default-pipeline-state]
[lambdacd.internal.execution :as execution]
[lambdacd.event-bus :as event-bus]
[clojure.core.async :as async]
[lambdacd.internal.running-builds-tracking :as running-builds-tracking]
[lambdacd.internal.pipeline-state :as pipeline-state]))


(defn assemble-pipeline
([pipeline-def config]
(assemble-pipeline pipeline-def config (default-pipeline-state/new-default-pipeline-state config)))
([pipeline-def config pipeline-state-component]
(let [context (-> {:config config}
(let [context (-> {:config config}
(event-bus/initialize-event-bus)
(running-builds-tracking/initialize-running-builds-tracking)
(assoc :pipeline-state-component pipeline-state-component))
pipeline-state-updater (pipeline-state/start-pipeline-state-updater (:pipeline-state-component context) context)]

Expand Down
14 changes: 12 additions & 2 deletions src/clj/lambdacd/internal/execution.clj
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@
(and (:retriggered-build-number ctx)
(:retriggered-step-id ctx)))}))

(defn- report-step-started [ctx]
(send-step-result ctx {:status :running})
(event-bus/publish ctx :step-started {:step-id (:step-id ctx)
:build-number (:build-number ctx)}))

(defn execute-step [args [ctx step]]
(let [step-id (:step-id ctx)
Expand All @@ -114,7 +118,7 @@
:is-killed child-kill-switch)
processed-async-result-ch (process-channel-result-async result-ch ctx)
kill-subscription (kill-step-handling ctx-for-child)
_ (send-step-result ctx {:status :running})
_ (report-step-started ctx)
immediate-step-result (execute-or-catch step args ctx-for-child)
processed-async-result (async/<!! processed-async-result-ch)
complete-step-result (merge processed-async-result immediate-step-result)]
Expand Down Expand Up @@ -297,7 +301,13 @@
(event-bus/publish ctx :kill-step {:step-id step-id
:build-number build-number}))

(defn- wait-for-pipelines-to-complete [ctx]
(while (not-empty @(:started-steps ctx))
(log/debug "Waiting for steps to complete:" @(:started-steps ctx))
(Thread/sleep 100)))

(defn kill-all-pipelines [ctx]
(log/info "Killing all running pipelines...")
(event-bus/publish ctx :kill-step {:step-id :any-root
:build-number :any}))
:build-number :any})
(wait-for-pipelines-to-complete ctx))
26 changes: 26 additions & 0 deletions src/clj/lambdacd/internal/running_builds_tracking.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
(ns lambdacd.internal.running-builds-tracking
(:require [lambdacd.event-bus :as event-bus]
[clojure.core.async :as async]))

(defn running-step-record [payload]
{:step-id (:step-id payload)
:build-number (:build-number payload)})

(defn initialize-running-builds-tracking [ctx]
(let [steps-started-subscription (event-bus/subscribe ctx :step-started)
steps-started-payload (event-bus/only-payload steps-started-subscription)

steps-finished-subscription (event-bus/subscribe ctx :step-finished)
steps-finished-payload (event-bus/only-payload steps-finished-subscription)
started-steps (atom #{})]
(async/go-loop []
(if-let [payload (async/<! steps-started-payload)]
(do
(swap! started-steps #(conj % (running-step-record payload)))
(recur))))
(async/go-loop []
(if-let [payload (async/<! steps-finished-payload)]
(do
(swap! started-steps #(disj % (running-step-record payload)))
(recur))))
(assoc ctx :started-steps started-steps)))
1 change: 1 addition & 0 deletions src/clj/lambdacd/ui/api.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns lambdacd.ui.api
(:require [lambdacd.util :as util]
[lambdacd.presentation.unified :as unified]
[lambdacd.runners :as runners]
[ring.util.response :as resp]
[clojure.string :as string]
[ring.middleware.json :as ring-json]
Expand Down
35 changes: 27 additions & 8 deletions test/clj/lambdacd/internal/execution_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,20 @@
(defn some-pipeline-state []
(atom {}))

(defn- events-for [k ctx]
(-> (event-bus/subscribe ctx k)
(event-bus/only-payload)
(buffered))
)

(defn- step-finished-events-for [ctx]
(-> (event-bus/subscribe ctx :step-finished)
(event-bus/only-payload)
(buffered)))
(events-for :step-finished ctx))

(defn step-result-updates-for [ctx]
(-> (event-bus/subscribe ctx :step-result-updated)
(event-bus/only-payload)
(buffered)))
(defn- step-started-events-for [ctx]
(events-for :step-started ctx))

(defn- step-result-updates-for [ctx]
(events-for :step-result-updated ctx))

(deftest execute-step-test
(testing "that executing returns the step result added to the input args"
Expand Down Expand Up @@ -208,6 +212,13 @@
:step-id [1 2 3]
:final-result {:status :success :foo :baz}
:rerun-for-retrigger true}] (slurp-chan step-finished-events)))))
(testing "that the event bus is notified when a step starts"
(let [ctx (some-ctx-with :build-number 3
:step-id [1 2 3])
step-started-events (step-started-events-for ctx)]
(execute-step {} [ctx some-other-step])
(is (= [{:build-number 3
:step-id [1 2 3]}] (slurp-chan step-started-events)))))
(testing "that a running step can be killed"
(let [is-killed (atom false)
ctx (some-ctx-with :step-id [3 2 1]
Expand Down Expand Up @@ -417,4 +428,12 @@
(kill-all-pipelines ctx)
(kill-all-pipelines ctx)
(is (map-containing {:status :killed} (get-or-timeout future-step-result :timeout 1000)))
(kill-all-pipelines ctx))))
(kill-all-pipelines ctx)))
(testing "that it waits until all running pipelines are done"
(let [started-steps (atom #{:foo})
ctx (some-ctx-with :step-id [3 1]
:started-steps started-steps)]
; still running, should time out
(is (map-containing {:status :timeout} (call-with-timeout 300 (kill-all-pipelines ctx))))
(reset! started-steps #{})
(is (not= {:status :timeout} (call-with-timeout 500 (kill-all-pipelines ctx)))))))
23 changes: 23 additions & 0 deletions test/clj/lambdacd/internal/running_builds_tracking_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
(ns lambdacd.internal.running-builds-tracking-test
(:require [clojure.test :refer :all]
[lambdacd.internal.running-builds-tracking :refer :all]
[lambdacd.testsupport.data :refer [some-ctx some-ctx-with]]
[lambdacd.testsupport.test-util :refer [wait-for]]
[lambdacd.event-bus :as event-bus]))

(deftest running-builds-tracker
(testing "that it adds running builds and step-ids when steps start runnning and removes them if they stop"
(let [ctx (initialize-running-builds-tracking (some-ctx))]

(event-bus/publish ctx :step-started {:step-id [1] :build-number 1})
(event-bus/publish ctx :step-started {:step-id [2 2] :build-number 2})

(wait-for (= 2 (count @(:started-steps ctx))))
(is (= #{{:step-id [1] :build-number 1}
{:step-id [2 2] :build-number 2}} @(:started-steps ctx)))

(event-bus/publish ctx :step-finished {:step-id [2 2] :build-number 2 :final-result {:foo :bar}})

(wait-for (= 1 (count @(:started-steps ctx))))

(is (= #{{:step-id [1] :build-number 1}} @(:started-steps ctx))))))
3 changes: 2 additions & 1 deletion test/clj/lambdacd/testsupport/data.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
:pipeline-state-component nil ;; set later
:config config
:is-killed (atom false)
:_out-acc (atom "")}
:_out-acc (atom "")
:started-steps (atom #{})}
(event-bus/initialize-event-bus))
))

Expand Down

0 comments on commit c470c53

Please sign in to comment.