Skip to content
A publish/subscribe API similar to that in node_redis, minus the redis
JavaScript
Latest commit 5a01286 Sep 6, 2014 @danielstjules 1.0.1

README.md

node-internal-pubsub

A publish/subscribe API similar to that in node_redis, minus the redis. Built on pattern-emitter.

Build Status

Installation

Using npm, you can install node-internal-pubsub with npm install node-internal-pubsub. You can also require it as a dependency in your package.json file:

"dependencies": {
    "node-internal-pubsub": "1.0.x"
}

Overview

Quite a few SockJS and Socket.IO examples/tutorials create a redis client in subscriber mode per socket connection. They demonstrate code along the lines of:

sockjs.on('connection', function(conn) {
  var sub = redis.createClient();
  ...
}

or

io.sockets.on('connection', function(socket){
  var sub = redis.createClient();
  ...
}

While the node_redis pubsub API simplifies the management of subscriptions as opposed to listeners (e.g. using EventEmitter), it likely shouldn't be used for internal message routing as seen in the above example. This is mostly due to the possible duplication of data and unnecessary network IO.

As such, this library exists to simplify the transition to using a single redis client for all connections, in addition to an internal pubsub mechanism. Most of the API has been designed to resemble that in node_redis, though a few key differences exists:

  • Subscribers and publishers must be created using pubsub.createSubscriber and pubsub.createPublisher, respectively
  • Rather than offering glob-based pattern subscriptions, regular expressions are available
  • Subscribers do not exit subscriber mode when the subscription count reaches 0

Performance

A benchmark can be found at benchmarks/singleChannelMultiSubs.js. It tests the performance of sending 10,000,000 messages in a single channel using 2,000 redis clients/subscribers, compared to a single redis client and 2,000 node-internal-pubsub subscribers. Example results can be seen below:

$ node benchmarks/singleChannelMultiSubs.js
Setting up redis suite
Receiving 10000000 messages with 2000 redis subscribers
Setting up pubsub suite
Receiving 10000000 messages with 1 redis sub, 2000 pubsub subscribers

Redis subscribers
Running time: 29735 ms
Avg messages received per second: 336,304

Redis subscriber with pubsub subscribers
Running time: 1203 ms
Avg messages received per second: 8,312,551

This was ran with a local redis server on my Macbook Air. A ~24x improvement in performance can be seen in the above output by using the 2,000 internal subscribers as opposed to the same number of redis clients.

Examples

The following is a brief example showing how to setup the library with an existing websocket server:

var redisClient = redis.createClient();
var redisSub = redis.createClient();
var pub = pubsub.createPublisher();

// Subscribe to all incoming messages, and publish them to the internal pubsub
redisSub.psubscribe('*');
redisSub.on('pmessage', function(pattern, channel, msg) {
  pub.publish(channel, msg);
});

wsServer.on('connection', function(conn) {
  var sub = pubsub.createSubscriber();

  // Publish messages received from the user to redis
  conn.on('data', function(message) {
    redisClient.publish('chatmessages', message);
  });
});

Basic examples can be found in the examples directory.

Publisher

A publisher in the internal pub sub module. Publishes messages by invoking emit on an instance of PatternEmitter.

createPublisher()

Creates and returns a new Publisher instance.

var pubsub     = require('node-internal-pubsub');
var publisher  = pubsub.createPublisher();

publisher.publish(channel, message)

Publishes a message to the given channel. Channel is expected to be a string, though message can be any object.

publisher.publish('channel:1', 'A message to send to all channel subscribers');

Subscriber

A subscriber in the internal pub sub module. Each subscriber holds a count of the number of subscriptions it holds. It extends EventEmitter, but rather than emitting newListener and removeListener events, it emits events corresponding to those used in node_redis: message, pmessage, subscribe, psubscribe, unsubscribe, and punsubscribe.

createSubscriber()

Creates and returns a new Subscriber instance.

var pubsub     = require('node-internal-pubsub');
var subscriber = pubsub.createSubscriber();

subscriber.subscribe([channel1], [channel2], [...])

Subscribes to all messages published in the given channels. Accepts multiple channels as arguments, and emits a subscribe event for each newly subscribed channel. The event is passed the channel name, and the current subscription count. Ignores channels that are already subscribed to.

subscriber.subscribe('channel:1', 'channel:2');
subscriber.on('message', function(channel, message) {
  console.log('Received:', channel, '-', message);
});

publisher.publish('channel:1', 'Example message');
// 'Received: channel:1 - message'

subscriber.unsubscribe([...channels])

Unsubscribes from messages in each of the provided channels. Accepts multiple channels as arguments, and emits an unsubscribe event for each channel. The event is passed the channel name, and the current subscription count. Ignores channels that are not among the current subscriptions. If no arguments are passed, the subscriber is unsubscribed from all channels.

subscriber.subscribe('channel:1', 'channel:2');
subscriber.unsubscribe('channel:1', 'channel:2');

subscriber.psubscribe([pattern1], [pattern2], [...])

Subscribes to all messages published in channels matching the given regular expressions' patterns. Accepts multiple RegExp objects as arguments, and emits a psubscribe event for each newly subscribed pattern. The event is passed the RegExp, and the current subscription count. Any non RegExp instances passed to this function are cast to a string, and used to create a RegExp. Ignores patterns that are already subscribed to.

subscriber.psubscribe(/channel/);
subscriber.on('pmessage', function(pattern, channel, message) {
  console.log('Received:', pattern, '-', channel, '-', message);
});

publisher.publish('channel:1', 'Example message');
// 'Received: /channel/ - channel:1 - Example message'

subscriber.punsubscribe([...patterns])

subscriber.psubscribe(/channel/);
subscriber.punsubscribe(/channel/);

Unsubscribes from each of the provided regular expressions' patterns. Accepts multiple RegExp objects as arguments, and emits a punsubscribe event for each pattern. The event is passed the RegExp, and the current subscription count. Any non RegExp instances passed to this function are cast to a string, and used to create a RegExp. Ignores patterns that are not among the current subscriptions. If no arguments are passed, the subscriber is unsubscribed from all patterns.

Events

Each subscriber instance extends EventEmitter and, like node_redis subscriber clients, emit the following events: message, pmessage, subscribe, psubscribe, unsubscribe, and punsubscribe.

message

{string} channel, {*} message

Emitted when a message is received on a subscribed channel.

subscriber.subscribe('example');
subscriber.on('message', function(channel, message) {
  console.log(channel, message);
});
publisher.publish('example', 'message'); // 'example message'

pmessage

{RegExp} pattern, {string} channel, {*} message

Emitted when a message is received on a channel matching a pattern subscription.

subscriber.subscribe(/^example/i);
subscriber.on('message', function(pattern, channel, message) {
  console.log(pattern, channel, message);
});
publisher.publish('ExampleSub', 'message'); // '/^example/i ExampleSub message'

subscribe

{string} channel, {int} count

Emitted when a new channel is subscribed to. Both the channel name and the updated subscription count are passed to the listener.

subscriber.on('subscribe', function(channel, count) {
  console.log(channel, count);
});
subscriber.subscribe('example'); // 'example 1'

psubscribe

{RegExp} pattern, {int} count

Emitted when a new pattern is subscribed to. The pattern and the updated subscription count are passed to the listener.

subscriber.on('psubscribe', function(pattern, count) {
  console.log(pattern, count);
});
subscriber.psubscribe(/^\w+$/); // '/^\w+$/ 1'

unsubscribe

{string} channel, {int} count

Emitted in response to unsubscribing from a channel. Both the channel name and the updated subscription count are passed to the listener.

subscriber.subscribe('example');
subscriber.on('unsubscribe', function(channel, count) {
  console.log(channel, count);
});
subscriber.unsubscribe('example'); // 'example 0'

punsubscribe

{RegExp} pattern, {int} count

Emitted in response to unsubscribing from a pattern The pattern and the updated subscription count are passed to the listener.

subscriber.psubscribe(/\w+/);
subscriber.on('punsubscribe', function(pattern, count) {
  console.log(pattern, count);
});
subscriber.punsubscribe(/\w+/); // '/\w+/ 0'
Something went wrong with that request. Please try again.