Skip to content

Commit

Permalink
Add structured logs to ziggurat logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Vijaywargiya committed Sep 1, 2021
1 parent 8444384 commit a95fcf0
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 20 deletions.
6 changes: 4 additions & 2 deletions src/ziggurat/kafka_consumer/consumer_driver.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
[clojure.tools.logging :as log]
[ziggurat.kafka-consumer.executor-service :refer :all]
[mount.core :as mount]
[ziggurat.metrics :as metrics])
[ziggurat.metrics :as metrics]
[cambium.core :as clog])
(:import (java.util.concurrent ExecutorService RejectedExecutionException)
(org.apache.kafka.clients.consumer Consumer)))

(defn- start-polling-with-consumer
[consumer init-arg topic-entity consumer-config]

(let [message-poller (cast Runnable #(ch/poll-for-messages consumer (:handler-fn init-arg) topic-entity consumer-config))]
(when message-poller
(try
Expand Down Expand Up @@ -43,7 +45,7 @@
(defn- stop-consumers [consumer-groups]
(do (log/info "stopping consumers")
(doseq [[topic-entity consumers] consumer-groups]
(log/info "Stopping threads for consumer group: " topic-entity)
(clog/info {:consumer-group topic-entity} (str "Stopping threads for consumer group: " topic-entity))
(doseq [consumer consumers]
(.wakeup ^Consumer consumer)))))

Expand Down
36 changes: 21 additions & 15 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[ziggurat.config :refer :all]
[ziggurat.messaging.producer :as producer]
[ziggurat.message-payload :refer [map->MessagePayload]]
[ziggurat.metrics :as metrics])
[ziggurat.metrics :as metrics]
[cambium.core :as clog])
(:import (org.apache.kafka.common.errors WakeupException)
(java.time Duration Instant)
(tech.gojek.ziggurat.internal InvalidReturnTypeException)
Expand Down Expand Up @@ -49,7 +50,8 @@
batch-size (count batch)]
(try
(when (not-empty batch)
(log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)
(clog/info {:batch-size batch-size} "Processing the batch")
;(log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)
(let [start-time (Instant/now)
result (batch-handler batch)
time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))]
Expand All @@ -58,7 +60,9 @@
to-be-retried-count (count messages-to-be-retried)
skip-count (count (:skip result))
success-count (- batch-size (+ to-be-retried-count skip-count))]
(log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count)

(clog/info {:messages-successfully-processed success-count :messages-skipped skip-count :messages-to-be-retried to-be-retried-count} "Batch processing complete")
;(log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count)
(publish-batch-process-metrics topic-entity batch-size success-count skip-count to-be-retried-count time-taken-in-millis)
(retry messages-to-be-retried current-retry-count topic-entity))))
(catch InvalidReturnTypeException e
Expand All @@ -77,16 +81,18 @@

(defn poll-for-messages
[^Consumer consumer handler-fn topic-entity consumer-config]
(try
(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer)))))
(clog/with-logging-context {:consumer-group topic-entity}
(try

(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer))))))

5 changes: 3 additions & 2 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.map :as umap])
[ziggurat.util.map :as umap]
[cambium.core :as clog])
(:import [io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.tag Tags]
Expand Down Expand Up @@ -244,7 +245,7 @@
(.setUncaughtExceptionHandler stream
(reify StreamsUncaughtExceptionHandler
(^StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse handle [_ ^Throwable error] (handle-uncaught-exception (get stream-config :stream-thread-exception-response :shutdown-client) error))))
(.start stream)
(clog/with-logging-context {:consumer-group topic-entity} (.start stream))
(assoc streams topic-entity stream))
streams)))
{}
Expand Down
5 changes: 4 additions & 1 deletion test/ziggurat/init_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
[ziggurat.tracer :as tracer]
[ziggurat.fixtures :refer [with-config]]
[cambium.logback.json.flat-layout :as flat]
[cambium.codec :as codec])
[cambium.codec :as codec]
[cambium.core :as clog]
[clojure.tools.logging :as log])

(:import (io.opentracing.mock MockTracer)))

(def valid-modes-count 4)
Expand Down

0 comments on commit a95fcf0

Please sign in to comment.