Skip to content

Commit

Permalink
refactor(session): use socket.io rooms for emitToUsers
Browse files Browse the repository at this point in the history
This refactor allows deployd to scale socket.io across multiple nodes by not tying emitToUsers to internal state, but instead tying sockets to their uids using socket.io room functionality.

It should also fix a small memory leak with an event emitter.

Also, it adds new functionality in the form of additional configuration options:

Example:
```javascript

var redisAdapter = require('socket.io-redis');

var server = deployd({
  port: process.env.PORT || 1337,
  ...
  socketIo: {
    options: {
      transports: ['websocket'],
    },
    adapter: redisAdapter({ host: 'localhost', port: 6379 }),
  },

});
```
  • Loading branch information
andreialecu committed Dec 22, 2015
1 parent 3ab6533 commit ee1fe6d
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 82 deletions.
15 changes: 11 additions & 4 deletions lib/server.js
Expand Up @@ -8,7 +8,8 @@ var http = require('http')
, io = require('socket.io')
, setupReqRes = require('./util/http').setup
, debug = require('debug')('server')
, config = require('./config-loader');
, config = require('./config-loader')
, _ = require('underscore');

function extend(origin, add) {
// don't do anything if add isn't an object
Expand Down Expand Up @@ -65,10 +66,16 @@ function Server(options) {
// back all memory stores with a db
this.db = db.create(options.db);

// use socket io for a session based realtime channel
this.sockets = io.listen(this, {
var socketServer = io.listen(this, _.extend({
'log level': 0
}).sockets;
}, (this.options.socketIo && this.options.socketIo.options) || {}));

// use socket io for a session based realtime channel
this.sockets = socketServer.sockets;

if (this.options.socketIo && this.options.socketIo.adapter) {
socketServer.adapter(this.options.socketIo.adapter);
}

// persist sessions in a store
this.sessions = new SessionStore('sessions', this.db, this.sockets, options.sessions);
Expand Down
171 changes: 94 additions & 77 deletions lib/session.js
Expand Up @@ -36,8 +36,8 @@ function SessionStore(namespace, db, sockets, options) {
// NOTE: we will get a warning otherwise when more than 10 users try to login
socketQueue.setMaxListeners(0);

if(sockets) {
sockets.on('connection', function (client) {
if (sockets) {
sockets.on('connection', function(client) {
// NOTE: do not use set here ever, the `Cookies` api is meant to get a req, res
// but we are just using it for a cookie parser
var cookies = new Cookies(client.handshake)
Expand All @@ -54,13 +54,13 @@ function SessionStore(namespace, db, sockets, options) {
});
};

if(sid) {
if (sid) {
getSession(sid, function(err, session) {
if (session) {
// index sockets against their session id
socketIndex[sid] = socketIndex[sid] || {};
socketIndex[sid][client.id] = client;
socketQueue.emit(sid, client);
socketQueue.emit('socket', client, session);
}
});
}
Expand All @@ -81,7 +81,7 @@ function SessionStore(namespace, db, sockets, options) {

socketIndex[sid] = socketIndex[sid] || {};
socketIndex[sid][client.id] = client;
socketQueue.emit(sid, client);
socketQueue.emit('socket', client, session);
}
});
});
Expand All @@ -93,22 +93,39 @@ function SessionStore(namespace, db, sockets, options) {
});
});
});

var drainQueue = function drainQueue(method, rawSocket, session) {
var key = '_' + method;
if (session.socket._bindQueue && session.socket._bindQueue[key] && session.socket._bindQueue[key].length) {
session.socket._bindQueue[key].forEach(function(args) {
rawSocket[method].apply(rawSocket, args);
});
}
};

// resolve queue once a socket is ready
socketQueue.on('socket', function(socket, session) {
drainQueue('on', socket, session);
drainQueue('emit', socket, session);
drainQueue('join', socket, session);
drainQueue('leave', socket, session);
});
}

Store.apply(this, arguments);

if (db) {
// Cleanup inactive sessions from the db
var store = this;
process.nextTick(function () {
process.nextTick(function() {
store.cleanupInactiveSessions();
});
}
}
util.inherits(SessionStore, Store);
exports.SessionStore = SessionStore;

SessionStore.prototype.cleanupInactiveSessions = function () {
SessionStore.prototype.cleanupInactiveSessions = function() {
var store = this;
var inactiveSessions = [];

Expand Down Expand Up @@ -143,15 +160,15 @@ SessionStore.prototype.cleanupInactiveSessions = function () {
{ lastActive: { $lt: Date.now() - this.options.maxAge } },
{ lastActive: { $exists: false } }
]
}, function (err, updated) {
}, function(err, updated) {
if (err) {
error("Error removing old sessions: " + err);
}
});
this.cleanupInactiveSessions.lastRun = Date.now();
};

SessionStore.prototype.createUniqueIdentifier = function () {
SessionStore.prototype.createUniqueIdentifier = function() {
return crypto.randomBytes(64).toString('hex');
};

Expand All @@ -166,13 +183,13 @@ SessionStore.prototype.createSession = function(sid, fn) {
var socketIndex = this.socketIndex
, store = this;

if(typeof sid == 'function') {
if (typeof sid == 'function') {
fn = sid;
sid = undefined;
}

if(sid) {
this.find({ id: sid }, function (err, s) {
if (sid) {
this.find({ id: sid }, function(err, s) {
if (err) return fn(err);
if (!s || s.lastActive < Date.now() - store.options.maxAge) {
s = { anonymous: true };
Expand All @@ -188,7 +205,7 @@ SessionStore.prototype.createSession = function(sid, fn) {
if (!sess.data.anonymous && (!sess.data.lastActive || sess.data.lastActive < Date.now() - 10 * 1000)) {
// update last active date at max once every 10 seconds
sess.data.lastActive = Date.now();
sess.save(function () {
sess.save(function() {
fn(null, sess);
});
} else {
Expand All @@ -201,7 +218,7 @@ SessionStore.prototype.createSession = function(sid, fn) {

// clean up inactive sessions once per minute
if (store.cleanupInactiveSessions.lastRun < Date.now() - 60 * 1000) {
process.nextTick(function () {
process.nextTick(function() {
store.cleanupInactiveSessions();
});
}
Expand All @@ -212,7 +229,7 @@ SessionStore.prototype.createSession = function(sid, fn) {
/**
* Get the already created session
*/
SessionStore.prototype.getSession = function (uid, sid) {
SessionStore.prototype.getSession = function(uid, sid) {
return userSessionIndex[uid][sid] || null;
};

Expand All @@ -238,89 +255,63 @@ function Session(data, store, sockets, rawSockets) {
this.data = _.clone(data);
if (!this.data.createdOn) this.data.createdOn = Date.now();
if (!this.data.lastActive) this.data.lastActive = Date.now();
if(data && data.id) this.sid = sid = data.id;
if (data && data.id) this.sid = sid = data.id;
this.store = store;
var self = this;

// create faux socket, to queue any events until
// a real socket is available
this.socket = {
on: function () {
function bindFauxSocket(method, queue) {
return function() {
var myArgs = arguments;
if (sockets[self.sid]) {
_.each(sockets[self.sid], function(s){
s.on.apply(s, myArgs);
// clear all queue arrays once socket is available, since they will no longer be needed
_.each(queue, function(val) {
val.length = 0;
});
} else {
// otherwise add to bind queue
var queue = this._bindQueue = this._bindQueue || [];
queue.push(myArgs);
}
},
emit: function (ev) {
var myArgs = arguments;
if (sockets[self.sid]){
_.each(sockets[self.sid], function(s){
// if we have a real socket, use it
if(s) {
s.emit.apply(s, myArgs);
}

_.each(sockets[self.sid], function(s) {
s[method].apply(s, myArgs);
});
} else {
// otherwise add to emit queue
var queue = this._emitQueue = this._emitQueue || [];
queue.push(myArgs);
// otherwise add to bind queue
var key = '_' + method;
queue[key] = queue[key] || [];
queue[key].push(myArgs);
}
}
};
}

// create faux socket, to queue any events until
// a real socket is available

this.socket = {
_bindQueue: []
};

this.socket.on = bindFauxSocket('on', this.socket._bindQueue);
this.socket.emit = bindFauxSocket('emit', this.socket._bindQueue);
this.socket.join = bindFauxSocket('join', this.socket._bindQueue);
this.socket.leave = bindFauxSocket('leave', this.socket._bindQueue);

if (data && data.uid) this.setUid(data.uid);

this.emitToUsers = function(collection, query, event, data) {
collection.get(query, function(users) {
var userSession;
if(users && users.id) {
if (users && users.id) {
users = [users]; // convert single item to array
}

users.forEach(function(u) {
var userSessions = userSessionIndex[u.id];
// emit to sessions online
for (var key in userSessions) {
userSession = userSessions[key];
if (userSession && userSession.socket) {
userSession.socket.emit(event, data);
}
}
rawSockets.to('dpd_uid:' + u.id).emit(event, data);
});
});
};

this.emitToAll = function() {
rawSockets.emit.apply(rawSockets, arguments);
};

this.bindSocketQueue();
}


Session.prototype.bindSocketQueue = function(){
if (!this.sid) return;
var self = this;
// resolve queue once a socket is ready
this.store.socketQueue.on(this.sid, function (socket) {
// drain bind queue
if(self.socket._bindQueue && self.socket._bindQueue.length) {
self.socket._bindQueue.forEach(function (args) {
socket.on.apply(socket, args);
});
}
// drain emit queue
if(self.socket._emitQueue && self.socket._emitQueue.length) {
self.socket._emitQueue.forEach(function (args) {
socket.emit.apply(socket, args);
});
}
});
};

/**
* Set properties on the in memory representation of a session.
*
Expand All @@ -335,6 +326,33 @@ Session.prototype.set = function(object) {
Object.keys(object).forEach(function(key) {
data[key] = object[key];
});

if (object && object.uid) {
session.setUid(object.uid);
}

return this;
};

/**
* Set the user id for this session.
*
* @param {String} uid
* @return {Session} this for chaining
*/

Session.prototype.setUid = function(uid) {
var session = this;
if (session.data.uid != uid) {
// remove from previous room
session.socket.leave('dpd_uid:' + session.data.uid);
}

if (uid) {
session.data.uid = uid;
session.socket.join('dpd_uid:' + uid);
}

return this;
};

Expand Down Expand Up @@ -365,7 +383,7 @@ Session.prototype.save = function(fn) {

// If anonymous, create a new session.
if (anonymous) {
session.store.insert(data, function (err, res) {
session.store.insert(data, function(err, res) {
if (!err) {
session.data = res;
sessionIndex[sid] = session;
Expand All @@ -375,15 +393,14 @@ Session.prototype.save = function(fn) {
userSessionIndex[res.uid][session.data.id] = session;
}
session.sid = res.id;
session.bindSocketQueue();
}
fn(err, res);
});
}
// If already authenticated and we have sid, update session.
else if (sid) {
delete data.id;
session.store.update({id: sid}, data, function (err) {
session.store.update({id: sid}, data, function(err) {
if (!err) {
data.id = sid;
session.data = data;
Expand Down Expand Up @@ -411,7 +428,7 @@ Session.prototype.save = function(fn) {

Session.prototype.fetch = function(fn) {
var session = this;
this.store.first({id: this.data.id}, function (err, data) {
this.store.first({id: this.data.id}, function(err, data) {
session.set(data);
fn(err, data);
});
Expand All @@ -433,7 +450,7 @@ Session.prototype.isAnonymous = function() {
* @return {Session} this for chaining
*/

Session.prototype.remove = function (data, fn) {
Session.prototype.remove = function(data, fn) {
if (typeof data === "function") {
fn = data;
data = this.data;
Expand Down

0 comments on commit ee1fe6d

Please sign in to comment.