diff --git a/API.md b/API.md index 6b45020..59213bd 100755 --- a/API.md +++ b/API.md @@ -166,6 +166,16 @@ Sends a message to all the subscribed clients where: - `message` - the message sent to the clients. Can be any type which can be safely converted to string using `JSON.stringify()`. + +### `server.eachSocket(each, options)` + +Iterates over all connected sockets, optionally filtering on those that have subscribed to +a given subscription. This operation is synchronous. +- `each` - Iteration callback in the form `function(socket)`. +- `options` - Optional options object + - `subscription` - When set to a string path, limits the results to sockets that are + to that path. + ## Socket An object representing a client connection. diff --git a/lib/index.js b/lib/index.js index 509ad40..5a01fb9 100755 --- a/lib/index.js +++ b/lib/index.js @@ -107,6 +107,7 @@ exports.register = function (server, options, next) { server.decorate('server', 'broadcast', Listener.broadcast); server.decorate('server', 'subscription', Listener.subscription); server.decorate('server', 'publish', Listener.publish); + server.decorate('server', 'eachSocket', Listener.eachSocket); return next(); }; diff --git a/lib/listener.js b/lib/listener.js index 539c82a..731627f 100755 --- a/lib/listener.js +++ b/lib/listener.js @@ -353,6 +353,40 @@ internals.Listener.prototype._subscribe = function (path, socket, next) { }; +internals.Listener.eachSocket = function (each, options) { + + options = options || {}; + + var connections = this.connections; + for (var i = 0, il = connections.length; i < il; ++i) { + var connection = connections[i]; + if (connection.plugins.nes) { + connection.plugins.nes._listener._eachSocket(each, options); + } + } +}; + + +internals.Listener.prototype._eachSocket = function (each, options) { + + var subscription = options.subscription; + if (!subscription) { + return this._sockets.forEach(each); + } + + var sub = this._router.route('sub', subscription); + if (sub.isBoom) { + return; + } + + var route = sub.route; + route.subscribers.forEach(sub.paramsArray.length ? subscription : null, function (socket) { // Filter on path if has parameters + + each(socket); + }); +}; + + internals.Listener.prototype._generateId = function () { var id = Date.now() + ':' + this._connection.info.id + ':' + this._socketCounter++; diff --git a/test/listener.js b/test/listener.js index 8e4e46b..fa4ba13 100755 --- a/test/listener.js +++ b/test/listener.js @@ -524,6 +524,115 @@ describe('Listener', function () { }); }); + + describe('eachSocket()', function () { + + it('returns connected sockets', function (done) { + + var server = new Hapi.Server(); + server.connection(); + + server.register({ register: Nes, options: { auth: false } }, function (err) { + + expect(err).to.not.exist(); + + server.start(function (err) { + + var client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + expect(countSockets(server)).to.equal(1); + + server.stop(done); + }); + }); + }); + }); + + it('returns sockets on a subscription', function (done) { + + var server = new Hapi.Server(); + server.connection(); + + server.register({ register: Nes, options: { auth: false } }, function (err) { + + expect(err).to.not.exist(); + + server.subscription('/a/{id}'); + server.subscription('/b'); + + server.start(function (err) { + + var client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + + client.subscribe('/b', function () { }); + + client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + client.subscribe('/a/b', function () { }); + + setTimeout(function () { + + expect(countSockets(server)).to.equal(2); + expect(countSockets(server, { subscription: '/a/a' })).to.equal(0); + expect(countSockets(server, { subscription: '/a/b' })).to.equal(1); + + expect(countSockets(server, { subscription: '/b' })).to.equal(1); + + expect(countSockets(server, { subscription: '/foo' })).to.equal(0); + + server.stop(done); + }, 10); + }); + }); + }); + }); + }); + + it('ignores not participating connections', function (done) { + + var server = new Hapi.Server(); + server.connection(); + + server.register({ register: Nes, options: { auth: false } }, function (err) { + + expect(err).to.not.exist(); + + server.start(function (err) { + + var client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + + server.connection(); + expect(countSockets(server)).to.equal(1); + + server.stop(done); + }); + }); + }); + }); + + + var countSockets = function (server, options) { + + var seen = 0; + server.eachSocket(function (socket) { + + expect(socket).to.exist(); + seen++; + }, options); + return seen; + }; + }); + describe('_generateId()', function () { it('rolls over when reached max sockets per millisecond', function (done) {