-
Notifications
You must be signed in to change notification settings - Fork 47
/
core.clj
90 lines (65 loc) · 2.19 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
(ns ^{:author "Bruno Bonacci (@BrunoBonacci)"
:doc
"Logging library designed to log data events instead of plain words."}
com.brunobonacci.mulog.core
(:require [com.brunobonacci.mulog.buffer :as rb]
[com.brunobonacci.mulog.agents :as ag]
[com.brunobonacci.mulog.publisher :as p])
(:import [com.brunobonacci.mulog.publisher PPublisher]))
(defn dequeue!
[buffer offset]
(swap! buffer rb/dequeue offset))
(defn enqueue!
[buffer value]
(swap! buffer rb/enqueue value))
(defonce publishers
(atom #{}))
(defn register-publisher!
[buffer id publisher]
(swap! publishers conj [buffer id publisher]))
(defn- merge-pairs
[& pairs]
(into {} (mapcat (fn [v] (if (sequential? v) (map vec (partition 2 v)) v)) pairs)))
(defonce dispatch-publishers
(ag/recurring-task
200
(fn []
(try
(let [pubs @publishers
;; group-by buffer
pubs (group-by first pubs)]
(doseq [[buf dests] pubs] ;; for every buffer
(let [items (rb/items @buf)
offset (-> items last first)]
(doseq [[_ _ pub] dests] ;; and each destination
;; send to the agent-buffer
(send (p/agent-buffer pub)
(partial reduce rb/enqueue)
(->> items
(map second)
(map (partial apply merge-pairs)))))
;; remove items up to the offset
(swap! buf rb/dequeue offset))))
(catch Exception x
;; TODO:
(.printStackTrace x))))))
(defn start-publisher!
[buffer config]
(let [^PPublisher publisher (p/publisher-factory config)
period (p/publish-delay publisher)
_ (register-publisher! buffer (:type config) publisher)
stop (if (and period (> period 0))
(ag/recurring-task
(p/publish-delay publisher)
(fn []
(send-off (p/agent-buffer publisher)
(partial p/publish publisher)))))]
(fn []
;;TODO: deregister
(stop))))
(defmacro on-error
[default & body]
`(try
~@body
(catch Exception _#
~default)))