Skip to content

Commit

Permalink
A unit test to ensure only topic-entities which are provided in init-…
Browse files Browse the repository at this point in the history
…args are started by consumer_driver
  • Loading branch information
mjayprateek committed Oct 5, 2020
1 parent b1d98bb commit 623061e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/ziggurat/kafka_consumer/consumer_driver.clj
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
(defn- start-consumers [consumer-configs init-args]
(log/info "Starting consumers")
(reduce (fn [consumer-groups [topic-entity init-arg]]
(let [consumer-config (get consumer-configs topic-entity)]
(assoc consumer-groups topic-entity (start-consumers-per-group topic-entity consumer-config init-arg))))
(if-some [consumer-config (get consumer-configs topic-entity)]
(assoc consumer-groups topic-entity (start-consumers-per-group topic-entity consumer-config init-arg))
consumer-groups))
{}
init-args))

Expand Down
21 changes: 21 additions & 0 deletions test/ziggurat/kafka_consumer/consumer_driver_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,27 @@
(is (= @stopped-consumer-count 6))
(is (= @stopped-consumer-groups-count 2))))))

(deftest should-create-consumers-only-for-topic-entities-provided-in-batch-routes
(testing "should create consumers only for those topic-entities which have been provided in init-args"
(let [valid-topic-entities #{:consumer-1}]
(with-redefs [ct/create-consumer (fn [topic-entity consumer-config]
(is (contains? valid-topic-entities topic-entity))
(is (not (nil? consumer-config)))
;; returning a dummy data instead of a consumer
{:dummy-key :dummy-value})
cast (constantly nil)
cd/stop-consumers (constantly nil)]
(-> (mount/only [#'consumer-groups])
(mount/with-args {:consumer-1 {:handler-fn dummy-handler-fn}
:invalid-batch-route {:handler-fn dummy-handler-fn}})
(mount/start))
(is (= 1 (count consumer-groups)))
(is (= 2 (count (:consumer-1 consumer-groups))))
(is (nil? (:consumer-2 consumer-groups)))
(is (nil? (:invalid-batch-route consumer-groups)))
(-> (mount/only [#'consumer-groups])
(mount/stop))))))

(deftest create-consumers-per-group-with-default-thread-count-test
(testing "should create the consumers according to default thread count when thread count config is not available"
(let [create-consumer-called (atom 0)]
Expand Down

0 comments on commit 623061e

Please sign in to comment.