Skip to content

Commit

Permalink
Fix AMQP disconnection logic that was causing "cannot read property w…
Browse files Browse the repository at this point in the history
…rite of undefined" errors
  • Loading branch information
Pierre Cauchois committed Mar 20, 2018
1 parent 79abf4b commit 75f23f8
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 22 deletions.
7 changes: 6 additions & 1 deletion common/transport/amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ export class Amqp {
};

this._amqp.on('disconnected', () => {
this._fsm.handle('amqpDisconnected');
// deferring this is necessary because in some instances
// the amqp10 library might want to send a close frame - and we don't want
// to trigger anything (especially not reconnection) before it has done so.
process.nextTick(() => {
this._fsm.handle('amqpDisconnected');
});
});

this._fsm = new machina.Fsm({
Expand Down
8 changes: 8 additions & 0 deletions device/transport/amqp/devdoc/device_amqp_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,11 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a
**SRS_NODE_DEVICE_AMQP_16_078: [** The `disableTwinDesiredPropertiesUpdates` method shall call its callback with and error if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` fails. **]**

**SRS_NODE_DEVICE_AMQP_16_079: [** The `disableTwinDesiredPropertiesUpdates` method shall call its callback no arguments if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` succeeds. **]**

### Errors

**SRS_NODE_DEVICE_AMQP_16_080: [** if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is disconnected, the call shall be ignored. **]**

**SRS_NODE_DEVICE_AMQP_16_081: [** if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error. **]**

**SRS_NODE_DEVICE_AMQP_16_082: [** if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connected, the connection shall be disconnected and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error. **]**
2 changes: 1 addition & 1 deletion device/transport/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec test/_*_test*.js",
"ci": "npm -s run lint && npm -s run build && npm -s run alltest-min && npm -s run check-cover",
"test": "npm -s run lint && npm -s run build && npm -s run unittest",
"check-cover": "istanbul check-coverage --statements 93 --branches 83 --lines 94 --functions 92"
"check-cover": "istanbul check-coverage --statements 94 --branches 84 --lines 95 --functions 93"
},
"engines": {
"node": ">= 0.10"
Expand Down
49 changes: 30 additions & 19 deletions device/transport/amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class Amqp extends EventEmitter implements Client.Transport {
this._amqp = baseClient || new BaseAmqpClient(false, 'azure-iot-device/' + packageJson.version);
this._amqp.setDisconnectHandler((err) => {
debug('disconnected event handler: ' + (err ? err.toString() : 'no error'));
this._fsm.handle('disconnect', () => {
this._fsm.handle('disconnectError', err, () => {
this.emit('disconnect', getTranslatedError(err, 'AMQP client disconnected'));
});
});
Expand Down Expand Up @@ -137,7 +137,7 @@ export class Amqp extends EventEmitter implements Client.Transport {
initialState: 'disconnected',
states: {
disconnected: {
_onEnter: (callback, err) => {
_onEnter: (err, callback) => {
if (callback) {
if (err) {
callback(err);
Expand Down Expand Up @@ -236,6 +236,11 @@ export class Amqp extends EventEmitter implements Client.Transport {
disableMethods: (callback) => {
// if we are disconnected the C2D link is already detached.
callback();
},
disconnectError: (err, callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_080: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is disconnected, the call shall be ignored.]*/
debug('ignoring disconnectError because already disconnected: ' + err.toString());
callback();
}
},
connecting: {
Expand All @@ -244,51 +249,52 @@ export class Amqp extends EventEmitter implements Client.Transport {
this._authenticationProvider.getDeviceCredentials((err, credentials) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_055: [The `connect` method shall call its callback with an error if the callback passed to the `getDeviceCredentials` method is called with an error.]*/
this._fsm.transition('disconnected', connectCallback, translateError('AMQP Transport: Could not get credentials', err));
this._fsm.transition('disconnected', translateError('AMQP Transport: Could not get credentials', err), connectCallback);
} else {
this._c2dEndpoint = endpoint.messagePath(encodeURIComponent(credentials.deviceId));
this._d2cEndpoint = endpoint.eventPath(credentials.deviceId);

const uri = this._getConnectionUri(credentials.host);
this._amqp.connect(uri, credentials.x509, (err, connectResult) => {
if (err) {
this._fsm.transition('disconnected', connectCallback, translateError('AMQP Transport: Could not connect', err));
this._fsm.transition('disconnected', translateError('AMQP Transport: Could not connect', err), connectCallback);
} else {
this._fsm.transition('authenticating', connectCallback, connectResult);
this._fsm.transition('authenticating', connectResult, connectCallback);
}
});
}

});
},
disconnect: (disconnectCallback, err) => this._fsm.transition('disconnecting', disconnectCallback, err),
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
updateSharedAccessSignature: (token, callback) => {
callback(null, new results.SharedAccessSignatureUpdated(false));
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
disconnectError: (err, callback) => this._fsm.transition('disconnecting', err, callback),
'*': () => this._fsm.deferUntilTransition()
},
authenticating: {
_onEnter: (connectCallback, connectResult) => {
_onEnter: (connectResult, connectCallback) => {
if (this._authenticationProvider.type === AuthenticationType.X509) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_005: [If x509 authentication is NOT being utilized then `initializeCBS` shall be invoked.]*/
this._fsm.transition('authenticated', connectCallback, connectResult);
this._fsm.transition('authenticated', connectResult, connectCallback);
} else {
this._amqp.initializeCBS((err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_008: [If `initializeCBS` is not successful then the client will be disconnected.]*/
this._fsm.transition('disconnecting', connectCallback, getTranslatedError(err, 'AMQP Transport: Could not initialize CBS'));
this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not initialize CBS'), connectCallback);
} else {
this._authenticationProvider.getDeviceCredentials((err, credentials) => {
if (err) {
this._fsm.transition('disconnecting', connectCallback, getTranslatedError(err, 'AMQP Transport: Could not get credentials from AuthenticationProvider'));
this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not get credentials from AuthenticationProvider'), connectCallback);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_06_006: [If `initializeCBS` is successful, `putToken` shall be invoked If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter `audience`, created from the `sr` of the shared access signature, the actual shared access signature, and a callback.]*/
this._amqp.putToken(SharedAccessSignature.parse(credentials.sharedAccessSignature, ['sr', 'sig', 'se']).sr, credentials.sharedAccessSignature, (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_009: [If `putToken` is not successful then the client will be disconnected.]*/
this._fsm.transition('disconnecting', connectCallback, getTranslatedError(err, 'AMQP Transport: Could not authorize with puttoken'));
this._fsm.transition('disconnecting', getTranslatedError(err, 'AMQP Transport: Could not authorize with puttoken'), connectCallback);
} else {
this._fsm.transition('authenticated', connectCallback, connectResult);
this._fsm.transition('authenticated', connectResult, connectCallback);
}
});
}
Expand All @@ -297,15 +303,17 @@ export class Amqp extends EventEmitter implements Client.Transport {
});
}
},
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', disconnectCallback),
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
/*Codes_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
disconnectError: (err, callback) => this._fsm.transition('disconnecting', err, callback),
'*': () => this._fsm.deferUntilTransition()
},
authenticated: {
_onEnter: (connectCallback, connectResult) => {
_onEnter: (connectResult, connectCallback) => {
connectCallback(null, connectResult);
},
connect: (connectCallback) => connectCallback(null, new results.Connected()),
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', disconnectCallback),
disconnect: (disconnectCallback) => this._fsm.transition('disconnecting', null, disconnectCallback),
sendEvent: (message, sendCallback) => {
let amqpMessage = AmqpMessage.fromMessage(message);
amqpMessage.properties.to = this._d2cEndpoint;
Expand Down Expand Up @@ -387,10 +395,12 @@ export class Amqp extends EventEmitter implements Client.Transport {
/*Codes_SRS_NODE_DEVICE_AMQP_16_042: [The `disableMethods` method shall call `detach` on the device method links and call its callback when these are successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_043: [The `disableMethods` method shall call its `callback` with an `Error` if it fails to detach the device method links.]*/
this._deviceMethodClient.detach(callback);
}
},
/*Codes_SRS_NODE_DEVICE_AMQP_16_082: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connected, the connection shall be disconnected and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
disconnectError: (err, callback) => this._fsm.transition('disconnecting', err, callback)
},
disconnecting: {
_onEnter: (disconnectCallback, err) => {
_onEnter: (err, disconnectCallback) => {
let finalError = err;
async.series([
(callback) => {
Expand Down Expand Up @@ -480,7 +490,7 @@ export class Amqp extends EventEmitter implements Client.Transport {
], () => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_010: [The `done` callback method passed in argument shall be called when disconnected.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_011: [The `done` callback method passed in argument shall be called with an error object if disconnecting fails.]*/
this._fsm.transition('disconnected', disconnectCallback, finalError);
this._fsm.transition('disconnected', finalError, disconnectCallback);
});
},
'*': (connectCallback) => this._fsm.deferUntilTransition()
Expand Down Expand Up @@ -750,6 +760,7 @@ export class Amqp extends EventEmitter implements Client.Transport {
// detaching listeners and getting rid of the object anyway.
tmpC2DLink.removeListener('error', this._c2dErrorListener);
tmpC2DLink.removeListener('message', this._c2dMessageListener);
callback();
} else {
tmpC2DLink.detach((err) => {
if (err) {
Expand Down
61 changes: 60 additions & 1 deletion device/transport/amqp/test/_amqp_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ describe('Amqp', function () {
var receiver = null;
var sender = null;
var fakeBaseClient = null;
var disconnectHandler = null;
var fakeTokenAuthenticationProvider = null;
var fakeX509AuthenticationProvider = null;

Expand Down Expand Up @@ -59,7 +60,7 @@ describe('Amqp', function () {
attachReceiverLink: sinon.stub().callsArgWith(2, null, receiver),
detachSenderLink: sinon.stub().callsArg(1),
detachReceiverLink: sinon.stub().callsArg(1),
setDisconnectHandler: sinon.stub(),
setDisconnectHandler: sinon.stub().callsFake(function (handler) { disconnectHandler = handler; }),
send: sinon.stub().callsArgWith(3, null, new results.MessageEnqueued())
};

Expand Down Expand Up @@ -831,6 +832,64 @@ describe('Amqp', function () {
}, errors.InvalidOperationError);
});
});

describe('on(\'disconnect\')', function () {
/*Tests_SRS_NODE_DEVICE_AMQP_16_080: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is disconnected, the call shall be ignored.]*/
it('ignores the event if already disconnected', function () {
transport.on('error', function () {
assert.fail();
});

disconnectHandler(new Error());
});

/*Tests_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
it('emits a disconnect event if called while connecting', function (testCallback) {
var fakeError = new Error('disconnected');
fakeBaseClient.connect = sinon.stub();

transport.on('disconnect', function (err) {
assert.strictEqual(err.amqpError, fakeError);
assert(fakeBaseClient.connect.calledOnce);
assert(fakeBaseClient.disconnect.calledOnce);
testCallback();
});
transport.connect(function () {});

disconnectHandler(fakeError);
});

/*Tests_SRS_NODE_DEVICE_AMQP_16_081: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connecting or authenticating, the connection shall be stopped and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
it('emits an error event if called while authenticating', function (testCallback) {
var fakeError = new Error('disconnected');
fakeBaseClient.putToken = sinon.stub();

transport.on('disconnect', function (err) {
assert.strictEqual(err.amqpError, fakeError);
assert(fakeBaseClient.connect.calledOnce);
assert(fakeBaseClient.disconnect.calledOnce);
testCallback();
});
transport.connect(function () {});

disconnectHandler(fakeError);
});

/*Tests_SRS_NODE_DEVICE_AMQP_16_082: [if the handler specified in the `setDisconnectHandler` call is called while the `Amqp` object is connected, the connection shall be disconnected and an `disconnect` event shall be emitted with the error translated to a transport-agnostic error.]*/
it('emits an error event if called while connected and authenticated', function (testCallback) {
var fakeError = new Error('disconnected');
transport.on('disconnect', function (err) {
assert.strictEqual(err.amqpError, fakeError);
assert(fakeBaseClient.connect.calledOnce);
assert(fakeBaseClient.disconnect.calledOnce);
testCallback();
});

transport.connect(function () {});

disconnectHandler(fakeError);
});
});
});

describe('D2C', function () {
Expand Down

0 comments on commit 75f23f8

Please sign in to comment.