From b42ab109c8aad437f9659cbdc3609ace29854771 Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Mon, 9 May 2022 16:48:38 +0530 Subject: [PATCH 1/3] using channel prefetch count while creating subscribers --- src/ziggurat/messaging/consumer.clj | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index f2e5f035..97b93c95 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -142,11 +142,12 @@ (let [channel-key (first channel) channel-handler-fn (second channel)] (dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])] - (start-subscriber* (lch/open consumer-connection) - 1 - (util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) - (mpr/channel-mapper-func channel-handler-fn channel-key) - topic-entity))))) + (let [channel-prefetch-count (get-in-config [:stream-router topic-entity :channels channel-key :prefetch-count])] + (start-subscriber* (lch/open consumer-connection) + channel-prefetch-count + (util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) + (mpr/channel-mapper-func channel-handler-fn channel-key) + topic-entity)))))) (defn start-subscribers "Starts the subscriber to the instant queue of the rabbitmq" From 4847b1c217eba0a906f791184110db0ec42dada6 Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Thu, 12 May 2022 15:38:14 +0530 Subject: [PATCH 2/3] using user provided channel prefetch count while creating channel subscribers --- project.clj | 2 +- src/ziggurat/messaging/consumer.clj | 4 ++- test/ziggurat/messaging/consumer_test.clj | 35 ++++++++++++++++++++++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/project.clj b/project.clj index effa5d5f..63929eb1 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,7 @@ (cemerick.pomegranate.aether/register-wagon-factory! "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) -(defproject tech.gojek/ziggurat "4.6.3" +(defproject tech.gojek/ziggurat "4.6.4" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index 97b93c95..c1ee53d0 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -14,6 +14,8 @@ [ziggurat.util.error :refer [report-error]] [cambium.core :as clog])) +(def DEFAULT_CHANNEL_PREFETCH_COUNT 20) + (defn- reject-message [ch delivery-tag] (lb/reject ch delivery-tag)) @@ -142,7 +144,7 @@ (let [channel-key (first channel) channel-handler-fn (second channel)] (dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])] - (let [channel-prefetch-count (get-in-config [:stream-router topic-entity :channels channel-key :prefetch-count])] + (let [channel-prefetch-count (get-in-config [:stream-router topic-entity :channels channel-key :prefetch-count] DEFAULT_CHANNEL_PREFETCH_COUNT)] (start-subscriber* (lch/open consumer-connection) channel-prefetch-count (util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index 83cdcc25..e3f0c979 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -11,7 +11,8 @@ [ziggurat.messaging.util :refer [prefixed-queue-name]] [ziggurat.tracer :refer [tracer]] [ziggurat.util.error :refer [report-error]] - [ziggurat.util.rabbitmq :as util])) + [ziggurat.util.rabbitmq :as util]) + (:import (com.rabbitmq.client Channel))) (use-fixtures :once (join-fixtures [fix/init-rabbit-mq fix/silence-logging @@ -216,6 +217,38 @@ (is (= 1 @call-counter)) (util/close rmq-ch)))))) +(deftest channel-prefetch-count-test + (testing "Default prefetch-count is used while creating channel subscribers if prefetch-count is not configured explicitly" + (let [prefetch-count-used (atom 0) + channel :channel-1 + channel-fn (fn [_]) + original-zig-config (ziggurat-config) + original-lb-qos lb/qos] + (fix/with-queues {topic-entity {:handler-fn #(constantly nil) + channel channel-fn}} + (with-redefs [ziggurat-config (fn [] (-> original-zig-config + (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1)))) + lb/qos (fn [^Channel _ ^long prefetch-count] + (reset! prefetch-count-used prefetch-count))] + (consumer/start-channels-subscriber {:channel-1 (fn [_])} topic-entity) + (is (= consumer/DEFAULT_CHANNEL_PREFETCH_COUNT @prefetch-count-used)))))) + + (testing "prefetch-count provided in configuration is used while creating channel subscribers" + (let [prefetch-count-used (atom 0) + expected-prefetch-count 50 + channel :channel-1 + channel-fn (fn [_]) + original-zig-config (ziggurat-config)] + (fix/with-queues {topic-entity {:handler-fn #(constantly nil) + channel channel-fn}} + (with-redefs [ziggurat-config (fn [] (-> original-zig-config + (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1)) + (update-in [:stream-router topic-entity :channels channel :prefetch-count] (constantly expected-prefetch-count)))) + lb/qos (fn [^Channel _ ^long prefetch-count] + (reset! prefetch-count-used prefetch-count))] + (consumer/start-channels-subscriber {channel channel-fn} topic-entity) + (is (= expected-prefetch-count @prefetch-count-used))))))) + (deftest start-retry-subscriber-test (testing "creates a span when tracer is enabled" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} From c826c2220793ce6176a1855483d1b0002e1ead77 Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Fri, 13 May 2022 10:22:51 +0530 Subject: [PATCH 3/3] linting fixes --- test/ziggurat/messaging/consumer_test.clj | 26 +++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index e3f0c979..92a5cf57 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -226,12 +226,12 @@ original-lb-qos lb/qos] (fix/with-queues {topic-entity {:handler-fn #(constantly nil) channel channel-fn}} - (with-redefs [ziggurat-config (fn [] (-> original-zig-config - (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1)))) - lb/qos (fn [^Channel _ ^long prefetch-count] - (reset! prefetch-count-used prefetch-count))] - (consumer/start-channels-subscriber {:channel-1 (fn [_])} topic-entity) - (is (= consumer/DEFAULT_CHANNEL_PREFETCH_COUNT @prefetch-count-used)))))) + (with-redefs [ziggurat-config (fn [] (-> original-zig-config + (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1)))) + lb/qos (fn [^Channel _ ^long prefetch-count] + (reset! prefetch-count-used prefetch-count))] + (consumer/start-channels-subscriber {:channel-1 (fn [_])} topic-entity) + (is (= consumer/DEFAULT_CHANNEL_PREFETCH_COUNT @prefetch-count-used)))))) (testing "prefetch-count provided in configuration is used while creating channel subscribers" (let [prefetch-count-used (atom 0) @@ -241,13 +241,13 @@ original-zig-config (ziggurat-config)] (fix/with-queues {topic-entity {:handler-fn #(constantly nil) channel channel-fn}} - (with-redefs [ziggurat-config (fn [] (-> original-zig-config - (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1)) - (update-in [:stream-router topic-entity :channels channel :prefetch-count] (constantly expected-prefetch-count)))) - lb/qos (fn [^Channel _ ^long prefetch-count] - (reset! prefetch-count-used prefetch-count))] - (consumer/start-channels-subscriber {channel channel-fn} topic-entity) - (is (= expected-prefetch-count @prefetch-count-used))))))) + (with-redefs [ziggurat-config (fn [] (-> original-zig-config + (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1)) + (update-in [:stream-router topic-entity :channels channel :prefetch-count] (constantly expected-prefetch-count)))) + lb/qos (fn [^Channel _ ^long prefetch-count] + (reset! prefetch-count-used prefetch-count))] + (consumer/start-channels-subscriber {channel channel-fn} topic-entity) + (is (= expected-prefetch-count @prefetch-count-used))))))) (deftest start-retry-subscriber-test (testing "creates a span when tracer is enabled"