Skip to content

Commit

Permalink
Merge pull request #192 from mjayprateek/offset_commit_only_on_atleas…
Browse files Browse the repository at this point in the history
…t_one_records

commit offset only when non-zero  records are polled
  • Loading branch information
mjayprateek committed Nov 23, 2020
2 parents 063eff9 + cd13ad0 commit 06d5193
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. This change

## Unreleased Changes

## 3.6.1
- Changed the logic for committing offsets to only commit only when non-zero records are polled while
consuming via Kafka Consumer API

## 3.6.0
- Error reporting done to newrelic along with sentry.

Expand Down
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.6.0"
(defproject tech.gojek/ziggurat "3.6.1"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
7 changes: 4 additions & 3 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Expand Up @@ -75,9 +75,10 @@
[^Consumer consumer handler-fn topic-entity consumer-config]
(try
(loop [records []]
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload))
(commit-offsets consumer topic-entity)
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload))
(commit-offsets consumer topic-entity))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e)
(catch Exception e
Expand Down
59 changes: 45 additions & 14 deletions test/ziggurat/kafka_consumer/consumer_handler_test.clj
Expand Up @@ -14,18 +14,54 @@

(use-fixtures :once fix/mount-only-config)

(def dummy-consumer-records
(let [topic-partition (TopicPartition. "string" 1)
individual-consumer-record (ConsumerRecord. "topic" 1 2 "hello" "world")
list-of-consumer-records (doto (ArrayList.) (.add individual-consumer-record))
map-of-partition-and-records (doto (HashMap.) (.put topic-partition list-of-consumer-records))]
(ConsumerRecords. map-of-partition-and-records)))

(deftest consumer-polling-test
(testing "should commit only if non-zero records are polled"
(let [expected-calls 2
actual-calls (atom 0)
commit-called (atom false)
kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
dummy-consumer-records)
(commitSync [_] (reset! commit-called true))
(close [_]))]
(with-redefs [ch/process (fn [_ _]
(if (< @actual-calls 2)
(swap! actual-calls inc)
(throw (WakeupException.))))
metrics/increment-count (constantly nil)]
(ch/poll-for-messages kafka-consumer nil :random-consumer-id {:consumer-group-id "some-id" :poll-timeout-ms-config 1000})
(is (= expected-calls @actual-calls))
(is (true? @commit-called)))))
(testing "should not commit if no records are polled"
(let [expected-calls 0
actual-calls (atom 0)
commit-called (atom false)
kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
[])
(commitSync [_] (reset! commit-called true))
(close [_]))]
(with-redefs [ch/process (fn [_ _]
(if (< @actual-calls 2)
(swap! actual-calls inc)
(throw (WakeupException.))))
metrics/increment-count (constantly nil)]
(ch/poll-for-messages kafka-consumer nil :random-consumer-id {:consumer-group-id "some-id" :poll-timeout-ms-config 1000})
(is (= expected-calls @actual-calls))
(is (false? @commit-called)))))
(testing "should keep on polling even if commitSync call on KafkaConsumer throws an exception and publishes the metrics"
(let [topic-partition (TopicPartition. "string" 1)
individual-consumer-record (ConsumerRecord. "topic" 1 2 "hello" "world")
list-of-consumer-records (doto (ArrayList.) (.add individual-consumer-record))
map-of-partition-and-records (doto (HashMap.) (.put topic-partition list-of-consumer-records))
records (ConsumerRecords. map-of-partition-and-records)
expected-calls 2
(let [expected-calls 2
actual-calls (atom 0)
kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
records)
dummy-consumer-records)
(commitSync [_]
(throw (Exception. "Commit exception")))
(close [_]))]
Expand All @@ -40,14 +76,9 @@
(ch/poll-for-messages kafka-consumer nil :random-consumer-id {:consumer-group-id "some-id" :poll-timeout-ms-config 1000})
(is (= expected-calls @actual-calls)))))
(testing "Exceptions other than WakeupException are caught"
(let [topic-partition (TopicPartition. "string" 1)
individual-consumer-record (ConsumerRecord. "topic" 1 2 "hello" "world")
list-of-consumer-records (doto (ArrayList.) (.add individual-consumer-record))
map-of-partition-and-records (doto (HashMap.) (.put topic-partition list-of-consumer-records))
records (ConsumerRecords. map-of-partition-and-records)
kafka-consumer (reify Consumer
(let [kafka-consumer (reify Consumer
(^ConsumerRecords poll [_ ^Duration _]
records)
dummy-consumer-records)
(commitSync [_])
(close [_]))]
(with-redefs [ch/process (fn [_ _] (throw (Exception.)))
Expand Down

0 comments on commit 06d5193

Please sign in to comment.