Permalink
Browse files

pull in code from racer

  • Loading branch information...
1 parent 85206ea commit bd4915b9b734cb708314318269c081f4f0073a1b @nateps nateps committed Feb 28, 2012
Showing with 155 additions and 5 deletions.
  1. +2 −2 Makefile
  2. +4 −3 package.json
  3. +149 −0 src/index.coffee
View
@@ -1,8 +1,8 @@
compile:
- ./node_modules/coffee-script/bin/coffee -bw -o ./lib -c ./src
+ ./node_modules/racer/node_modules/coffee-script/bin/coffee -bw -o ./lib -c ./src
MOCHA_TESTS := $(shell find test/ -name '*.mocha.coffee')
-MOCHA := $(shell which mocha)
+MOCHA := ./node_modules/racer/node_modules/mocha/bin/mocha
OUT_FILE = "test-output.tmp"
g = "."
View
@@ -3,10 +3,11 @@
"description": "Redis PubSub plugin for Racer",
"version": "0.0.0",
"main": "./lib/index.js",
+ "dependencies": {
+ "redis": ">=0.5.11"
+ },
"devDependencies": {
- "mocha": ">=0.9.0",
- "expect.js": ">=0.1.2",
- "coffee-script": ">=1.1.2"
+ "racer": ">=0.1.8"
},
"engines": {
"node": ">=0.4.0"
View
@@ -0,0 +1,149 @@
+redis = require 'redis'
+transaction = null
+pathRegExp = null
+hasKeys = null
+
+module.exports = (racer) ->
+ {transaction} = racer
+ {regExp: pathRegExp} = racer.path
+ {hasKeys} = racer.util
+ racer.adapters.pubSub.Redis = PubSubRedis
+
+PubSubRedis = (options = {}) ->
+ self = this
+ {port, host, db, password} = options
+ namespace = (db || 0) + '.'
+ @_prefixWithNamespace = (path) -> namespace + path
+
+ @_pubClient = pubClient = redis.createClient port, host, options
+ @_subClient = subClient = redis.createClient port, host, options
+
+ if password
+ throwOnErr = (err) -> throw err if err
+ pubClient.auth password, throwOnErr
+ subClient.auth password, throwOnErr
+
+ @_subs = subs = {}
+ @_subscriberSubs = {}
+
+ if options.debug
+ for event in ['subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe']
+ do (event) ->
+ subClient.on event, (path, count) ->
+ console.log "#{event.toUpperCase()} #{path} COUNT = #{count}"
+ subClient.on 'message', (channel, message) ->
+ console.log "MESSAGE #{channel} #{message}"
+ subClient.on 'pmessage', (pattern, channel, message) ->
+ console.log "PMESSAGE #{pattern} #{channel} #{message}"
+ @__publish = PubSubRedis::publish
+ @publish = (path, message) ->
+ console.log "PUBLISH #{@_prefixWithNamespace path} #{JSON.stringify message}"
+ @__publish path, message
+
+ subClient.on 'pmessage', (pattern, path, message) ->
+ # The pattern returned will have an extra * on the end
+ if pathSubs = subs[pattern.substr(0, pattern.length - 1)]
+ message = JSON.parse message
+ for subscriberId, re of pathSubs
+ self.onMessage subscriberId, message if re.test path
+
+ subClient.on 'message', (path, message) ->
+ if pathSubs = subs[path]
+ message = JSON.parse message
+ for subscriberId of pathSubs
+ self.onMessage subscriberId, message
+
+ # Redis doesn't support callbacks on subscribe or unsubscribe methods, so
+ # we call the callback after subscribe/unsubscribe events are published on
+ # each of the paths for a given call of subscribe/unsubscribe.
+ makeCallback = (queue, event) ->
+ subClient.on event, (path) ->
+ if pending = queue[path]
+ if callback = pending.shift()
+ callback() unless --callback.__count
+ makeCallback @_pendingPsubscribe = {}, 'psubscribe'
+ makeCallback @_pendingPunsubscribe = {}, 'punsubscribe'
+ makeCallback @_pendingSubscribe = {}, 'subscribe'
+ makeCallback @_pendingUnsubscribe = {}, 'unsubscribe'
+
+ return
+
+PubSubRedis:: =
+
+ onMessage: ->
+
+ disconnect: ->
+ @_pubClient.end()
+ @_subClient.end()
+
+ subscribe: (subscriberId, paths, callback, method = 'psubscribe') ->
+ return if subscriberId is undefined
+
+ subs = @_subs
+ subscriberSubs = @_subscriberSubs
+ toAdd = []
+ for path in paths
+ path = @_prefixWithNamespace path
+ unless pathSubs = subs[path]
+ subs[path] = pathSubs = {}
+ toAdd.push path
+ re = pathRegExp path
+ pathSubs[subscriberId] = re
+ ss = subscriberSubs[subscriberId] ||= {}
+ ss[path] = re
+
+ callbackQueue = switch method
+ when 'psubscribe' then @_pendingPsubscribe
+ when 'subscribe' then @_pendingSubscribe
+ handlePaths toAdd, callbackQueue, @_subClient, method, callback
+
+ publish: (path, message) ->
+ path = @_prefixWithNamespace path
+ @_pubClient.publish path, JSON.stringify message
+
+ unsubscribe: (subscriberId, paths, callback, method = 'punsubscribe') ->
+ return if subscriberId is undefined
+
+ # For signature: unsubscribe(subscriberId, callback)
+ if typeof paths is 'function'
+ callback = paths
+ paths = null
+
+ # For signature: unsubscribe(subscriberId[, callback])
+ subscriberSubs = @_subscriberSubs
+ paths ||= subscriberSubs[subscriberId] || []
+
+ # For signature: unsubscribe(subscriberId, paths[, callback])
+ subs = @_subs
+ toRemove = []
+ for path in paths
+ path = @_prefixWithNamespace path
+ if pathSubs = subs[path]
+ delete pathSubs[subscriberId]
+ toRemove.push path unless hasKeys pathSubs
+ delete ss[path] if ss = subscriberSubs[subscriberId]
+
+ callbackQueue = switch method
+ when 'punsubscribe' then @_pendingPunsubscribe
+ when 'unsubscribe' then @_pendingUnsubscribe
+ handlePaths toRemove, callbackQueue, @_subClient, method, callback
+
+ hasSubscriptions: (subscriberId) -> subscriberId of @_subscriberSubs
+
+ subscribedToTxn: (subscriberId, txn) ->
+ path = @_prefixWithNamespace transaction.path txn
+ for p, re of @_subscriberSubs[subscriberId]
+ return true if p == path || re.test path
+ return false
+
+handlePaths = (paths, queue, client, fn, callback) ->
+ if i = paths.length
+ callback.__count = i if callback
+ else
+ callback() if callback
+ while i--
+ path = paths[i]
+ path += '*' if fn == 'psubscribe'
+ client[fn] path
+ if callback
+ (queue[path] ||= []).push callback

0 comments on commit bd4915b

Please sign in to comment.