-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
119 lines (101 loc) · 3.92 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
(ns capn-hook.core
"This namespace is the main user-facing API for the library that will send
all the webhook messages to the registered listeners. If one is never
listening, we'll keep trying to send to it for a very long time."
(:require [capn-hook.durable :refer [enqueue! process!] :as chd]
[clj-time.core :refer [now]]
[clojure.tools.logging :refer [infof warnf errorf]]
[capn-hook.logging :refer [log-execution-time!]]
[overtone.at-at :as aa]))
;;
;; Create a simple, single-process registration system so that folks can get
;; started without having to create a more complex, multi-box state and
;; registration system.
;;
(defonce registrations (atom {}))
(defn register!
"Function to add the url to the set of all targets for the supplied webhook.
This will be a set of urls, so there is no chance of duplication within the
registration."
[wh url]
(if (and (keyword? wh) (string? url))
(if (wh @registrations)
(swap! registrations update wh conj url)
(swap! registrations assoc wh #{url}))))
(defn deregister!
"Function to remove a url from *all* webhook registrations. This is a simple
way to remove a url without having to worry where it might have been
registered."
[url]
(when (string? url)
(doseq [k (keys @registrations)]
(swap! registrations update k disj url))
@registrations))
(defn targets
"Function to return a sequence of targets for a supplied 'webhook' name. The
system is capable of having any number of 'nameed' webhooks - imagine 'create',
'read', 'update', 'delete' for a standard editable service. The return value
is a sequence of urls to POST to:
[\"http://foo.com/hit/me\", \"http://bar.com/punch\"]
."
[hook]
(or (if (keyword? hook) (hook @registrations)) []))
;;
;; Functions to send the callbacks to the registered clients...
;;
(defn fire!
"Function to take a representation of a sequence of urls, and a message
to send to all registered targets of that particular webhook. The first
arg can be a function that will be expected to return a sequence of urls,
or it can be a sequence of urls, or a single url, or a keyword, in which
case, we'll look up the registration from the internal list. This will
actually just enqueue the complete message onto the durable queue of the
correct name, and then let the processing of that queue handle the rest."
[wh msg]
(let [tgts (cond
(keyword? wh) (targets wh)
(fn? wh) (wh)
(coll? wh) wh
(string? wh) [wh]
:else [])
base {:created (now)
:msg msg}]
(doseq [u tgts
:when (string? u)]
(enqueue! (assoc base :url u)))))
(log-execution-time! fire! {:level :debug})
(defn flush!
"Function to flush the queue of *all* pending callbacks so that anything
that was in the queue is now lost. This is a permanent operation!"
[]
(chd/flush!))
(log-execution-time! flush! {:level :debug})
;;
;; Let's now make a way to start a worker process that will send out the
;; callbacks to the registered listeners until the user want to shut it all
;; down.
;;
(defonce pool
(delay
(aa/mk-pool)))
(defonce worker (atom nil))
(def chill
"This is the interval time between restarts on the sending of the callbacks
to the registered listeners - in msec."
1000)
(defn start!
"Function to start a worker thread for sending the callbacks to the
registered lsteners, and to continue doing so until the process is
stopped or the `stop!` function is called."
[]
(if @worker
(errorf "There is already a callback worker sending messages!")
(if-let [wt (aa/interspaced chill (bound-fn [] (process!)) @pool)]
(reset! worker wt))))
(defn stop!
"Function to stop sending the callbacks to the recipients. This just
shuts down the sending thread and that's it."
[]
(when @worker
(aa/stop @worker)
(reset! worker nil)))