Skip to content


Subversion checkout URL

You can clone with
Download ZIP


Add monitor support #15

wants to merge 1 commit into from

2 participants



This commit adds support for the streaming 'monitor' command. A few things to note:

  1. I generalized 'open-channel' to something like 'open-stream' - since both monitoring and subscribing to channels need to open a connection that listens for an unbounded list of responses and pass them to their corresponding callback functions. This uses a multi-method 'stream-record' to build the record based on the type (:monitor or :channel).
  2. I tried renaming "RedisChannel" to "RedisStream" and implementing the monitor and pub/sub protocols, but having one record that represented both a pub/sub metaphor (where you can add and remove subscriptions) and a monitor metaphor (where you can start and stop the monitor) made interaction with the record more confusing. I chose to break these concepts apart instead.
  3. I renamed "IRedisChannel" to "IRedisClosable" since both a monitor and channel record should implement close functionality.

Feedback is much appreciated, thanks!



Some notes from our discussion:

  • Push the polymorphism up into the protocols
  • Genericise the receive/open semantics
  • Docstrings/Documentation for the marginalia docs
@abedra abedra closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 21, 2012
  1. @blakesmith
This page is out of date. Refresh to see the latest.
Showing with 85 additions and 20 deletions.
  1. +77 −20 src/accession/core.clj
  2. +8 −0 test/accession/test/core.clj
97 src/accession/core.clj
@@ -63,11 +63,17 @@
(subscribe [this channels])
#_(psubscribe [this channels]))
+(defprotocol IMonitorable
+ (monitor [this f]))
(defprotocol IUnsubscribable
(unsubscribe [this channels])
#_(punsubscribe [this channels]))
-(defprotocol IRedisChannel
+(defprotocol IUnmonitorable
+ (unmonitor [this]))
+(defprotocol IRedisClosable
(close [this]))
(defmulti response
@@ -124,7 +130,7 @@
(doall (repeatedly (count query) #(response in)))
(response in))))
-(defn receive-message
+(defn receive-publish
"Used in conjunction with an open channel to handle messages that
arrive. Takes a channel spec and a message"
[channel-spec in]
@@ -133,10 +139,21 @@
(if-let [f (clojure.core/get @channel-spec channel-name)]
(f next-message))))
-(defn write-commands
- "Sends commands out to the specified channels "
- [out command channels]
- (.write out (.getBytes (apply query command (clojure.core/keys channels)))))
+(defn receive-monitor
+ "Used in conjunction with an open monitor to handles commands that
+ arrive."
+ [channel-spec in]
+ (if-let [f (clojure.core/get @channel-spec "monitor")]
+ (f (response in))))
+(defn write-command
+ [out command]
+ (.write out (.getBytes command)))
+(defn channel-commands
+ "Builds commands to specified channels"
+ [command channels]
+ (apply query command (clojure.core/keys channels)))
;; This record implements the protocols defined above to provide the
;; pub/sub infrastructure
@@ -144,33 +161,74 @@
(subscribe [this channels]
(do (swap! channel-fns merge channels)
- (write-commands out "subscribe" channels)))
+ (write-command out (channel-commands "subscribe" channels))))
(unsubscribe [this channel]
(do (swap! channel-fns dissoc channel)
- (.write out (.getBytes (query "unsubscribe" channel)))))
- IRedisChannel
+ (write-command out (query "unsubscribe" channel))))
+ IRedisClosable
(close [this]
(.close socket)))
-(defn open-channel
- "Takes a connection and sets up a connection to a Redis channel with
- the provided functions as callbacks to invoke when messages are
- received"
- [conn command channels]
+;; This record implements the protocols defined above to enable monitor
+;; functionality
+(defrecord RedisMonitor [monitor-fn socket out]
+ IMonitorable
+ (monitor [this f]
+ (do (reset! monitor-fn {"monitor" f})
+ (write-command out (query "monitor"))))
+ IUnmonitorable
+ (unmonitor [this]
+ (do (reset! monitor-fn {})
+ (write-command out (query "quit"))))
+ IRedisClosable
+ (close [this]
+ (.close socket)))
+(defmulti stream-record :type)
+(defmethod stream-record :channel
+ [args]
+ (let [{:keys [callbacks socket output]} args]
+ (RedisChannel. callbacks socket output)))
+(defmethod stream-record :monitor
+ [args]
+ (let [{:keys [callbacks socket output]} args]
+ (RedisMonitor. callbacks socket output)))
+(defn open-stream
+ "Takes a connection and sets up a connection to a stream record
+ (either a Redis Channel or Redis Monitor) with the provided functions
+ as callbacks to invoke when messages are received"
+ [type conn command receive-fn callbacks]
(let [socket (socket conn)
in (.getInputStream socket)
out (.getOutputStream socket)
in (DataInputStream. (BufferedInputStream. in))
- channel-fns (atom channels)]
- (write-commands out command channels)
- (future (doall (repeatedly #(receive-message channel-fns in))))
- (RedisChannel. channel-fns socket out)))
+ callback-fns (atom callbacks)]
+ (write-command out command)
+ (future (doall (repeatedly #(receive-fn callback-fns in))))
+ (stream-record {:type type
+ :callbacks callback-fns
+ :socket socket
+ :output out})))
+(defn open-channel
+ [conn command channels]
+ (open-stream :channel conn command receive-publish channels))
+(defn open-monitor
+ [conn command f]
+ (open-stream :monitor conn command receive-monitor {"monitor" f}))
(extend-type clojure.lang.PersistentArrayMap
(subscribe [this channels]
- (open-channel this "subscribe" channels)))
+ (open-channel this (channel-commands "subscribe" channels) channels))
+ IMonitorable
+ (monitor [this f]
+ (open-monitor this (query "monitor") f)))
(defn connection-map
"Creates the initial connection spec. Options can be overridden by
@@ -296,7 +354,6 @@
(ping [])
(discard [])
(exec [])
- (monitor [])
(multi [])
(object [subcommand & arguments])
8 test/accession/test/core.clj
@@ -193,6 +193,14 @@
(redis/lrange "children" "0" "3")
(redis/get "favorite:child")))))
+(deftest test-monitor
+ (let [received (atom (str))
+ monitor (redis/monitor c #(reset! received %))]
+ (redis/with-connection c
+ (redis/ping))
+ (is (= "PING" (re-find #"PING" @received)))
+ (redis/close monitor)))
(deftest test-pubsub
(let [received (atom [])
channel (redis/subscribe c {"ps-foo" #(swap! received conj %)})]
Something went wrong with that request. Please try again.