Skip to content

Commit

Permalink
Merge 1ec16ff into 565831f
Browse files Browse the repository at this point in the history
  • Loading branch information
anmol1vw13 committed Mar 2, 2021
2 parents 565831f + 1ec16ff commit 10750dc
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 83 deletions.
1 change: 1 addition & 0 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
:bootstrap-servers "localhost:9092"
:max-poll-records [1000 :int]
:origin-topic "topic"
:commit-interval-ms [5000 :int]
:poll-timeout-ms-config [1000 :int]
:thread-count [2 :int]
:session-timeout-ms-config [60000 :int]
Expand Down
1 change: 1 addition & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
:bootstrap-servers "localhost:9092"
:max-poll-records [1000 :int]
:origin-topic "topic"
:commit-interval-ms [5000 :int]
:poll-timeout-ms-config [1000 :int]
:thread-count [2 :int]
:session-timeout-ms-config [60000 :int]
Expand Down
41 changes: 27 additions & 14 deletions src/ziggurat/kafka_consumer/consumer.clj
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
(ns ziggurat.kafka-consumer.consumer
(:require [clojure.tools.logging :as log]
[ziggurat.kafka-consumer.consumer-handler :refer :all])
[ziggurat.kafka-consumer.consumer-handler :refer :all]
[ziggurat.util.map :as umap])
(:import (java.util Map Properties)
(org.apache.kafka.clients.consumer KafkaConsumer ConsumerConfig)
(java.util.regex Pattern)))

(def default-consumer-config
{:commit-interval-ms 15000
:max-poll-records 500
:session-timeout-ms-config 60000
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"})

(defn- build-consumer-properties-map
[consumer-group-config]
[{:keys [bootstrap-servers
consumer-group-id
max-poll-records
session-timeout-ms-config
commit-interval-ms
key-deserializer-class-config
value-deserializer-class-config]}]
(doto (Properties.)
(.putAll {ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG (:bootstrap-servers consumer-group-config)
ConsumerConfig/GROUP_ID_CONFIG (:consumer-group-id consumer-group-config)
ConsumerConfig/MAX_POLL_RECORDS_CONFIG (int (or (:max-poll-records consumer-group-config) 500))
ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG (int (or (:session-timeout-ms-config consumer-group-config) 60000))
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG false
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG (or (:key-deserializer-class-config consumer-group-config)
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG (or (:value-deserializer-class-config consumer-group-config)
"org.apache.kafka.common.serialization.ByteArrayDeserializer")})))

(.putAll {ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers
ConsumerConfig/GROUP_ID_CONFIG consumer-group-id
ConsumerConfig/MAX_POLL_RECORDS_CONFIG (int max-poll-records)
ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG (int session-timeout-ms-config)
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG true
ConsumerConfig/AUTO_COMMIT_INTERVAL_MS_CONFIG (int commit-interval-ms)
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG key-deserializer-class-config
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG value-deserializer-class-config})))
(defn create-consumer
[topic-entity consumer-group-config]
(try
(let [consumer (KafkaConsumer. ^Map (build-consumer-properties-map consumer-group-config))
topic-pattern (Pattern/compile (:origin-topic consumer-group-config))]
(let [merged-consumer-group-config (umap/deep-merge consumer-group-config default-consumer-config)
consumer (KafkaConsumer. ^Map (build-consumer-properties-map merged-consumer-group-config))
topic-pattern (Pattern/compile (:origin-topic merged-consumer-group-config))]
(.subscribe consumer topic-pattern)
consumer)
(catch Exception e
Expand Down
14 changes: 3 additions & 11 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@
(log/errorf e "[Consumer Group: %s] Exception received while processing messages \n" topic-entity)
(retry batch-payload))))))

(defn- commit-offsets
[consumer topic-entity]
(try
(.commitSync consumer)
(catch Exception e
(metrics/increment-count batch-consumption-metric-ns "commit.failed.exception" 1 {:topic-entity (name topic-entity)})
(log/error "Exception while committing offsets:" e))))

(defn- create-batch-payload
[records topic-entity]
(let [key-value-pairs (map (fn [^ConsumerRecord m]
Expand All @@ -90,10 +82,10 @@
(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload))
(commit-offsets consumer topic-entity))
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e)
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
Expand Down
58 changes: 1 addition & 57 deletions test/ziggurat/kafka_consumer/consumer_handler_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,64 +22,10 @@
(ConsumerRecords. map-of-partition-and-records)))

(deftest consumer-polling-test
(testing "should commit only if non-zero records are polled"
(let [expected-calls 2
actual-calls (atom 0)
commit-called (atom false)
kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
dummy-consumer-records)
(commitSync [_] (reset! commit-called true))
(close [_]))]
(with-redefs [ch/process (fn [_ _]
(if (< @actual-calls 2)
(swap! actual-calls inc)
(throw (WakeupException.))))
metrics/increment-count (constantly nil)]
(ch/poll-for-messages kafka-consumer nil :random-consumer-id {:consumer-group-id "some-id" :poll-timeout-ms-config 1000})
(is (= expected-calls @actual-calls))
(is (true? @commit-called)))))
(testing "should not commit if no records are polled"
(let [expected-calls 0
actual-calls (atom 0)
commit-called (atom false)
kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
[])
(commitSync [_] (reset! commit-called true))
(close [_]))]
(with-redefs [ch/process (fn [_ _]
(if (< @actual-calls 2)
(swap! actual-calls inc)
(throw (WakeupException.))))
metrics/increment-count (constantly nil)]
(ch/poll-for-messages kafka-consumer nil :random-consumer-id {:consumer-group-id "some-id" :poll-timeout-ms-config 1000})
(is (= expected-calls @actual-calls))
(is (false? @commit-called)))))
(testing "should keep on polling even if commitSync call on KafkaConsumer throws an exception and publishes the metrics"
(let [expected-calls 2
actual-calls (atom 0)
kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
dummy-consumer-records)
(commitSync [_]
(throw (Exception. "Commit exception")))
(close [_]))]
(with-redefs [ch/process (fn [_ _]
(if (< @actual-calls 2)
(swap! actual-calls inc)
(throw (WakeupException.))))
metrics/increment-count (fn [metric-namespace metrics _ tags]
(is (= metric-namespace ["ziggurat.batch.consumption" "message.processed"]))
(is (= metrics "commit.failed.exception"))
(is (= "random-consumer-id" (:topic-entity tags))))]
(ch/poll-for-messages kafka-consumer nil :random-consumer-id {:consumer-group-id "some-id" :poll-timeout-ms-config 1000})
(is (= expected-calls @actual-calls)))))
(testing "Exceptions other than WakeupException are caught"
(let [kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
dummy-consumer-records)
(commitSync [_])
ziggurat.kafka-consumer.consumer-handler-test/dummy-consumer-records)
(close [_]))]
(with-redefs [ch/process (fn [_ _] (throw (Exception.)))
metrics/increment-count (constantly nil)]
Expand All @@ -96,7 +42,6 @@
(^ConsumerRecords poll [_ ^Duration timeout]
(reset! actual-poll-timeout (.toMillis timeout))
records)
(commitSync [_])
(close [_]))]
(with-redefs [ch/process (fn [_ _]
(if (< @process-calls 1)
Expand All @@ -119,7 +64,6 @@
(swap! is-polled inc)
records)
(throw (WakeupException.))))
(commitSync [_])
(close [_]))]
(with-redefs [ch/process (fn [batch-handler message]
(when-not (empty? (:batch message))
Expand Down
16 changes: 15 additions & 1 deletion test/ziggurat/kafka_consumer/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
(:require [ziggurat.kafka-consumer.consumer :refer [create-consumer]]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.fixtures :as fix])
(:import (org.apache.kafka.clients.consumer KafkaConsumer)))
(:import (org.apache.kafka.clients.consumer KafkaConsumer)
(java.util Properties)))

(use-fixtures :once fix/mount-only-config)

Expand All @@ -14,6 +15,19 @@
(is (contains? (set (keys (.listTopics consumer))) expected-origin-topic))
(.unsubscribe consumer)
(.close consumer)))
(testing "uses the config supplied by the user while creating the consumer"
(let [ziggurat-consumer-config (get-in (ziggurat-config) [:batch-routes :consumer-1])
consumer-properties (#'ziggurat.kafka-consumer.consumer/build-consumer-properties-map ziggurat-consumer-config)]
(with-redefs [ziggurat.kafka-consumer.consumer/build-consumer-properties-map (fn [consumer-config] (is (= (:commit-interval-ms consumer-config) (:commit-interval-ms ziggurat-consumer-config)))
consumer-properties)]
(.close (create-consumer :consumer-1 ziggurat-consumer-config)))))
(testing "uses the default config for a specific property if the config supplied by the user does not have it"
(let [ziggurat-consumer-config (get-in (ziggurat-config) [:batch-routes :consumer-1])
consumer-config-without-commit-interval (dissoc ziggurat-consumer-config :commit-interval-ms)
consumer-properties (#'ziggurat.kafka-consumer.consumer/build-consumer-properties-map ziggurat-consumer-config)]
(with-redefs [ziggurat.kafka-consumer.consumer/build-consumer-properties-map (fn [consumer-config] (is (= (:commit-interval-ms consumer-config) (:commit-interval-ms ziggurat.kafka-consumer.consumer/default-consumer-config)))
consumer-properties)]
(.close (create-consumer :consumer-1 consumer-config-without-commit-interval)))))
(testing "returns nil when invalid configs are provided (KafkaConsumer throws Exception)"
(let [consumer-config (get-in (ziggurat-config) [:batch-routes :consumer-1])]
(is (= nil (create-consumer :consumer-1 (assoc-in consumer-config [:consumer-group-id] nil)))))))

0 comments on commit 10750dc

Please sign in to comment.