Skip to content

Commit

Permalink
application rabbitmq autorecovery from connection failure (#42)
Browse files Browse the repository at this point in the history
* dont close the channel on shutdown listener. it is already closed when connection is broken. this prevents topology recovery

* fix linting errors

* catch message production exception in rabbitmq publisher
  • Loading branch information
shubham7saxena authored and kartik7153 committed May 2, 2019
1 parent d2f0efd commit 5a74210
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 12 deletions.
6 changes: 3 additions & 3 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
queue-name
(message-handler wrapped-mapper-fn)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/info "Closing channel with consumer tag - " consumer_tag)
(close ch))})]
(log/infof "starting consumer for %s with consumer tag %s " queue-name consumer-tag)))
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})]))

(defn start-retry-subscriber* [mapper-fn topic-entity channels]
(when (get-in-config [:retry :enabled])
Expand Down
17 changes: 10 additions & 7 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
(let [props (if dead-letter-exchange
{"x-dead-letter-exchange" dead-letter-exchange}
{})]
(with-open [ch (lch/open connection)]
(let [ch (lch/open connection)]
(create-queue queue-name props ch)
(declare-exchange ch exchange-name)
(bind-queue-to-exchange ch queue-name exchange-name)))
Expand All @@ -55,12 +55,15 @@
([exchange message]
(publish exchange message nil))
([exchange message expiration]
(with-retry {:count 3
:wait 50
:on-failure #(sentry/report-error sentry-reporter %
"Pushing message to rabbitmq failed")}
(with-open [ch (lch/open connection)]
(lb/publish ch exchange "" (nippy/freeze message) (properties-for-publish expiration))))))
(try
(with-retry {:count 5
:wait 100
:on-failure #(log/error "publishing message to rabbitmq failed with error " (.getMessage %))}
(with-open [ch (lch/open connection)]
(lb/publish ch exchange "" (nippy/freeze message) (properties-for-publish expiration))))
(catch Throwable e
(sentry/report-error sentry-reporter e
"Pushing message to rabbitmq failed, data: " message)))))

(defn publish-to-delay-queue [topic-entity message]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
Expand Down
2 changes: 1 addition & 1 deletion src/ziggurat/timestamp_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
(deftype IngestionTimeExtractor [] TimestampExtractor
(extract [_ record _]
(let [ingestion-time (get-timestamp-from-record record)]
(if (< ingestion-time 0)
(if (neg? ingestion-time)
(get-current-time-in-millis)
ingestion-time))))

Expand Down
15 changes: 14 additions & 1 deletion test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,20 @@
topic-entity :default]
(producer/retry message topic-entity)
(let [message-from-mq (rmq/get-msg-from-dead-queue "default")]
(is (= expected-message message-from-mq))))))
(is (= expected-message message-from-mq)))))

(testing "it will retry publishing message six times when unable to publish to rabbitmq"
(fix/with-queues
{:default {:handler-fn #(constantly nil)}}
(let [retry-count (atom 0)
message {:foo "bar" :retry-count 5}
expected-message {:foo "bar" :retry-count 4}
topic-entity :default]
(with-redefs [lb/publish (fn [_ _ _ _ props]
(swap! retry-count inc)
(throw (Exception. "some exception")))]
(producer/retry message topic-entity)
(is (= 6 @retry-count)))))))

(testing "message with no retry count will publish to delay queue"
(fix/with-queues
Expand Down

0 comments on commit 5a74210

Please sign in to comment.