/
flow.clj
107 lines (99 loc) · 3.37 KB
/
flow.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
(ns aleph.flow
(:require
[potemkin :as p]
[manifold
[deferred :as d]
[executor :as ex]])
(:import
[io.aleph.dirigiste
Pool
IPool
IPool$AcquireCallback
IPool$Controller
IPool$Generator
Executors
Executor
Executor$Controller
Pools
Stats
Stats$Metric]
[java.util
EnumSet]
[java.util.concurrent
SynchronousQueue
ArrayBlockingQueue
ThreadFactory
TimeUnit]))
(defn instrumented-pool
"Returns a [Dirigiste](https://github.com/ztellman/dirigiste) object pool, which can be interacted
with via `acquire`, `release`, and `dispose`.
|:---|:----
| `generate` | a single-arg funcion which takes a key, and returns an object which should be non-equal to any other generated object |
| `destroy` | an optional two-arg function which takes a key and object, and releases any associated resources |
| `stats-callback` | a function which will be invoked every `control-period` with a map of keys onto associated statistics |
| `max-queue-size` | the maximum number of pending acquires per key that are allowed before `acquire` will start to throw a `java.util.concurrent.RejectedExecutionException`.
| `sample-period` | the interval, in milliseconds, between sampling the state of the pool for resizing and gathering statistics, defaults to `10`.
| `control-period` | the interval, in milliseconds, between use of the controller to adjust the size of the pool, defaults to `10000`.
| `controller` | a Dirigiste controller that is used to gide the pool's size."
[{:keys
[generate
destroy
stats-callback
max-queue-size
sample-period
control-period
controller]
:or {sample-period 10
control-period 10000
max-queue-size 65536}}]
(let [^IPool$Controller c controller]
(assert controller "must specify :controller")
(assert generate "must specify :generate")
(Pool.
(reify IPool$Generator
(generate [_ k]
(generate k))
(destroy [_ k v]
(when destroy
(destroy k v))))
(reify IPool$Controller
(shouldIncrement [_ key objects-per-key total-objects]
(.shouldIncrement c key objects-per-key total-objects))
(adjustment [_ key->stats]
(when stats-callback
(stats-callback
(zipmap
(map str (keys key->stats))
(map ex/stats->map (vals key->stats)))))
(.adjustment c key->stats)))
max-queue-size
sample-period
control-period
TimeUnit/MILLISECONDS)))
(defn acquire
"Acquires an object from the pool for key `k`, returning a deferred containing the object. May
throw a `java.util.concurrent.RejectedExecutionException` if there are too many pending acquires."
[^IPool p k]
(let [d (d/deferred nil)]
(try
(.acquire p k
(reify IPool$AcquireCallback
(handleObject [_ obj]
(when-not (d/success! d obj)
(.release p k obj)))))
(catch Throwable e
(d/error! d e)))
d))
(defn release
"Releases an object for key `k` back to the pool."
[^IPool p k obj]
(.release p k obj))
(defn dispose
"Disposes of a pooled object which is no longer valid."
[^IPool p k obj]
(.dispose p k obj))
(p/import-vars
[manifold.executor
instrumented-executor
utilization-executor
fixed-thread-executor])