Skip to content

Commit

Permalink
AMQP: Detach links before disconnecting the transport (Fix for transi…
Browse files Browse the repository at this point in the history
…ent C2D test failure)
  • Loading branch information
pierreca committed Mar 13, 2017
1 parent 68cf52d commit 15dc3a3
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 79 deletions.
6 changes: 4 additions & 2 deletions common/transport/amqp/devdoc/amqp_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ A done callback.
### disconnect(done)
Disconnects the application or device from the IoT Hub instance.
**SRS_NODE_COMMON_AMQP_16_004: [**The disconnect method shall call the `done` callback when the application/service has been successfully disconnected from the service**]**
**SRS_NODE_COMMON_AMQP_16_034: [** The `disconnect` method shall detach all open links before disconnecting the underlying AMQP client. **]**
**SRS_NODE_COMMON_AMQP_16_005: [**The disconnect method shall call the `done` callback and pass the error as a parameter if the disconnection is unsuccessful**]**
**SRS_NODE_COMMON_AMQP_16_004: [**The `disconnect` method shall call the `done` callback when the application/service has been successfully disconnected from the service**]**
**SRS_NODE_COMMON_AMQP_16_005: [**The `disconnect` method shall call the `done` callback and pass the error as a parameter if the disconnection is unsuccessful**]**
### send (message, endpoint, to, done)
Builds and sends an AMQP message with the body set to the message parameter to the IoT Hub service, using the endpoint and destination passed as arguments.
Expand Down
57 changes: 41 additions & 16 deletions common/transport/amqp/lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
'use strict';

var amqp10 = require('amqp10');
var Promise = require('bluebird');
var AmqpMessage = require('./amqp_message.js');
var AmqpReceiver = require('./amqp_receiver.js');
var errors = require('azure-iot-common').errors;
Expand Down Expand Up @@ -144,18 +145,38 @@ Amqp.prototype.setDisconnectHandler = function (disconnectCallback) {
* @param {Function} done Called when disconnected of if an error happened.
*/
Amqp.prototype.disconnect = function disconnect(done) {
this._amqp.disconnect()
.then(function (result) {
this._connected = false;
this._senders = {};
/*Codes_SRS_NODE_COMMON_AMQP_16_004: [The disconnect method shall call the done callback when the application/service has been successfully disconnected from the service] */
safeCallback(done, null, result);
return null;
}.bind(this))
.catch(function (err) {
/*SRS_NODE_COMMON_AMQP_16_005: [The disconnect method shall call the done callback and pass the error as a parameter if the disconnection is unsuccessful] */
safeCallback(done, err);
});
var self = this;

/*Codes_SRS_NODE_COMMON_AMQP_16_034: [The `disconnect` method shall detach all open links before disconnecting the underlying AMQP client.]*/
var openLinks = [];
for (var sender_endpoint in this._senders) {
if (this._senders.hasOwnProperty(sender_endpoint)) {
openLinks.push(this._senders[sender_endpoint]);
delete this._senders[sender_endpoint];
}
}
self._senders = {};

for (var receiver_endpoint in this._receivers) {
if (this._receivers.hasOwnProperty(receiver_endpoint)) {
openLinks.push(this._receivers[receiver_endpoint]._amqpReceiver);
delete this._receivers[receiver_endpoint];
}
}
self.receivers = {};

Promise.all(openLinks.map(function (link) { return link.detach(); }))
.then(this._amqp.disconnect.bind(this._amqp))
.then(function (result) {
self._connected = false;
/*Codes_SRS_NODE_COMMON_AMQP_16_004: [The disconnect method shall call the done callback when the application/service has been successfully disconnected from the service] */
safeCallback(done, null, result);
return null;
})
.catch(function (err) {
/*SRS_NODE_COMMON_AMQP_16_005: [The disconnect method shall call the done callback and pass the error as a parameter if the disconnection is unsuccessful] */
safeCallback(done, err);
});
};

/**
Expand Down Expand Up @@ -341,10 +362,14 @@ Amqp.prototype.detachReceiverLink = function detachReceiverLink(endpoint, detach
throw new ReferenceError('endpoint cannot be \'' + endpoint + '\'');
}
var self = this;
this._detachLink(this._receivers[endpoint], function(err) {
delete(self._receivers[endpoint]);
detachCallback(err);
});
if(!this._receivers[endpoint]) {
safeCallback(detachCallback);
} else {
this._detachLink(this._receivers[endpoint]._amqpReceiver, function(err) {
delete(self._receivers[endpoint]);
detachCallback(err);
});
}
};

/**
Expand Down
4 changes: 2 additions & 2 deletions common/transport/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
"typings": "index.d.ts",
"dependencies": {
"amqp10": "3.5.0",
"bluebird": "^3.5.0",
"amqp10-transport-ws": "^0.0.5",
"azure-iot-common": "1.1.5",
"debug": "^2.6.0",
"sinon": "^1.17.7",
"sinon-as-promised": "^4.0.2"
},
"devDependencies": {
"bluebird": "^3.4.7",
"chai": "^3.5.0",
"istanbul": "^0.4.5",
"jshint": "^2.9.4",
Expand All @@ -32,7 +32,7 @@
"alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec test/_*_test*.js",
"ci": "npm -s run lint && npm -s run alltest-min && npm -s run check-cover",
"test": "npm -s run lint && npm -s run unittest",
"check-cover": "istanbul check-coverage --statements 95 --branches 83 --functions 92 --lines 94"
"check-cover": "istanbul check-coverage --statements 95 --branches 83 --functions 92 --lines 95"
},
"engines": {
"node": ">= 0.10"
Expand Down
198 changes: 141 additions & 57 deletions common/transport/amqp/test/_amqp_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,32 @@ describe('Amqp', function () {
});

describe('#disconnect', function() {
/*Tests_SRS_NODE_COMMON_AMQP_16_034: [The `disconnect` method shall detach all open links before disconnecting the underlying AMQP client.]*/
it('detaches existing links before disconnecting the client', function(testCallback) {
var amqp = new Amqp();
sinon.stub(amqp._amqp, 'disconnect').resolves();
var fakeSender = {
detach: sinon.stub().resolves()
};
var fakeReceiver = {
_amqpReceiver: {
detach: sinon.stub().resolves()
}
};
amqp._senders.fake_sender_endpoint = fakeSender;
amqp._receivers.fake_receiver_endpoint = fakeReceiver;

amqp.disconnect(function(err) {
if (err) {
testCallback(err);
} else {
assert(fakeSender.detach.calledOnce);
assert(fakeReceiver._amqpReceiver.detach.calledOnce);
testCallback();
}
});
});

/*Tests_SRS_NODE_COMMON_AMQP_16_004: [The disconnect method shall call the `done` callback when the application/service has been successfully disconnected from the service]*/
it('calls the done callback if disconnected successfully', function(testCallback) {
var amqp = new Amqp();
Expand Down Expand Up @@ -401,74 +427,132 @@ describe('Amqp', function () {
});
});

[
{amqpFunc: 'detachSenderLink', privateLinkArray: '_senders' },
{amqpFunc: 'detachReceiverLink', privateLinkArray: '_receivers' },
].forEach(function(testConfig) {
describe('#' + testConfig.amqpFunc, function() {
/*Tests_SRS_NODE_COMMON_AMQP_16_022: [The `detachSenderLink` method shall throw a ReferenceError if the `endpoint` argument is falsy.]*/
/*Tests_SRS_NODE_COMMON_AMQP_16_027: [The `detachReceiverLink` method shall throw a ReferenceError if the `endpoint` argument is falsy.]*/
[null, undefined, ''].forEach(function(badEndpoint) {
it('throws if the endpoint is \'' + badEndpoint + '\'', function() {
var amqp = new Amqp();
assert.throws(function() {
amqp[testConfig.amqpFunc](badEndpoint, null, function() {});
}, ReferenceError);
});
describe('#detachSenderLink', function() {
/*Tests_SRS_NODE_COMMON_AMQP_16_022: [The `detachSenderLink` method shall throw a ReferenceError if the `endpoint` argument is falsy.]*/
[null, undefined, ''].forEach(function(badEndpoint) {
it('throws if the endpoint is \'' + badEndpoint + '\'', function() {
var amqp = new Amqp();
assert.throws(function() {
amqp.detachSenderLink(badEndpoint, null, function() {});
}, ReferenceError);
});
});

/*Tests_SRS_NODE_COMMON_AMQP_16_023: [The `detachSenderLink` method shall call detach on the link object corresponding to the endpoint passed as argument.]*/
/*Tests_SRS_NODE_COMMON_AMQP_16_028: [The `detachReceiverLink` method shall call detach on the link object corresponding to the endpoint passed as argument.]*/
it('calls the \'detach\' method on the link object', function (testCallback) {
var fakeLink = {
detach: function () { return new Promise(function() {}); }
};
sinon.stub(fakeLink, 'detach').resolves();
/*Tests_SRS_NODE_COMMON_AMQP_16_023: [The `detachSenderLink` method shall call detach on the link object corresponding to the endpoint passed as argument.]*/
it('calls the \'detach\' method on the link object', function (testCallback) {
var fakeLink = {
detach: function () { return new Promise(function() {}); }
};
sinon.stub(fakeLink, 'detach').resolves();

var amqp = new Amqp();
amqp._senders[endpoint] = fakeLink;
/*Tests_SRS_NODE_COMMON_AMQP_16_024: [The `detachSenderLink` method shall call the `done` callback with no arguments if detaching the link succeeded.]*/
amqp.detachSenderLink(endpoint, function(err) {
assert.isUndefined(err);
assert.isUndefined(amqp._senders[endpoint]);
testCallback();
});
});

/*Tests_SRS_NODE_COMMON_AMQP_16_025: [The `detachSenderLink` method shall call the `done` callback with no arguments if the link for this endpoint doesn't exist.]*/
it('calls the callback immediately if there\'s no link attached', function (testCallback) {
var fakeLink = {
detach: function () { return new Promise(function() {}); }
};
sinon.stub(fakeLink, 'detach').resolves();

var amqp = new Amqp();
assert.isUndefined(amqp._senders[endpoint]);
amqp.detachSenderLink(endpoint, function(err) {
assert.isUndefined(err);
assert.isUndefined(amqp._senders[endpoint]);
testCallback();
});
});

/*Tests_SRS_NODE_COMMON_AMQP_16_026: [The `detachSenderLink` method shall call the `done` callback with an `Error` object if there was an error while detaching the link.]*/
it('calls the callback with an error if detaching the link causes an error', function (testCallback) {
var fakeError = new Error('failed to detach');
var fakeLink = {
detach: function () { return new Promise(function() {}); }
};
sinon.stub(fakeLink, 'detach').rejects(fakeError);

var amqp = new Amqp();
amqp._senders[endpoint] = fakeLink;
amqp.detachSenderLink(endpoint, function(err) {
assert.strictEqual(fakeError, err);
assert.isUndefined(amqp._senders[endpoint]);
testCallback();
});
});
});

describe('#detachReceiverLink', function() {
/*Tests_SRS_NODE_COMMON_AMQP_16_027: [The `detachReceiverLink` method shall throw a ReferenceError if the `endpoint` argument is falsy.]*/
[null, undefined, ''].forEach(function(badEndpoint) {
it('throws if the endpoint is \'' + badEndpoint + '\'', function() {
var amqp = new Amqp();
amqp[testConfig.privateLinkArray][endpoint] = fakeLink;
/*Tests_SRS_NODE_COMMON_AMQP_16_024: [The `detachSenderLink` method shall call the `done` callback with no arguments if detaching the link succeeded.]*/
/*Tests_SRS_NODE_COMMON_AMQP_16_029: [The `detachReceiverLink` method shall call the `done` callback with no arguments if detaching the link succeeded.]*/
amqp[testConfig.amqpFunc](endpoint, function(err) {
assert.isUndefined(err);
assert.isUndefined(amqp[testConfig.privateLinkArray][endpoint]);
testCallback();
});
assert.throws(function() {
amqp.detachReceiverLink(badEndpoint, null, function() {});
}, ReferenceError);
});
});

/*Tests_SRS_NODE_COMMON_AMQP_16_025: [The `detachSenderLink` method shall call the `done` callback with no arguments if the link for this endpoint doesn't exist.]*/
/*Tests_SRS_NODE_COMMON_AMQP_16_030: [The `detachReceiverLink` method shall call the `done` callback with no arguments if the link for this endpoint doesn't exist.]*/
it('calls the callback immediately if there\'s no link attached', function (testCallback) {
var fakeLink = {
/*Tests_SRS_NODE_COMMON_AMQP_16_028: [The `detachReceiverLink` method shall call detach on the link object corresponding to the endpoint passed as argument.]*/
it('calls the \'detach\' method on the link object', function (testCallback) {
var fakeLink = {
_amqpReceiver: {
detach: function () { return new Promise(function() {}); }
};
sinon.stub(fakeLink, 'detach').resolves();

var amqp = new Amqp();
assert.isUndefined(amqp[testConfig.privateLinkArray][endpoint]);
amqp[testConfig.amqpFunc](endpoint, function(err) {
assert.isUndefined(err);
assert.isUndefined(amqp[testConfig.privateLinkArray][endpoint]);
testCallback();
});
}
};
sinon.stub(fakeLink._amqpReceiver, 'detach').resolves();

var amqp = new Amqp();
amqp._receivers[endpoint] = fakeLink;
/*Tests_SRS_NODE_COMMON_AMQP_16_029: [The `detachReceiverLink` method shall call the `done` callback with no arguments if detaching the link succeeded.]*/
amqp.detachReceiverLink(endpoint, function(err) {
assert.isUndefined(err);
assert.isUndefined(amqp._receivers[endpoint]);
testCallback();
});
});

/*Tests_SRS_NODE_COMMON_AMQP_16_026: [The `detachSenderLink` method shall call the `done` callback with an `Error` object if there was an error while detaching the link.]*/
/*Tests_SRS_NODE_COMMON_AMQP_16_031: [The `detachReceiverLink` method shall call the `done` callback with an `Error` object if there was an error while detaching the link.]*/
it('calls the callback with an error if detaching the link causes an error', function (testCallback) {
var fakeError = new Error('failed to detach');
var fakeLink = {
/*Tests_SRS_NODE_COMMON_AMQP_16_030: [The `detachReceiverLink` method shall call the `done` callback with no arguments if the link for this endpoint doesn't exist.]*/
it('calls the callback immediately if there\'s no link attached', function (testCallback) {
var fakeLink = {
_amqpReceiver: {
detach: function () { return new Promise(function() {}); }
};
sinon.stub(fakeLink, 'detach').rejects(fakeError);
}
};
sinon.stub(fakeLink._amqpReceiver, 'detach').resolves();

var amqp = new Amqp();
assert.isUndefined(amqp._receivers[endpoint]);
amqp.detachReceiverLink(endpoint, function(err) {
assert.isUndefined(err);
assert.isUndefined(amqp._receivers[endpoint]);
testCallback();
});
});

var amqp = new Amqp();
amqp[testConfig.privateLinkArray][endpoint] = fakeLink;
amqp[testConfig.amqpFunc](endpoint, function(err) {
assert.strictEqual(fakeError, err);
assert.isUndefined(amqp[testConfig.privateLinkArray][endpoint]);
testCallback();
});
/*Tests_SRS_NODE_COMMON_AMQP_16_031: [The `detachReceiverLink` method shall call the `done` callback with an `Error` object if there was an error while detaching the link.]*/
it('calls the callback with an error if detaching the link causes an error', function (testCallback) {
var fakeError = new Error('failed to detach');
var fakeLink = {
_amqpReceiver: {
detach: function () { return new Promise(function() {}); }
}
};
sinon.stub(fakeLink._amqpReceiver, 'detach').rejects(fakeError);

var amqp = new Amqp();
amqp._receivers[endpoint] = fakeLink;
amqp.detachReceiverLink(endpoint, function(err) {
assert.strictEqual(fakeError, err);
assert.isUndefined(amqp._receivers[endpoint]);
testCallback();
});
});
});
Expand Down
3 changes: 1 addition & 2 deletions e2etests/test/device_acknowledge_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var uuid = require('uuid');

var runTests = function (hubConnectionString, deviceTransport, provisionedDevice) {
describe('Device utilizing ' + provisionedDevice.authenticationDescription + ' authentication, connected over ' + deviceTransport.name, function () {
this.timeout(60000);

var serviceClient, deviceClient;

Expand All @@ -26,7 +27,6 @@ var runTests = function (hubConnectionString, deviceTransport, provisionedDevice
});

it('Service sends 1 C2D message and it is re-sent until the device completes it', function (done) {
this.timeout(60000);
var guid = uuid.v4();

var abandonnedOnce = false;
Expand Down Expand Up @@ -87,7 +87,6 @@ var runTests = function (hubConnectionString, deviceTransport, provisionedDevice
});

it('Service sends 1 C2D message and it is re-sent until the device rejects it', function (done) {
this.timeout(60000);
var guid = uuid.v4();

var abandonnedOnce = false;
Expand Down

0 comments on commit 15dc3a3

Please sign in to comment.