Skip to content

Commit

Permalink
[Feature/EV-38] Remove opentracing code (#275)
Browse files Browse the repository at this point in the history
* [EV-38][Indra] Remove modules tracer and tracer-test

* [EV-38][Indra] Remove opentracing tests from consumer-connection test

* [EV-38][Indra] Remove opentracing from connection_helper

* [EV-38][Indra] Remove opentracing from ziggurat.producer

* [EV-38][Indra] Remove opentracing tests from ziggurat.messaging.producer-test

* [EV-38][Indra] Remove opentracing init-test

* [EV-38][Indra] Remove tracer-enabled arg to connection helper from producer connection helper test

* [EV-38][Indra] Remove tracer related test from consumer_test

* [EV-38][Indra] Remove opentracing related dependencies from project.clj and fixtures

* [EV-38][Indra] Remove opentracing content from Readme

* [EV-38][Indra] Remove opentracing from streams

* [EV-38][Indra] Fix linting

* [EV-38][Indra] Remove ziggurat.tracer references

* [EV-38][Indra] Bump the version to v4.9.4
  • Loading branch information
indrajithi committed Oct 20, 2023
1 parent a10c481 commit 900d010
Show file tree
Hide file tree
Showing 18 changed files with 46 additions and 496 deletions.
32 changes: 0 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,38 +425,6 @@ and different timeout values.
:enable [true :bool]}}}}}
```

## Tracing

[Open Tracing](https://opentracing.io/docs/overview/) enables to identify the amount of time spent in various stages of the work flow.

Currently, the execution of the handler function is traced. If the message consumed has the corresponding tracing headers, then the E2E life time of the message from the time of production till the time of consumption can be traced.

Tracing has been added to the following flows:

1. Normal basic consume
2. Retry via rabbitmq
3. Produce to rabbitmq channel
4. Produce to another kafka topic

By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment)
and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables.
To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created.

To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key.

```clojure
:tracer {:enabled [true :bool]
:custom-provider ""}
```

Example Jaeger Env Config:

```
JAEGER_SERVICE_NAME: "service-name"
JAEGER_AGENT_HOST: "localhost"
JAEGER_AGENT_PORT: 6831
```

## Deprecation Notice
* Sentry has been deprecated.

Expand Down
8 changes: 1 addition & 7 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.9.3"
(defproject tech.gojek/ziggurat "4.9.4"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand All @@ -22,12 +22,6 @@
[mount "0.1.16"]
[io.jaegertracing/jaeger-core "1.6.0"]
[io.jaegertracing/jaeger-client "1.6.0"]
[io.opentracing/opentracing-api "0.33.0"]
[io.opentracing/opentracing-mock "0.33.0"]
[io.opentracing/opentracing-noop "0.33.0"]
[io.opentracing.contrib/opentracing-kafka-streams "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.apache.kafka/kafka-streams org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-kafka-client "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.11" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.13"]
[org.apache.kafka/kafka-clients "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-streams "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
Expand Down
2 changes: 1 addition & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
:default-api-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:tracer {:enabled [false :bool]
:custom-provider ""}
:new-relic {:report-errors false}
:log-format "text"}}
7 changes: 2 additions & 5 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.tracer :as tracer]
[ziggurat.util.java-util :as util])
(:gen-class
:methods [^{:static true} [init [java.util.Map] void]]
Expand Down Expand Up @@ -150,15 +149,13 @@
(defn start-common-states []
(start* #{#'metrics/statsd-reporter
#'sentry-reporter
#'nrepl-server/server
#'tracer/tracer}))
#'nrepl-server/server}))

(defn stop-common-states []
(mount/stop #'config/config
#'sentry-reporter
#'metrics/statsd-reporter
#'nrepl-server/server
#'tracer/tracer))
#'nrepl-server/server))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
Expand Down
11 changes: 3 additions & 8 deletions src/ziggurat/messaging/connection_helper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.channel :refer [get-keys-for-topic]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.messaging.util :as util]
[ziggurat.util.error :refer [report-error]])
(:import [com.rabbitmq.client ShutdownListener ConnectionFactory AddressResolver]
[java.util.concurrent Executors ExecutorService]
[io.opentracing.contrib.rabbitmq TracingConnectionFactory]
[com.rabbitmq.client.impl DefaultCredentialsProvider]))

(defn is-connection-required? []
Expand Down Expand Up @@ -48,10 +46,8 @@
(util/create-address-resolver rabbitmq-config)
(:connection-name rabbitmq-config))))

(defn create-connection [config tracer-enabled]
(if tracer-enabled
(create-rmq-connection (TracingConnectionFactory. tracer) config)
(create-rmq-connection (ConnectionFactory.) config)))
(defn create-connection [config]
(create-rmq-connection (ConnectionFactory.) config))

(defn- get-connection-config
[is-producer?]
Expand All @@ -70,8 +66,7 @@
(when (is-connection-required?)
(try
(let
[is-tracer-enabled? (get-in (ziggurat-config) [:tracer :enabled])
connection (create-connection (get-connection-config is-producer?) is-tracer-enabled?)]
[connection (create-connection (get-connection-config is-producer?))]
(log/info "Connection created " connection)
(doto connection
(.addShutdownListener
Expand Down
11 changes: 4 additions & 7 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@
(:require [clojure.tools.logging :as log]
[mount.core :refer [defstate]]
[ziggurat.config :refer [build-producer-config-properties ziggurat-config]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.java-util :refer [get-key]])
(:import (io.opentracing.contrib.kafka TracingKafkaProducer)
(org.apache.kafka.clients.producer KafkaProducer ProducerRecord))
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord))
(:gen-class
:methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future]
^{:static true} [send [String String int Object Object] java.util.concurrent.Future]]
Expand All @@ -70,9 +68,8 @@
(do (log/info "Starting Kafka producers ...")
(reduce (fn [producers [stream-config-key properties]]
(log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(let [kp (KafkaProducer. properties)
tkp (TracingKafkaProducer. kp tracer)]
(assoc producers stream-config-key tkp)))
(let [kp (KafkaProducer. properties)]
(assoc producers stream-config-key kp)))
{}
(seq (producer-properties-map))))
(log/info "No producers found. Can not initiate start."))
Expand All @@ -85,7 +82,7 @@
(.flush)
(.close)))
(seq kafka-producers))))
(log/info "No producers found.n Can not initiate stop.")))
(log/info "No producers found. Can not initiate stop.")))

(defn send
"A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables
Expand Down
36 changes: 11 additions & 25 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
[ziggurat.message-payload :refer [->MessagePayload]]
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.map :as umap]
[cambium.core :as clog])
(:import [io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.tag Tags]
[java.time Duration]
(:import [java.time Duration]
[java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.common.errors TimeoutException]
Expand Down Expand Up @@ -126,22 +122,13 @@
(doseq [[topic-entity stream] streams]
(close-stream topic-entity stream)))

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally
(.finish span)))))
(defn- mapped-handler-fn [handler-fn channels message topic-entity]
(try
((mapper-func handler-fn channels)
(-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally)))

(defn- join-streams
[oldest-processed-message-in-s topic-entity stream-1 stream-2]
Expand Down Expand Up @@ -187,7 +174,7 @@
{stream :stream} (reduce (partial join-streams oldest-processed-message-in-s topic-entity) stream-map)]
(->> stream
(header-transform-values)
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(map-values #(mapped-handler-fn handler-fn channels % topic-entity)))
(.build builder))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
Expand All @@ -198,7 +185,7 @@
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(map-values #(mapped-handler-fn handler-fn channels % topic-entity)))
(.build builder)))

(defn- start-stream* [handler-fn stream-config topic-entity channels]
Expand All @@ -209,8 +196,7 @@

(when-not (nil? top)
(KafkaStreams. ^Topology top
^Properties (properties stream-config)
(new TracingKafkaClientSupplier tracer)))))
^Properties (properties stream-config)))))

(defn- merge-consumer-type-config
[config]
Expand Down
92 changes: 0 additions & 92 deletions src/ziggurat/tracer.clj

This file was deleted.

15 changes: 2 additions & 13 deletions test/ziggurat/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics]
[ziggurat.producer :as producer]
[ziggurat.server :refer [server]]
[ziggurat.tracer :as tracer])
(:import (io.opentracing.mock MockTracer)
(java.util Properties)
[ziggurat.server :refer [server]])
(:import (java.util Properties)
(org.apache.kafka.clients.consumer ConsumerConfig)
(org.apache.kafka.clients.producer ProducerConfig))
(:gen-class
Expand Down Expand Up @@ -67,14 +65,8 @@
(f)
(mount/stop #'metrics/statsd-reporter))

(defn mount-tracer []
(with-redefs [tracer/create-tracer (fn [] (MockTracer.))]
(-> (mount/only [#'tracer/tracer])
(mount/start))))

(defn mount-config-with-tracer [f]
(mount-config)
(mount-tracer)
(f)
(mount/stop))

Expand Down Expand Up @@ -119,8 +111,6 @@
(let [stream-routes {:default {:handler-fn #(constantly nil)
:channel-1 #(constantly nil)}}]
(mount-config)
(mount-tracer)

(->
(mount/only [#'producer-connection #'consumer-connection #'channel-pool])
(mount/with-args {:stream-routes stream-routes})
Expand Down Expand Up @@ -163,7 +153,6 @@

(defn mount-producer-with-config-and-tracer [f]
(mount-config)
(mount-tracer)
(mount-producer)
(binding [*bootstrap-servers* (get-in (config/ziggurat-config) [:stream-router :default :bootstrap-servers])]
(binding [*consumer-properties* (doto (Properties.)
Expand Down

0 comments on commit 900d010

Please sign in to comment.