Skip to content

Commit

Permalink
Handles scenario when input message does not have trace headers
Browse files Browse the repository at this point in the history
  • Loading branch information
roobalimsab committed Oct 8, 2019
1 parent f8ad7ab commit 848ef89
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
1 change: 0 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
(defproject tech.gojek/ziggurat "3.0.0"
(defproject tech.gojek/ziggurat "3.0.1-alpha.1"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
Expand Down
15 changes: 8 additions & 7 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,15 @@

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (-> tracer
(.buildSpan "Message-Handler")
(.asChildOf parent-ctx)
(.withTag (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag (.getKey Tags/COMPONENT) "ziggurat")
(.start))]
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
(try
(.activate (.scopeManager tracer) span)
((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message)))
(finally
(.finish span)))))
Expand Down
32 changes: 21 additions & 11 deletions test/ziggurat/init_test.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
(ns ziggurat.init-test
(:require [clojure.test :refer :all]
[mount.core :refer [defstate] :as mount]
[ziggurat.config :as config]
[ziggurat.init :as init]
[ziggurat.messaging.connection :as rmqc]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.streams :as streams :refer [stream]]
[mount.core :refer [defstate]]
[ziggurat.server.test-utils :as tu]
[mount.core :as mount]))
[ziggurat.tracer :as tracer])
(:import (io.opentracing.mock MockTracer)))

(deftest start-calls-actor-start-fn-test
(testing "The actor start fn starts before the ziggurat state and can read config"
Expand All @@ -17,7 +18,8 @@
streams/stop-streams (constantly nil)
rmqc/start-connection (fn [] (reset! result (* @result 2)))
rmqc/stop-connection (constantly nil)
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #(reset! result (+ @result 3)) {} [] nil)
(init/stop #() nil)
(is (= 16 @result))))))
Expand All @@ -27,7 +29,8 @@
(let [result (atom 1)]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (fn [_] (reset! result (* @result 2)))
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 8 @result))))))
Expand All @@ -38,7 +41,8 @@
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
rmqc/stop-connection (fn [_] (reset! result (* @result 2)))
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 8 @result))))))
Expand All @@ -53,7 +57,8 @@
(swap! make-queues-called + 1)
(is (= stream-routes expected-stream-routes)))
messaging-consumer/start-subscribers (constantly nil)
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() expected-stream-routes [] nil)
(init/stop #() nil)
(is (= 2 @make-queues-called))))))
Expand All @@ -68,7 +73,8 @@
(swap! start-subscriber-called + 1)
(is (= stream-routes expected-stream-routes)))
messaging-producer/make-queues (constantly nil)
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() expected-stream-routes [] nil)
(init/stop #() nil)
(is (= 1 @start-subscriber-called))))))
Expand Down Expand Up @@ -118,7 +124,8 @@
(testing "The routes added by actor should be served along with ziggurat-routes"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() {} [["test-ping" (fn [_request] {:status 200
:body "pong"})]] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)
Expand All @@ -131,7 +138,8 @@
(testing "Deadset management and server api modes should run both actor and deadset management routes"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() {} [["test-ping" (fn [_request] {:status 200
:body "pong"})]] [:management-api :api-server])
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)
Expand All @@ -144,7 +152,8 @@
(testing "The routes not added by actor should return 404"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() {} [] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)]
(init/stop #() nil)
Expand All @@ -153,7 +162,8 @@
(testing "The ziggurat routes should work fine when actor routes are not provided"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
config/config-file "config.test.edn"
tracer/create-tracer (fn [] (MockTracer.))]
(init/start #() {} [] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #() nil)
Expand Down

0 comments on commit 848ef89

Please sign in to comment.