-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds open tracing support #89
Conversation
@roobalimsab Can you add some details to the PR description, to give us some context? |
11242ec
to
a0f425f
Compare
src/ziggurat/mapper.clj
Outdated
|
||
(defn mapper-func [mapper-fn channels] | ||
(defn mapper-func [mapper-fn channels headers] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roobalimsab Why can't we have headers in message-payload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
|
||
To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key. | ||
|
||
```clojure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not clojure format, edn format :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the configuration below also has the same format. thats why used it. should i change it to :edn?
src/ziggurat/streams.clj
Outdated
(.start))] | ||
(try | ||
(.activate (.scopeManager tracer) span) | ||
((mapper-func handler-fn channels (:headers message-payload)) (->MessagePayload (:value message-payload) topic-entity)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roobalimsab You could have put (:headers message-payload)
in MessagePayload
itself. That way you would not have needed to add an extra headers
parameter everywhere in the code.
@theanirudhvyas WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we did not want to mess up with the existing message payload. we were not sure if the message payload will already have any headers. that is why kept it separate. if you don't see any issues, then we could merge it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the message-payload does not have any headers, you can merge it. Also, instead of calling it headers why not call it what it exactly is - tracing-ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those are the kafka headers that we are passing across. hence the name
src/ziggurat/messaging/consumer.clj
Outdated
@@ -95,7 +95,7 @@ | |||
(start-subscriber* (lch/open connection) | |||
(get-in-config [:jobs :instant :prefetch-count]) | |||
(prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name])) | |||
(mpr/mapper-func mapper-fn channels) | |||
(mpr/mapper-func mapper-fn channels {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are adding trace information to the rabbitmq headers, why are we not passing those to the mapper-func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it only needs to be added when the message is put in rabbit the very first time. from there on it gets carried forward. so it is not needed in the subscriber. the producer will take care of it
4f6c4a8
to
6da4206
Compare
@@ -80,7 +80,7 @@ | |||
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) | |||
(metrics/increment-count default-namespace failure-metric additional-tags))))))) | |||
|
|||
(defrecord MessagePayload [message topic-entity]) | |||
(defrecord MessagePayload [message topic-entity headers]) | |||
|
|||
(s/defschema message-payload-schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you are adding headers to messagePayload, the schema will also change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the schema is used to validate the message at the consumer end right? we are not passing headers anywhere in the consumers. thats why did not add it here
src/ziggurat/streams.clj
Outdated
(.activate (.scopeManager tracer) span) | ||
((mapper-func handler-fn channels) (->MessagePayload (:value message-payload) topic-entity (:headers message-payload))) | ||
(catch Exception e | ||
(log/error "Exception while executing handler function " e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is there a generic exception catch here? This will lead to skipping the message and data loss right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ya. this was added for testing the error scenario. will remove the catch
src/ziggurat/streams.clj
Outdated
(.buildSpan "Message-Handler") | ||
(.asChildOf parent-ctx) | ||
(.withTag (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) | ||
(.withTag (.getKey Tags/COMPONENT) "lambda") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this tag signify?? shouldn't it be ziggurat
instead of lambda
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just to classify the consumers. yes it should be ziggurat.
src/ziggurat/streams.clj
Outdated
(.start))] | ||
(try | ||
(.activate (.scopeManager tracer) span) | ||
((mapper-func handler-fn channels) (->MessagePayload (:value message-payload) topic-entity (:headers message-payload))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of (:value message-payload)
it should be (:value message)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same with headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
src/ziggurat/streams.clj
Outdated
@@ -92,6 +97,22 @@ | |||
additional-tags {:topic_name topic-entity-name}] | |||
(.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) | |||
|
|||
(defn- traced-handler-fn [handler-fn channels message-payload topic-entity] | |||
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message-payload) tracer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now the headers contain only the tracing headers. Will this still work when other information is added to the kafka headers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this will work with all kafka headers
@@ -80,7 +80,7 @@ | |||
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) | |||
(metrics/increment-count default-namespace failure-metric additional-tags))))))) | |||
|
|||
(defrecord MessagePayload [message topic-entity]) | |||
(defrecord MessagePayload [message topic-entity headers]) | |||
|
|||
(s/defschema message-payload-schema | |||
{:message s/Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you'll also need to change the defschema as you changed the MessagePayload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it mandatory? because we dont set it in consumers
src/ziggurat/messaging/consumer.clj
Outdated
@@ -21,7 +21,7 @@ | |||
(catch Exception e | |||
(log/info "old message format read, converting to message-payload: " message) | |||
(let [retry-count (:retry-count message) | |||
message-payload (mpr/->MessagePayload (dissoc message :retry-count) topic-entity)] | |||
message-payload (mpr/->MessagePayload (dissoc message :retry-count) topic-entity {})] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we passing empty {} here? why not have headers as an optional parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok have added that as optional param to the schema
src/ziggurat/messaging/producer.clj
Outdated
@@ -99,7 +106,7 @@ | |||
(let [{:keys [exchange-name]} (:instant (rabbitmq-config)) | |||
topic-entity (:topic-entity message-payload) | |||
exchange-name (prefixed-channel-name topic-entity channel exchange-name)] | |||
(publish exchange-name message-payload))) | |||
(publish exchange-name message-payload nil))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of passing nil, make it a multi-arity function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was already there. removed the nil
@@ -25,7 +25,7 @@ | |||
(let [message-time (.timestamp processor-context)] | |||
(when (message-to-process? message-time oldest-processed-message-in-s) | |||
(calculate-and-report-kafka-delay metric-namespace message-time additional-tags) | |||
(KeyValue/pair record-key record-value)))) | |||
(KeyValue/pair record-key {:value record-value :headers (.headers processor-context)})))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this message transformation should happen in streams.clj and not in timestamp_transformer
. It does not make sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have extracted out a separate header_transformer.
@@ -31,6 +33,17 @@ | |||
(f) | |||
(mount/stop)) | |||
|
|||
(defn mount-tracer [] | |||
(with-redefs [tracer/create-tracer (fn [] (MockTracer.))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of a mockTracer why can we not have a noOp tracer here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mockTracer provides utility methods to check the number of spans completed in the test and so on. hence it is used to test the spans and corresponding tags
9ca8a45
to
848ef89
Compare
848ef89
to
855ab23
Compare
This PR is to add open tracing support to Ziggurat. Open tracing enables to identify the time spent at various phases of the program flow.
Hence if the env variables are set for tracer, the entire behaviour of the handler function will be traced.
Tracing has been added to 3 flows as part of this PR: