/
messaging.clj
67 lines (54 loc) · 3.22 KB
/
messaging.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
(ns ziggurat.messaging.messaging
(:require [ziggurat.config :refer [ziggurat-config]]
[ziggurat.messaging.rabbitmq-wrapper :as rmqw]
[ziggurat.messaging.util :as util]
[ziggurat.messaging.messaging-interface :as messaging-interface]
[clojure.tools.logging :as log]))
(def messaging-impl (atom nil))
(defn get-implementation []
(if (nil? @messaging-impl)
(throw (Exception. "Messaging Library has not been initialized, please make sure the messaging library has been initialized"))
@messaging-impl))
(defn- get-messaging-implementor-constructor []
(if-let [configured-metrics-class-constructor (get-in (ziggurat-config) [:messaging :constructor])]
(let [configured-constructor-symbol (symbol configured-metrics-class-constructor)
constructor-namespace (namespace configured-constructor-symbol)
_ (require [(symbol constructor-namespace)])
metric-constructor (resolve configured-constructor-symbol)]
(if (nil? metric-constructor)
(throw (ex-info "Incorrect messaging_interface implementation constructor configured. Please fix it." {:constructor-configured configured-constructor-symbol}))
metric-constructor))
rmqw/->RabbitMQMessaging))
(defn initialise-messaging-library []
(let [messaging-impl-constructor (get-messaging-implementor-constructor)]
(reset! messaging-impl (messaging-impl-constructor))))
(defn start-connection [config stream-routes]
(initialise-messaging-library)
(when (util/is-connection-required? (:ziggurat config) stream-routes)
(log/info "Initialized Messaging Library")
(messaging-interface/start-connection (get-implementation) config stream-routes)))
(defn stop-connection [config stream-routes]
(when-not (nil? @messaging-impl)
(log/info "Stopping Messaging Library")
(messaging-interface/stop-connection (get-implementation) config stream-routes)))
(defn create-and-bind-queue
([queue-name exchange-name]
(messaging-interface/create-and-bind-queue (get-implementation) queue-name exchange-name))
([queue-name exchange-name dead-letter-exchange]
(messaging-interface/create-and-bind-queue (get-implementation) queue-name exchange-name dead-letter-exchange)))
(defn publish
([exchange message-payload]
(messaging-interface/publish (get-implementation) exchange message-payload))
([exchange message-payload expiration]
(messaging-interface/publish (get-implementation) exchange message-payload expiration)))
(defn get-messages-from-queue
([queue-name ack?]
(messaging-interface/get-messages-from-queue (get-implementation) queue-name ack?))
([queue-name ack? count]
(messaging-interface/get-messages-from-queue (get-implementation) queue-name ack? count)))
(defn process-messages-from-queue [queue-name count processing-fn]
(messaging-interface/process-messages-from-queue (get-implementation) queue-name count processing-fn))
(defn start-subscriber [prefetch-count wrapped-mapper-fn queue-name]
(messaging-interface/start-subscriber (get-implementation) prefetch-count wrapped-mapper-fn queue-name))
(defn consume-message [ch meta payload ack?]
(messaging-interface/consume-message (get-implementation) ch meta payload ack?))