@@ -52,6 +52,13 @@
;; <pre>"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"</pre>
+;; These protocols were created to support pub/sub, in particular,
+;; subscribe. Subscribe is called in terms of both a connection-map
+;; and an open channel. This is not implemented via the macro because
+;; the semantics are so much different from that of the standard Redis
+;; calls. Since channels and subscriptions live longer than a single
+;; query, we needed a way to keep a connection open and close it later
+;; when we are finished.
(defprotocol ISubscribable
(subscribe [this channels])
#_(psubscribe [this channels]))
@@ -95,12 +102,18 @@
(let [length (Integer/parseInt (.readLine in))]
(doall (repeatedly length #(response in)))))
-(defn- socket [spec]
+(defn- socket
+ "Creates an initial socket for consumption attached to the proper
+ host and port. This socket is subject to modification later depending
+ on the request use case or if a timeout is set."
+ [spec]
(doto (Socket. (:host spec) (:port spec))
(.setTcpNoDelay true)
(.setKeepAlive true)))
(defn request
+ "Responsible for actually making the request to the Redis
+ server. Sets the timeout on the socket if one was specified."
[conn & query]
(with-open [socket (doto (socket conn)
(.setSoTimeout (:timeout conn)))
@@ -111,15 +124,22 @@
(doall (repeatedly (count query) #(response in)))
(response in))))
-(defn receive-message [channel-spec in]
+(defn receive-message
+ "Used in conjunction with an open channel to handle messages that
+ arrive. Takes a channel spec and a message"
+ [channel-spec in]
(let [next-message (response in)
channel-name (second next-message)]
(if-let [f (clojure.core/get @channel-spec channel-name)]
(f next-message))))
-(defn write-commands [out command channels]
+(defn write-commands
+ "Sends commands out to the specified channels "
+ [out command channels]
(.write out (.getBytes (apply query command (clojure.core/keys channels)))))
+;; This record implements the protocols defined above to provide the
+;; pub/sub infrastructure
(defrecord RedisChannel [channel-fns socket out]
(subscribe [this channels]
@@ -134,6 +154,9 @@
(.close socket)))
(defn open-channel
+ "Takes a connection and sets up a connection to a Redis channel with
+ the provided functions as callbacks to invoke when messages are
+ received"
[conn command channels]
(let [socket (socket conn)
in (.getInputStream socket)
@@ -150,6 +173,17 @@
(open-channel this "subscribe" channels)))
(defn connection-map
+ "Creates the initial connection spec. Options can be overridden by
+ passing in a map. The following keys are valid:
+ :host
+ :port
+ :password
+ :socket
+ :timeout
+ Although passing in your own socket is not recommended and will
+ probably cause more problems than it solves"
([] (connection-map {}))
(let [default {:host ""
@@ -366,5 +400,4 @@
(zscore [key member])
(zunionstore [destination numkeys set1 set2])
- (publish [channel message])
- )
+ (publish [channel message]))
