Skip to content

Commit

Permalink
Merge c826c22 into 505ff6e
Browse files Browse the repository at this point in the history
  • Loading branch information
mjayprateek committed May 13, 2022
2 parents 505ff6e + c826c22 commit d0499bf
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.6.3"
(defproject tech.gojek/ziggurat "4.6.4"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
13 changes: 8 additions & 5 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
[ziggurat.util.error :refer [report-error]]
[cambium.core :as clog]))

(def DEFAULT_CHANNEL_PREFETCH_COUNT 20)

(defn- reject-message
[ch delivery-tag]
(lb/reject ch delivery-tag))
Expand Down Expand Up @@ -142,11 +144,12 @@
(let [channel-key (first channel)
channel-handler-fn (second channel)]
(dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])]
(start-subscriber* (lch/open consumer-connection)
1
(util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
(mpr/channel-mapper-func channel-handler-fn channel-key)
topic-entity)))))
(let [channel-prefetch-count (get-in-config [:stream-router topic-entity :channels channel-key :prefetch-count] DEFAULT_CHANNEL_PREFETCH_COUNT)]
(start-subscriber* (lch/open consumer-connection)
channel-prefetch-count
(util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
(mpr/channel-mapper-func channel-handler-fn channel-key)
topic-entity))))))

(defn start-subscribers
"Starts the subscriber to the instant queue of the rabbitmq"
Expand Down
35 changes: 34 additions & 1 deletion test/ziggurat/messaging/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
[ziggurat.messaging.util :refer [prefixed-queue-name]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.error :refer [report-error]]
[ziggurat.util.rabbitmq :as util]))
[ziggurat.util.rabbitmq :as util])
(:import (com.rabbitmq.client Channel)))

(use-fixtures :once (join-fixtures [fix/init-rabbit-mq
fix/silence-logging
Expand Down Expand Up @@ -216,6 +217,38 @@
(is (= 1 @call-counter))
(util/close rmq-ch))))))

(deftest channel-prefetch-count-test
(testing "Default prefetch-count is used while creating channel subscribers if prefetch-count is not configured explicitly"
(let [prefetch-count-used (atom 0)
channel :channel-1
channel-fn (fn [_])
original-zig-config (ziggurat-config)
original-lb-qos lb/qos]
(fix/with-queues {topic-entity {:handler-fn #(constantly nil)
channel channel-fn}}
(with-redefs [ziggurat-config (fn [] (-> original-zig-config
(update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1))))
lb/qos (fn [^Channel _ ^long prefetch-count]
(reset! prefetch-count-used prefetch-count))]
(consumer/start-channels-subscriber {:channel-1 (fn [_])} topic-entity)
(is (= consumer/DEFAULT_CHANNEL_PREFETCH_COUNT @prefetch-count-used))))))

(testing "prefetch-count provided in configuration is used while creating channel subscribers"
(let [prefetch-count-used (atom 0)
expected-prefetch-count 50
channel :channel-1
channel-fn (fn [_])
original-zig-config (ziggurat-config)]
(fix/with-queues {topic-entity {:handler-fn #(constantly nil)
channel channel-fn}}
(with-redefs [ziggurat-config (fn [] (-> original-zig-config
(update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1))
(update-in [:stream-router topic-entity :channels channel :prefetch-count] (constantly expected-prefetch-count))))
lb/qos (fn [^Channel _ ^long prefetch-count]
(reset! prefetch-count-used prefetch-count))]
(consumer/start-channels-subscriber {channel channel-fn} topic-entity)
(is (= expected-prefetch-count @prefetch-count-used)))))))

(deftest start-retry-subscriber-test
(testing "creates a span when tracer is enabled"
(fix/with-queues {topic-entity {:handler-fn #(constantly nil)}}
Expand Down

0 comments on commit d0499bf

Please sign in to comment.