forked from share/sharedb-redis-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
64 lines (54 loc) · 1.93 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
var redis = require('redis');
var PubSub = require('@teamwork/sharedb').PubSub;
// Redis pubsub driver for ShareDB.
//
// The redis driver requires two redis clients (a single redis client can't do
// both pubsub and normal messaging). These clients will be created
// automatically if you don't provide them.
function RedisPubSub(options) {
if (!(this instanceof RedisPubSub)) return new RedisPubSub(options);
PubSub.call(this, options);
options || (options = {});
this.client = options.client || redis.createClient(options);
// Redis doesn't allow the same connection to both listen to channels and do
// operations. Make an extra redis connection for subscribing with the same
// options if not provided
this.observer = options.observer || redis.createClient(this.client.options);
var pubsub = this;
this.observer.on('message', function(channel, message) {
var data = JSON.parse(message);
pubsub._emit(channel, data);
});
}
module.exports = RedisPubSub;
RedisPubSub.prototype = Object.create(PubSub.prototype);
RedisPubSub.prototype.close = function(callback) {
if (!callback) {
callback = function(err) {
if (err) throw err;
};
}
var pubsub = this;
PubSub.prototype.close.call(this, function(err) {
if (err) return callback(err);
pubsub.client.quit(function(err) {
if (err) return callback(err);
pubsub.observer.quit(callback);
});
});
};
RedisPubSub.prototype._subscribe = function(channel, callback) {
this.observer.subscribe(channel, callback);
};
RedisPubSub.prototype._unsubscribe = function(channel, callback) {
this.observer.unsubscribe(channel, callback);
};
RedisPubSub.prototype._publish = function(channels, data, callback) {
var message = JSON.stringify(data);
var args = [PUBLISH_SCRIPT, 0, message].concat(channels);
this.client.eval(args, callback);
};
var PUBLISH_SCRIPT =
'for i = 2, #ARGV do ' +
'redis.call("publish", ARGV[i], ARGV[1]) ' +
'end';