-
Notifications
You must be signed in to change notification settings - Fork 47
/
buffer.clj
138 lines (98 loc) · 3.05 KB
/
buffer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
(ns ^{:author "Bruno Bonacci (@BrunoBonacci)"
:doc "
Logging library designed to log data events instead of plain words.
This namespace contains the implementation of a ring-buffer and a
wrapper agent used to buffer the events before their are published
to the downstream systems by the publishers.
"}
com.brunobonacci.mulog.buffer
(:require [amalloy.ring-buffer :as rb])
(:import [java.util.concurrent ScheduledThreadPoolExecutor
TimeUnit ScheduledFuture Future ThreadFactory]))
(defprotocol PRingBuffer
"RingBuffer protocol"
(enqueue [this item]
"Add an item to the Ring Buffer.")
(dequeue [this offset]
"removes all the items in the ring buffer up to the and including the given offset")
(clear [this]
"removes all the items in the ring buffer")
(items [this]
"Returns a sequence of pairs [<offset> <item>]")
)
(defn- pop-while
"like drop-while but for amalloy/ring-buffer"
[pred buffer]
(if (some-> (peek buffer) pred)
(recur pred (pop buffer))
buffer))
(deftype RingBuffer [counter buffer]
;; This type uses amalloy/ring-buffer. Every item added has an
;; monotonically increasing offset in the form of pairs in a tuple
;; `[<offset> <item>]`. The offset can be used as high-water-mark
;; to dequeue processed items.
Object
(toString [this]
(pr-str buffer))
clojure.lang.Counted
(count [this]
(count buffer))
PRingBuffer
(enqueue [this item]
(let [id (inc counter)]
(RingBuffer. id (conj buffer [id item]))))
(dequeue [this offset]
(RingBuffer.
counter
(pop-while #(<= (first %) (or offset 0)) buffer)))
(clear [this]
(RingBuffer. counter (empty buffer)))
(items [this]
buffer)
)
(defn ring-buffer
"Create an empty ring buffer with the specified [capacity]."
[capacity]
{:pre [(> capacity 0)]}
(RingBuffer. 0 (rb/ring-buffer capacity)))
(defn scheduled-thread-pool
[core-pool-size]
(ScheduledThreadPoolExecutor.
^int core-pool-size
^ThreadFactory
(reify ThreadFactory
(^Thread newThread [this ^Runnable r]
(let [t (Thread. r)]
(.setName t (str "μ/log-task-" (.getId t)))
(.setDaemon t true)
t)))))
(defmacro ignore
"ignores exceptions occurring in the body"
{:no-doc true :private true}
[& body]
`(try
~@body
(catch Exception x#
x#)))
(defonce timer-pool
(scheduled-thread-pool 2))
(defn recurring-task
([delay-millis task]
(recurring-task delay-millis task nil))
([delay-millis task error-logger]
(let [^ScheduledFuture ftask
(.scheduleAtFixedRate
^ScheduledThreadPoolExecutor timer-pool
(fn []
(try
(task)
(catch Exception x
(when error-logger
(ignore ;; ignore errors during logging
(error-logger x))))))
delay-millis delay-millis TimeUnit/MILLISECONDS)]
(fn [] (.cancel ftask true)))))
(defn agent-buffer
[capacity]
(agent (ring-buffer capacity)
:error-mode :continue))