From a0c80a9f59fe42a96a65d545f75a10d8c3a543f7 Mon Sep 17 00:00:00 2001 From: kpdecker Date: Mon, 19 Oct 2015 00:07:29 -0500 Subject: [PATCH 1/2] Implement eachSocket iterator Provides a formal API for consumers to interact with currently connected clients. Fixes #48 --- API.md | 10 +++++ lib/index.js | 1 + lib/listener.js | 34 +++++++++++++++ test/listener.js | 109 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 154 insertions(+) diff --git a/API.md b/API.md index e084a53..840aa88 100755 --- a/API.md +++ b/API.md @@ -158,6 +158,16 @@ Sends a message to all the subscribed clients where: - `message` - the message sent to the clients. Can be any type which can be safely converted to string using `JSON.stringify()`. + +### `server.eachSocket(callback, options)` + +Iterates over all connected sockets, optionally filtering on those that have subscribed to +a given subscription. This operation is synchronous. +- `callback` - Iteration callback in the form `function(socket)`. +- `options` - Optional options object + - `subscription` - When set to a string path, limits the results to sockets that are + to that path. + ## Socket An object representing a client connection. diff --git a/lib/index.js b/lib/index.js index 509ad40..5a01fb9 100755 --- a/lib/index.js +++ b/lib/index.js @@ -107,6 +107,7 @@ exports.register = function (server, options, next) { server.decorate('server', 'broadcast', Listener.broadcast); server.decorate('server', 'subscription', Listener.subscription); server.decorate('server', 'publish', Listener.publish); + server.decorate('server', 'eachSocket', Listener.eachSocket); return next(); }; diff --git a/lib/listener.js b/lib/listener.js index ce638c1..8ae7389 100755 --- a/lib/listener.js +++ b/lib/listener.js @@ -351,6 +351,40 @@ internals.Listener.prototype._subscribe = function (path, socket, next) { }; +internals.Listener.eachSocket = function (callback, options) { + + options = options || {}; + + var connections = this.connections; + for (var i = 0, il = connections.length; i < il; ++i) { + var connection = connections[i]; + if (connection.plugins.nes) { + connection.plugins.nes._listener._eachSocket(callback, options); + } + } +}; + + +internals.Listener.prototype._eachSocket = function (callback, options) { + + var subscription = options.subscription; + if (!subscription) { + return this._sockets.forEach(callback); + } + + var sub = this._router.route('sub', subscription); + if (sub.isBoom) { + return; + } + + var route = sub.route; + route.subscribers.forEach(sub.paramsArray.length ? subscription : null, function (socket) { // Filter on path if has parameters + + callback(socket); + }); +}; + + internals.Listener.prototype._generateId = function () { var id = Date.now() + ':' + this._connection.info.id + ':' + this._socketCounter++; diff --git a/test/listener.js b/test/listener.js index a34c742..19469c8 100755 --- a/test/listener.js +++ b/test/listener.js @@ -448,6 +448,115 @@ describe('Listener', function () { }); }); + + describe('eachSocket()', function () { + + it('returns connected sockets', function (done) { + + var server = new Hapi.Server(); + server.connection(); + + server.register({ register: Nes, options: { auth: false } }, function (err) { + + expect(err).to.not.exist(); + + server.start(function (err) { + + var client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + expect(countSockets(server)).to.equal(1); + + server.stop(done); + }); + }); + }); + }); + + it('returns sockets on a subscription', function (done) { + + var server = new Hapi.Server(); + server.connection(); + + server.register({ register: Nes, options: { auth: false } }, function (err) { + + expect(err).to.not.exist(); + + server.subscription('/a/{id}'); + server.subscription('/b'); + + server.start(function (err) { + + var client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + + client.subscribe('/b', function () { }); + + client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + client.subscribe('/a/b', function () { }); + + setTimeout(function () { + + expect(countSockets(server)).to.equal(2); + expect(countSockets(server, { subscription: '/a/a' })).to.equal(0); + expect(countSockets(server, { subscription: '/a/b' })).to.equal(1); + + expect(countSockets(server, { subscription: '/b' })).to.equal(1); + + expect(countSockets(server, { subscription: '/foo' })).to.equal(0); + + server.stop(done); + }, 10); + }); + }); + }); + }); + }); + + it('ignores not participating connections', function (done) { + + var server = new Hapi.Server(); + server.connection(); + + server.register({ register: Nes, options: { auth: false } }, function (err) { + + expect(err).to.not.exist(); + + server.start(function (err) { + + var client = new Nes.Client('http://localhost:' + server.info.port); + client.connect(function (err) { + + expect(err).to.not.exist(); + + server.connection(); + expect(countSockets(server)).to.equal(1); + + server.stop(done); + }); + }); + }); + }); + + + var countSockets = function (server, options) { + + var seen = 0; + server.eachSocket(function (socket) { + + expect(socket).to.exist(); + seen++; + }, options); + return seen; + }; + }); + describe('_generateId()', function () { it('rolls over when reached max sockets per millisecond', function (done) { From 2584d70be0ee2e066743f7212731df087ddc1548 Mon Sep 17 00:00:00 2001 From: kpdecker Date: Wed, 21 Oct 2015 13:33:46 -0500 Subject: [PATCH 2/2] Rename callback parameter to each --- API.md | 4 ++-- lib/listener.js | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/API.md b/API.md index 840aa88..8ce9e1f 100755 --- a/API.md +++ b/API.md @@ -159,11 +159,11 @@ Sends a message to all the subscribed clients where: string using `JSON.stringify()`. -### `server.eachSocket(callback, options)` +### `server.eachSocket(each, options)` Iterates over all connected sockets, optionally filtering on those that have subscribed to a given subscription. This operation is synchronous. -- `callback` - Iteration callback in the form `function(socket)`. +- `each` - Iteration callback in the form `function(socket)`. - `options` - Optional options object - `subscription` - When set to a string path, limits the results to sockets that are to that path. diff --git a/lib/listener.js b/lib/listener.js index 8ae7389..95a6750 100755 --- a/lib/listener.js +++ b/lib/listener.js @@ -351,7 +351,7 @@ internals.Listener.prototype._subscribe = function (path, socket, next) { }; -internals.Listener.eachSocket = function (callback, options) { +internals.Listener.eachSocket = function (each, options) { options = options || {}; @@ -359,17 +359,17 @@ internals.Listener.eachSocket = function (callback, options) { for (var i = 0, il = connections.length; i < il; ++i) { var connection = connections[i]; if (connection.plugins.nes) { - connection.plugins.nes._listener._eachSocket(callback, options); + connection.plugins.nes._listener._eachSocket(each, options); } } }; -internals.Listener.prototype._eachSocket = function (callback, options) { +internals.Listener.prototype._eachSocket = function (each, options) { var subscription = options.subscription; if (!subscription) { - return this._sockets.forEach(callback); + return this._sockets.forEach(each); } var sub = this._router.route('sub', subscription); @@ -380,7 +380,7 @@ internals.Listener.prototype._eachSocket = function (callback, options) { var route = sub.route; route.subscribers.forEach(sub.paramsArray.length ? subscription : null, function (socket) { // Filter on path if has parameters - callback(socket); + each(socket); }); };