Skip to content

Commit

Permalink
Merge pull request #185 from mistrys47/batch_consumers_fix
Browse files Browse the repository at this point in the history
Batch consumers fix
  • Loading branch information
mjayprateek committed Oct 5, 2020
2 parents d76666d + 79191ca commit 87004b2
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 28 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. This change

## Unreleased Changes

## 3.5.1
- Fixed publishing of metrics for batch consumption
- Fixed the startup logic for batch consumption - only routes provided in Ziggurat init-args will be started
- Standardized naming for Kafka Consumer API configs

## 3.5.0

- Adds support for consuming Kafka messages in batches using Kafka Consumer API
Expand Down
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.5.0"
(defproject tech.gojek/ziggurat "3.5.1"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
8 changes: 4 additions & 4 deletions resources/config.test.ci.edn
Expand Up @@ -70,16 +70,16 @@
:poll-timeout-ms-config [1000 :int]
:thread-count [2 :int]
:session-timeout-ms-config [60000 :int]
:key_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}
:consumer-2 {:consumer-group-id "test-consumer-2002"
:bootstrap-servers "localhost:9092"
:max-poll-records [2000 :int]
:origin-topic "topic"
:poll-timeout-ms-config [1000 :int]
:thread-count [4 :int]
:session-timeout-ms-config [60000 :int]
:key_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}
8 changes: 4 additions & 4 deletions resources/config.test.edn
Expand Up @@ -74,16 +74,16 @@
:poll-timeout-ms-config [1000 :int]
:thread-count [2 :int]
:session-timeout-ms-config [60000 :int]
:key_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}
:consumer-2 {:consumer-group-id "test-consumer-2002"
:bootstrap-servers "localhost:9092"
:max-poll-records [2000 :int]
:origin-topic "topic"
:poll-timeout-ms-config [1000 :int]
:thread-count [4 :int]
:session-timeout-ms-config [60000 :int]
:key_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value_deserializer_class_config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}
4 changes: 2 additions & 2 deletions src/ziggurat/kafka_consumer/consumer.clj
Expand Up @@ -13,9 +13,9 @@
ConsumerConfig/MAX_POLL_RECORDS_CONFIG (int (or (:max-poll-records consumer-group-config) 500))
ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG (int (or (:session-timeout-ms-config consumer-group-config) 60000))
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG false
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG (or (:key_deserializer_class_config consumer-group-config)
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG (or (:key-deserializer-class-config consumer-group-config)
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG (or (:value_deserializer_class_config consumer-group-config)
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG (or (:value-deserializer-class-config consumer-group-config)
"org.apache.kafka.common.serialization.ByteArrayDeserializer")})))

(defn create-consumer
Expand Down
11 changes: 6 additions & 5 deletions src/ziggurat/kafka_consumer/consumer_driver.clj
Expand Up @@ -17,7 +17,7 @@
(try
(.submit ^ExecutorService thread-pool ^Runnable message-poller)
(catch RejectedExecutionException e
(metrics/increment-count ["ziggurat.batch.consumption"] "thread-pool.task.rejected" 1 {:topic-entity topic-entity})
(metrics/increment-count ["ziggurat.batch.consumption"] "thread-pool.task.rejected" 1 {:topic-entity (name topic-entity)})
(log/error "message polling task was rejected by the threadpool" e))))))

(defn- start-consumers-per-group
Expand All @@ -33,11 +33,12 @@

(defn- start-consumers [consumer-configs init-args]
(log/info "Starting consumers")
(reduce (fn [consumer-groups [topic-entity consumer-config]]
(let [init-arg-for-the-consumer-group (get init-args topic-entity)]
(assoc consumer-groups topic-entity (start-consumers-per-group topic-entity consumer-config init-arg-for-the-consumer-group))))
(reduce (fn [consumer-groups [topic-entity init-arg]]
(if-some [consumer-config (get consumer-configs topic-entity)]
(assoc consumer-groups topic-entity (start-consumers-per-group topic-entity consumer-config init-arg))
consumer-groups))
{}
consumer-configs))
init-args))

(defn- stop-consumers [consumer-groups]
(do (log/info "stopping consumers")
Expand Down
6 changes: 3 additions & 3 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Expand Up @@ -13,7 +13,7 @@

(defn- publish-batch-process-metrics
[topic-entity batch-size success-count skip-count retry-count time-taken-in-millis]
(let [topic-entity-tag {:topic-entity topic-entity}]
(let [topic-entity-tag {:topic-entity (name topic-entity)}]
(metrics/increment-count batch-consumption-metric-ns "total" batch-size topic-entity-tag)
(metrics/increment-count batch-consumption-metric-ns "success" success-count topic-entity-tag)
(metrics/increment-count batch-consumption-metric-ns "skip" skip-count topic-entity-tag)
Expand Down Expand Up @@ -53,7 +53,7 @@
(retry messages-to-be-retried current-retry-count topic-entity)))))
(catch Exception e
(do
(metrics/increment-count batch-consumption-metric-ns "exception" 1 {:topic-entity topic-entity})
(metrics/increment-count batch-consumption-metric-ns "exception" 1 {:topic-entity (name topic-entity)})
(log/errorf e "[Consumer Group: %s] Exception received while processing messages \n" topic-entity)
(retry batch-payload))))))

Expand All @@ -62,7 +62,7 @@
(try
(.commitSync consumer)
(catch Exception e
(metrics/increment-count batch-consumption-metric-ns "commit.failed.exception" 1 {:topic-entity topic-entity})
(metrics/increment-count batch-consumption-metric-ns "commit.failed.exception" 1 {:topic-entity (name topic-entity)})
(log/error "Exception while committing offsets:" e))))

(defn- create-batch-payload
Expand Down
30 changes: 26 additions & 4 deletions test/ziggurat/kafka_consumer/consumer_driver_test.clj
Expand Up @@ -48,6 +48,27 @@
(is (= @stopped-consumer-count 6))
(is (= @stopped-consumer-groups-count 2))))))

(deftest should-create-consumers-only-for-topic-entities-provided-in-batch-routes
(testing "should create consumers only for those topic-entities which have been provided in init-args"
(let [valid-topic-entities #{:consumer-1}]
(with-redefs [ct/create-consumer (fn [topic-entity consumer-config]
(is (contains? valid-topic-entities topic-entity))
(is (not (nil? consumer-config)))
;; returning a dummy data instead of a consumer
{:dummy-key :dummy-value})
cast (constantly nil)
cd/stop-consumers (constantly nil)]
(-> (mount/only [#'consumer-groups])
(mount/with-args {:consumer-1 {:handler-fn dummy-handler-fn}
:invalid-batch-route {:handler-fn dummy-handler-fn}})
(mount/start))
(is (= 1 (count consumer-groups)))
(is (= 2 (count (:consumer-1 consumer-groups))))
(is (nil? (:consumer-2 consumer-groups)))
(is (nil? (:invalid-batch-route consumer-groups)))
(-> (mount/only [#'consumer-groups])
(mount/stop))))))

(deftest create-consumers-per-group-with-default-thread-count-test
(testing "should create the consumers according to default thread count when thread count config is not available"
(let [create-consumer-called (atom 0)]
Expand Down Expand Up @@ -117,13 +138,14 @@
log/error (fn [str e]
(is (= "message polling task was rejected by the threadpool" str))
(is (= RejectedExecutionHandler (class e))))
metrics/increment-count (fn [metric-namespace metrics val _]
metrics/increment-count (fn [metric-namespace metrics val tags]
(is (= metric-namespace ["ziggurat.batch.consumption"]))
(is (= metrics "thread-pool.task.rejected"))
(is (>= val 0)))]
(is (>= val 0))
(is (= "dummy-consumer-group-1" (:topic-entity tags))))]
(.shutdown ^ExecutorService thread-pool)
(-> (mount/only [#'consumer-groups])
(mount/with-args {:consumer-1 {:handler-fn dummy-handler-fn}})
(mount/with-args {:dummy-consumer-group-1 {:handler-fn dummy-handler-fn}})
(mount/start))
(is (= false @polling-started))
(-> (mount/only [#'consumer-groups])
Expand All @@ -140,7 +162,7 @@
(swap! consumers-shut-down inc))))
ch/poll-for-messages (constantly nil)]
(-> (mount/only [#'consumer-groups])
(mount/with-args {})
(mount/with-args {:dummy-consumer-group-1 {:handler-fn dummy-handler-fn}})
(mount/start))
(-> (mount/only [#'consumer-groups])
(mount/stop))
Expand Down
13 changes: 8 additions & 5 deletions test/ziggurat/kafka_consumer/consumer_handler_test.clj
Expand Up @@ -33,9 +33,10 @@
(if (< @actual-calls 2)
(swap! actual-calls inc)
(throw (WakeupException.))))
metrics/increment-count (fn [metric-namespace metrics _ _]
metrics/increment-count (fn [metric-namespace metrics _ tags]
(is (= metric-namespace ["ziggurat.batch.consumption" "message.processed"]))
(is (= metrics "commit.failed.exception")))]
(is (= metrics "commit.failed.exception"))
(is (= "random-consumer-id" (:topic-entity tags))))]
(ch/poll-for-messages kafka-consumer nil :random-consumer-id {:consumer-group-id "some-id" :poll-timeout-ms-config 1000})
(is (= expected-calls @actual-calls)))))
(testing "Exceptions other than WakeupException are caught"
Expand Down Expand Up @@ -103,7 +104,8 @@
expected-skip-count 2
expected-retry-count 2
batch-handler (fn [_] {:skip (vec (replicate expected-skip-count 0)) :retry (vec (replicate expected-retry-count 0))})]
(with-redefs [metrics/increment-count (fn [_ metrics count _]
(with-redefs [metrics/increment-count (fn [_ metrics count tags]
(is (= "consumer-1" (:topic-entity tags)))
(cond
(= metrics "total")
(is (= expected-batch-size count))
Expand All @@ -123,9 +125,10 @@
(testing "should publish metrics for exception in process message"
(let [expected-batch-size 10
batch-handler (fn [_] (throw (Exception. "exception in batch-handler")))]
(with-redefs [metrics/increment-count (fn [metric-namespace metrics _ _]
(with-redefs [metrics/increment-count (fn [metric-namespace metrics _ tags]
(is (= metric-namespace ["ziggurat.batch.consumption" "message.processed"]))
(is (= metrics "exception")))
(is (= metrics "exception"))
(is (= "consumer-1" (:topic-entity tags))))
ch/retry (fn [message]
(is (= message (mp/map->MessagePayload {:message (vec (replicate expected-batch-size 0)) :topic-entity :consumer-1 :retry-count nil}))))]
(ch/process batch-handler (mp/map->MessagePayload {:message (vec (replicate expected-batch-size 0)) :topic-entity :consumer-1 :retry-count nil})))))
Expand Down

0 comments on commit 87004b2

Please sign in to comment.