Skip to content

Commit

Permalink
Merge 87c4515 into f47c522
Browse files Browse the repository at this point in the history
  • Loading branch information
theanirudhvyas committed Dec 2, 2019
2 parents f47c522 + 87c4515 commit f446f9f
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 291 deletions.
22 changes: 1 addition & 21 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,7 @@
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}
:channel-delay {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}
:channel-delay-ms [1000 :int]}
:linear-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [2000 :int]}}
:exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [1000 :int]
:exponential-backoff-enabled [true :bool]}}
:channel-exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}
:channel-no-retry-count {:worker-count [10 :int]
:retry {:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}}
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
Expand Down
22 changes: 1 addition & 21 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,7 @@
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}
:channel-delay {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}
:channel-delay-ms [1000 :int]}
:linear-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [2000 :int]}}
:exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [1000 :int]
:exponential-backoff-enabled [true :bool]}}
:channel-exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}
:channel-no-retry-count {:worker-count [10 :int]
:retry {:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}}
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
Expand Down
14 changes: 8 additions & 6 deletions src/ziggurat/kafka_delay.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
[ziggurat.util.time :refer :all]))

(defn calculate-and-report-kafka-delay
([metric-namespace record-timestamp]
(calculate-and-report-kafka-delay metric-namespace record-timestamp nil))
([metric-namespace record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)]
(metrics/report-histogram metric-namespace delay additional-tags))))
([metric-namespaces record-timestamp]
(calculate-and-report-kafka-delay metric-namespaces record-timestamp nil))
([metric-namespaces record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)
default-namespace (last metric-namespaces)
multi-namespaces [metric-namespaces [default-namespace]]]
(metrics/multi-ns-report-histogram multi-namespaces delay additional-tags))))
96 changes: 50 additions & 46 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,77 +12,81 @@
(defn- send-msg-to-channel [channels message-payload return-code]
(when-not (contains? (set channels) return-code)
(throw (ex-info "Invalid mapper return code" {:code return-code})))
(let [topic-entity (:topic-entity message-payload)
channel-delay-ms (get-in (ziggurat-config) [:stream-router topic-entity :channels return-code :channel-delay-ms])]
(if channel-delay-ms
(producer/publish-to-channel-custom-delay-queue return-code message-payload channel-delay-ms)
(producer/publish-to-channel-instant-queue return-code message-payload))))
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
(let [topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
default-namespace "message-processing"
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
message-processing-namespace "message-processing"
message-processing-namespaces [service-name topic-entity-name message-processing-namespace]
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"]
(metrics/report-histogram execution-time-namespace time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [[service-name topic-entity-name execution-time-namespace]
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
(case return-code
:success (metrics/increment-count default-namespace success-metric additional-tags)
:retry (do (metrics/increment-count default-namespace retry-metric additional-tags)
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry message-payload))
:skip (metrics/increment-count default-namespace skip-metric additional-tags)
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(do
(send-msg-to-channel channels message-payload return-code)
(metrics/increment-count default-namespace success-metric additional-tags))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload)
(sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [{:keys [topic-entity message] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
default-namespace "message-processing"
base-namespaces [service-name topic-entity-name channel-name]
metric-namespaces (conj base-namespaces default-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." metric-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
message-processing-namespace "message-processing"
base-namespaces [service-name topic-entity-name channel-name]
message-processing-namespaces (conj base-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." message-processing-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"]
(metrics/report-histogram execution-time-namespace time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/increment-count default-namespace success-metric additional-tags)
:retry (do (metrics/increment-count default-namespace retry-metric additional-tags)
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:skip (metrics/increment-count default-namespace skip-metric additional-tags)
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defrecord MessagePayload [message topic-entity])

Expand Down
33 changes: 0 additions & 33 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -96,41 +96,8 @@
(defn- get-channel-retry-count [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count))

(defn- get-channel-retry-queue-timeout-ms [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :queue-timeout-ms))

(defn- channel-retries-exponential-backoff-enabled [topic-entity channel]
"Get exponential backoff enabled for specific channel from config."
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :exponential-backoff-enabled))

(defn- get-exponential-backoff [retry-count message-retry-count queue-timeout-ms]
"Get queue timeout value when exponential backoff is enabled."
(int (* (dec (Math/pow 2 (- retry-count message-retry-count))) queue-timeout-ms)))

(defn get-queue-timeout-ms [topic-entity channel message-payload]
"Get queue timeout from config. It will use channel `queue-timeout-ms` if defined, otherwise it will use rabbitmq delay `queue-timeout-ms`.
If `exponential-backoff-enabled` is true, `queue-timeout-ms` will be exponential with formula `(2^n)-1`, where `n` is message retry-count.
_NOTE: Exponential backoff for channel retries is an experimental feature. It should not be used until released in a stable version._"
(let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
channel-queue-timeout-ms (get-channel-retry-queue-timeout-ms topic-entity channel)
exponential-backoff-enabled (channel-retries-exponential-backoff-enabled topic-entity channel)]
(if exponential-backoff-enabled
(let [retry-count (-> (ziggurat-config) :retry :count)
channel-retry-count (get-channel-retry-count topic-entity channel)
message-retry-count (:retry-count message-payload)]
(get-exponential-backoff (or channel-retry-count retry-count) message-retry-count (or channel-queue-timeout-ms queue-timeout-ms)))
(or channel-queue-timeout-ms queue-timeout-ms))))

(defn publish-to-channel-delay-queue [channel message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)
queue-timeout-ms (get-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-custom-delay-queue [channel message-payload queue-timeout-ms]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload queue-timeout-ms)))
Expand Down
Loading

0 comments on commit f446f9f

Please sign in to comment.