Skip to content

Commit

Permalink
Merge pull request #89 from roobalimsab/open-tracing
Browse files Browse the repository at this point in the history
Adds open tracing support
  • Loading branch information
theanirudhvyas committed Oct 14, 2019
2 parents 4dd1cd8 + 855ab23 commit 43491f9
Show file tree
Hide file tree
Showing 24 changed files with 735 additions and 109 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,36 @@ For publishing data using a producer which is defined for the stream router conf

`(send :default "test-topic" 1 "key" "value")`

## Tracing
[Open Tracing](https://opentracing.io/docs/overview/) enables to identify the amount of time spent in various stages of the work flow.

Currently, the execution of the handler function is traced. If the message consumed has the corresponding tracing headers, then the E2E life time of the message from the time of production till the time of consumption can be traced.

Tracing has been added to the following flows:

1. Normal basic consume
2. Retry via rabbitmq
3. Produce to rabbitmq channel
4. Produce to another kafka topic

By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment)
and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables.
To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created.

To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key.

```clojure
:tracer {:enabled [true :bool]
:custom-provider ""}
```

Example Jaeger Env Config:
```
JAEGER_SERVICE_NAME: "service-name"
JAEGER_AGENT_HOST: "localhost"
JAEGER_AGENT_PORT: 6831
```

## Configuration

All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key.
Expand Down
10 changes: 9 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.0.0"
(defproject tech.gojek/ziggurat "3.0.1-alpha.1"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand All @@ -16,6 +16,14 @@
[io.dropwizard.metrics5/metrics-core "5.0.0-rc2" :scope "compile"]
[medley "1.2.0" :exclusions [org.clojure/clojure]]
[mount "0.1.16"]
[io.jaegertracing/jaeger-core "1.0.0"]
[io.jaegertracing/jaeger-client "1.0.0"]
[io.opentracing/opentracing-api "0.33.0"]
[io.opentracing/opentracing-mock "0.33.0"]
[io.opentracing/opentracing-noop "0.33.0"]
[io.opentracing.contrib/opentracing-kafka-streams "0.1.4" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.apache.kafka/kafka-streams org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-kafka-client "0.1.4" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.5" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.4"]
[org.apache.kafka/kafka-streams "2.1.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.12.1"]
Expand Down
4 changes: 3 additions & 1 deletion resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}

4 changes: 3 additions & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}
13 changes: 13 additions & 0 deletions src/ziggurat/header_transformer.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(ns ziggurat.header-transformer
(:import [org.apache.kafka.streams.kstream ValueTransformer]
[org.apache.kafka.streams.processor ProcessorContext]))

(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-value] {:value record-value :headers (.headers processor-context)})
(close [_] nil))

(defn create []
(HeaderTransformer. nil))
9 changes: 6 additions & 3 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.util.java-util :as util])
[ziggurat.util.java-util :as util]
[ziggurat.tracer :as tracer])
(:gen-class
:name tech.gojek.ziggurat.internal.Init
:methods [^{:static true} [init [java.util.Map] void]]))
Expand Down Expand Up @@ -113,13 +114,15 @@
(start* #{#'config/config
#'statsd-reporter
#'sentry-reporter
#'nrepl-server/server}))
#'nrepl-server/server
#'tracer/tracer}))

(defn stop-common-states []
(mount/stop #'config/config
#'statsd-reporter
#'messaging-connection/connection
#'nrepl-server/server))
#'nrepl-server/server
#'tracer/tracer))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
Expand Down
3 changes: 2 additions & 1 deletion src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@
(s/defschema message-payload-schema
{:message s/Any
:topic-entity s/Keyword
(s/optional-key :retry-count) s/Int})
(s/optional-key :retry-count) s/Int
(s/optional-key :headers) s/Any})
23 changes: 18 additions & 5 deletions src/ziggurat/messaging/connection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.channel :refer [get-keys-for-topic]])
(:import [com.rabbitmq.client ShutdownListener]
[java.util.concurrent Executors]))
[ziggurat.channel :refer [get-keys-for-topic]]
[ziggurat.tracer :refer [tracer]])
(:import [com.rabbitmq.client ShutdownListener Address ListAddressResolver]
[java.util.concurrent Executors ExecutorService]
[io.opentracing.contrib.rabbitmq TracingConnectionFactory]
[com.rabbitmq.client.impl DefaultCredentialsProvider]))

(defn is-connection-required? []
(let [stream-routes (:stream-routes (mount/args))
Expand All @@ -28,11 +31,19 @@
(reduce (fn [sum [_ route-config]]
(+ sum (channel-threads (:channels route-config)) worker-count)) 0 stream-routes)))

(defn create-connection [config tracer-enabled]
(if tracer-enabled
(let [connection-factory (TracingConnectionFactory. tracer)]
(.setCredentialsProvider connection-factory (DefaultCredentialsProvider. (:username config) (:password config)))
(.newConnection connection-factory ^ExecutorService (:executor config) ^ListAddressResolver (ListAddressResolver. (list (Address. (:host config) (:port config))))))

(rmq/connect config)))

(defn- start-connection []
(log/info "Connecting to RabbitMQ")
(when (is-connection-required?)
(try
(let [connection (rmq/connect (assoc (:rabbit-mq-connection (ziggurat-config)) :executor (Executors/newFixedThreadPool (total-thread-count))))]
(let [connection (create-connection (assoc (:rabbit-mq-connection (ziggurat-config)) :executor (Executors/newFixedThreadPool (total-thread-count))) (get-in (ziggurat-config) [:tracer :enabled]))]
(doto connection
(.addShutdownListener
(reify ShutdownListener
Expand All @@ -45,7 +56,9 @@

(defn- stop-connection [conn]
(when (is-connection-required?)
(rmq/close conn)
(if (get-in (ziggurat-config) [:tracer :enabled])
(.close conn)
(rmq/close conn))
(log/info "Disconnected from RabbitMQ")))

(defstate connection
Expand Down
13 changes: 10 additions & 3 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,17 @@
(sentry/report-error sentry-reporter e "Error while declaring RabbitMQ queues")
(throw e)))))

(defn- record-headers->map [record-headers]
(reduce (fn [header-map record-header]
(assoc header-map (.key record-header) (String. (.value record-header))))
{}
record-headers))

(defn- properties-for-publish
[expiration]
[expiration headers]
(let [props {:content-type "application/octet-stream"
:persistent true}]
:persistent true
:headers (record-headers->map headers)}]
(if expiration
(assoc props :expiration (str expiration))
props)))
Expand All @@ -60,7 +67,7 @@
:wait 100
:on-failure #(log/error "publishing message to rabbitmq failed with error " (.getMessage %))}
(with-open [ch (lch/open connection)]
(lb/publish ch exchange "" (nippy/freeze message-payload) (properties-for-publish expiration))))
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload)))))
(catch Throwable e
(sentry/report-error sentry-reporter e
"Pushing message to rabbitmq failed, data: " message-payload)))))
Expand Down
8 changes: 5 additions & 3 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@
(:require [ziggurat.config :refer [ziggurat-config]]
[clojure.tools.logging :as log]
[mount.core :refer [defstate]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.java-util :refer [get-key]])
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties))
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties)
(io.opentracing.contrib.kafka TracingKafkaProducer))
(:gen-class
:name tech.gojek.ziggurat.internal.Producer
:methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future]
Expand Down Expand Up @@ -85,7 +87,7 @@
(do (log/info "Starting Kafka producers ...")
(reduce (fn [producers [stream-config-key properties]]
(do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(assoc producers stream-config-key (KafkaProducer. properties))))
(assoc producers stream-config-key (TracingKafkaProducer. (KafkaProducer. properties) tracer))))
{}
(seq (producer-properties-map))))
(log/info "No producers found. Can not initiate start."))
Expand Down
53 changes: 42 additions & 11 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
[sentry-clj.async :as sentry]
[ziggurat.channel :as chl]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.header-transformer :as header-transformer]
[ziggurat.mapper :refer [mapper-func ->MessagePayload]]
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as transformer]
[ziggurat.util.map :as umap])
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.util.map :as umap]
[ziggurat.tracer :refer [tracer]])
(:import [java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.clients.consumer ConsumerConfig]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.kstream ValueMapper TransformerSupplier]
[org.apache.kafka.streams.kstream ValueMapper TransformerSupplier ValueTransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))
[ziggurat.timestamp_transformer IngestionTimeExtractor]
[io.opentracing Tracer]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.tag Tags]))

(def default-config-for-stream
{:buffered-records-per-partition 10000
Expand Down Expand Up @@ -81,31 +87,56 @@
(defn- map-values [mapper-fn stream-builder]
(.mapValues stream-builder (value-mapper mapper-fn)))

(defn- transformer-supplier
(defn- timestamp-transformer-supplier
[metric-namespace oldest-processed-message-in-s additional-tags]
(reify TransformerSupplier
(get [_] (transformer/create metric-namespace oldest-processed-message-in-s additional-tags))))
(get [_] (timestamp-transformer/create metric-namespace oldest-processed-message-in-s additional-tags))))

(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(defn- header-transformer-supplier
[]
(reify ValueTransformerSupplier
(get [_] (header-transformer/create))))

(defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [service-name (:app-name (ziggurat-config))
metric-namespace "message-received-delay-histogram"
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))
(.transform stream-builder (timestamp-transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))

(defn- header-transform-values [stream-builder]
(.transformValues stream-builder (header-transformer-supplier) (into-array [(.name (store-supplier-builder))])))

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message)))
(finally
(.finish span)))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
topic-entity-name (name topic-entity)
topic-pattern (Pattern/compile origin-topic)]
(.addStateStore builder (store-supplier-builder))
(->> (.stream builder topic-pattern)
(transform-values topic-entity-name oldest-processed-message-in-s)
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values #((mapper-func handler-fn channels) (->MessagePayload % topic-entity))))
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(.build builder)))

(defn- start-stream* [handler-fn stream-config topic-entity channels]
(KafkaStreams. ^Topology (topology handler-fn stream-config topic-entity channels)
^Properties (properties stream-config)))
^Properties (properties stream-config)
(new TracingKafkaClientSupplier tracer)))

(defn start-streams
([stream-routes]
Expand Down
Loading

0 comments on commit 43491f9

Please sign in to comment.