Skip to content

Commit

Permalink
Merge 1b3c8c5 into fd43d52
Browse files Browse the repository at this point in the history
  • Loading branch information
rmruano committed Apr 21, 2022
2 parents fd43d52 + 1b3c8c5 commit fcb69c0
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 21 deletions.
70 changes: 54 additions & 16 deletions lib/smpp.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,24 @@ function Session(options) {
self.remoteAddress = self.rootSocket().remoteAddress;
self.remotePort = self.rootSocket().remotePort;
self.debug("server.connected", "connected to server", {secure: options.tls});
self.emit('connect'); // @todo should emmit the session, but it would break BC
self.emitMetric("server.connected", 1);
self.emit('connect'); // @todo should emit the session, but it would break BC
if(self.options.auto_enquire_link_period) {
self._interval = setInterval(function() {
self.enquire_link();
}, self.options.auto_enquire_link_period);
}
}).bind(this));
this.socket.on('secureConnect', (function() {
self.emit('secureConnect'); // @todo should emmit the session, but it would break BC
self.emit('secureConnect'); // @todo should emit the session, but it would break BC
}).bind(this));
}
this.socket.on('readable', function() {
if ( (self.socket.bytesRead - self._prevBytesRead) > 0 ) {
var bytesRead = self.socket.bytesRead - self._prevBytesRead;
if ( bytesRead > 0 ) {
// on disconnections the readable event receives 0 bytes, we do not want to debug that
self.debug("socket.data.in", null, {bytes: self.socket.bytesRead - self._prevBytesRead});
self.debug("socket.data.in", null, {bytes: bytesRead});
self.emitMetric("socket.data.in", bytesRead, {bytes: bytesRead});
self._prevBytesRead = self.socket.bytesRead;
}
self._extractPDUs();
Expand All @@ -87,8 +90,10 @@ function Session(options) {
clearTimeout(connectTimeout);
if (self._mode === "server") {
self.debug("client.disconnected", "client has disconnected");
self.emitMetric("client.disconnected", 1);
} else {
self.debug("server.disconnected", "disconnected from server");
self.emitMetric("server.disconnected", 1);
}
self.emit('close');
if(self._interval) {
Expand All @@ -98,11 +103,12 @@ function Session(options) {
});
this.socket.on('error', function(e) {
clearTimeout(connectTimeout);
self.debug("socket.error", e.message, e);
if (self._interval) {
clearInterval(self._interval);
self._interval = 0;
}
self.debug("socket.error", e.message, e);
self.emitMetric("socket.error", 1, {error: e});
self.emit('error', e); // Emitted errors will kill the program if they're not captured.
});
this.rootSocket = (function() {
Expand All @@ -116,6 +122,17 @@ function Session(options) {

util.inherits(Session, EventEmitter);

Session.prototype.emitMetric = function(event, value, payload) {
this.emit('metrics', event || null, value || null, payload || {}, {
mode: this._mode || null,
remoteAddress: this.remoteAddress || null,
remotePort: this.remotePort || null,
remoteTls: this.options.tls || null,
sessionId: this._id || null,
session: this
});
}

Session.prototype.debug = function(type, msg, payload) {
if (type === undefined) type = null;
if (msg === undefined) msg = null;
Expand All @@ -129,7 +146,11 @@ Session.prototype.debug = function(type, msg, payload) {
"server.disconnected": "\x1b[1m\x1b[31m",
"pdu.command.in": "\x1b[36m",
"pdu.command.out": "\x1b[32m",
"socket.error": "\x1b[41m\x1b[30m"
"pdu.command.error": "\x1b[41m\x1b[30m",
"socket.error": "\x1b[41m\x1b[30m",
"socket.data.in": "\x1b[2m",
"socket.data.out": "\x1b[2m",
"metrics": "\x1b[2m",
}
var now = new Date();
var logBuffer = now.toISOString() +
Expand Down Expand Up @@ -173,8 +194,10 @@ Session.prototype._extractPDUs = function() {
break;
}
this.debug("pdu.command.in", pdu.command, pdu);
this.emitMetric("pdu.command.in", 1, pdu);
} catch (e) {
this.debug("pdu.command.error", e.message, e);
this.emitMetric("pdu.command.error", 1, {error: e});
this.emit('error', e);
return;
}
Expand All @@ -191,7 +214,12 @@ Session.prototype._extractPDUs = function() {

Session.prototype.send = function(pdu, responseCallback, sendCallback, failureCallback) {
if (!this.socket.writable) {
this.debug('socket.data.error', null, {error: 'Socket is not writable'});
var errorObject = {
error: 'Socket is not writable',
errorType: 'socket_not_writable'
}
this.debug('socket.data.error', null, errorObject);
this.emitMetric("socket.data.error", 1, errorObject);
if (failureCallback) {
pdu.command_status = defs.errors.ESME_RSUBMITFAIL;
failureCallback(pdu);
Expand All @@ -215,24 +243,33 @@ Session.prototype.send = function(pdu, responseCallback, sendCallback, failureCa
sendCallback = responseCallback;
}
this.debug('pdu.command.out', pdu.command, pdu);
this.emitMetric("pdu.command.out", 1, pdu);
var buffer = pdu.toBuffer();
this.socket.write(buffer, (function(err) {
if (err) {
this.debug('socket.data.error', null, {error:'Cannot write command ' + pdu.command + ' to socket'});
this.debug('socket.data.error', null, {
error:'Cannot write command ' + pdu.command + ' to socket',
errorType: 'socket_write_error'
});
this.emitMetric("socket.data.error", 1, {
error: err,
errorType: 'socket_write_error',
pdu: pdu
});
if (!pdu.isResponse() && this._callbacks[pdu.sequence_number]) {
delete this._callbacks[pdu.sequence_number];
}
if (failureCallback) {
pdu.command_status = defs.errors.ESME_RSUBMITFAIL;
failureCallback(pdu);
}
return;
}

this.debug("socket.data.out", null, {bytes: buffer.length});
this.emit('send', pdu);
if (sendCallback) {
sendCallback(pdu);
} else {
this.debug("socket.data.out", null, {bytes: buffer.length});
this.emitMetric("socket.data.out", buffer.length, {bytes: buffer.length});
this.emit('send', pdu);
if (sendCallback) {
sendCallback(pdu);
}
}
}).bind(this));
return true;
Expand Down Expand Up @@ -344,6 +381,7 @@ function Server(options, listener) {
self.sessions.splice(self.sessions.indexOf(session), 1);
});
self.emit('session', session);
session.emitMetric("client.connected", 1);
});

if (this.isProxiedServer) {
Expand Down Expand Up @@ -434,7 +472,7 @@ exports.connect = exports.createSession = function(options, listener) {
clientOptions.tls = options.protocol === 'ssmpp:';
}
}
if (clientOptions.tls && !clientOptions.hasOwnProperty("rejectUnauthorized")) {
if (clientOptions.tls && !clientOptions.hasOwnProperty("rejectUnauthorized")) {
clientOptions.rejectUnauthorized = false; // Allow self signed certificates by default
}
clientOptions.port = clientOptions.port || (clientOptions.tls ? 3550 : 2775);
Expand Down
71 changes: 66 additions & 5 deletions test/smpp.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,16 @@ describe('Session', function() {
});
});
});

});

describe('Client/Server simulations', function() {

describe('standard connection simulations', function() {
var server, port, debugBuffer = [], lastServerError;
var server, port, secure = {}, debugBuffer = [], lastServerError;

beforeEach(function (done) {
server = smpp.createServer({}, function (session) {
var sessionHandler = function (session) {
debugBuffer = [];
// We'll use the debug event to track what happened inside the server
session.on('debug', function(type, msg, payload) {
Expand Down Expand Up @@ -307,16 +308,29 @@ describe('Client/Server simulations', function() {
lastServerError = err;
session.close();
});
});
server.listen(0, done);
}
server = smpp.createServer({}, sessionHandler);
server.listen(0);
port = server.address().port;

secure.server = smpp.createServer({
key: fs.readFileSync(__dirname + '/fixtures/server.key'),
cert: fs.readFileSync(__dirname + '/fixtures/server.crt')
}, sessionHandler);
secure.server.listen(0, done);
secure.port = secure.server.address().port;
});

afterEach(function (done) {
server.sessions.forEach(function (session) {
session.close();
});
server.close(done);
server.close();

secure.server.sessions.forEach(function (session) {
session.close();
});
secure.server.close(done);
});

it('should successfully bind a transceiver with a hardcoded user/password', function (done) {
Expand Down Expand Up @@ -407,6 +421,53 @@ describe('Client/Server simulations', function() {
done();
});
});

it('should successfully emit every expected metric', function (done) {
var clientMetricsEmitted = [], serverMetricsEmitted = [], metricsEntry = null;
var session = smpp.connect({
port: secure.port,
tls: true,
rejectUnauthorized: false
}, function () {
session.bind_transceiver({
system_id: 'FAKE_USER',
password: 'FAKE_PASSWORD'
}, function (pdu) {
session.close(function() {
// Check client metrics
for (i = 0, metricsEntry = null; i < clientMetricsEmitted.length && metricsEntry === null; i++) if (clientMetricsEmitted[i].event === "server.connected") metricsEntry = clientMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "server.connected entry not found in metrics");
for (i = 0, metricsEntry = null; i < clientMetricsEmitted.length && metricsEntry === null; i++) if (clientMetricsEmitted[i].event === "pdu.command.out") metricsEntry = clientMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "pdu.command.out entry not found in metrics");
for (i = 0, metricsEntry = null; i < clientMetricsEmitted.length && metricsEntry === null; i++) if (clientMetricsEmitted[i].event === "pdu.command.in") metricsEntry = clientMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "pdu.command.in entry not found in metrics");
for (i = 0, metricsEntry = null; i < clientMetricsEmitted.length && metricsEntry === null; i++) if (clientMetricsEmitted[i].event === "server.disconnected") metricsEntry = clientMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "server.disconnected entry not found in metrics");
})
});
});
// Add metrics loggers
session.on("metrics", function(event, value, payload, context) {
clientMetricsEmitted.push({event: event, value: value, payload: payload});
});
secure.server.on("session", function(serverSession) {
serverSession.on("metrics", function(event, value, payload, context) {
serverMetricsEmitted.push({event: event, value: value, payload: payload});
})
serverSession.on("close", function() {
// Check server metrics
for (i = 0, metricsEntry = null; i < serverMetricsEmitted.length && metricsEntry === null; i++) if (serverMetricsEmitted[i].event === "client.connected") metricsEntry = serverMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "client.connected entry not found in metrics");
for (i = 0, metricsEntry = null; i < serverMetricsEmitted.length && metricsEntry === null; i++) if (serverMetricsEmitted[i].event === "pdu.command.out") metricsEntry = serverMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "pdu.command.out entry not found in metrics");
for (i = 0, metricsEntry = null; i < serverMetricsEmitted.length && metricsEntry === null; i++) if (serverMetricsEmitted[i].event === "pdu.command.in") metricsEntry = serverMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "pdu.command.in entry not found in metrics");
for (i = 0, metricsEntry = null; i < serverMetricsEmitted.length && metricsEntry === null; i++) if (serverMetricsEmitted[i].event === "client.disconnected") metricsEntry = serverMetricsEmitted[i];
assert.notEqual(metricsEntry.event, null, "client.disconnected entry not found in metrics");
done();
})
});
});
});


Expand Down

0 comments on commit fcb69c0

Please sign in to comment.