Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.

Commit

Permalink
Remove use of go-blocks for blocking, long running tasks and remove u…
Browse files Browse the repository at this point in the history
…se of blocking >git add -A within go-blocks to fix thread pool exhaustion in big setups (fixes #110)
  • Loading branch information
flosell committed May 14, 2016
1 parent 09345e7 commit 87cbdc3
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 20 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ However, as this is still an experimental library, breaking changes may occur wi

The official release will have a defined and more stable API. If you are already relying on a particular API, please let me know.

## 0.9.1

* Bug fixes:
* Fixed bug that led to stuck pipelines in scenarios where a lot of pipelines live in the same project/process (#110)
* API Changes:
* `lambdacd.event-bus/publish` is now deprecated in favor of `lambdacd.event-bus/publish!!` and `lambdacd.event-bus/publish!`
(to be able to properly publish from within a go-block)

## 0.9.0

* Improvements:
Expand Down
10 changes: 9 additions & 1 deletion src/clj/lambdacd/event_bus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@
(assoc ctx :event-publisher publisher-ch
:event-publication publication)))


(defmacro publish! [ctx topic payload]
`(async/>! (:event-publisher ~ctx) {:topic ~topic :payload ~payload}))

(defmacro publish!! [ctx topic payload]
`(async/>!! (:event-publisher ~ctx) {:topic ~topic :payload ~payload}))

(defn publish [ctx topic payload]
(async/>!! (:event-publisher ctx) {:topic topic :payload payload}))
"DEPRECATED, will be removed in subsequent versions. use publish!! or publish! instead"
(publish!! ctx topic payload))

(defn subscribe [ctx topic]
(let [result-ch (async/chan)]
Expand Down
28 changes: 15 additions & 13 deletions src/clj/lambdacd/internal/execution.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,25 @@
(assoc result :has-been-waiting true)
result))

(defn- send-step-result [{step-id :step-id build-number :build-number :as ctx } step-result]
(defn- send-step-result!! [{step-id :step-id build-number :build-number :as ctx} step-result]
(let [payload {:build-number build-number
:step-id step-id :step-result step-result}]
(event-bus/publish ctx :step-result-updated payload)))
:step-id step-id
:step-result step-result}]
(event-bus/publish!! ctx :step-result-updated payload)))

(defn process-channel-result-async [c ctx]
(async/go
(loop [cur-result {:status :running}]
(defn process-channel-result-async [c {step-id :step-id build-number :build-number :as ctx}]
(async/go-loop [cur-result {:status :running}]
(let [[key value] (async/<! c)
new-result (-> cur-result
(assoc key value)
(attach-wait-indicator-if-necessary key value))]
(if (and (nil? key) (nil? value))
cur-result
(do
(send-step-result ctx new-result)
(recur new-result)))))))
(event-bus/publish! ctx :step-result-updated {:build-number build-number
:step-id step-id
:step-result new-result})
(recur new-result))))))

(defmacro with-err-str
[& body]
Expand Down Expand Up @@ -86,7 +88,7 @@
(build-number-to-kill? build-number kill-payload))
(do
(reset! is-killed true)
(async/>!! (:result-channel ctx) [:received-kill true]))
(async/>! (:result-channel ctx) [:received-kill true]))
(recur))))
subscription))

Expand All @@ -102,7 +104,7 @@
(:retriggered-step-id ctx)))}))

(defn- report-step-started [ctx]
(send-step-result ctx {:status :running})
(send-step-result!! ctx {:status :running})
(event-bus/publish ctx :step-started {:step-id (:step-id ctx)
:build-number (:build-number ctx)}))

Expand All @@ -125,7 +127,7 @@
(log/debug (str "executed step " step-id complete-step-result))
(clean-up-kill-handling ctx-for-child kill-subscription)
(remove-watch parent-kill-switch watch-key)
(send-step-result ctx complete-step-result)
(send-step-result!! ctx complete-step-result)
(report-step-finished ctx complete-step-result)
(step-output step-id complete-step-result)))

Expand Down Expand Up @@ -163,7 +165,7 @@
old-unified (unify-status-fn (vals statuses))
new-unified (unify-status-fn (vals new-statuses))]
(if (not= old-unified new-unified)
(async/>!! out-ch [:status new-unified]))
(async/>! out-ch [:status new-unified]))
(recur new-statuses))
(async/close! out-ch))))
out-ch))
Expand Down Expand Up @@ -219,7 +221,7 @@
(defn- publish-child-step-results [ctx retriggered-build-number original-build-result]
(->> original-build-result
(filter #(step-id/parent-of? (:step-id ctx) (first %)))
(map #(send-step-result (assoc ctx :step-id (first %)) (assoc (second %) :retrigger-mock-for-build-number retriggered-build-number)))
(map #(send-step-result!! (assoc ctx :step-id (first %)) (assoc (second %) :retrigger-mock-for-build-number retriggered-build-number)))
(doall)))

(defn retrigger-mock-step [retriggered-build-number]
Expand Down
2 changes: 1 addition & 1 deletion src/clj/lambdacd/steps/control_flow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
(async/<!! filtered-by-success)))

(defn- step-producer-returning-with-first-successful [args steps-and-ids]
(let [step-result-channels (map #(async/go (core/execute-step args %)) steps-and-ids)
(let [step-result-channels (map #(async/thread (core/execute-step args %)) steps-and-ids)
result (wait-for-finished-on step-result-channels)]
(if (nil? result)
[{:status :failure}]
Expand Down
10 changes: 5 additions & 5 deletions test/clj/lambdacd/event_bus_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
(testing "that we can publish and subscribe to events"
(let [ctx (initialize-event-bus (some-ctx))]
(let [subscription (subscribe ctx :test-messages)]
(publish ctx :test-messages {:message-number 1})
(publish ctx :other-topic {:other-message "hello"})
(publish!! ctx :test-messages {:message-number 1})
(publish!! ctx :other-topic {:other-message "hello"})
(unsubscribe ctx :test-messages subscription)
(publish ctx :test-messages {:message-number 2})
(publish!! ctx :test-messages {:message-number 2})
(is (= [{:message-number 1}] (slurp-chan-with-size 1 (only-payload subscription)))))))
(testing "that messages get delivered to all subscribers if more than one subscribes to the same topic"
(let [ctx (initialize-event-bus (some-ctx))
subscription-1 (subscribe ctx :test-messages)
subscription-2 (subscribe ctx :test-messages)
payloads-1 (buffered (only-payload subscription-1))
payloads-2 (buffered (only-payload subscription-2))]
(publish ctx :test-messages {:message-number 1})
(publish ctx :test-messages {:message-number 2})
(publish!! ctx :test-messages {:message-number 1})
(publish!! ctx :test-messages {:message-number 2})

(is (= [{:message-number 1} {:message-number 2}] (slurp-chan-with-size 2 payloads-1)))
(is (= [{:message-number 1} {:message-number 2}] (slurp-chan-with-size 2 payloads-2))))))

0 comments on commit 87cbdc3

Please sign in to comment.