-
Notifications
You must be signed in to change notification settings - Fork 5
/
events.clj
158 lines (113 loc) · 5.79 KB
/
events.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
(ns bacure.events
(:require [bacure.coerce :as c]
[bacure.state :as state]
[bacure.util :as util])
(:import com.serotonin.bacnet4j.event.DeviceEventAdapter))
(def default-cov-process-id 1)
;; Public methods for accessing cache
(defn cached-remote-devices
([] (cached-remote-devices nil))
([local-device-id]
(state/get-in-local-device local-device-id [:remote-devices])))
(defn cached-remote-objects
([] (cached-remote-objects nil))
([local-device-id]
(state/get-in-local-device local-device-id [:remote-objects])))
(defn all-cached-cov-events
([] (all-cached-cov-events nil))
([local-device-id]
(state/get-in-local-device local-device-id [:cov-events])))
(defn cached-cov-events
([] (cached-cov-events nil default-cov-process-id))
([process-identifier] (cached-cov-events nil process-identifier))
([local-device-id process-identifier]
(state/get-in-local-device local-device-id [:cov-events process-identifier])))
(defn cached-cov-events-by-object
([] (cached-cov-events-by-object nil default-cov-process-id))
([process-identifier] (cached-cov-events-by-object nil process-identifier))
([local-device-id process-identifier]
(reduce (fn [events-by-object {:keys [monitored-object-identifier]
:as event}]
(update events-by-object monitored-object-identifier
#(-> (into [] %1)
(conj %2)) event))
{}
(cached-cov-events local-device-id process-identifier))))
(defn cached-cov-events-for-object
([object-identifier]
(cached-cov-events-for-object nil default-cov-process-id object-identifier))
([process-identifier object-identifier]
(cached-cov-events-for-object nil process-identifier object-identifier))
([local-device-id process-identifier object-identifier]
(-> (cached-cov-events-by-object local-device-id process-identifier)
(get object-identifier))))
;; Public methods for manipulating cache
(defn clear-cached-remote-devices!
([] (clear-cached-remote-devices! nil))
([local-device-id]
(state/assoc-in-local-device! local-device-id [:remote-devices] {})))
(defn clear-cached-remote-objects!
([] (clear-cached-remote-objects! nil))
([local-device-id]
(state/assoc-in-local-device! local-device-id [:remote-objects] {})))
(defn clear-all-cached-cov-events!
([] (clear-all-cached-cov-events! nil))
([local-device-id]
(state/assoc-in-local-device! local-device-id [:cov-events] {})))
(defn clear-cached-cov-events!
([] (clear-cached-cov-events! nil default-cov-process-id))
([process-identifier] (clear-cached-cov-events! nil process-identifier))
([local-device-id process-identifier]
(state/assoc-in-local-device! local-device-id [:cov-events process-identifier] [])))
;; Event handlers
(defn- add-remote-device-to-cache!
[local-device-id remote-device]
(let [remote-device-id (.getInstanceNumber remote-device)
remote-devices (-> (cached-remote-devices local-device-id)
(assoc remote-device-id remote-device))]
(state/assoc-in-local-device! local-device-id [:remote-devices] remote-devices)))
(defn- add-remote-object-to-cache!
[local-device-id remote-device remote-object]
(let [remote-device-id (.getInstanceNumber remote-device)
remote-object (c/bacnet->clojure remote-object)
object-id (:object-identifier remote-object)]
(state/assoc-in-local-device! local-device-id
[:remote-objects remote-device-id object-id]
remote-object)))
(defn- add-cov-event-to-cache!
[local-device-id subscriber-process-identifier initiating-device-identifier
monitored-object-identifier time-remaining list-of-values]
(let [subscriber-process-identifier (c/bacnet->clojure subscriber-process-identifier)
initiating-device-identifier (c/bacnet->clojure initiating-device-identifier)
monitored-object-identifier (c/bacnet->clojure monitored-object-identifier)
time-remaining (c/bacnet->clojure time-remaining)
values (->> (c/bacnet->clojure list-of-values)
(into {}))
events-cache-ks [:cov-events subscriber-process-identifier]
event (util/mapify initiating-device-identifier monitored-object-identifier
time-remaining values)
events (state/get-in-local-device local-device-id events-cache-ks [])]
(state/assoc-in-local-device! local-device-id events-cache-ks (conj events event))))
(defn unconfirmed-event-listener
"Listens to IAm, IHave, and COV events and updates a cache with the results"
[local-device-id]
(proxy [DeviceEventAdapter] []
(iAmReceived [remote-device]
(add-remote-device-to-cache! local-device-id remote-device))
(iHaveReceived [remote-device remote-object]
(add-remote-object-to-cache! local-device-id remote-device remote-object))
;; This was useful for debugging, but it's not needed for any features so far
;;(requestReceived [from service]
;; (apply prn [(c/bacnet->clojure from) (c/bacnet->clojure service)]))
(covNotificationReceived [subscriber-process-identifier initiating-device-identifier
monitored-object-identifier time-remaining list-of-values]
(add-cov-event-to-cache! local-device-id subscriber-process-identifier
initiating-device-identifier monitored-object-identifier
time-remaining list-of-values))))
; ;; Local device setup
; (defn add-listener!
; [local-device-object local-device-id]
; (let [listener (unconfirmed-event-listener local-device-id)]
; (-> local-device-object
; (.getEventHandler)
; (.addListener listener))))