Skip to content

Commit

Permalink
Merge pull request #62 from hapijs/each-socket
Browse files Browse the repository at this point in the history
Implement eachSocket iterator
  • Loading branch information
hueniverse committed Oct 21, 2015
2 parents ec2b58b + 2584d70 commit 451430a
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 0 deletions.
10 changes: 10 additions & 0 deletions API.md
Expand Up @@ -166,6 +166,16 @@ Sends a message to all the subscribed clients where:
- `message` - the message sent to the clients. Can be any type which can be safely converted to
string using `JSON.stringify()`.


### `server.eachSocket(each, options)`

Iterates over all connected sockets, optionally filtering on those that have subscribed to
a given subscription. This operation is synchronous.
- `each` - Iteration callback in the form `function(socket)`.
- `options` - Optional options object
- `subscription` - When set to a string path, limits the results to sockets that are
to that path.

## Socket

An object representing a client connection.
Expand Down
1 change: 1 addition & 0 deletions lib/index.js
Expand Up @@ -107,6 +107,7 @@ exports.register = function (server, options, next) {
server.decorate('server', 'broadcast', Listener.broadcast);
server.decorate('server', 'subscription', Listener.subscription);
server.decorate('server', 'publish', Listener.publish);
server.decorate('server', 'eachSocket', Listener.eachSocket);

return next();
};
Expand Down
34 changes: 34 additions & 0 deletions lib/listener.js
Expand Up @@ -353,6 +353,40 @@ internals.Listener.prototype._subscribe = function (path, socket, next) {
};


internals.Listener.eachSocket = function (each, options) {

options = options || {};

var connections = this.connections;
for (var i = 0, il = connections.length; i < il; ++i) {
var connection = connections[i];
if (connection.plugins.nes) {
connection.plugins.nes._listener._eachSocket(each, options);
}
}
};


internals.Listener.prototype._eachSocket = function (each, options) {

var subscription = options.subscription;
if (!subscription) {
return this._sockets.forEach(each);
}

var sub = this._router.route('sub', subscription);
if (sub.isBoom) {
return;
}

var route = sub.route;
route.subscribers.forEach(sub.paramsArray.length ? subscription : null, function (socket) { // Filter on path if has parameters

each(socket);
});
};


internals.Listener.prototype._generateId = function () {

var id = Date.now() + ':' + this._connection.info.id + ':' + this._socketCounter++;
Expand Down
109 changes: 109 additions & 0 deletions test/listener.js
Expand Up @@ -524,6 +524,115 @@ describe('Listener', function () {
});
});


describe('eachSocket()', function () {

it('returns connected sockets', function (done) {

var server = new Hapi.Server();
server.connection();

server.register({ register: Nes, options: { auth: false } }, function (err) {

expect(err).to.not.exist();

server.start(function (err) {

var client = new Nes.Client('http://localhost:' + server.info.port);
client.connect(function (err) {

expect(err).to.not.exist();
expect(countSockets(server)).to.equal(1);

server.stop(done);
});
});
});
});

it('returns sockets on a subscription', function (done) {

var server = new Hapi.Server();
server.connection();

server.register({ register: Nes, options: { auth: false } }, function (err) {

expect(err).to.not.exist();

server.subscription('/a/{id}');
server.subscription('/b');

server.start(function (err) {

var client = new Nes.Client('http://localhost:' + server.info.port);
client.connect(function (err) {

expect(err).to.not.exist();

client.subscribe('/b', function () { });

client = new Nes.Client('http://localhost:' + server.info.port);
client.connect(function (err) {

expect(err).to.not.exist();
client.subscribe('/a/b', function () { });

setTimeout(function () {

expect(countSockets(server)).to.equal(2);
expect(countSockets(server, { subscription: '/a/a' })).to.equal(0);
expect(countSockets(server, { subscription: '/a/b' })).to.equal(1);

expect(countSockets(server, { subscription: '/b' })).to.equal(1);

expect(countSockets(server, { subscription: '/foo' })).to.equal(0);

server.stop(done);
}, 10);
});
});
});
});
});

it('ignores not participating connections', function (done) {

var server = new Hapi.Server();
server.connection();

server.register({ register: Nes, options: { auth: false } }, function (err) {

expect(err).to.not.exist();

server.start(function (err) {

var client = new Nes.Client('http://localhost:' + server.info.port);
client.connect(function (err) {

expect(err).to.not.exist();

server.connection();
expect(countSockets(server)).to.equal(1);

server.stop(done);
});
});
});
});


var countSockets = function (server, options) {

var seen = 0;
server.eachSocket(function (socket) {

expect(socket).to.exist();
seen++;
}, options);
return seen;
};
});

describe('_generateId()', function () {

it('rolls over when reached max sockets per millisecond', function (done) {
Expand Down

0 comments on commit 451430a

Please sign in to comment.