Skip to content

Commit

Permalink
Merge 6ad5b59 into c8343fb
Browse files Browse the repository at this point in the history
  • Loading branch information
anmol1vw13 committed Aug 31, 2021
2 parents c8343fb + 6ad5b59 commit b7c2441
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 61 deletions.
13 changes: 9 additions & 4 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.11" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.13"]
[org.apache.kafka/kafka-streams "2.8.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.14.1"]
[org.apache.logging.log4j/log4j-slf4j-impl "2.14.1"]
[org.clojure/clojure "1.10.3"]
[org.clojure/tools.logging "1.1.0"]
[nrepl/nrepl "0.8.3"]
Expand All @@ -53,7 +51,14 @@
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
[metosin/ring-swagger-ui "3.46.0"]]
[metosin/ring-swagger-ui "3.46.0"]
[cambium/cambium.core "1.1.0"]
[cambium/cambium.codec-cheshire "1.0.0"]
[cambium/cambium.logback.json "0.4.4"]
[ch.qos.logback/logback-classic "1.2.3"]
[ch.qos.logback.contrib/logback-json-classic "0.1.5"]
[ch.qos.logback.contrib/logback-jackson "0.1.5"]
[net.logstash.logback/logstash-logback-encoder "6.6"]]
:deploy-repositories [["clojars" {:url "https://clojars.org/repo"
:username :env/clojars_username
:password :env/clojars_password
Expand All @@ -67,7 +72,7 @@
:global-vars {*warn-on-reflection* true}
:pedantic? :abort}
:test {:java-source-paths ["src/com" "test/com"]
:jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
:jvm-opts ["-Dlogback.configurationFile=resources/logback.test.xml"]
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
Expand Down
21 changes: 0 additions & 21 deletions resources/log4j2.test.xml

This file was deleted.

37 changes: 37 additions & 0 deletions resources/logback.test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<configuration>
<property name="type" value="${ZIGGURAT_LOG_FORMAT:-text}" />

<if condition = 'property("type").equals("json")'>
<then>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="cambium.logback.json.FlatJsonLayout">
<jsonFormatter class="ch.qos.logback.contrib.jackson.JacksonJsonFormatter">
<prettyPrint>false</prettyPrint>
</jsonFormatter>
<!-- <context>api</context> -->
<timestampFormat>yyyy-MM-dd'T'HH:mm:ss.SSS'Z'</timestampFormat>
<timestampFormatTimezoneId>UTC</timestampFormatTimezoneId>
<appendLineSeparator>true</appendLineSeparator>
</layout>
</encoder>
</appender>
</then>
</if>

<if condition = 'property("type").equals("text")'>
<then>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg { %mdc }%n
</pattern>
</layout>
</appender>
</then>
</if>

<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
9 changes: 8 additions & 1 deletion src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
[ziggurat.tracer :as tracer]
[ziggurat.util.java-util :as util]
[ziggurat.kafka-consumer.executor-service :as executor-service]
[ziggurat.kafka-consumer.consumer-driver :as consumer-driver])
[ziggurat.kafka-consumer.consumer-driver :as consumer-driver]
[cambium.codec :as codec]
[cambium.logback.json.flat-layout :as flat])
(:gen-class
:methods [^{:static true} [init [java.util.Map] void]]
:name tech.gojek.ziggurat.internal.Init))
Expand All @@ -44,6 +46,10 @@
(start-rabbitmq-connection args)
(messaging-producer/make-queues (event-routes args)))

(defn- set-properties-for-structured-logging []
(if (= (:log-format (ziggurat-config)) "json")
(flat/set-decoder! codec/destringify-val)))

(defn start-kafka-producers []
(start* #{#'kafka-producers}))

Expand Down Expand Up @@ -247,6 +253,7 @@
(try
(let [derived-modes (validate-modes modes stream-routes batch-routes actor-routes)]
(initialize-config)
(set-properties-for-structured-logging)
(validate-routes stream-routes batch-routes derived-modes)
(add-shutdown-hook stop-fn derived-modes)
(start start-fn stream-routes batch-routes actor-routes derived-modes))
Expand Down
6 changes: 4 additions & 2 deletions src/ziggurat/kafka_consumer/consumer_driver.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
[clojure.tools.logging :as log]
[ziggurat.kafka-consumer.executor-service :refer :all]
[mount.core :as mount]
[ziggurat.metrics :as metrics])
[ziggurat.metrics :as metrics]
[cambium.core :as clog])
(:import (java.util.concurrent ExecutorService RejectedExecutionException)
(org.apache.kafka.clients.consumer Consumer)))

(defn- start-polling-with-consumer
[consumer init-arg topic-entity consumer-config]

(let [message-poller (cast Runnable #(ch/poll-for-messages consumer (:handler-fn init-arg) topic-entity consumer-config))]
(when message-poller
(try
Expand Down Expand Up @@ -43,7 +45,7 @@
(defn- stop-consumers [consumer-groups]
(do (log/info "stopping consumers")
(doseq [[topic-entity consumers] consumer-groups]
(log/info "Stopping threads for consumer group: " topic-entity)
(clog/info {:consumer-group topic-entity} (str "Stopping threads for consumer group: " topic-entity))
(doseq [consumer consumers]
(.wakeup ^Consumer consumer)))))

Expand Down
37 changes: 21 additions & 16 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[ziggurat.config :refer :all]
[ziggurat.messaging.producer :as producer]
[ziggurat.message-payload :refer [map->MessagePayload]]
[ziggurat.metrics :as metrics])
[ziggurat.metrics :as metrics]
[cambium.core :as clog])
(:import (org.apache.kafka.common.errors WakeupException)
(java.time Duration Instant)
(tech.gojek.ziggurat.internal InvalidReturnTypeException)
Expand Down Expand Up @@ -49,7 +50,8 @@
batch-size (count batch)]
(try
(when (not-empty batch)
(log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)
(clog/info {:batch-size batch-size} "Processing the batch")
;(log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)
(let [start-time (Instant/now)
result (batch-handler batch)
time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))]
Expand All @@ -58,8 +60,9 @@
to-be-retried-count (count messages-to-be-retried)
skip-count (count (:skip result))
success-count (- batch-size (+ to-be-retried-count skip-count))]
(log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n"
topic-entity success-count skip-count to-be-retried-count)
(clog/info {:messages-successfully-processed success-count :messages-skipped skip-count :messages-to-be-retried to-be-retried-count} "Batch processing complete")
;(log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n"
; topic-entity success-count skip-count to-be-retried-count)
(publish-batch-process-metrics topic-entity batch-size success-count skip-count to-be-retried-count time-taken-in-millis)
(retry messages-to-be-retried current-retry-count topic-entity))))
(catch InvalidReturnTypeException e
Expand All @@ -78,16 +81,18 @@

(defn poll-for-messages
[^Consumer consumer handler-fn topic-entity consumer-config]
(try
(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer)))))
(clog/with-logging-context {:consumer-group topic-entity}
(try

(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer))))))

6 changes: 4 additions & 2 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.map :as umap])
[ziggurat.util.map :as umap]
[cambium.core :as clog])
(:import [io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.tag Tags]
Expand Down Expand Up @@ -247,7 +248,8 @@
(reify StreamsUncaughtExceptionHandler
(^StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse handle [_ ^Throwable error]
(handle-uncaught-exception (get stream-config :stream-thread-exception-response :shutdown-client) error))))
(.start stream)
(clog/with-logging-context {:consumer-group topic-entity}
(.start stream))
(assoc streams topic-entity stream))
streams)))
{}
Expand Down
8 changes: 2 additions & 6 deletions src/ziggurat/util/logging.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
(ns ziggurat.util.logging
(:require [clojure.tools.logging :as log])
(:import (org.apache.logging.log4j ThreadContext)))
(:require [cambium.core :as clog]))

(defmacro with-context
[context-map & body]
`(try
(doseq [[k# v#] ~context-map] (ThreadContext/put (name k#) (str v#)))
~@body
(finally (doseq [[k# _#] ~context-map] (ThreadContext/remove (name k#))))))
`(clog/with-logging-context ~context-map ~@body))

26 changes: 24 additions & 2 deletions test/ziggurat/init_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
[ziggurat.streams :as streams :refer [stream]]
[ziggurat.server.test-utils :as tu]
[ziggurat.tracer :as tracer]
[ziggurat.fixtures :refer [with-config]])
[ziggurat.fixtures :refer [with-config]]
[cambium.logback.json.flat-layout :as flat]
[cambium.codec :as codec]
[cambium.core :as clog]
[clojure.tools.logging :as log])

(:import (io.opentracing.mock MockTracer)))

(def valid-modes-count 4)
Expand Down Expand Up @@ -105,7 +110,24 @@
(swap! start-was-called not)
(is (= expected-stream-routes stream-router)))]
(init/main #() #() expected-stream-routes)
(is @start-was-called)))))
(is @start-was-called))))
(testing "Flat Json Layout decoder is set if log format is json"
(let [start-was-called (atom false)
decoder-was-set (atom false)
expected-stream-routes {:default {:handler-fn #(constantly nil)}}
config config/default-config]
(with-redefs [init/add-shutdown-hook (fn [_ _] (constantly nil))
config/ziggurat-config (fn [] (assoc config :log-format "json"))
init/initialize-config (constantly nil)
init/validate-routes-against-config (constantly nil)
init/start (fn [_ stream-router _ _ _]
(swap! start-was-called not)
(is (= expected-stream-routes stream-router)))
flat/set-decoder! (fn [decoder] (is (= decoder codec/destringify-val))
(reset! decoder-was-set true))]
(init/main #() #() expected-stream-routes)
(is @start-was-called)
(is @decoder-was-set)))))

(def mock-modes {:api-server {:start-fn (constantly nil) :stop-fn (constantly nil)}
:stream-worker {:start-fn (constantly nil) :stop-fn (constantly nil)}
Expand Down
14 changes: 7 additions & 7 deletions test/ziggurat/util/logging_test.clj
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
(ns ziggurat.util.logging-test
(:require [clojure.test :refer :all]
[ziggurat.util.logging :as zlog])
(:import (org.apache.logging.log4j ThreadContext)))
(:import (org.slf4j MDC)))

(deftest with-context-test
(let [capture-context #(reset! % (into {} (ThreadContext/getImmutableContext)))]
(testing "sets thread local log context params within body"
(let [capture-context #(reset! % (into {} (MDC/getCopyOfContextMap)))]
(testing "sets mdc context params within body"
(let [context-map (atom {})]
(zlog/with-context {:param-1 "string-value", :param-2 123}
(zlog/with-context {:param-1 "string-value" :param-2 123}
(capture-context context-map))
(is (= "string-value" (get @context-map "param-1")))
(is (= "\"string-value\"" (get @context-map "param-1")))
(is (= "123" (get @context-map "param-2")))))

(testing "clears thread local log context params when exits"
(testing "clears mdc context params when exits"
(let [context-map (atom {})]
(zlog/with-context {:param-1 "string-value", :param-2 123}
(zlog/with-context {:param-1 "string-value" :param-2 123}
(constantly nil))
(capture-context context-map)
(is (empty? @context-map))))))

0 comments on commit b7c2441

Please sign in to comment.