forked from abedra/accession
-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
469 lines (406 loc) · 14.6 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
; Copyright (c) Aaron Bedra. All rights reserved.
; The use and distribution terms for this software are covered by the
; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
; which can be found in the file epl-v10.html at the root of this distribution.
; By using this software in any fashion, you are agreeing to be bound by
; the terms of this license. You must not remove this notice, or any other,
; from this software.
(ns accession.core
(:refer-clojure :exclude [get set keys type sync sort])
(:require [clojure.java.io :as io]
[clojure.string :as str])
(:import (java.net Socket)
(java.io BufferedInputStream DataInputStream)))
(defn query
"The new [unified protocol][up] was introduced in Redis 1.2, but it became
the standard way for talking with the Redis server in Redis 2.0.
In the unified protocol all the arguments sent to the Redis server
are binary safe. This is the general form:
*<number of arguments> CR LF
$<number of bytes of argument 1> CR LF
<argument data> CR LF
...
$<number of bytes of argument N> CR LF
<argument data> CR LF
See the following example:
*3
$3
SET
$5
mykey
$7
myvalue
This is how the above command looks as a quoted string, so that it
is possible to see the exact value of every byte in the query:
[up]: http://redis.io/topics/protocol
"
[name & args]
(str "*"
(+ 1 (count args)) "\r\n"
"$" (count name) "\r\n"
(str/upper-case name) "\r\n"
(str/join "\r\n"
(map (fn [a] (str "$" (count (.getBytes (str a))) "\r\n" a))
args))
"\r\n"))
;; <pre>"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"</pre>
;; These protocols were created to support pub/sub, in particular,
;; subscribe. Subscribe is called in terms of both a connection-map
;; and an open channel. This is not implemented via the macro because
;; the semantics are so much different from that of the standard Redis
;; calls. Since channels and subscriptions live longer than a single
;; query, we needed a way to keep a connection open and close it later
;; when we are finished.
(defprotocol ISubscribable
(subscribe [this channels])
#_(psubscribe [this channels]))
(defprotocol IMonitorable
(monitor [this f]))
(defprotocol IUnsubscribable
(unsubscribe [this channels])
#_(punsubscribe [this channels]))
(defprotocol IUnmonitorable
(unmonitor [this]))
(defprotocol IRedisClosable
(close [this]))
(defmulti response
"Redis will reply to commands with different kinds of replies. It is
possible to check the kind of reply from the first byte sent by the
server:
* With a single line reply the first byte of the reply will be `+`
* With an error message the first byte of the reply will be `-`
* With an integer number the first byte of the reply will be `:`
* With bulk reply the first byte of the reply will be `$`
* With multi-bulk reply the first byte of the reply will be `*`"
(fn [in] (char (.readByte in))))
(defmethod response \- [in]
(.readLine in))
(defmethod response \+ [in]
(.readLine in))
(defmethod response \$ [in]
(let [length (Integer/parseInt (.readLine in))]
(when (not= length -1)
(let [content (byte-array (+ 2 length))]
(.read in content)
(String. content 0 length)))))
(defmethod response \: [in]
(Long/parseLong (.readLine in)))
(defmethod response \* [in]
(let [length (Integer/parseInt (.readLine in))]
(doall (repeatedly length #(response in)))))
(defn- socket
"Creates an initial socket for consumption attached to the proper
host and port. This socket is subject to modification later depending
on the request use case or if a timeout is set."
[spec]
(doto (Socket. (:host spec) (:port spec))
(.setTcpNoDelay true)
(.setKeepAlive true)))
(defn request
"Responsible for actually making the request to the Redis
server. Sets the timeout on the socket if one was specified."
[conn & query]
(with-open [socket (doto (socket conn)
(.setSoTimeout (:timeout conn)))
in (DataInputStream. (BufferedInputStream. (.getInputStream socket)))
out (.getOutputStream socket)]
(.write out (.getBytes (apply str query)))
(if (next query)
(doall (repeatedly (count query) #(response in)))
(response in))))
(defn receive-publish
"Used in conjunction with an open channel to handle messages that
arrive. Takes a channel spec and a message"
[channel-spec in]
(let [next-message (response in)
channel-name (second next-message)]
(if-let [f (clojure.core/get @channel-spec channel-name)]
(f next-message))))
(defn receive-monitor
"Used in conjunction with an open monitor to handles commands that
arrive."
[channel-spec in]
(if-let [f (clojure.core/get @channel-spec "monitor")]
(f (response in))))
(defn write-command
[out command]
(.write out (.getBytes command)))
(defn channel-commands
"Builds commands to specified channels"
[command channels]
(apply query command (clojure.core/keys channels)))
;; This record implements the protocols defined above to provide the
;; pub/sub infrastructure
(defrecord RedisChannel [channel-fns socket out]
ISubscribable
(subscribe [this channels]
(do (swap! channel-fns merge channels)
(write-command out (channel-commands "subscribe" channels))))
IUnsubscribable
(unsubscribe [this channel]
(do (swap! channel-fns dissoc channel)
(write-command out (query "unsubscribe" channel))))
IRedisClosable
(close [this]
(.close socket)))
;; This record implements the protocols defined above to enable monitor
;; functionality
(defrecord RedisMonitor [monitor-fn socket out]
IMonitorable
(monitor [this f]
(do (reset! monitor-fn {"monitor" f})
(write-command out (query "monitor"))))
IUnmonitorable
(unmonitor [this]
(do (reset! monitor-fn {})
(write-command out (query "quit"))))
IRedisClosable
(close [this]
(.close socket)))
(defmulti stream-record :type)
(defmethod stream-record :channel
[args]
(let [{:keys [callbacks socket output]} args]
(RedisChannel. callbacks socket output)))
(defmethod stream-record :monitor
[args]
(let [{:keys [callbacks socket output]} args]
(RedisMonitor. callbacks socket output)))
(defn open-stream
"Takes a connection and sets up a connection to a stream record
(either a Redis Channel or Redis Monitor) with the provided functions
as callbacks to invoke when messages are received"
[type conn command receive-fn callbacks]
(let [socket (socket conn)
in (.getInputStream socket)
out (.getOutputStream socket)
in (DataInputStream. (BufferedInputStream. in))
callback-fns (atom callbacks)]
(write-command out command)
(future (doall (repeatedly #(receive-fn callback-fns in))))
(stream-record {:type type
:callbacks callback-fns
:socket socket
:output out})))
(defn open-channel
[conn command channels]
(open-stream :channel conn command receive-publish channels))
(defn open-monitor
[conn command f]
(open-stream :monitor conn command receive-monitor {"monitor" f}))
(extend-type clojure.lang.PersistentArrayMap
ISubscribable
(subscribe [this channels]
(open-channel this (channel-commands "subscribe" channels) channels))
IMonitorable
(monitor [this f]
(open-monitor this (query "monitor") f)))
(defn connection-map
"Creates the initial connection spec. Options can be overridden by
passing in a map. The following keys are valid:
:host
:port
:password
:socket
:timeout
Although passing in your own socket is not recommended and will
probably cause more problems than it solves"
([] (connection-map {}))
([spec]
(let [default {:host "127.0.0.1"
:port 6379
:password nil
:socket nil
:timeout 0}]
(merge default spec))))
(defn with-connection [spec & body]
(apply request spec body))
;; We would like to create one function for each command which Redis
;; supports. The set function would look something like this:
;;
;; (defn set [key value]
;; (query "set" key value))
;;
;; Similarly, the get function would be:
;;
;; (defn get [key]
;; (query "get" key))
;;
;; Because each of these functions has the same pattern, we can use a
;; macro to create them and save a lot of typing.
(defn parameters
"This function enables vararg style definitions in the queries. For
example you can write:
(mget [key & keys])
and the defquery macro will properly expand out into a variable
argument function"
[params]
(let [[args varargs] (split-with #(not= '& %) params)]
(conj (vec args) (last varargs))))
(defmacro defquery
"Given a redis command and a parameter list, create a function of
the form:
(defn <name> <parameter-list>
(query <command> <p1> <p2> ... <pN>))
The name which is passed is a symbol and is first used as a symbol
for the function name. We convert this symbol to a string and use
that string as the command name.
params is a list of N symbols which represent the parameters to the
function. We use this list as the parameter-list when we create the
function. Each symbol in this list will be an argument to query
after the command. We use unquote splicing (~@) to insert these
arguments after the command string."
[name params]
(let [command (str name)
p (parameters params)]
`(defn ~name ~params
(apply query ~command ~@p))))
;; A call to:
;;
;; <pre><code>(defqueries (set [key value]))</code></pre>
;;
;; will expand to:
;;
;; <pre><code>(do (defn set [key value] (query "set" key value)))</pre></code>
;;
(defmacro defqueries
"Given any number of redis commands and argument lists, convert them
to function definitions.
This is an interesting use of unquote splicing. Unquote splicing
works on a sequence and that sequence can be the result of a
function call as it is here. The function which produces this
sequence maps each argument passed to this macro over a function
which takes each list like (set [key value]), binds it to q, and
uses unquote splicing again to create a call to defquery which
looks like (defquery set [key value]).
Finally, a macro can only return a single form, so we wrap all of
the produced expressions in a do special form."
[& queries]
`(do ~@(map (fn [q] `(defquery ~@q)) queries)))
(defqueries
(auth [password])
(quit [])
(select [index])
(bgwriteaof [])
(bgsave [])
(flushdb [])
(flushall [])
(dbsize [])
(info [])
(save [])
(sync [])
(lastsave [])
(shutdown [])
(slaveof [host port])
(slowlog [command argument])
;; (config-get [parameter])
;; (config-set [parameter value])
;; (config-resetstat [])
;; (debug-object [key])
;; (debug-segfault)
(echo [message])
(ping [])
(discard [])
(exec [])
(multi [])
(object [subcommand & arguments])
(set [key value])
(setex [key seconds value])
(setnx [key value])
(setbit [key offset value])
(mset [key value & pairs])
(msetnx [key value & pairs])
(setrange [key offset value])
(get [key])
(mget [key & keys])
(getbit [key offset])
(getset [key value])
(getrange [key start end])
(append [key value])
(keys [pattern])
(exists [key])
(randomkey [])
(type [key])
(move [key db])
(rename [key newkey])
(renamenx [key newkey])
(strlen [key])
(watch [key & keys])
(unwatch [])
(del [key & keys])
(sort [key & options])
(incr [key])
(incrby [key increment])
(decr [key])
(decrby [key increment])
(expire [key seconds])
(expireat [key seconds])
(persist [key])
(ttl [key])
(llen [key])
(lpop [key])
(lpush [key value])
(lpushx [key value & values])
(lset [key value index])
(linsert [key before-after pivot value])
(lrem [key count value])
(ltrim [key start stop])
(lrange [key start end])
(lindex [key index])
(rpop [key])
(rpush [key value & values])
(rpushx [key value])
(rpoplpush [source destination])
(blpop [key timeout])
(brpop [key timeout])
(brpoplpush [source destination timeout])
(hset [key field value])
(hmset [key field value & pairs])
(hsetnx [key field value])
(hget [key field])
(hmget [key field & fields])
(hexists [key field])
(hdel [key field & fields])
(hgetall [key])
(hincrby [key field increment])
(hkeys [key])
(hvals [key])
(hlen [key])
(sadd [key member & members])
(srem [key member & members])
(sismember [key member])
(smembers [key])
(sunion [key & keys])
(sunionstore [destination key & keys])
(sdiff [key & keys])
(sdiffstore [destination set1 set2])
(sinter [key & keys])
(sinterstore [destination key & keys])
(scard [key])
(smove [source desination member])
(spop [key])
(srandmember [key])
(zadd [key score member & more])
(zrange [key start end])
(zrangebyscore [key min max & more])
(zrevrange [key start stop & more])
(zrevrangebyscore [key max min & more])
(zcard [key])
(zcount [key min max])
(zincrby [key increment member])
(zrange [key start stop])
(zrank [member])
(zrem [key member & members])
(zremrangebyrank [key start stop])
(zremrangebyscore [key min max])
(zrevrank [key member])
(zscore [key member])
(publish [channel message]))
(defn zinterstore
[dest-key source-keys & options]
(apply query "zinterstore" dest-key
(count source-keys) (concat source-keys options)))
(defn zunionstore
[dest-key source-keys & options]
(apply query "zunionstore" dest-key
(count source-keys) (concat source-keys options)))