Skip to content

Commit

Permalink
Merge f012795 into d290281
Browse files Browse the repository at this point in the history
  • Loading branch information
theanirudhvyas committed Mar 19, 2020
2 parents d290281 + f012795 commit ad88ced
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 146 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.3.0-alpha.1"
(defproject tech.gojek/ziggurat "3.2.1"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
12 changes: 8 additions & 4 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
(send-msg-to-channel channels message-payload return-code)
(metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload)
(sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)
(case (:type (ex-data e))
:rabbitmq-publish-failure (throw e)
(producer/retry message-payload))))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [{:keys [topic-entity message] :as message-payload}]
Expand Down Expand Up @@ -85,9 +87,11 @@
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)
(case (:type (ex-data e))
:rabbitmq-publish-failure (throw e)
(producer/retry-for-channel message-payload channel))))))))

(defrecord MessagePayload [message topic-entity])

Expand Down
4 changes: 3 additions & 1 deletion src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload)))))
(catch Throwable e
(sentry/report-error sentry-reporter e
"Pushing message to rabbitmq failed, data: " message-payload)))))
"Pushing message to rabbitmq failed, data: " message-payload)
(throw (ex-info "Pushing message to rabbitMQ failed after retries, data: " {:type :rabbitmq-publish-failure
:error e}))))))

(defn- retry-type []
(-> (ziggurat-config) :retry :type))
Expand Down
19 changes: 14 additions & 5 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
:auto-offset-reset-config "latest"
:oldest-processed-message-in-s 604800
:changelog-topic-replication-factor 3
:session-timeout-ms-config 60000
:default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"
:default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"})

Expand Down Expand Up @@ -72,7 +73,8 @@
default-value-serde
key-deserializer-encoding
value-deserializer-encoding
deserializer-encoding]}]
deserializer-encoding
session-timeout-ms-config]}]
(validate-auto-offset-reset-config auto-offset-reset-config)
(doto (Properties.)
(.put StreamsConfig/APPLICATION_ID_CONFIG application-id)
Expand All @@ -85,6 +87,7 @@
(.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms)
(.put StreamsConfig/REPLICATION_FACTOR_CONFIG (int changelog-topic-replication-factor))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)
(.put ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG (int session-timeout-ms-config))
(set-upgrade-from-config upgrade-from)
(set-encoding-config key-deserializer-encoding value-deserializer-encoding deserializer-encoding)))

Expand Down Expand Up @@ -132,6 +135,13 @@
(defn- header-transform-values [stream-builder]
(.transformValues stream-builder (header-transformer-supplier) (into-array [(.name (store-supplier-builder))])))

(declare stream)

(defn stop-streams [streams]
(log/debug "Stopping Kafka streams")
(doseq [stream streams]
(.close stream)))

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (as-> tracer t
Expand All @@ -144,6 +154,9 @@
(.start t))]
(try
((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message)))
(catch Exception e
(log/error "Stopping Kafka Streams due to error: " e)
(stop-streams stream))
(finally
(.finish span)))))

Expand Down Expand Up @@ -181,10 +194,6 @@
[]
stream-routes)))

(defn stop-streams [streams]
(doseq [stream streams]
(.close stream)))

(defstate stream
:start (do (log/info "Starting Kafka stream")
(start-streams (:stream-routes (mount/args)) (ziggurat-config)))
Expand Down
42 changes: 40 additions & 2 deletions test/ziggurat/mapper_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
[ziggurat.mapper :refer :all]
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.metrics :as metrics]
[ziggurat.util.rabbitmq :as rmq]))
[ziggurat.util.rabbitmq :as rmq]
[langohr.basic :as lb]))

(use-fixtures :once (join-fixtures [fix/init-rabbit-mq
fix/silence-logging]))
Expand Down Expand Up @@ -83,7 +84,7 @@
(is (= message-from-mq expected-message)))
(is @unsuccessfully-processed?)))))

(testing "message should raise exception"
(testing "reports error to sentry, publishes message to retry queue if mapper-fn raises exception"
(fix/with-queues stream-routes
(let [expected-message (assoc message-payload :retry-count (dec (:count (:retry (ziggurat-config)))))
sentry-report-fn-called? (atom false)
Expand All @@ -102,6 +103,24 @@
(is @unsuccessfully-processed?)
(is @sentry-report-fn-called?)))))

(testing "raises exception when rabbitMQ publishing fails"
(let [expected-message (assoc message-payload :retry-count (dec (:count (:retry (ziggurat-config)))))
sentry-report-fn-called? (atom false)
unsuccessfully-processed? (atom false)
expected-metric "failure"]
(with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true))
metrics/increment-count (fn [metric-namespace metric additional-tags]
(when (and (or (= metric-namespace [service-name topic-entity expected-metric-namespace])
(= metric-namespace [expected-metric-namespace]))
(= metric expected-metric)
(= additional-tags expected-additional-tags))
(reset! unsuccessfully-processed? true)))
lb/publish (fn [_ _ _ _ _]
(throw (Exception. "publishing failure test")))]
(is (thrown? clojure.lang.ExceptionInfo ((mapper-func (constantly nil) []) message-payload)))
(is @unsuccessfully-processed?)
(is @sentry-report-fn-called?))))

(testing "reports execution time with topic prefix"
(let [reported-execution-time? (atom false)
expected-metric-namespace "handler-fn-execution-time"
Expand Down Expand Up @@ -174,6 +193,25 @@
(is (= message-from-mq expected-message)))
(is @unsuccessfully-processed?)
(is @sentry-report-fn-called?)))))

(testing "raises exception when rabbitMQ publishing fails"
(let [expected-message (assoc message-payload :retry-count (dec (:count (:retry (ziggurat-config)))))
sentry-report-fn-called? (atom false)
unsuccessfully-processed? (atom false)
expected-metric "failure"]
(with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true))
metrics/increment-count (fn [metric-namespace metric additional-tags]
(when (and (or (= metric-namespace expected-increment-count-namespaces)
(= metric-namespace [increment-count-namespace]))
(= metric expected-metric)
(= additional-tags expected-additional-tags))
(reset! unsuccessfully-processed? true)))
lb/publish (fn [_ _ _ _ _]
(throw (Exception. "publishing failure test")))]
(is (thrown? clojure.lang.ExceptionInfo ((channel-mapper-func (fn [_] (throw (Exception. "test exception"))) channel) message-payload)))
(is @unsuccessfully-processed?)
(is @sentry-report-fn-called?))))

(testing "reports execution time with topic prefix"
(let [reported-execution-time? (atom false)
execution-time-namespace "execution-time"
Expand Down
10 changes: 7 additions & 3 deletions test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
[langohr.basic :as lb]
[ziggurat.config :as config]
[ziggurat.tracer :refer [tracer]]
[ziggurat.mapper :refer [->MessagePayload]])
[ziggurat.mapper :refer [->MessagePayload]]
[mount.core :as mount])
(:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader]))

(use-fixtures :once (join-fixtures [fix/init-rabbit-mq
Expand Down Expand Up @@ -146,7 +147,7 @@
(with-redefs [lb/publish (fn [_ _ _ _ props]
(swap! retry-count inc)
(throw (Exception. "some exception")))]
(producer/retry retry-message-payload)
(is (thrown? clojure.lang.ExceptionInfo (producer/retry retry-message-payload)))
(is (= 6 @retry-count))))))

(testing "message with no retry count will publish to delay queue"
Expand Down Expand Up @@ -507,7 +508,10 @@
(reset! publish-called? true)))]
(producer/publish-to-instant-queue retry-message-payload)
(is (true? @prefixed-queue-name-called?))
(is (true? @publish-called?))))))
(is (true? @publish-called?)))))
(testing "An exception is raised, if publishing to RabbitMQ fails even after retries"
(mount/stop #'ziggurat.messaging.connection/connection)
(is (thrown? clojure.lang.ExceptionInfo (producer/publish-to-instant-queue message-payload)))))

(deftest publish-to-delay-queue-test
(testing "creates a span when tracer is enabled"
Expand Down
Loading

0 comments on commit ad88ced

Please sign in to comment.