Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds open tracing support #89

Merged
merged 19 commits into from
Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,36 @@ For publishing data using a producer which is defined for the stream router conf

`(send :default "test-topic" 1 "key" "value")`

## Tracing
[Open Tracing](https://opentracing.io/docs/overview/) enables to identify the amount of time spent in various stages of the work flow.

Currently, the execution of the handler function is traced. If the message consumed has the corresponding tracing headers, then the E2E life time of the message from the time of production till the time of consumption can be traced.

Tracing has been added to the following flows:

1. Normal basic consume
2. Retry via rabbitmq
3. Produce to rabbitmq channel
4. Produce to another kafka topic
roobalimsab marked this conversation as resolved.
Show resolved Hide resolved

By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment)
and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables.
To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created.

To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key.

```clojure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not clojure format, edn format :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the configuration below also has the same format. thats why used it. should i change it to :edn?

:tracer {:enabled [true :bool]
:custom-provider ""}
```

Example Jaeger Env Config:
```
JAEGER_SERVICE_NAME: "service-name"
JAEGER_AGENT_HOST: "localhost"
JAEGER_AGENT_PORT: 6831
```

## Configuration

All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key.
Expand Down
10 changes: 9 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.0.0"
(defproject tech.gojek/ziggurat "3.0.1-alpha.1"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand All @@ -16,6 +16,14 @@
[io.dropwizard.metrics5/metrics-core "5.0.0-rc2" :scope "compile"]
[medley "1.2.0" :exclusions [org.clojure/clojure]]
[mount "0.1.16"]
[io.jaegertracing/jaeger-core "1.0.0"]
[io.jaegertracing/jaeger-client "1.0.0"]
[io.opentracing/opentracing-api "0.33.0"]
[io.opentracing/opentracing-mock "0.33.0"]
theanirudhvyas marked this conversation as resolved.
Show resolved Hide resolved
[io.opentracing/opentracing-noop "0.33.0"]
[io.opentracing.contrib/opentracing-kafka-streams "0.1.4" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.apache.kafka/kafka-streams org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-kafka-client "0.1.4" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.5" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.4"]
[org.apache.kafka/kafka-streams "2.1.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.12.1"]
Expand Down
4 changes: 3 additions & 1 deletion resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}

4 changes: 3 additions & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}
13 changes: 13 additions & 0 deletions src/ziggurat/header_transformer.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(ns ziggurat.header-transformer
(:import [org.apache.kafka.streams.kstream ValueTransformer]
[org.apache.kafka.streams.processor ProcessorContext]))

(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-value] {:value record-value :headers (.headers processor-context)})
(close [_] nil))

(defn create []
(HeaderTransformer. nil))
9 changes: 6 additions & 3 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.util.java-util :as util])
[ziggurat.util.java-util :as util]
[ziggurat.tracer :as tracer])
(:gen-class
:name tech.gojek.ziggurat.internal.Init
:methods [^{:static true} [init [java.util.Map] void]]))
Expand Down Expand Up @@ -113,13 +114,15 @@
(start* #{#'config/config
#'statsd-reporter
#'sentry-reporter
#'nrepl-server/server}))
#'nrepl-server/server
#'tracer/tracer}))

(defn stop-common-states []
(mount/stop #'config/config
#'statsd-reporter
#'messaging-connection/connection
#'nrepl-server/server))
#'nrepl-server/server
#'tracer/tracer))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
Expand Down
3 changes: 2 additions & 1 deletion src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@
(s/defschema message-payload-schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are adding headers to messagePayload, the schema will also change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema is used to validate the message at the consumer end right? we are not passing headers anywhere in the consumers. thats why did not add it here

{:message s/Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'll also need to change the defschema as you changed the MessagePayload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it mandatory? because we dont set it in consumers

:topic-entity s/Keyword
(s/optional-key :retry-count) s/Int})
(s/optional-key :retry-count) s/Int
(s/optional-key :headers) s/Any})
23 changes: 18 additions & 5 deletions src/ziggurat/messaging/connection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.channel :refer [get-keys-for-topic]])
(:import [com.rabbitmq.client ShutdownListener]
[java.util.concurrent Executors]))
[ziggurat.channel :refer [get-keys-for-topic]]
[ziggurat.tracer :refer [tracer]])
(:import [com.rabbitmq.client ShutdownListener Address ListAddressResolver]
[java.util.concurrent Executors ExecutorService]
[io.opentracing.contrib.rabbitmq TracingConnectionFactory]
[com.rabbitmq.client.impl DefaultCredentialsProvider]))

(defn is-connection-required? []
(let [stream-routes (:stream-routes (mount/args))
Expand All @@ -28,11 +31,19 @@
(reduce (fn [sum [_ route-config]]
(+ sum (channel-threads (:channels route-config)) worker-count)) 0 stream-routes)))

(defn create-connection [config tracer-enabled]
(if tracer-enabled
(let [connection-factory (TracingConnectionFactory. tracer)]
(.setCredentialsProvider connection-factory (DefaultCredentialsProvider. (:username config) (:password config)))
(.newConnection connection-factory ^ExecutorService (:executor config) ^ListAddressResolver (ListAddressResolver. (list (Address. (:host config) (:port config))))))

(rmq/connect config)))

(defn- start-connection []
(log/info "Connecting to RabbitMQ")
(when (is-connection-required?)
(try
(let [connection (rmq/connect (assoc (:rabbit-mq-connection (ziggurat-config)) :executor (Executors/newFixedThreadPool (total-thread-count))))]
(let [connection (create-connection (assoc (:rabbit-mq-connection (ziggurat-config)) :executor (Executors/newFixedThreadPool (total-thread-count))) (get-in (ziggurat-config) [:tracer :enabled]))]
(doto connection
(.addShutdownListener
(reify ShutdownListener
Expand All @@ -45,7 +56,9 @@

(defn- stop-connection [conn]
(when (is-connection-required?)
(rmq/close conn)
(if (get-in (ziggurat-config) [:tracer :enabled])
(.close conn)
(rmq/close conn))
(log/info "Disconnected from RabbitMQ")))

(defstate connection
Expand Down
13 changes: 10 additions & 3 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,17 @@
(sentry/report-error sentry-reporter e "Error while declaring RabbitMQ queues")
(throw e)))))

(defn- record-headers->map [record-headers]
(reduce (fn [header-map record-header]
(assoc header-map (.key record-header) (String. (.value record-header))))
{}
record-headers))

(defn- properties-for-publish
[expiration]
[expiration headers]
(let [props {:content-type "application/octet-stream"
:persistent true}]
:persistent true
:headers (record-headers->map headers)}]
theanirudhvyas marked this conversation as resolved.
Show resolved Hide resolved
(if expiration
(assoc props :expiration (str expiration))
props)))
Expand All @@ -60,7 +67,7 @@
:wait 100
:on-failure #(log/error "publishing message to rabbitmq failed with error " (.getMessage %))}
(with-open [ch (lch/open connection)]
(lb/publish ch exchange "" (nippy/freeze message-payload) (properties-for-publish expiration))))
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload)))))
(catch Throwable e
(sentry/report-error sentry-reporter e
"Pushing message to rabbitmq failed, data: " message-payload)))))
Expand Down
8 changes: 5 additions & 3 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@
(:require [ziggurat.config :refer [ziggurat-config]]
[clojure.tools.logging :as log]
[mount.core :refer [defstate]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.java-util :refer [get-key]])
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties))
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties)
(io.opentracing.contrib.kafka TracingKafkaProducer))
(:gen-class
:name tech.gojek.ziggurat.internal.Producer
:methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future]
Expand Down Expand Up @@ -85,7 +87,7 @@
(do (log/info "Starting Kafka producers ...")
(reduce (fn [producers [stream-config-key properties]]
(do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(assoc producers stream-config-key (KafkaProducer. properties))))
(assoc producers stream-config-key (TracingKafkaProducer. (KafkaProducer. properties) tracer))))
{}
(seq (producer-properties-map))))
(log/info "No producers found. Can not initiate start."))
Expand Down
53 changes: 42 additions & 11 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
[sentry-clj.async :as sentry]
[ziggurat.channel :as chl]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.header-transformer :as header-transformer]
[ziggurat.mapper :refer [mapper-func ->MessagePayload]]
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as transformer]
[ziggurat.util.map :as umap])
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.util.map :as umap]
[ziggurat.tracer :refer [tracer]])
(:import [java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.clients.consumer ConsumerConfig]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.kstream ValueMapper TransformerSupplier]
[org.apache.kafka.streams.kstream ValueMapper TransformerSupplier ValueTransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))
[ziggurat.timestamp_transformer IngestionTimeExtractor]
[io.opentracing Tracer]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.tag Tags]))

(def default-config-for-stream
{:buffered-records-per-partition 10000
Expand Down Expand Up @@ -81,31 +87,56 @@
(defn- map-values [mapper-fn stream-builder]
(.mapValues stream-builder (value-mapper mapper-fn)))

(defn- transformer-supplier
(defn- timestamp-transformer-supplier
[metric-namespace oldest-processed-message-in-s additional-tags]
(reify TransformerSupplier
(get [_] (transformer/create metric-namespace oldest-processed-message-in-s additional-tags))))
(get [_] (timestamp-transformer/create metric-namespace oldest-processed-message-in-s additional-tags))))

(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(defn- header-transformer-supplier
[]
(reify ValueTransformerSupplier
(get [_] (header-transformer/create))))

(defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [service-name (:app-name (ziggurat-config))
metric-namespace "message-received-delay-histogram"
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))
(.transform stream-builder (timestamp-transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))

(defn- header-transform-values [stream-builder]
(.transformValues stream-builder (header-transformer-supplier) (into-array [(.name (store-supplier-builder))])))

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message)))
(finally
(.finish span)))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
topic-entity-name (name topic-entity)
topic-pattern (Pattern/compile origin-topic)]
(.addStateStore builder (store-supplier-builder))
(->> (.stream builder topic-pattern)
(transform-values topic-entity-name oldest-processed-message-in-s)
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values #((mapper-func handler-fn channels) (->MessagePayload % topic-entity))))
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(.build builder)))

(defn- start-stream* [handler-fn stream-config topic-entity channels]
(KafkaStreams. ^Topology (topology handler-fn stream-config topic-entity channels)
^Properties (properties stream-config)))
^Properties (properties stream-config)
(new TracingKafkaClientSupplier tracer)))

(defn start-streams
([stream-routes]
Expand Down
Loading