diff --git a/src/ziggurat/kafka_consumer/consumer_driver.clj b/src/ziggurat/kafka_consumer/consumer_driver.clj index 445fcf9e..78037fb5 100644 --- a/src/ziggurat/kafka_consumer/consumer_driver.clj +++ b/src/ziggurat/kafka_consumer/consumer_driver.clj @@ -6,12 +6,14 @@ [clojure.tools.logging :as log] [ziggurat.kafka-consumer.executor-service :refer :all] [mount.core :as mount] - [ziggurat.metrics :as metrics]) + [ziggurat.metrics :as metrics] + [cambium.core :as clog]) (:import (java.util.concurrent ExecutorService RejectedExecutionException) (org.apache.kafka.clients.consumer Consumer))) (defn- start-polling-with-consumer [consumer init-arg topic-entity consumer-config] + (let [message-poller (cast Runnable #(ch/poll-for-messages consumer (:handler-fn init-arg) topic-entity consumer-config))] (when message-poller (try @@ -43,7 +45,7 @@ (defn- stop-consumers [consumer-groups] (do (log/info "stopping consumers") (doseq [[topic-entity consumers] consumer-groups] - (log/info "Stopping threads for consumer group: " topic-entity) + (clog/info {:consumer-group topic-entity} (str "Stopping threads for consumer group: " topic-entity)) (doseq [consumer consumers] (.wakeup ^Consumer consumer))))) diff --git a/src/ziggurat/kafka_consumer/consumer_handler.clj b/src/ziggurat/kafka_consumer/consumer_handler.clj index 2b28d73b..cb34ed37 100644 --- a/src/ziggurat/kafka_consumer/consumer_handler.clj +++ b/src/ziggurat/kafka_consumer/consumer_handler.clj @@ -3,7 +3,8 @@ [ziggurat.config :refer :all] [ziggurat.messaging.producer :as producer] [ziggurat.message-payload :refer [map->MessagePayload]] - [ziggurat.metrics :as metrics]) + [ziggurat.metrics :as metrics] + [cambium.core :as clog]) (:import (org.apache.kafka.common.errors WakeupException) (java.time Duration Instant) (tech.gojek.ziggurat.internal InvalidReturnTypeException) @@ -49,7 +50,8 @@ batch-size (count batch)] (try (when (not-empty batch) - (log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size) + (clog/info {:batch-size batch-size} "Processing the batch") + ;(log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size) (let [start-time (Instant/now) result (batch-handler batch) time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))] @@ -58,7 +60,9 @@ to-be-retried-count (count messages-to-be-retried) skip-count (count (:skip result)) success-count (- batch-size (+ to-be-retried-count skip-count))] - (log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count) + + (clog/info {:messages-successfully-processed success-count :messages-skipped skip-count :messages-to-be-retried to-be-retried-count} "Batch processing complete") + ;(log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count) (publish-batch-process-metrics topic-entity batch-size success-count skip-count to-be-retried-count time-taken-in-millis) (retry messages-to-be-retried current-retry-count topic-entity)))) (catch InvalidReturnTypeException e @@ -77,16 +81,18 @@ (defn poll-for-messages [^Consumer consumer handler-fn topic-entity consumer-config] - (try - (loop [records []] - (when (not-empty records) - (let [batch-payload (create-batch-payload records topic-entity)] - (process handler-fn batch-payload))) - (recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG)))))) - (catch WakeupException e - (log/errorf e "WakeupException while polling for messages for: %s" topic-entity)) - (catch Exception e - (log/errorf e "Exception while polling for messages for: %s" topic-entity)) - (finally (do (log/info "Closing the Kafka Consumer for: " topic-entity) - (.close consumer))))) + (clog/with-logging-context {:consumer-group topic-entity} + (try + + (loop [records []] + (when (not-empty records) + (let [batch-payload (create-batch-payload records topic-entity)] + (process handler-fn batch-payload))) + (recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG)))))) + (catch WakeupException e + (log/errorf e "WakeupException while polling for messages for: %s" topic-entity)) + (catch Exception e + (log/errorf e "Exception while polling for messages for: %s" topic-entity)) + (finally (do (log/info "Closing the Kafka Consumer for: " topic-entity) + (.close consumer)))))) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index ed3c949a..f79006af 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -9,7 +9,8 @@ [ziggurat.metrics :as metrics] [ziggurat.timestamp-transformer :as timestamp-transformer] [ziggurat.tracer :refer [tracer]] - [ziggurat.util.map :as umap]) + [ziggurat.util.map :as umap] + [cambium.core :as clog]) (:import [io.opentracing.contrib.kafka TracingKafkaUtils] [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] [io.opentracing.tag Tags] @@ -244,7 +245,7 @@ (.setUncaughtExceptionHandler stream (reify StreamsUncaughtExceptionHandler (^StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse handle [_ ^Throwable error] (handle-uncaught-exception (get stream-config :stream-thread-exception-response :shutdown-client) error)))) - (.start stream) + (clog/with-logging-context {:consumer-group topic-entity} (.start stream)) (assoc streams topic-entity stream)) streams))) {} diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index b9d2b8fc..d0c8fc72 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -11,7 +11,10 @@ [ziggurat.tracer :as tracer] [ziggurat.fixtures :refer [with-config]] [cambium.logback.json.flat-layout :as flat] - [cambium.codec :as codec]) + [cambium.codec :as codec] + [cambium.core :as clog] + [clojure.tools.logging :as log]) + (:import (io.opentracing.mock MockTracer))) (def valid-modes-count 4)