-
Notifications
You must be signed in to change notification settings - Fork 10
/
message.clj
84 lines (66 loc) · 2.95 KB
/
message.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
(ns flower.messaging.exchange.message
(:require [clojure.core.async :as async]
[flower.macros :as macros]
[flower.messaging.proto :as proto]
[flower.messaging.exchange.common :as common]))
;;
;; Private declarations
;;
(declare private-search-exchange-messages-inner)
;;
;; Public definitions
;;
(defrecord ExchangeMessage [msg-box msg-source msg-recipients msg-title msg-body]
proto/MessageProto
(get-message-box [message] msg-box)
(get-source [message] msg-source)
(get-recipients [message] msg-recipients)
(get-title [message] msg-title)
(get-body [message] msg-body)
(send-message! [message] (common/send-exchange-message-inner! message)))
(macros/public-definition search-exchange-messages cached)
(macros/public-definition subscribe)
;;
;; Private definitions
;;
(defn- private-message-from-inner [message-box load-body message-inner]
(map->ExchangeMessage {:msg-box message-box
:msg-source (.toString (.getFrom message-inner))
:msg-recipients (map (fn [item] (.toString item))
(.getItems (.getToRecipients message-inner)))
:msg-title (.getConversationTopic message-inner)
:msg-body (if load-body
(.toString (.getBody message-inner))
nil)}))
(defn- private-search-exchange-messages-before-map [message-box params]
(let [{load-body :load-body} params]
(map (partial private-message-from-inner message-box load-body)
(private-search-exchange-messages-inner message-box params))))
(defn- private-search-exchange-messages [message-box params]
(map (get-in (proto/get-message-box-component message-box)
[:context :messages-map-function]
(fn [message] message))
(private-search-exchange-messages-before-map message-box params)))
(defn- private-subscribe [message-box params]
(let [{load-body :load-body
msg-root :msg-root} params
conn-inner (common/get-message-box-conn-inner message-box)
channel (async/chan)
channel-inner (common/subscribe-inner conn-inner params channel)]
(async/go-loop []
(if (or (.closed? channel-inner)
(.closed? channel))
(do (async/close! channel-inner)
(async/close! channel))
(let [message-inner (async/<! channel-inner)
message (when message-inner
(private-message-from-inner message-box
load-body
message-inner))]
(when message
(async/>! channel message)
(recur)))))
channel))
(defn- private-search-exchange-messages-inner [message-box params]
(let [conn-inner (common/get-message-box-conn-inner message-box)]
(common/search-exchange-messages-inner conn-inner params)))