Skip to content

Commit

Permalink
Fixes a bug and updates the test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Vijaywargiya committed Sep 14, 2022
1 parent 95f0f14 commit 2e06282
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
6 changes: 3 additions & 3 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
topic-entity
channel-key)]
(conj consumer-tags consumer-tag))) [] (range (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])))]
(update channel-consumer-tags-map channel-key consumer-tags-for-channel)))
(assoc channel-consumer-tags-map channel-key consumer-tags-for-channel)))
{} channels))

(defn start-subscribers
Expand All @@ -161,15 +161,15 @@
channel-consumer-tags-map (start-channels-subscriber ch channels topic-entity)
retry-consumer-tags (start-retry-subscriber* ch (mpr/mapper-func handler (keys channels)) topic-entity)]

(update subscriber-map topic-entity {:retry retry-consumer-tags :channels channel-consumer-tags-map})))
(assoc subscriber-map topic-entity {:retry retry-consumer-tags :channels channel-consumer-tags-map})))
{} stream-routes)

batch-consumers (reduce (fn [subscriber-map batch-route]
(let [topic-entity (first batch-route)
handler (-> batch-route second :handler-fn)
consumer-tags (start-retry-subscriber* ch (fn [message] (ch/process handler message)) topic-entity)]

(update subscriber-map topic-entity {:retry consumer-tags}))) {} batch-routes)
(assoc subscriber-map topic-entity {:retry consumer-tags}))) {} batch-routes)
data {:rabbitmq-channel ch :stream-consumers stream-consumers :batch-consumers batch-consumers}]
data)))

Expand Down
5 changes: 2 additions & 3 deletions test/ziggurat/messaging/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@

(deftest stop-consumers

(testing "when valid consumer-tags are passed, it stops the consumers"
(testing "when consumer is valid, it stops the consumers"
(fix/with-queues {topic-entity {:handler-fn #(constantly nil)}}
(let [original-zig-config (ziggurat-config)]
(with-redefs [ziggurat-config (fn [] (-> original-zig-config
Expand All @@ -230,8 +230,7 @@

(-> (mount/with-args {:stream-routes {topic-entity {:handler-fn #(constantly nil)}}})
(mount/start #'consumer/consumers))

(is (-> (:consumer-tags consumer/consumers) empty? not))
(is (-> (get-in consumer/consumers [:stream-consumers topic-entity :retry]) empty? not))
(mount/stop #'consumer/consumers)
(is (= (type consumer/consumers) mount.core.NotStartedState)))))))

Expand Down

0 comments on commit 2e06282

Please sign in to comment.