From f48db52b816d75510c6f9ab49eb3ae5e67732c7d Mon Sep 17 00:00:00 2001 From: Yoseph Maguire Date: Thu, 14 May 2020 12:39:13 -0700 Subject: [PATCH] refactor(multipe packages): c2d states no longer using booleans (#797) This moves the logic of checking if C2D is enabled down to the transport level, since it is not necessarily accurate to represent it at the device client level. There is not enough information at that level to make totally accurate judgements of if it is connected or not. --- .../mqtt/devdoc/mqtt_base_requirements.md | 6 +- common/transport/mqtt/src/mqtt_base.ts | 2 + common/transport/mqtt/test/_mqtt_base_test.js | 6 +- device/core/src/device_client.ts | 71 ++++++++----------- device/core/test/_device_client_test.js | 9 +-- .../amqp/devdoc/device_amqp_requirements.md | 7 +- device/transport/amqp/src/amqp.ts | 50 ++++++++----- device/transport/amqp/test/_amqp_test.js | 33 ++++++++- .../mqtt/devdoc/mqtt_requirements.md | 12 ++++ device/transport/mqtt/src/mqtt.ts | 55 +++++++++++--- device/transport/mqtt/test/_mqtt_test.js | 38 +++++++++- e2etests/test/c2d_disconnect.js | 4 +- 12 files changed, 210 insertions(+), 83 deletions(-) diff --git a/common/transport/mqtt/devdoc/mqtt_base_requirements.md b/common/transport/mqtt/devdoc/mqtt_base_requirements.md index b0aef60f5..abc4a3037 100644 --- a/common/transport/mqtt/devdoc/mqtt_base_requirements.md +++ b/common/transport/mqtt/devdoc/mqtt_base_requirements.md @@ -107,4 +107,8 @@ The `disconnect` method closes the connection to the server. **SRS_NODE_COMMON_MQTT_BASE_16_035: [** The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds. **]** -**SRS_NODE_COMMON_MQTT_BASE_16_036: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails. **]** \ No newline at end of file +**SRS_NODE_COMMON_MQTT_BASE_16_036: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails. **]** + +**SRS_NODE_COMMON_MQTT_BASE_41_002: [** The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect. **]** + +**SRS_NODE_COMMON_MQTT_BASE_41_003: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out. **]** diff --git a/common/transport/mqtt/src/mqtt_base.ts b/common/transport/mqtt/src/mqtt_base.ts index e7f66c999..25d9f1e1f 100644 --- a/common/transport/mqtt/src/mqtt_base.ts +++ b/common/transport/mqtt/src/mqtt_base.ts @@ -123,6 +123,8 @@ export class MqttBase extends EventEmitter { /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/ /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_036: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails.]*/ let switched = false; + /*Codes_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ + /*Codes_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/ const disconnectTimeout = setTimeout(() => { debug('disconnecting mqtt client timed out. Force disconnecting.'); switched = true; diff --git a/common/transport/mqtt/test/_mqtt_base_test.js b/common/transport/mqtt/test/_mqtt_base_test.js index 50a9af3f6..bc753a02b 100644 --- a/common/transport/mqtt/test/_mqtt_base_test.js +++ b/common/transport/mqtt/test/_mqtt_base_test.js @@ -463,7 +463,6 @@ describe('MqttBase', function () { fakeMqtt.emit('connect'); }); - /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_XXX: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ it('force disconnects the client on disconnect timeout during reconnecting', function (testCallback) { this.clock = sinon.useFakeTimers(); var newSas = 'newsas'; @@ -476,6 +475,7 @@ describe('MqttBase', function () { }).bind(this); fakeMqtt.end.onSecondCall().callsFake((force, callback) => { + /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ assert.isTrue(force); callback(); }).bind(this); @@ -484,7 +484,6 @@ describe('MqttBase', function () { assert.isTrue(fakeMqtt.connect.calledOnce); assert.strictEqual(fakeMqtt.connect.firstCall.args[1].password, fakeConfig.sharedAccessSignature); transport.updateSharedAccessSignature(newSas, function (err) { - /*Tests_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/ assert.notExists(err); assert.isTrue(fakeMqtt.end.calledTwice); assert.isTrue(fakeMqtt.connect.calledTwice); @@ -516,7 +515,7 @@ describe('MqttBase', function () { fakeMqtt.emit('connect'); }); - /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_XXX: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ + /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/ it('calls the callback with an error if it fails to reconnect the mqtt client (force disconnect)', function (testCallback) { this.clock = sinon.useFakeTimers(); var fakeError = new Error('fake failed to reconnect'); @@ -532,7 +531,6 @@ describe('MqttBase', function () { var transport = new MqttBase(fakeMqtt); transport.connect(fakeConfig, function () { transport.updateSharedAccessSignature('newSas', function (err) { - /*Tests_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/ assert.isTrue(fakeMqtt.end.calledTwice); assert.isTrue(fakeMqtt.connect.calledTwice); assert.strictEqual(err, fakeError); diff --git a/device/core/src/device_client.ts b/device/core/src/device_client.ts index b5d67d62b..21fa262e5 100644 --- a/device/core/src/device_client.ts +++ b/device/core/src/device_client.ts @@ -28,7 +28,7 @@ function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Er * to create an IoT Hub device client. */ export class Client extends InternalClient { - private _c2dEnabled: boolean; + private _c2dFeature: boolean; private _deviceDisconnectHandler: (err?: Error, result?: any) => void; private _blobUploadClient: BlobUploadClient; private _fileUploadApi: FileUploadInterface; @@ -45,7 +45,7 @@ export class Client extends InternalClient { constructor(transport: DeviceTransport, connStr?: string, blobUploadClient?: BlobUploadClient, fileUploadApi?: FileUploadInterface) { super(transport, connStr); this._blobUploadClient = blobUploadClient; - this._c2dEnabled = false; + this._c2dFeature = false; this._fileUploadApi = fileUploadApi; this.on('removeListener', (eventName) => { @@ -57,6 +57,7 @@ export class Client extends InternalClient { debug('in removeListener, error disabling C2D.'); this.emit('error', err); } else { + this._c2dFeature = false; debug('removeListener successfully disabled C2D.'); } }); @@ -72,6 +73,7 @@ export class Client extends InternalClient { debug('in newListener, error enabling C2D.'); this.emit('error', err); } else { + this._c2dFeature = true; debug('in newListener, successfully enabled C2D'); } }); @@ -89,10 +91,9 @@ export class Client extends InternalClient { if (err && this._retryPolicy.shouldRetry(err)) { debug('reconnect policy specifies a reconnect on error'); /*Codes_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/ - debug('_c2dEnabled is: ' + this._c2dEnabled); - if (this._c2dEnabled) { - this._c2dEnabled = false; - debug('re-enabling C2D link.'); + if (this._c2dFeature) { + // turn on C2D + debug('disconnectHandler re-enabling C2D'); this._enableC2D((err) => { if (err) { /*Codes_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/ @@ -103,7 +104,7 @@ export class Client extends InternalClient { } }); } else { - debug('During _deviceDisconnectHandler, _c2dEnabled is false'); + debug('C2D has not been enabled on the device'); } } }; @@ -251,44 +252,30 @@ export class Client extends InternalClient { private _enableC2D(callback: (err?: Error) => void): void { - debug('_c2dEnabled is: ' + this._c2dEnabled); - if (!this._c2dEnabled) { - debug('enabling C2D'); - const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout); - retryOp.retry((opCallback) => { - this._transport.enableC2D(opCallback); - }, (err) => { - if (!err) { - debug('enabled C2D. Setting this._c2dEnabled to true.'); - this._c2dEnabled = true; - } else { - debug('Error while enabling C2D.'); - } - callback(err); - }); - } else { - debug('this._c2dEnable is true. Not enabling C2D.'); - callback(); - } + debug('enabling C2D'); + const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout); + retryOp.retry((opCallback) => { + this._transport.enableC2D(opCallback); + }, (err) => { + if (!err) { + debug('enabled C2D'); + } else { + debug('Error while enabling C2D.'); + } + callback(err); + }); } private _disableC2D(callback: (err?: Error) => void): void { - debug('_c2dEnabled is: ' + this._c2dEnabled); - if (this._c2dEnabled) { - debug('disabling C2D'); - this._transport.disableC2D((err) => { - if (!err) { - debug('disabled C2D. Setting this._c2dEnabled to false.'); - this._c2dEnabled = false; - } else { - debug('Error while disabling C2D.'); - } - callback(err); - }); - } else { - debug('this._c2dEnable is false. Not disabling C2D.'); - callback(); - } + debug('disabling C2D'); + this._transport.disableC2D((err) => { + if (!err) { + debug('disabled C2D'); + } else { + debug('Error while disabling C2D.'); + } + callback(err); + }); } /** diff --git a/device/core/test/_device_client_test.js b/device/core/test/_device_client_test.js index dd8365576..a17bcaf32 100644 --- a/device/core/test/_device_client_test.js +++ b/device/core/test/_device_client_test.js @@ -324,11 +324,12 @@ describe('Device Client', function () { sinon.spy(fakeTransport, 'enableC2D'); var client = new Client(fakeTransport); - // Calling 'on' twice to make sure it's called only once on the receiver. - // It works because the test will fail if the test callback is called multiple times, and it's called for every time the 'message' event is subscribed on the receiver. + // Calling 'on' thrice to make sure it's called each time on the receiver. + // It should work because enableC2D does not check that it has not been called once before, instead leaving that job up to the transport level. client.on('message', function () { }); client.on('message', function () { }); - assert.isTrue(fakeTransport.enableC2D.calledOnce); + client.on('message', function () { }); + assert.isTrue(fakeTransport.enableC2D.calledThrice, 'client called enabledC2D the incorrect number of times'); }); /*Tests_SRS_NODE_DEVICE_CLIENT_16_005: [The client shall stop listening for messages from the service whenever the last listener unsubscribes from the ‘message’ event.]*/ @@ -348,7 +349,7 @@ describe('Device Client', function () { client.removeListener('message', listener1); assert.isTrue(fakeTransport.disableC2D.notCalled); client.removeListener('message', listener2); - assert(fakeTransport.disableC2D.calledOnce); + assert.strictEqual(fakeTransport.disableC2D.callCount, 1, 'disableC2D not called once'); testCallback(); }); }); diff --git a/device/transport/amqp/devdoc/device_amqp_requirements.md b/device/transport/amqp/devdoc/device_amqp_requirements.md index e05394d7e..bc86aa4e0 100644 --- a/device/transport/amqp/devdoc/device_amqp_requirements.md +++ b/device/transport/amqp/devdoc/device_amqp_requirements.md @@ -159,7 +159,7 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a **SRS_NODE_DEVICE_AMQP_06_013: [** The authentication providers `setTokenRenewalValues` method shall be invoked with the values provided in the tokenRenewal option. **]** - + **SRS_NODE_DEVICE_AMQP_13_001: [** The `setOptions` method shall save the options passed in. **]** @@ -202,6 +202,8 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a **SRS_NODE_DEVICE_AMQP_16_033: [** The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link. **]** +**SRS_NODE_DEVICE_AMQP_41_003: [** The `enableC2D` method shall attach the C2D link only if it is not already attached. **]** + **SRS_NODE_DEVICE_AMQP_16_034: [** Any `error` event received on the C2D link shall trigger the emission of an `error` event by the transport, with an argument that is a `CloudToDeviceDetachedError` object with the `innerError` property set to that error. **]** ### disableC2D(callback) @@ -212,6 +214,9 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a **SRS_NODE_DEVICE_AMQP_16_037: [** The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected. **]** +**SRS_NODE_DEVICE_AMQP_41_004: [** The `disableC2D` method shall detach the C2D link only if it is already attached. **]** + + ### enableInputMessages(callback: (err?: Error) => void): void; **SRS_NODE_DEVICE_AMQP_18_010: [** The `enableInputMessages` method shall enable C2D messages **]** diff --git a/device/transport/amqp/src/amqp.ts b/device/transport/amqp/src/amqp.ts index f30676065..6c698201a 100644 --- a/device/transport/amqp/src/amqp.ts +++ b/device/transport/amqp/src/amqp.ts @@ -414,33 +414,47 @@ export class Amqp extends EventEmitter implements DeviceTransport { /*Codes_SRS_NODE_DEVICE_AMQP_16_079: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback no arguments if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` succeeds.]*/ disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(handleResult('could not disable twin desired properties updates', callback)), enableC2D: (callback) => { - debug('attaching C2D link'); - this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => { - if (err) { - debug('error creating a C2D link: ' + err.toString()); - /*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/ - handleResult('AMQP Transport: Could not attach link', callback)(err); - } else { - /*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/ - debug('C2D link created and attached successfully'); - this._c2dLink = receiverLink; - this._c2dLink.on('error', this._c2dErrorListener); - this._c2dLink.on('message', this._c2dMessageListener); - callback(); - } - }); + /*Codes_SRS_NODE_DEVICE_AMQP_41_003: [The `enableC2D` method shall attach the C2D link only if it is not already attached.] */ + if (!this._c2dLink) { + debug('attaching C2D link'); + this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => { + if (err) { + debug('error creating a C2D link: ' + err.toString()); + /*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/ + handleResult('AMQP Transport: Could not attach link', callback)(err); + } else { + /*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/ + debug('C2D link created and attached successfully'); + this._c2dLink = receiverLink; + this._c2dLink.on('error', this._c2dErrorListener); + this._c2dLink.on('message', this._c2dMessageListener); + callback(); + } + }); + } else { + debug('C2D link already attached, doing nothing....'); + callback(); + } }, disableC2D: (callback) => { - /*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/ - /*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/ - this._stopC2DListener(undefined, callback); + /*Codes_SRS_NODE_DEVICE_AMQP_41_004: [The `disableC2D` method shall detach the C2D link only if it is already attached.] */ + if (this._c2dLink) { + /*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/ + /*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/ + this._stopC2DListener(undefined, callback); + } else { + debug('C2D link already detached, doing nothing...'); + callback(); + } }, enableMethods: (callback) => { + // deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D /*Codes_SRS_NODE_DEVICE_AMQP_16_039: [The `enableMethods` method shall attach the method links and call its `callback` once these are successfully attached.]*/ /*Codes_SRS_NODE_DEVICE_AMQP_16_040: [The `enableMethods` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach method links.]*/ this._deviceMethodClient.attach(callback); }, disableMethods: (callback) => { + // deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D /*Codes_SRS_NODE_DEVICE_AMQP_16_042: [The `disableMethods` method shall call `detach` on the device method links and call its callback when these are successfully detached.]*/ /*Codes_SRS_NODE_DEVICE_AMQP_16_043: [The `disableMethods` method shall call its `callback` with an `Error` if it fails to detach the device method links.]*/ this._deviceMethodClient.detach(callback); diff --git a/device/transport/amqp/test/_amqp_test.js b/device/transport/amqp/test/_amqp_test.js index 6b56fff42..1bf2b2bf5 100644 --- a/device/transport/amqp/test/_amqp_test.js +++ b/device/transport/amqp/test/_amqp_test.js @@ -100,7 +100,7 @@ describe('Amqp', function () { }); describe('#constructor', function () { - /*Codes_SRS_NODE_DEVICE_AMQP_16_056: [If the `authenticationProvider` object passed to the `Amqp` constructor has a `type` property which value is set to `AuthenticationType.Token` the `Amqp` constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` object.]*/ + /*Tests_SRS_NODE_DEVICE_AMQP_16_056: [If the `authenticationProvider` object passed to the `Amqp` constructor has a `type` property which value is set to `AuthenticationType.Token` the `Amqp` constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` object.]*/ it('subscribes to the newTokenAvailable event if the AuthenticationProvider has its type property set to AuthenticationType.Token', function () { assert.isTrue(fakeTokenAuthenticationProvider.on.calledOnce); }); @@ -1269,6 +1269,20 @@ describe('Amqp', function () { }); }); + /*Tests_SRS_NODE_DEVICE_AMQP_41_003: [The `enableC2D` method shall attach the C2D link only if it is not already attached.] */ + it('does not attach the C2D link if the link is already attached', function (testCallback) { + transport.connect(function () { + assert(fakeBaseClient.attachReceiverLink.notCalled); + transport[testConfig.enableFunc](function () { + assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint)); + transport[testConfig.enableFunc](function () { + assert(fakeBaseClient.attachReceiverLink.calledOnce, 'attachReceiverLink called multiple times, when it should have only been called once.'); + testCallback(); + }); + }); + }); + }); + /*Tests_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/ it('calls its callback with an Error if connecting the transport fails', function (testCallback) { fakeBaseClient.connect = sinon.stub().callsArgWith(1, new Error('fake failed to connect')); @@ -1351,6 +1365,23 @@ describe('Amqp', function () { }); }); + /*Tests_SRS_NODE_DEVICE_AMQP_41_004: [The `disableC2D` method shall detach the C2D link only if it is already attached.] */ + it('does not detach the C2D link if the link is already detached', function (testCallback) { + transport.connect(function () { + assert(fakeBaseClient.attachReceiverLink.notCalled); + transport[testConfig.enableFunc](function () { + assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint)); + transport[testConfig.disableFunc](function () { + assert(receiver.detach.calledOnce); + transport[testConfig.disableFunc](function () { + assert(receiver.detach.calledOnce); + testCallback(); + }); + }); + }); + }); + }); + /*Tests_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/ it('calls its callback with an Error if an error happens while detaching the C2D link', function (testCallback) { receiver.detach = sinon.stub().callsArgWith(0, new Error('fake detach error')); diff --git a/device/transport/mqtt/devdoc/mqtt_requirements.md b/device/transport/mqtt/devdoc/mqtt_requirements.md index 6ded314bb..6336b233f 100644 --- a/device/transport/mqtt/devdoc/mqtt_requirements.md +++ b/device/transport/mqtt/devdoc/mqtt_requirements.md @@ -294,6 +294,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_052: [** `enableC2D` shall call its callback with an `Error` if subscribing to the topic fails. **]** +**SRS_NODE_DEVICE_MQTT_41_008: [** `enableC2D` shall not subscribe multiple times if already subscribed. **]** + ### enableMethods **SRS_NODE_DEVICE_MQTT_16_038: [** `enableMethods` shall connect the MQTT connection if it is disconnected. **]** @@ -306,6 +308,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_053: [** `enableMethods` shall call its callback with an `Error` if subscribing to the topic fails. **]** +**SRS_NODE_DEVICE_MQTT_41_009: [** `enableMethods` shall not subscribe multiple times if already subscribed. **]** + ### enableTwinDesiredPropertiesUpdates **SRS_NODE_DEVICE_MQTT_16_057: [** `enableTwinDesiredPropertiesUpdates` shall connect the MQTT connection if it is disconnected. **]** @@ -330,6 +334,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_18_063: [** `enableInputMessages` shall call its callback with an `Error` if subscribing to the topic fails. **]** +**SRS_NODE_DEVICE_MQTT_41_010: [** `enableInputMessages` shall not subscribe multiple times if already subscribed. **]** + ### disableC2D **SRS_NODE_DEVICE_MQTT_16_041: [** `disableC2D` shall call its callback immediately if the MQTT connection is already disconnected. **]** @@ -340,6 +346,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_043: [** `disableC2D` shall call its callback with an `Error` if an error is received while unsubscribing. **]** +**SRS_NODE_DEVICE_MQTT_41_011: [** `disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed. **]** + ### disableMethods **SRS_NODE_DEVICE_MQTT_16_044: [** `disableMethods` shall call its callback immediately if the MQTT connection is already disconnected. **]** @@ -350,6 +358,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_046: [** `disableMethods` shall call its callback with an `Error` if an error is received while unsubscribing. **]** +**SRS_NODE_DEVICE_MQTT_41_012: [** `disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed. **]** + ### disableTwinDesiredPropertiesUpdates **SRS_NODE_DEVICE_MQTT_16_062: [** `disableTwinDesiredPropertiesUpdates` shall call its callback immediately if the MQTT connection is already disconnected. **]** @@ -370,6 +380,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_18_067: [** `disableInputMessages` shall call its callback with an `Error` if an error is received while unsubscribing. **]** +**SRS_NODE_DEVICE_MQTT_41_013: [** `disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed. **]** + ### message Event **SRS_NODE_DEVICE_MQTT_RECEIVER_16_004: [** If there is a listener for the `message` event, a `message` event shall be emitted for each message received. **]** diff --git a/device/transport/mqtt/src/mqtt.ts b/device/transport/mqtt/src/mqtt.ts index 91c395a7b..1fb482c49 100644 --- a/device/transport/mqtt/src/mqtt.ts +++ b/device/transport/mqtt/src/mqtt.ts @@ -317,20 +317,59 @@ export class Mqtt extends EventEmitter implements DeviceTransport { callback(!!err ? translateError(err) : null); }); }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_008: [`enableC2D` shall not subscribe multiple times if already subscribed.]*/ enableC2D: (callback) => { - this._setupSubscription(this._topics.message, 1, callback); + if (this._topics.message && this._topics.message.subscribed) { + debug('already subscribed to `message`, doing nothing...'); + callback(); + } else { + this._setupSubscription(this._topics.message, 1, callback); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_009: [`enableMethods` shall not subscribe multiple times if already subscribed.]*/ enableMethods: (callback) => { - this._setupSubscription(this._topics.method, 0, callback); + if (this._topics.method && this._topics.method.subscribed) { + debug('already subscribed to `method`, doing nothing...'); + callback(); + } else { + this._setupSubscription(this._topics.method, 0, callback); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_010: [`enableInputMessages` shall not subscribe multiple times if already subscribed.]*/ enableInputMessages: (callback) => { - this._setupSubscription(this._topics.inputMessage, 1, callback); + if (this._topics.inputMessage && this._topics.inputMessage.subscribed) { + debug('already subscribed to `inputMessages`, doing nothing...'); + callback(); + } else { + this._setupSubscription(this._topics.inputMessage, 1, callback); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_011: [`disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed.]*/ disableC2D: (callback) => { - this._removeSubscription(this._topics.message, callback); + if (this._topics.message && this._topics.message.subscribed) { + this._removeSubscription(this._topics.message, callback); + } else { + debug('not subscribed to `message`, so doing nothing...'); + callback(); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_012: [`disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed.]*/ disableMethods: (callback) => { - this._removeSubscription(this._topics.method, callback); + if (this._topics.method && this._topics.method.subscribed) { + this._removeSubscription(this._topics.method, callback); + } else { + debug('not subscribed to `method`, so doing nothing...'); + callback(); + } + }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_013: [`disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed.]*/ + disableInputMessages: (callback) => { + if (this._topics.inputMessage && this._topics.inputMessage.subscribed) { + this._removeSubscription(this._topics.inputMessage, callback); + } else { + debug('not subscribed to `method`, so doing nothing...'); + callback(); + } }, /*Codes_SRS_NODE_DEVICE_MQTT_16_077: [`getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback.]*/ getTwin: (callback) => this._twinClient.getTwin(callback), @@ -340,9 +379,6 @@ export class Mqtt extends EventEmitter implements DeviceTransport { enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(callback), /*Codes_SRS_NODE_DEVICE_MQTT_16_083: [`disableTwinDesiredPropertiesUpdates` shall call the `disableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/ disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(callback), - disableInputMessages: (callback) => { - this._removeSubscription(this._topics.inputMessage, callback); - }, }, disconnecting: { _onEnter: (disconnectCallback, err) => { @@ -775,7 +811,7 @@ export class Mqtt extends EventEmitter implements DeviceTransport { /*Codes_SRS_NODE_DEVICE_MQTT_16_045: [`disableMethods` shall unsubscribe from the topic for direct methods.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_065: [`disableInputMessages` shall unsubscribe from the topic for inputMessages. ]*/ this._mqtt.unsubscribe(topic.name, (err) => { - topic.subscribed = !err; + topic.subscribed = !!err; // this sets the topic.subscribed to false if the unsubscribe is successful. /*Codes_SRS_NODE_DEVICE_MQTT_16_054: [`disableC2D` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_055: [`disableMethods` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_066: [`disableInputMessages` shall call its callback with no arguments when the `UNSUBACK` packet is received. ]*/ @@ -971,7 +1007,6 @@ export class Mqtt extends EventEmitter implements DeviceTransport { }); } } - } diff --git a/device/transport/mqtt/test/_mqtt_test.js b/device/transport/mqtt/test/_mqtt_test.js index 9cb632c44..e992e1b3f 100644 --- a/device/transport/mqtt/test/_mqtt_test.js +++ b/device/transport/mqtt/test/_mqtt_test.js @@ -1021,6 +1021,23 @@ describe('Mqtt', function () { }); }); }); + + it('will not subscribe multiple times to the same topic', function (testCallback) { + const transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase); + transport[testConfig.methodName](function (err) { + assert.isTrue(fakeMqttBase.connect.calledOnce); + assert.isUndefined(err); + assert.equal(fakeMqttBase.subscribe.firstCall.args[0], testConfig.topicName); + assert.strictEqual(fakeMqttBase.subscribe.firstCall.args[1].qos, testConfig.qos); + transport[testConfig.methodName](function (err) { + /* Tests_SRS_NODE_DEVICE_MQTT_41_008: [`enableC2D` shall not subscribe multiple times if already subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_009: [`enableMethods` shall not subscribe multiple times if already subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_010: [`enableInputMessages` shall not subscribe multiple times if already subscribed.]*/ + assert(fakeMqttBase.subscribe.calledOnce, 'subscribe called multiple times when it should have been only called once'); + testCallback(); + }); + }); + }); }); }); @@ -1039,7 +1056,7 @@ describe('Mqtt', function () { /* Tests_SRS_NODE_DEVICE_MQTT_16_044: [`disableMethods` shall call its callback immediately if the MQTT connection is already disconnected.]*/ /* Tests_SRS_NODE_DEVICE_MQTT_16_062: [`disableTwinDesiredPropertiesUpdates` shall call its callback immediately if the MQTT connection is already disconnected.]*/ /* Tests_SRS_NODE_DEVICE_MQTT_18_064: [ `disableInputMessages` shall call its callback immediately if the MQTT connection is already disconnected. ]*/ - it('immediately calls its callback if the disconnected', function (testCallback) { + it('immediately calls its callback if the connection is already disconnected', function (testCallback) { const mqtt = new Mqtt(fakeAuthenticationProvider, fakeMqttBase); mqtt[testConfig.disableFeatureMethod](function () { assert.isTrue(fakeMqttBase.connect.notCalled); @@ -1089,6 +1106,25 @@ describe('Mqtt', function () { }); }); }); + + it('calls its callback without unsubscribing if already unsubscribed', function (testCallback) { + const transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase); + transport.connect(function () { + transport[testConfig.enableFeatureMethod](function () { + transport[testConfig.disableFeatureMethod](function (err) { + assert.isTrue(fakeMqttBase.unsubscribe.called); + assert.isUndefined(err); + transport[testConfig.disableFeatureMethod](function (err) { + /* Tests_SRS_NODE_DEVICE_MQTT_41_011: [`disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_012: [`disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_013: [`disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed.]*/ + assert.isTrue(fakeMqttBase.unsubscribe.calledOnce); + testCallback(); + }); + }); + }); + }); + }); }); }); diff --git a/e2etests/test/c2d_disconnect.js b/e2etests/test/c2d_disconnect.js index 75993324a..66a702c65 100644 --- a/e2etests/test/c2d_disconnect.js +++ b/e2etests/test/c2d_disconnect.js @@ -32,7 +32,8 @@ var protocolAndTermination = [ transport: deviceAmqp.Amqp, operationType: 'KillTcp', closeReason: ' severs the TCP connection ', - delayInSeconds: 2 + delayInSeconds: 2, + durationInSeconds: 20 }, { testEnabled: true, @@ -202,6 +203,7 @@ protocolAndTermination.forEach( function (testConfiguration) { terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType); terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason); terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds); + terminateMessage.properties.add('AzIoTHub_FaultOperationDurationInSecs', testConfiguration.durationInSeconds); deviceClient.sendEvent(terminateMessage, function (sendErr) { debug('at the callback for the fault injection send, err is:' + sendErr); });