Skip to content

Commit

Permalink
Merge pull request #1 from evrythng/patch
Browse files Browse the repository at this point in the history
tng-push patches
  • Loading branch information
verakruhliakova committed Jul 8, 2016
2 parents f96e48e + 238abd7 commit d844aef
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 7 deletions.
27 changes: 23 additions & 4 deletions lib/client.js
Expand Up @@ -25,7 +25,10 @@ OTHER DEALINGS IN THE SOFTWARE.
"use strict";

var async = require("async"),
uuid = require("uuid");
uuid = require("uuid"),
util = require("util");

var EventEmitter = require("events").EventEmitter;

var retimer = require('retimer');

Expand Down Expand Up @@ -55,6 +58,8 @@ function Client(conn, server) {
this._setup();
}

util.inherits(Client, EventEmitter);

/**
* Sets up all the handlers, to not be called directly.
*
Expand All @@ -63,6 +68,11 @@ function Client(conn, server) {
Client.prototype._setup = function() {
var that = this, client = that.connection;

this.on("error", function (err) {
that.logger.warn(err);
that.onNonDisconnectClose();
});

this._buildForward();

client.on("error", nop);
Expand All @@ -80,6 +90,7 @@ Client.prototype._setup = function() {

that.logger.info("client connected");
that.server.emit("clientConnected", that);
that.emit("clientConnected");

// packets will be forward only if client.clean is false
that.server.forwardOfflinePackets(that);
Expand Down Expand Up @@ -160,6 +171,10 @@ Client.prototype.setUpTimer = function() {
}

var timeout = this.keepalive * 1000 * 3 / 2;
if (this.server.opts.minKeepaliveTimeout > 0) {
timeout = Math.max(this.server.opts.minKeepaliveTimeout, timeout);
}

var that = this;

this.logger.debug({ timeout: timeout }, "setting keepalive timeout");
Expand All @@ -185,12 +200,12 @@ Client.prototype._buildForward = function() {

function doForward(err, packet) {
if (err) {
return that.client && that.client.emit('error', err);
return that.emit('error', err);
}

that.server.authorizeForward(that, packet, function(err, authorized) {
if (err) {
return that.client && that.client.emit('error', err);
return that.emit('error', err);
}

if (!authorized) {
Expand Down Expand Up @@ -364,6 +379,7 @@ Client.prototype.handleConnect = function(packet, completeConnection) {
that.clean = packet.clean;

if (that.id in that.server.clients){
logger.info("close duplicate client");
that.server.clients[that.id].close(completeConnection);
} else {
completeConnection();
Expand Down Expand Up @@ -397,7 +413,7 @@ Client.prototype.handlePuback = function(packet) {
delete this.inflight[packet.messageId];
this.server.deleteOfflinePacket(this, packet.messageId, function(err) {
if (err) {
return that.client && that.client.emit("error", err);
return that.emit("error", err);
}
logger.debug({ packet: packet }, "cleaned offline packet");
});
Expand Down Expand Up @@ -595,6 +611,9 @@ Client.prototype.close = function(callback) {
that.server.emit("clientDisconnected", that);

callback();

// unsubscribe to prevent closure memory leak
callback = null;
};

that._closing = true;
Expand Down
45 changes: 45 additions & 0 deletions lib/interfaces.js
Expand Up @@ -38,11 +38,13 @@ module.exports = {
serverFactory: serverFactory,

mqttFactory: mqttFactory,
mqttLimitFactory: mqttLimitFactory,
mqttsFactory: mqttsFactory,
httpFactory: httpFactory,
httpsFactory: httpsFactory,

buildWrap: buildWrap,
buildLimitWrap: buildLimitWrap,
buildServe: buildServe,
};

Expand All @@ -58,6 +60,7 @@ module.exports = {
function serverFactory(iface, fallback, mosca) {
var factories = {
"mqtt": mqttFactory,
"mqttLimit" : mqttLimitFactory,
"mqtts": mqttsFactory,
"http": httpFactory,
"https": httpsFactory,
Expand All @@ -72,6 +75,10 @@ function mqttFactory(iface, fallback, mosca) {
return net.createServer(buildWrap(mosca));
}

function mqttLimitFactory(iface, fallback, mosca) {
return net.createServer(buildLimitWrap(mosca, iface.ongoingConnectionLimit || 0));
}

function mqttsFactory(iface, fallback, mosca) {
var credentials = iface.credentials || fallback.credentials;
if (credentials === undefined) {
Expand Down Expand Up @@ -146,6 +153,44 @@ function buildWrap(mosca) {
};
}

/**
* Create the wrapper, but refuse new connections if we reached the limit
* on ongoing connections
* @param mosca
* @param limit : 0 mean no limit
* @returns {Function}
*/
function buildLimitWrap(mosca, limit) {
var current = 0;

return function wrap(stream) {
if (limit && current >= limit) {
// refuse new connection
stream.end();
} else {
current++;
var connectionTerminated = false;

var connection = new Connection(stream);
stream.setNoDelay(true);

var client = new Client(connection, mosca);
client.once('clientConnected', function() {
if (!connectionTerminated) {
current--;
connectionTerminated = true;
}
});
stream.once('finish', function() {
if (!connectionTerminated) {
current--;
connectionTerminated = true;
}
});
}
};
}

/**
* Create the serve logic for http server.
*
Expand Down
29 changes: 27 additions & 2 deletions lib/persistence/redis.js
Expand Up @@ -25,7 +25,6 @@ OTHER DEALINGS IN THE SOFTWARE.
"use strict";

var AbstractPersistence = require("./abstract");
var redis = require("redis");
var util = require("util");
var Matcher = require("./matcher");
var async = require("async");
Expand Down Expand Up @@ -74,6 +73,7 @@ function RedisPersistence(options, callback) {

this._subMatcher = new Matcher();

this.options.redis = this.options.redis || require('redis');
this._client = this._buildClient();
this._pubSubClient = this._buildClient();
this._id = shortid.generate();
Expand Down Expand Up @@ -175,7 +175,7 @@ util.inherits(RedisPersistence, AbstractPersistence);

RedisPersistence.prototype._buildClient = function() {
var options = this.options;
var client = redis.createClient(
var client = options.redis.createClient(
options.port || 6379,
options.host || "127.0.0.1",
options.redisOptions);
Expand All @@ -188,6 +188,12 @@ RedisPersistence.prototype._buildClient = function() {
client.auth(options.password);
}

var that = this;

client.on('error', function (err) {
that.emit('error', err);
});

return client;
};

Expand Down Expand Up @@ -227,6 +233,11 @@ RedisPersistence.prototype.lookupRetained = function(pattern, done) {
matcher.add(pattern, true);

this._client.hkeys("retained", function(err, topics) {
if (err) {
done(err);
return;
}

topics.sort();
topics = topics.filter(function(topic) {
return matcher.match(topic).length > 0;
Expand Down Expand Up @@ -300,6 +311,11 @@ RedisPersistence.prototype._cleanClient = function(client, done) {
var key = "client:sub:" + client.id;

this._client.get(key, function(err, subs) {
if (err) {
done(err);
return;
}

subs = JSON.parse(subs) || {};

Object.keys(subs).forEach(function(sub) {
Expand Down Expand Up @@ -336,6 +352,11 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) {
var that = this;

multi.get(key, function(err, result) {
if (err) {
cb(err);
return;
}

subscriptions = JSON.parse(result) || {};
});

Expand Down Expand Up @@ -384,6 +405,10 @@ RedisPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
listKey = "packets:" + client.id;

that._client.lrange(listKey, 0, 10000, function(err, results) {
if (err) {
cb(err);
return;
}

var total = results.length;

Expand Down
3 changes: 2 additions & 1 deletion lib/server.js
Expand Up @@ -261,7 +261,8 @@ function Server(opts, callback) {
clientId = payload;

if(that.clients[clientId] && serverId !== that.id) {
that.clients[clientId].close();
that.clients[clientId].logger.info({server: serverId}, "close duplicate client from other server");
that.clients[clientId].close();
}
}
);
Expand Down

0 comments on commit d844aef

Please sign in to comment.