-
Notifications
You must be signed in to change notification settings - Fork 64
/
producer.clj
365 lines (317 loc) · 19.6 KB
/
producer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
(ns ziggurat.messaging.producer
(:require [clojure.tools.logging :as log]
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.exchange :as le]
[langohr.http :as lh]
[langohr.queue :as lq]
[ziggurat.messaging.channel_pool :as cpool]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]]
[ziggurat.messaging.connection :refer [producer-connection is-connection-required?]]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics])
(:import (com.rabbitmq.client AlreadyClosedException Channel)
(java.io IOException)
(org.apache.commons.pool2.impl GenericObjectPool)))
(def MAX_EXPONENTIAL_RETRIES 25)
(defn delay-queue-name [topic-entity queue-name]
(util/prefixed-queue-name topic-entity queue-name))
(defn get-replica-count [host-count]
(int (Math/ceil (/ host-count 2))))
(defn get-default-ha-policy [rmq-config replica-count]
(let [ha-mode (get rmq-config :ha-mode "exactly")
ha-params (get rmq-config :ha-params replica-count)
ha-sync-mode (get rmq-config :ha-sync-mode "automatic")]
(if (= "all" ha-mode)
{:ha-mode "all" :ha-sync-mode ha-sync-mode}
{:ha-mode ha-mode :ha-sync-mode ha-sync-mode :ha-params ha-params})))
(defn set-ha-policy-on-host [host-endpoint username password ha-policy-body exchange-name queue-name]
(try
(binding [lh/*endpoint* host-endpoint
lh/*username* username
lh/*password* password]
(log/info "applying HA policies to queue: " queue-name)
(log/info "applying HA policies to exchange: " exchange-name)
(lh/set-policy "/" (str queue-name "_ha_policy")
{:apply-to "all"
:pattern (str "^" queue-name "|" exchange-name "$")
:definition ha-policy-body}))
(catch Exception e
(log/error "error setting ha-policies" (.getMessage e))
nil)))
(defn set-ha-policy [queue-name exchange-name rmq-config]
(let [username (:username rmq-config)
password (:password rmq-config)
hosts-vec (util/list-of-hosts rmq-config)
ha-policy-body (get-default-ha-policy rmq-config (get-replica-count (count hosts-vec)))]
(loop [hosts hosts-vec]
(let [host-endpoint (str "http://" (first hosts) ":" (get rmq-config :admin-port 15672))
resp (set-ha-policy-on-host host-endpoint username password ha-policy-body exchange-name queue-name)
remaining-hosts (rest hosts)]
(when (and (nil? resp) (pos? (count remaining-hosts)))
(recur remaining-hosts))))))
(defn- declare-exchange [ch exchange]
(le/declare ch exchange "fanout" {:durable true :auto-delete false})
(log/info "Declared exchange - " exchange))
(defn- create-queue [queue props ch]
(lq/declare ch queue {:durable true :arguments props :auto-delete false})
(log/info "Created queue - " queue))
(defn- bind-queue-to-exchange [ch queue exchange]
(lq/bind ch queue exchange)
(log/infof "Bound queue %s to exchange %s" queue exchange))
(defn- create-and-bind-queue
([queue-name exchange]
(create-and-bind-queue queue-name exchange nil))
([queue-name exchange-name dead-letter-exchange]
(try
(let [props (if dead-letter-exchange
{"x-dead-letter-exchange" dead-letter-exchange}
{})]
(with-open [ch (lch/open producer-connection)]
(create-queue queue-name props ch)
(declare-exchange ch exchange-name)
(bind-queue-to-exchange ch queue-name exchange-name)
(set-ha-policy queue-name exchange-name (get-in config [:ziggurat :rabbit-mq-connection])) ;TODO remove this
))
(catch Exception e
(log/error e "Error while declaring RabbitMQ queues")
(throw e)))))
(defn- record-headers->map [record-headers]
(reduce (fn [header-map record-header]
(assoc header-map (.key record-header) (String. (.value record-header))))
{}
record-headers))
(defn- properties-for-publish
[expiration headers]
(let [props {:content-type "application/octet-stream"
:persistent true
:headers (record-headers->map headers)}]
(if expiration
(assoc props :expiration (str expiration))
props)))
(defn- handle-network-exception
[e message-payload]
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))})
true)
(defn return-to-pool [^GenericObjectPool pool ^Channel ch]
(.returnObject pool ch))
(defn- publish-internal
[exchange message-payload expiration]
(try
(let [ch (.borrowObject cpool/channel-pool)]
(try
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
(properties-for-publish expiration (:headers message-payload)))
false
(catch AlreadyClosedException e
(handle-network-exception e message-payload))
(catch IOException e
(handle-network-exception e message-payload))
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))})
false)
(finally (return-to-pool cpool/channel-pool ch))))
(catch Exception e
(log/error "Exception occurred while borrowing a channel from the pool")
(metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))})
true ;TODO Evaluate whether message needs to be retried in case of borrow exceptions
)))
(defn publish
([exchange message-payload]
(publish exchange message-payload nil))
([exchange message-payload expiration]
(when (publish-internal exchange message-payload expiration)
(Thread/sleep 5000)
(log/info "Retrying publishing the message to " exchange)
(recur exchange message-payload expiration))))
(defn- retry-type []
(-> (ziggurat-config) :retry :type))
(defn- channel-retries-enabled [topic-entity channel]
(:enabled (channel-retry-config topic-entity channel)))
(defn- channel-retry-type [topic-entity channel]
(:type (channel-retry-config topic-entity channel)))
(defn- get-channel-retry-count [topic-entity channel]
(:count (channel-retry-config topic-entity channel)))
(defn- get-channel-queue-timeout-or-default-timeout [topic-entity channel]
(let [channel-queue-timeout-ms (:queue-timeout-ms (channel-retry-config topic-entity channel))
queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])]
(or channel-queue-timeout-ms queue-timeout-ms)))
(defn- get-backoff-exponent
"Calculates the exponent using the formula `retry-count` and `message-retry-count`, where `retry-count` is the total retries
possible and `message-retry-count` is the count of retries available for the message.
Caps the value of `retry-count` to MAX_EXPONENTIAL_RETRIES.
Returns 1, if `message-retry-count` is higher than `max(MAX_EXPONENTIAL_RETRIES, retry-count)`."
[retry-count message-retry-count]
(let [exponent (- (min MAX_EXPONENTIAL_RETRIES retry-count) message-retry-count)]
(max 1 exponent)))
(defn- get-exponential-backoff-timeout-ms "Calculates the exponential timeout value from the number of max retries possible (`retry-count`),
the number of retries available for a message (`message-retry-count`) and base timeout value (`queue-timeout-ms`).
It uses this formula `((2^n)-1)*queue-timeout-ms`, where `n` is the current message retry-count.
Sample config to use exponential backoff:
{:ziggurat {:retry {:enabled true
:count 5
:type :exponential}}}
Sample config to use exponential backoff when using channel flow:
{:ziggurat {:stream-router {topic-entity {:channels {channel {:retry {:count 5
:enabled true
:queue-timeout-ms 1000
:type :exponential}}}}}}}
_NOTE: Exponential backoff for channel retries is an experimental feature. It should not be used until released in a stable version._"
[retry-count message-retry-count queue-timeout-ms]
(let [exponential-backoff (get-backoff-exponent retry-count message-retry-count)]
(long (* (dec (Math/pow 2 exponential-backoff)) queue-timeout-ms))))
(defn get-queue-timeout-ms
"Calculate queue timeout for delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled."
[message-payload]
(let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
retry-count (-> (ziggurat-config) :retry :count)
message-retry-count (:retry-count message-payload)]
(if (= :exponential (-> (ziggurat-config) :retry :type))
(get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms)
queue-timeout-ms)))
(defn get-channel-queue-timeout-ms
"Calculate queue timeout for channel delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled."
[topic-entity channel message-payload]
(let [channel-queue-timeout-ms (get-channel-queue-timeout-or-default-timeout topic-entity channel)
message-retry-count (:retry-count message-payload)
channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (= :exponential (channel-retry-type topic-entity channel))
(get-exponential-backoff-timeout-ms channel-retry-count message-retry-count channel-queue-timeout-ms)
channel-queue-timeout-ms)))
(defn get-delay-exchange-name
"This function return delay exchange name for retry when using flow without channel. It will return exchange name with retry count as suffix if exponential backoff enabled."
[topic-entity message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
retry-count (-> (ziggurat-config) :retry :count)]
(if (= :exponential (-> (ziggurat-config) :retry :type))
(let [message-retry-count (:retry-count message-payload)
backoff-exponent (get-backoff-exponent retry-count message-retry-count)]
(util/prefixed-queue-name exchange-name backoff-exponent))
exchange-name)))
(defn get-channel-delay-exchange-name
"This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled."
[topic-entity channel message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)
channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (= :exponential (channel-retry-type topic-entity channel))
(let [message-retry-count (:retry-count message-payload)
exponential-backoff (get-backoff-exponent channel-retry-count message-retry-count)]
(str (name exchange-name) "_" exponential-backoff))
exchange-name)))
(defn publish-to-delay-queue [message-payload]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-delay-exchange-name topic-entity message-payload)
queue-timeout-ms (get-queue-timeout-ms message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))
(defn publish-to-dead-queue [message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-instant-queue [message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-channel-delay-queue [channel message-payload]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload)
queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))
(defn publish-to-channel-dead-queue [channel message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-channel-instant-queue [channel message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))
(defn retry [{:keys [retry-count] :as message-payload}]
(when (-> (ziggurat-config) :retry :enabled)
(cond
(nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count))))
(pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count)))
(zero? retry-count) (publish-to-dead-queue (assoc message-payload :retry-count (-> (ziggurat-config) :retry :count))))))
(defn retry-for-channel [{:keys [retry-count topic-entity] :as message-payload} channel]
(when (channel-retries-enabled topic-entity channel)
(cond
(nil? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec (get-channel-retry-count topic-entity channel))))
(pos? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec retry-count)))
(zero? retry-count) (publish-to-channel-dead-queue channel (assoc message-payload :retry-count (get-channel-retry-count topic-entity channel))))))
(defn- make-delay-queue [topic-entity]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)]
(create-and-bind-queue queue-name exchange-name dead-letter-exchange-name)))
(defn- make-delay-queue-with-retry-count [topic-entity retry-count]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)
sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))]
(doseq [s (range 1 sequence)]
(create-and-bind-queue (util/prefixed-queue-name queue-name s) (util/prefixed-queue-name exchange-name s) dead-letter-exchange-name))))
(defn- make-channel-delay-queue-with-retry-count [topic-entity channel retry-count]
(make-delay-queue-with-retry-count (util/with-channel-name topic-entity channel) retry-count))
(defn- make-channel-delay-queue [topic-entity channel]
(make-delay-queue (util/with-channel-name topic-entity channel)))
(defn- make-queue [topic-identifier queue-type]
(let [{:keys [queue-name exchange-name]} (queue-type (rabbitmq-config))
queue-name (util/prefixed-queue-name topic-identifier queue-name)
exchange-name (util/prefixed-queue-name topic-identifier exchange-name)]
(create-and-bind-queue queue-name exchange-name)))
(defn- make-channel-queue [topic-entity channel-name queue-type]
(make-queue (util/with-channel-name topic-entity channel-name) queue-type))
(defn- make-channel-queues [channels topic-entity]
(doseq [channel channels]
(make-channel-queue topic-entity channel :instant)
(when (channel-retries-enabled topic-entity channel)
(make-channel-queue topic-entity channel :dead-letter)
(let [channel-retry-type (channel-retry-type topic-entity channel)]
(cond
(= :exponential channel-retry-type) (do
(log/warn "[Alpha Feature]: Exponential backoff based retries is an alpha feature."
"Please use it only after understanding its risks and implications."
"Its contract can change in the future releases of Ziggurat.")
(make-channel-delay-queue-with-retry-count topic-entity channel (get-channel-retry-count topic-entity channel)))
(= :linear channel-retry-type) (make-channel-delay-queue topic-entity channel)
(nil? channel-retry-type) (do
(log/warn "[Deprecation Notice]: Please note that the configuration for channel retries has changed."
"Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide"
"Use :type to specify the type of retry mechanism in the channel config.")
(make-channel-delay-queue topic-entity channel))
:else (do
(log/warn "Incorrect keyword for type passed, falling back to linear backoff for channel: " channel)
(make-channel-delay-queue topic-entity channel)))))))
(defn make-queues [routes]
(when (is-connection-required?)
(doseq [topic-entity (keys routes)]
(let [channels (util/get-channel-names routes topic-entity)
retry-type (retry-type)]
(make-channel-queues channels topic-entity)
(when (-> (ziggurat-config) :retry :enabled)
(make-queue topic-entity :instant)
(make-queue topic-entity :dead-letter)
(cond
(= :exponential retry-type) (do
(log/warn "[Alpha Feature]: Exponential backoff based retries is an alpha feature."
"Please use it only after understanding its risks and implications."
"Its contract can change in the future releases of Ziggurat.")
(make-delay-queue-with-retry-count topic-entity (-> (ziggurat-config) :retry :count)))
(= :linear retry-type) (make-delay-queue topic-entity)
(nil? retry-type) (do
(log/warn "[Deprecation Notice]: Please note that the configuration for retries has changed."
"Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide"
"Use :type to specify the type of retry mechanism in the config.")
(make-delay-queue topic-entity))
:else (do
(log/warn "Incorrect keyword for type passed, falling back to linear backoff for topic Entity: " topic-entity)
(make-delay-queue topic-entity))))))))