Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.

Commit

Permalink
move start-pipeline-state-updater to pipeline-state and make it publi…
Browse files Browse the repository at this point in the history
…c so it can be reused by other persistence components
  • Loading branch information
flosell committed Jul 25, 2015
1 parent 48d50be commit 2a0167e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ However, as this is still an experimental library, breaking changes may occur wi

The official release will have a defined and more stable API. If you are already relying on a particular API, please let me know.

## 0.4.4

* Improvements:
* Extracted common functions from `internal.default-pipeline-state` so they can be reused in other persistence components


## 0.4.3

Housekeeping release: Contains mostly cleanup under the hood and changes to APIs for advanced users.
Expand Down
21 changes: 4 additions & 17 deletions src/clj/lambdacd/internal/default_pipeline_state.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
i.e. what's currently running, what are the results of each step, ..."
(:require [lambdacd.internal.default-pipeline-state-persistence :as persistence]
[clj-time.core :as t]
[clojure.core.async :as async]
[lambdacd.internal.pipeline-state :as pipeline-state-protocol]
[lambdacd.event-bus :as event-bus]
[lambdacd.internal.pipeline-state :as pipeline-state]
[lambdacd.util :as util]))

(def clean-pipeline-state {})
Expand Down Expand Up @@ -41,7 +39,7 @@


(defrecord DefaultPipelineState [state-atom home-dir]
pipeline-state-protocol/PipelineStateComponent
pipeline-state/PipelineStateComponent
(update [self build-number step-id step-result]
(update-legacy build-number step-id step-result home-dir state-atom))
(get-all [self]
Expand All @@ -51,19 +49,8 @@
(next-build-number [self]
(next-build-number-legacy state-atom)))

(defn- start-pipeline-state-updater [instance ctx] ; TODO could be generalized for others to use
(let [subscription (event-bus/subscribe ctx :step-result-updated)
step-updates-channel (util/buffered (event-bus/only-payload subscription))]
(async/go-loop []
(if-let [step-result-update (async/<! step-updates-channel)]
(let [step-result (:step-result step-result-update)
build-number (:build-number step-result-update)
step-id (:step-id step-result-update)]
(pipeline-state-protocol/update instance build-number step-id step-result)
(recur))))))

(defn new-default-pipeline-state [state-atom config ctx]
(let [home-dir (:home-dir config)
instance (->DefaultPipelineState state-atom home-dir)]
(start-pipeline-state-updater instance ctx)
instance))
(pipeline-state/start-pipeline-state-updater instance ctx)
instance))
16 changes: 15 additions & 1 deletion src/clj/lambdacd/internal/pipeline_state.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
(ns lambdacd.internal.pipeline-state)
(ns lambdacd.internal.pipeline-state
(:require [clojure.core.async :as async]
[lambdacd.event-bus :as event-bus]
[lambdacd.util :as util]))


; TODO: move this protocol out of internal once the interface is more polished and stable
Expand All @@ -9,3 +12,14 @@
(get-all [self]) ;; should in the future be replaced with more detailed accessor functions
(get-internal-state [self]) ;; FIXME: temporary, hack until runners are rewritten to use step-results-channel
(next-build-number [self]))

(defn start-pipeline-state-updater [pipeline-state ctx]
(let [subscription (event-bus/subscribe ctx :step-result-updated)
step-updates-channel (util/buffered (event-bus/only-payload subscription))]
(async/go-loop []
(if-let [step-result-update (async/<! step-updates-channel)]
(let [step-result (:step-result step-result-update)
build-number (:build-number step-result-update)
step-id (:step-id step-result-update)]
(update pipeline-state build-number step-id step-result)
(recur))))))
2 changes: 1 addition & 1 deletion test/clj/lambdacd/internal/default_pipeline_state_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"most-recent-update-at" "1970-01-01T00:00:00.000Z"
"first-updated-at" "1970-01-01T00:00:00.000Z"}}] (json/read-str (slurp (str home-dir "/build-10/pipeline-state.json"))))))))

(deftest initialize-pipeline-persistence-test
(deftest pipeline-state-integration-test
(testing "that we tap into the event bus update the pipeline state with its information"
(let [ctx (some-ctx)
pipeline-state (new-default-pipeline-state (atom {}) {:home-dir (util/create-temp-dir)} ctx)]
Expand Down
27 changes: 27 additions & 0 deletions test/clj/lambdacd/internal/pipeline_state_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
(ns lambdacd.internal.pipeline-state-test
(:use [lambdacd.testsupport.test-util])
(:require [clojure.test :refer :all]
[lambdacd.internal.pipeline-state :refer :all]
[lambdacd.testsupport.test-util :as tu]
[lambdacd.testsupport.data :refer [some-ctx-with some-ctx]]
[lambdacd.util :as util]
[lambdacd.event-bus :as event-bus]
[clojure.core.async :as async]))

(deftest pipeline-state-updater-test
(testing "that we tap into the event bus update the pipeline state with its information"
(let [updates (atom [])
ctx (some-ctx)
pipeline-state (reify PipelineStateComponent
(update [self build-number step-id step-result]
(swap! updates #(conj %1 [build-number step-id step-result]))))]
(start-pipeline-state-updater pipeline-state ctx)

(event-bus/publish ctx :step-result-updated {:build-number 1 :step-id [1 2] :step-result {:status :running}})
(event-bus/publish ctx :step-result-updated {:build-number 2 :step-id [1 2] :step-result {:status :success}})
(event-bus/publish ctx :step-result-updated {:build-number 1 :step-id [1 2] :step-result {:status :running :foo :bar}})

(wait-for (= 3 (count @updates)))
(is (= [[1 [1 2] {:status :running}]
[2 [1 2] {:status :success}]
[1 [1 2] {:status :running :foo :bar}]] @updates)))))
6 changes: 5 additions & 1 deletion test/clj/lambdacd/testsupport/test_util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@
(async/timeout timeout) {:status :timeout}))



(defmacro wait-for [predicate]
`(loop []
(if (not ~predicate)
(Thread/sleep 50)
(recur))))

(defmacro start-waiting-for [body]
`(async/go
Expand Down

0 comments on commit 2a0167e

Please sign in to comment.