Skip to content

Commit

Permalink
uses get-connection instead of the atom to fetch the connection object
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang.balkundi committed Jun 28, 2020
1 parent 31dd382 commit d7f9b4c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 37 deletions.
20 changes: 11 additions & 9 deletions src/ziggurat/messaging/rabbitmq_wrapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,44 @@
[ziggurat.messaging.rabbitmq.consumer :as rmq-consumer]
[ziggurat.messaging.messaging-interface :refer [MessagingProtocol]]))

(def connection (atom nil))
(def connection (atom nil))

(defn get-connection [] @connection)

(defn start-connection [config stream-routes]
(when (and (is-connection-required? (:ziggurat config) stream-routes)
(nil? @connection))
(nil? (get-connection)))
(reset! connection (rmq-connection/start-connection config))))

(defn stop-connection [config stream-routes]
(when (and (is-connection-required? (:ziggurat config) stream-routes)
(not (nil? @connection)))
(rmq-connection/stop-connection @connection config)
(not (nil? (get-connection))))
(rmq-connection/stop-connection (get-connection) config)
(reset! connection nil)))

(defn publish
([exchange message-payload]
(publish exchange message-payload nil))
([exchange message-payload expiration]
(rmq-producer/publish @connection exchange message-payload expiration)))
(rmq-producer/publish (get-connection) exchange message-payload expiration)))

(defn create-and-bind-queue
([queue-name exchange-name]
(create-and-bind-queue queue-name exchange-name nil))
([queue-name exchange-name dead-letter-exchange]
(rmq-producer/create-and-bind-queue @connection queue-name exchange-name dead-letter-exchange)))
(rmq-producer/create-and-bind-queue (get-connection) queue-name exchange-name dead-letter-exchange)))

(defn get-messages-from-queue
([queue-name ack?]
(get-messages-from-queue queue-name ack? 1))
([queue-name ack? count]
(rmq-consumer/get-messages-from-queue @connection queue-name ack? count)))
(rmq-consumer/get-messages-from-queue (get-connection) queue-name ack? count)))

(defn process-messages-from-queue [queue-name count processing-fn]
(rmq-consumer/process-messages-from-queue @connection queue-name count processing-fn))
(rmq-consumer/process-messages-from-queue (get-connection) queue-name count processing-fn))

(defn start-subscriber [prefetch-count wrapped-mapper-fn queue-name]
(rmq-consumer/start-subscriber @connection prefetch-count wrapped-mapper-fn queue-name))
(rmq-consumer/start-subscriber (get-connection) prefetch-count wrapped-mapper-fn queue-name))

(defn consume-message [ch meta payload ack?]
(rmq-consumer/consume-message ch meta payload ack?))
Expand Down
18 changes: 9 additions & 9 deletions test/ziggurat/messaging/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[langohr.channel :as lch]
[ziggurat.config :refer [ziggurat-config rabbitmq-config]]
[ziggurat.fixtures :as fix]
[ziggurat.messaging.rabbitmq-wrapper :refer [connection]]
[ziggurat.messaging.rabbitmq-wrapper :as rmqw]
[ziggurat.messaging.producer :as producer]
[ziggurat.retry :as retry]
[ziggurat.tracer :refer [tracer]]
Expand Down Expand Up @@ -103,7 +103,7 @@
retry-count 5
message-payload (assoc (gen-message-payload topic-entity) :retry-count 2)
original-zig-config (ziggurat-config)
rmq-ch (lch/open @connection)]
rmq-ch (lch/open (rmqw/get-connection))]

(with-redefs [ziggurat-config (fn [] (-> original-zig-config
(update-in [:retry :count] (constantly retry-count))
Expand Down Expand Up @@ -132,7 +132,7 @@
retry-count 5
message-payload (assoc (assoc-in (gen-message-payload topic-entity) [:message :msg] "skip") :retry-count 2)
original-zig-config (ziggurat-config)
rmq-ch (lch/open @connection)]
rmq-ch (lch/open (rmqw/get-connection))]

(with-redefs [ziggurat-config (fn [] (-> original-zig-config
(update-in [:retry :count] (constantly retry-count))
Expand Down Expand Up @@ -160,7 +160,7 @@
retry-count 5
no-of-msgs 2
original-zig-config (ziggurat-config)
rmq-ch (lch/open @connection)]
rmq-ch (lch/open (rmqw/get-connection))]

(with-redefs [ziggurat-config (fn [] (-> original-zig-config
(update-in [:retry :count] (constantly retry-count))
Expand Down Expand Up @@ -188,7 +188,7 @@
(fix/with-queues {topic-entity {:handler-fn #(constantly nil)}}
(let [no-of-workers 3
original-zig-config (ziggurat-config)
ch (lch/open @connection)
ch (lch/open (rmqw/get-connection))
counter (atom 0)]

(with-redefs [ziggurat-config (fn [] (-> original-zig-config
Expand All @@ -204,7 +204,7 @@
(testing "start subscribers should call start-subscriber* according to the product of worker and mapper-fns in stream-routes"
(let [no-of-workers 3
original-zig-config (ziggurat-config)
ch (lch/open @connection)
ch (lch/open (rmqw/get-connection))
counter (atom 0)
stream-routes {topic-entity {:handler-fn #(constantly nil)}
:test {:handler-fn #(constantly nil)}}]
Expand All @@ -229,7 +229,7 @@
:retry-limit 2
:success-promise success-promise})
original-zig-config (ziggurat-config)
rmq-ch (lch/open @connection)]
rmq-ch (lch/open (rmqw/get-connection))]
(fix/with-queues {topic-entity {:handler-fn #(constantly nil)
channel channel-fn}}
(with-redefs [ziggurat-config (fn [] (-> original-zig-config
Expand All @@ -256,7 +256,7 @@
:retry-limit 2
:success-promise success-promise})
original-zig-config (ziggurat-config)
rmq-ch (lch/open @connection)]
rmq-ch (lch/open (rmqw/get-connection))]
(fix/with-queues {topic-entity {:handler-fn #(constantly nil)
channel channel-fn}}
(with-redefs [ziggurat-config (fn [] (-> original-zig-config
Expand All @@ -277,7 +277,7 @@
retry-count 3
message-payload (assoc (gen-message-payload topic-entity) :retry-count 3)
original-zig-config (ziggurat-config)
rmq-ch (lch/open @connection)]
rmq-ch (lch/open (rmqw/get-connection))]
(.reset tracer)
(with-redefs [ziggurat-config (fn [] (-> original-zig-config
(update-in [:retry :count] (constantly retry-count))
Expand Down
8 changes: 4 additions & 4 deletions test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@

(deftest ^:integration make-queues-integration-tests
(testing "it creates queues with topic entity from stream routes"
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(let [stream-routes {:default {:handler-fn #(constantly :success)}}

instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config))))
Expand Down Expand Up @@ -293,7 +293,7 @@
(le/delete ch dead-exchange-name))))

(testing "it creates queues with suffixes in the range [1, retry-count] when exponential backoff is enabled"
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(let [stream-routes {:default {:handler-fn #(constantly :success)}}
retry-count (get-in (ziggurat-config) [:retry :count])
instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config))))
Expand Down Expand Up @@ -334,7 +334,7 @@
(is (= 0 @counter)))))

(testing "it creates queues with topic entity for channels only"
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(let [stream-routes {:default {:handler-fn #(constantly :success) :channel-1 #(constantly :success)}}
instant-queue-suffix (:queue-name (:instant (rabbitmq-config)))
instant-exchange-suffix (:exchange-name (:instant (rabbitmq-config)))
Expand Down Expand Up @@ -369,7 +369,7 @@
:stream-router {:default {:channels {:channel-1 {:retry {:enabled false}}}}}))]

(testing "it creates instant queues with topic entity for channels only"
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(let [stream-routes {:default {:handler-fn #(constantly :success) :channel-1 #(constantly :success)}}
instant-queue-suffix (:queue-name (:instant (rabbitmq-config)))
instant-exchange-suffix (:exchange-name (:instant (rabbitmq-config)))
Expand Down
12 changes: 6 additions & 6 deletions test/ziggurat/messaging/rabbitmq/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@
mock-mapper-fn (fn [message]
(when (= message-payload message)
(reset! is-mocked-mpr-fn-called? true)))]
(rmq-producer/create-and-bind-queue @rmqw/connection queue-name exchange-name false)
(rmq-producer/publish @rmqw/connection exchange-name message-payload nil)
(rmq-producer/create-and-bind-queue (rmqw/get-connection) queue-name exchange-name false)
(rmq-producer/publish (rmqw/get-connection) exchange-name message-payload nil)
(rmqw/start-subscriber 1 mock-mapper-fn queue-name)
(Thread/sleep 5000)
(is (true? @is-mocked-mpr-fn-called?)))))
Expand All @@ -154,7 +154,7 @@
(is (= message-arg message)))
topic-entity-name (name topic-entity)]
(producer/publish-to-dead-queue message)
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
[meta payload] (lb/get ch prefixed-queue-name false)
Expand All @@ -172,7 +172,7 @@
topic-entity-name (name topic-entity)]
(producer/publish-to-dead-queue message)
(with-redefs [consumer/read-messages-from-queue (fn [_ _ _ _] nil)]
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
[meta payload] (lb/get ch prefixed-queue-name false)
Expand All @@ -189,7 +189,7 @@
(throw (Exception. "exception message")))
topic-entity-name (name topic-entity)]
(producer/publish-to-dead-queue message)
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
[meta payload] (lb/get ch prefixed-queue-name false)
Expand All @@ -203,7 +203,7 @@
processing-fn (fn [_] ())
topic-entity-name (name topic-entity)]
(producer/publish-to-dead-queue message)
(with-open [ch (lch/open @rmqw/connection)]
(with-open [ch (lch/open (rmqw/get-connection))]
(with-redefs [ziggurat.messaging.consumer/convert-to-message-payload (fn [] (throw (Exception. "exception message")))]
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
Expand Down
17 changes: 8 additions & 9 deletions test/ziggurat/messaging/rabbitmq_wrapper_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
(with-redefs [rmq-connection/start-connection (fn [_] (reset! start-connection-called-count true) {:foo "bar"})]
(rmqw/start-connection config stream-routes)
(is (= true @start-connection-called-count))
(is (= {:foo "bar"} @rmqw/connection)))
(is (= {:foo "bar"} (rmqw/get-connection))))
(reset-connection-atom)))

(testing "start-connection should not call `rmq-connection/start-connection` function if retries are disabled and connection atom is nil"
Expand All @@ -36,7 +36,7 @@
(with-redefs [rmq-connection/start-connection (fn [_] (reset! start-connection-called? true))]
(rmqw/start-connection config stream-routes)
(is (= false @start-connection-called?))
(is (= @rmqw/connection nil))))
(is (= (rmqw/get-connection) nil))))
(reset-connection-atom)))

(deftest stop-connection-test
Expand All @@ -47,7 +47,7 @@
config (assoc default-config
:ziggurat {:retry {:enabled true}})]
(with-redefs [rmq-connection/stop-connection (fn [_ _] (reset! stop-connection-called? true))
rmqw/connection (atom {:foo "bar"})]
rmqw/get-connection (constantly {:foo "bar"})]
(rmqw/stop-connection config stream-routes)
(is (= true @stop-connection-called?))))
(reset-connection-atom))
Expand All @@ -59,7 +59,7 @@
config (assoc default-config
:ziggurat {:retry {:enabled true}})]
(with-redefs [rmq-connection/stop-connection (fn [_ _] (reset! stop-connection-called? true))
rmqw/connection (atom nil)]
rmqw/get-connection (constantly nil)]
(rmqw/stop-connection config stream-routes)
(is (= false @stop-connection-called?))))
(reset-connection-atom)))
Expand All @@ -77,23 +77,22 @@
(rmqw/start-connection config stream-routes)
(rmqw/start-connection config stream-routes)
(is (= true @start-connection-called-count))
(is (= mock-object @rmqw/connection)))
(is (= mock-object (rmqw/get-connection))))
(reset-connection-atom))))

(deftest stop-connection-idempotency-test
(testing "It should not set the connection atom if it has already been set"
(testing "It should not reset the connection atom if connection has already been stopped"
(let [default-config config/config
stop-connection-called-count (atom 0)
stream-routes {:default {:handler-fn (constantly nil)}}
config (assoc default-config
:ziggurat {:retry {:enabled true}})]
(with-redefs [rmqw/connection (atom {:foo "bar"})
(with-redefs [rmqw/get-connection (constantly nil)
rmq-connection/stop-connection (fn [_ _] (swap! stop-connection-called-count inc))]
(rmqw/stop-connection config stream-routes)
(rmqw/stop-connection config stream-routes)
(rmqw/stop-connection config stream-routes)
(is (= 1 @stop-connection-called-count))
(is (= nil @rmqw/connection)))
(is (= 0 @stop-connection-called-count)))
(reset-connection-atom))))

(deftest create-and-bind-queue-test
Expand Down

0 comments on commit d7f9b4c

Please sign in to comment.