-
Notifications
You must be signed in to change notification settings - Fork 64
/
connection.clj
55 lines (49 loc) · 2.2 KB
/
connection.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
(ns ziggurat.messaging.connection
(:require [clojure.tools.logging :as log]
[langohr.core :as rmq]
[mount.core :as mount :refer [defstate start]]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.channel :refer [get-keys-for-topic]])
(:import [com.rabbitmq.client ShutdownListener]
[java.util.concurrent Executors]))
(defn is-connection-required? []
(let [stream-routes (:stream-routes (mount/args))
all-channels (reduce (fn [all-channel-vec [topic-entity _]]
(concat all-channel-vec (get-keys-for-topic stream-routes topic-entity)))
[]
stream-routes)]
(or (pos? (count all-channels))
(-> (ziggurat-config) :retry :enabled))))
(defn- channel-threads
[channels]
(reduce (fn [sum [_ channel-config]]
(+ sum (:worker-count channel-config))) 0 channels))
(defn- total-thread-count
[]
(let [stream-routes (:stream-router (ziggurat-config))
worker-count (get-in (ziggurat-config) [:jobs :instant :worker-count])]
(reduce (fn [sum [_ route-config]]
(+ sum (channel-threads (:channels route-config)) worker-count)) 0 stream-routes)))
(defn- start-connection []
(log/info "Connecting to RabbitMQ")
(when (is-connection-required?)
(try
(let [connection (rmq/connect (assoc (:rabbit-mq-connection (ziggurat-config)) :executor (Executors/newFixedThreadPool (total-thread-count))))]
(doto connection
(.addShutdownListener
(reify ShutdownListener
(shutdownCompleted [_ cause]
(when-not (.isInitiatedByApplication cause)
(log/error cause "RabbitMQ connection shut down due to error")))))))
(catch Exception e
(sentry/report-error sentry-reporter e "Error while starting RabbitMQ connection")
(throw e)))))
(defn- stop-connection [conn]
(when (is-connection-required?)
(rmq/close conn)
(log/info "Disconnected from RabbitMQ")))
(defstate connection
:start (start-connection)
:stop (stop-connection connection))