/
tap.clj
104 lines (82 loc) · 2.67 KB
/
tap.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
(ns ken.tap
"Observability instrumentation using a global 'tap' which can have events
reported to it from any location in the code. This draws inspiration from
Clojure's `tap>`, but is more directly targeted at instrumentation."
(:refer-clojure :exclude [send])
(:import
(java.util.concurrent
ArrayBlockingQueue
BlockingQueue
TimeUnit)))
;; ## Subscriptions
(def ^:private subscriptions
"An atom containing the map of subscribed functions."
(atom {}))
(defn subscribe!
"Subscribe a function to be called with all events sent to the tap. Returns
the key `k`.
The key uniquely identifies the function and will replace any existing
subscription; it may also be used to remove it later with `unsubscribe!`."
[k f]
{:pre [(keyword? k) (fn? f)]}
(swap! subscriptions assoc k f)
k)
(defn unsubscribe!
"Remove the identified function from the tap subscriptions."
[k]
(swap! subscriptions dissoc k)
nil)
(defn clear!
"Remove all subscriptions from the tap."
[]
(swap! subscriptions empty)
nil)
;; ## Event Publishing
(def ^:private ^BlockingQueue event-queue
"A queue of events which have been sent but not published yet."
(ArrayBlockingQueue. 1024))
(def ^:private publisher
"Background thread which takes events from the queue and publishes them to
subscribed functions. The thread is only started once the first event has
been sent."
(delay
(doto (Thread.
(fn send-loop
[]
(let [event (.take event-queue)]
(run!
(fn publish
[[_k f]]
(try
(f event)
(catch Throwable _
;; Swallow unhandled error in subscriber
;; TODO: how to notify the user?
nil)))
@subscriptions)
(recur)))
"ken.tap/publisher")
(.setDaemon true)
(.start))))
(defn send
"Send the given event to be published to any subscribed tap functions.
Returns true if the event was accepted or false if it was dropped.
By default this returns immediately. If a timeout is given, this will block
until the event is accepted or the timeout expires."
([event]
(send event nil))
([event timeout-ms]
(when event
(force publisher)
(if timeout-ms
(.offer event-queue event timeout-ms TimeUnit/MILLISECONDS)
(.offer event-queue event)))))
(defn queue-size
"Return the number of events currently in the queue."
[]
(.size event-queue))
(defn drain!
"Drain the current event-queue to empty it. Mostly useful for testing."
[]
(.clear event-queue)
nil)