Skip to content

Commit

Permalink
Fix linting
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Vijaywargiya committed Sep 1, 2021
1 parent 773c253 commit 61f3250
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
25 changes: 12 additions & 13 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,16 @@
(defn poll-for-messages
[^Consumer consumer handler-fn topic-entity consumer-config]
(clog/with-logging-context {:consumer-group topic-entity}
(try

(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer))))))
(try
(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer))))))

2 changes: 1 addition & 1 deletion src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@
(^StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse handle [_ ^Throwable error]
(handle-uncaught-exception (get stream-config :stream-thread-exception-response :shutdown-client) error))))
(clog/with-logging-context {:consumer-group topic-entity}
(.start stream))
(.start stream))
(assoc streams topic-entity stream))
streams)))
{}
Expand Down

0 comments on commit 61f3250

Please sign in to comment.