diff --git a/common/transport/mqtt/devdoc/mqtt_base_requirements.md b/common/transport/mqtt/devdoc/mqtt_base_requirements.md index b0aef60f5..abc4a3037 100644 --- a/common/transport/mqtt/devdoc/mqtt_base_requirements.md +++ b/common/transport/mqtt/devdoc/mqtt_base_requirements.md @@ -107,4 +107,8 @@ The `disconnect` method closes the connection to the server. **SRS_NODE_COMMON_MQTT_BASE_16_035: [** The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds. **]** -**SRS_NODE_COMMON_MQTT_BASE_16_036: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails. **]** \ No newline at end of file +**SRS_NODE_COMMON_MQTT_BASE_16_036: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails. **]** + +**SRS_NODE_COMMON_MQTT_BASE_41_002: [** The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect. **]** + +**SRS_NODE_COMMON_MQTT_BASE_41_003: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out. **]** diff --git a/common/transport/mqtt/src/mqtt_base.ts b/common/transport/mqtt/src/mqtt_base.ts index e7f66c999..25d9f1e1f 100644 --- a/common/transport/mqtt/src/mqtt_base.ts +++ b/common/transport/mqtt/src/mqtt_base.ts @@ -123,6 +123,8 @@ export class MqttBase extends EventEmitter { /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/ /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_036: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails.]*/ let switched = false; + /*Codes_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ + /*Codes_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/ const disconnectTimeout = setTimeout(() => { debug('disconnecting mqtt client timed out. Force disconnecting.'); switched = true; diff --git a/common/transport/mqtt/test/_mqtt_base_test.js b/common/transport/mqtt/test/_mqtt_base_test.js index 50a9af3f6..bc753a02b 100644 --- a/common/transport/mqtt/test/_mqtt_base_test.js +++ b/common/transport/mqtt/test/_mqtt_base_test.js @@ -463,7 +463,6 @@ describe('MqttBase', function () { fakeMqtt.emit('connect'); }); - /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_XXX: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ it('force disconnects the client on disconnect timeout during reconnecting', function (testCallback) { this.clock = sinon.useFakeTimers(); var newSas = 'newsas'; @@ -476,6 +475,7 @@ describe('MqttBase', function () { }).bind(this); fakeMqtt.end.onSecondCall().callsFake((force, callback) => { + /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ assert.isTrue(force); callback(); }).bind(this); @@ -484,7 +484,6 @@ describe('MqttBase', function () { assert.isTrue(fakeMqtt.connect.calledOnce); assert.strictEqual(fakeMqtt.connect.firstCall.args[1].password, fakeConfig.sharedAccessSignature); transport.updateSharedAccessSignature(newSas, function (err) { - /*Tests_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/ assert.notExists(err); assert.isTrue(fakeMqtt.end.calledTwice); assert.isTrue(fakeMqtt.connect.calledTwice); @@ -516,7 +515,7 @@ describe('MqttBase', function () { fakeMqtt.emit('connect'); }); - /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_XXX: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/ + /*Tests_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/ it('calls the callback with an error if it fails to reconnect the mqtt client (force disconnect)', function (testCallback) { this.clock = sinon.useFakeTimers(); var fakeError = new Error('fake failed to reconnect'); @@ -532,7 +531,6 @@ describe('MqttBase', function () { var transport = new MqttBase(fakeMqtt); transport.connect(fakeConfig, function () { transport.updateSharedAccessSignature('newSas', function (err) { - /*Tests_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/ assert.isTrue(fakeMqtt.end.calledTwice); assert.isTrue(fakeMqtt.connect.calledTwice); assert.strictEqual(err, fakeError); diff --git a/device/core/src/device_client.ts b/device/core/src/device_client.ts index b5d67d62b..21fa262e5 100644 --- a/device/core/src/device_client.ts +++ b/device/core/src/device_client.ts @@ -28,7 +28,7 @@ function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Er * to create an IoT Hub device client. */ export class Client extends InternalClient { - private _c2dEnabled: boolean; + private _c2dFeature: boolean; private _deviceDisconnectHandler: (err?: Error, result?: any) => void; private _blobUploadClient: BlobUploadClient; private _fileUploadApi: FileUploadInterface; @@ -45,7 +45,7 @@ export class Client extends InternalClient { constructor(transport: DeviceTransport, connStr?: string, blobUploadClient?: BlobUploadClient, fileUploadApi?: FileUploadInterface) { super(transport, connStr); this._blobUploadClient = blobUploadClient; - this._c2dEnabled = false; + this._c2dFeature = false; this._fileUploadApi = fileUploadApi; this.on('removeListener', (eventName) => { @@ -57,6 +57,7 @@ export class Client extends InternalClient { debug('in removeListener, error disabling C2D.'); this.emit('error', err); } else { + this._c2dFeature = false; debug('removeListener successfully disabled C2D.'); } }); @@ -72,6 +73,7 @@ export class Client extends InternalClient { debug('in newListener, error enabling C2D.'); this.emit('error', err); } else { + this._c2dFeature = true; debug('in newListener, successfully enabled C2D'); } }); @@ -89,10 +91,9 @@ export class Client extends InternalClient { if (err && this._retryPolicy.shouldRetry(err)) { debug('reconnect policy specifies a reconnect on error'); /*Codes_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/ - debug('_c2dEnabled is: ' + this._c2dEnabled); - if (this._c2dEnabled) { - this._c2dEnabled = false; - debug('re-enabling C2D link.'); + if (this._c2dFeature) { + // turn on C2D + debug('disconnectHandler re-enabling C2D'); this._enableC2D((err) => { if (err) { /*Codes_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/ @@ -103,7 +104,7 @@ export class Client extends InternalClient { } }); } else { - debug('During _deviceDisconnectHandler, _c2dEnabled is false'); + debug('C2D has not been enabled on the device'); } } }; @@ -251,44 +252,30 @@ export class Client extends InternalClient { private _enableC2D(callback: (err?: Error) => void): void { - debug('_c2dEnabled is: ' + this._c2dEnabled); - if (!this._c2dEnabled) { - debug('enabling C2D'); - const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout); - retryOp.retry((opCallback) => { - this._transport.enableC2D(opCallback); - }, (err) => { - if (!err) { - debug('enabled C2D. Setting this._c2dEnabled to true.'); - this._c2dEnabled = true; - } else { - debug('Error while enabling C2D.'); - } - callback(err); - }); - } else { - debug('this._c2dEnable is true. Not enabling C2D.'); - callback(); - } + debug('enabling C2D'); + const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout); + retryOp.retry((opCallback) => { + this._transport.enableC2D(opCallback); + }, (err) => { + if (!err) { + debug('enabled C2D'); + } else { + debug('Error while enabling C2D.'); + } + callback(err); + }); } private _disableC2D(callback: (err?: Error) => void): void { - debug('_c2dEnabled is: ' + this._c2dEnabled); - if (this._c2dEnabled) { - debug('disabling C2D'); - this._transport.disableC2D((err) => { - if (!err) { - debug('disabled C2D. Setting this._c2dEnabled to false.'); - this._c2dEnabled = false; - } else { - debug('Error while disabling C2D.'); - } - callback(err); - }); - } else { - debug('this._c2dEnable is false. Not disabling C2D.'); - callback(); - } + debug('disabling C2D'); + this._transport.disableC2D((err) => { + if (!err) { + debug('disabled C2D'); + } else { + debug('Error while disabling C2D.'); + } + callback(err); + }); } /** diff --git a/device/core/test/_device_client_test.js b/device/core/test/_device_client_test.js index dd8365576..a17bcaf32 100644 --- a/device/core/test/_device_client_test.js +++ b/device/core/test/_device_client_test.js @@ -324,11 +324,12 @@ describe('Device Client', function () { sinon.spy(fakeTransport, 'enableC2D'); var client = new Client(fakeTransport); - // Calling 'on' twice to make sure it's called only once on the receiver. - // It works because the test will fail if the test callback is called multiple times, and it's called for every time the 'message' event is subscribed on the receiver. + // Calling 'on' thrice to make sure it's called each time on the receiver. + // It should work because enableC2D does not check that it has not been called once before, instead leaving that job up to the transport level. client.on('message', function () { }); client.on('message', function () { }); - assert.isTrue(fakeTransport.enableC2D.calledOnce); + client.on('message', function () { }); + assert.isTrue(fakeTransport.enableC2D.calledThrice, 'client called enabledC2D the incorrect number of times'); }); /*Tests_SRS_NODE_DEVICE_CLIENT_16_005: [The client shall stop listening for messages from the service whenever the last listener unsubscribes from the ‘message’ event.]*/ @@ -348,7 +349,7 @@ describe('Device Client', function () { client.removeListener('message', listener1); assert.isTrue(fakeTransport.disableC2D.notCalled); client.removeListener('message', listener2); - assert(fakeTransport.disableC2D.calledOnce); + assert.strictEqual(fakeTransport.disableC2D.callCount, 1, 'disableC2D not called once'); testCallback(); }); }); diff --git a/device/transport/amqp/devdoc/device_amqp_requirements.md b/device/transport/amqp/devdoc/device_amqp_requirements.md index e05394d7e..bc86aa4e0 100644 --- a/device/transport/amqp/devdoc/device_amqp_requirements.md +++ b/device/transport/amqp/devdoc/device_amqp_requirements.md @@ -159,7 +159,7 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a **SRS_NODE_DEVICE_AMQP_06_013: [** The authentication providers `setTokenRenewalValues` method shall be invoked with the values provided in the tokenRenewal option. **]** - + **SRS_NODE_DEVICE_AMQP_13_001: [** The `setOptions` method shall save the options passed in. **]** @@ -202,6 +202,8 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a **SRS_NODE_DEVICE_AMQP_16_033: [** The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link. **]** +**SRS_NODE_DEVICE_AMQP_41_003: [** The `enableC2D` method shall attach the C2D link only if it is not already attached. **]** + **SRS_NODE_DEVICE_AMQP_16_034: [** Any `error` event received on the C2D link shall trigger the emission of an `error` event by the transport, with an argument that is a `CloudToDeviceDetachedError` object with the `innerError` property set to that error. **]** ### disableC2D(callback) @@ -212,6 +214,9 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a **SRS_NODE_DEVICE_AMQP_16_037: [** The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected. **]** +**SRS_NODE_DEVICE_AMQP_41_004: [** The `disableC2D` method shall detach the C2D link only if it is already attached. **]** + + ### enableInputMessages(callback: (err?: Error) => void): void; **SRS_NODE_DEVICE_AMQP_18_010: [** The `enableInputMessages` method shall enable C2D messages **]** diff --git a/device/transport/amqp/src/amqp.ts b/device/transport/amqp/src/amqp.ts index f30676065..6c698201a 100644 --- a/device/transport/amqp/src/amqp.ts +++ b/device/transport/amqp/src/amqp.ts @@ -414,33 +414,47 @@ export class Amqp extends EventEmitter implements DeviceTransport { /*Codes_SRS_NODE_DEVICE_AMQP_16_079: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback no arguments if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` succeeds.]*/ disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(handleResult('could not disable twin desired properties updates', callback)), enableC2D: (callback) => { - debug('attaching C2D link'); - this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => { - if (err) { - debug('error creating a C2D link: ' + err.toString()); - /*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/ - handleResult('AMQP Transport: Could not attach link', callback)(err); - } else { - /*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/ - debug('C2D link created and attached successfully'); - this._c2dLink = receiverLink; - this._c2dLink.on('error', this._c2dErrorListener); - this._c2dLink.on('message', this._c2dMessageListener); - callback(); - } - }); + /*Codes_SRS_NODE_DEVICE_AMQP_41_003: [The `enableC2D` method shall attach the C2D link only if it is not already attached.] */ + if (!this._c2dLink) { + debug('attaching C2D link'); + this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => { + if (err) { + debug('error creating a C2D link: ' + err.toString()); + /*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/ + handleResult('AMQP Transport: Could not attach link', callback)(err); + } else { + /*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/ + debug('C2D link created and attached successfully'); + this._c2dLink = receiverLink; + this._c2dLink.on('error', this._c2dErrorListener); + this._c2dLink.on('message', this._c2dMessageListener); + callback(); + } + }); + } else { + debug('C2D link already attached, doing nothing....'); + callback(); + } }, disableC2D: (callback) => { - /*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/ - /*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/ - this._stopC2DListener(undefined, callback); + /*Codes_SRS_NODE_DEVICE_AMQP_41_004: [The `disableC2D` method shall detach the C2D link only if it is already attached.] */ + if (this._c2dLink) { + /*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/ + /*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/ + this._stopC2DListener(undefined, callback); + } else { + debug('C2D link already detached, doing nothing...'); + callback(); + } }, enableMethods: (callback) => { + // deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D /*Codes_SRS_NODE_DEVICE_AMQP_16_039: [The `enableMethods` method shall attach the method links and call its `callback` once these are successfully attached.]*/ /*Codes_SRS_NODE_DEVICE_AMQP_16_040: [The `enableMethods` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach method links.]*/ this._deviceMethodClient.attach(callback); }, disableMethods: (callback) => { + // deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D /*Codes_SRS_NODE_DEVICE_AMQP_16_042: [The `disableMethods` method shall call `detach` on the device method links and call its callback when these are successfully detached.]*/ /*Codes_SRS_NODE_DEVICE_AMQP_16_043: [The `disableMethods` method shall call its `callback` with an `Error` if it fails to detach the device method links.]*/ this._deviceMethodClient.detach(callback); diff --git a/device/transport/amqp/test/_amqp_test.js b/device/transport/amqp/test/_amqp_test.js index 6b56fff42..1bf2b2bf5 100644 --- a/device/transport/amqp/test/_amqp_test.js +++ b/device/transport/amqp/test/_amqp_test.js @@ -100,7 +100,7 @@ describe('Amqp', function () { }); describe('#constructor', function () { - /*Codes_SRS_NODE_DEVICE_AMQP_16_056: [If the `authenticationProvider` object passed to the `Amqp` constructor has a `type` property which value is set to `AuthenticationType.Token` the `Amqp` constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` object.]*/ + /*Tests_SRS_NODE_DEVICE_AMQP_16_056: [If the `authenticationProvider` object passed to the `Amqp` constructor has a `type` property which value is set to `AuthenticationType.Token` the `Amqp` constructor shall subscribe to the `newTokenAvailable` event of the `authenticationProvider` object.]*/ it('subscribes to the newTokenAvailable event if the AuthenticationProvider has its type property set to AuthenticationType.Token', function () { assert.isTrue(fakeTokenAuthenticationProvider.on.calledOnce); }); @@ -1269,6 +1269,20 @@ describe('Amqp', function () { }); }); + /*Tests_SRS_NODE_DEVICE_AMQP_41_003: [The `enableC2D` method shall attach the C2D link only if it is not already attached.] */ + it('does not attach the C2D link if the link is already attached', function (testCallback) { + transport.connect(function () { + assert(fakeBaseClient.attachReceiverLink.notCalled); + transport[testConfig.enableFunc](function () { + assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint)); + transport[testConfig.enableFunc](function () { + assert(fakeBaseClient.attachReceiverLink.calledOnce, 'attachReceiverLink called multiple times, when it should have only been called once.'); + testCallback(); + }); + }); + }); + }); + /*Tests_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/ it('calls its callback with an Error if connecting the transport fails', function (testCallback) { fakeBaseClient.connect = sinon.stub().callsArgWith(1, new Error('fake failed to connect')); @@ -1351,6 +1365,23 @@ describe('Amqp', function () { }); }); + /*Tests_SRS_NODE_DEVICE_AMQP_41_004: [The `disableC2D` method shall detach the C2D link only if it is already attached.] */ + it('does not detach the C2D link if the link is already detached', function (testCallback) { + transport.connect(function () { + assert(fakeBaseClient.attachReceiverLink.notCalled); + transport[testConfig.enableFunc](function () { + assert(fakeBaseClient.attachReceiverLink.calledWith(transport._c2dEndpoint)); + transport[testConfig.disableFunc](function () { + assert(receiver.detach.calledOnce); + transport[testConfig.disableFunc](function () { + assert(receiver.detach.calledOnce); + testCallback(); + }); + }); + }); + }); + }); + /*Tests_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/ it('calls its callback with an Error if an error happens while detaching the C2D link', function (testCallback) { receiver.detach = sinon.stub().callsArgWith(0, new Error('fake detach error')); diff --git a/device/transport/mqtt/devdoc/mqtt_requirements.md b/device/transport/mqtt/devdoc/mqtt_requirements.md index 6ded314bb..6336b233f 100644 --- a/device/transport/mqtt/devdoc/mqtt_requirements.md +++ b/device/transport/mqtt/devdoc/mqtt_requirements.md @@ -294,6 +294,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_052: [** `enableC2D` shall call its callback with an `Error` if subscribing to the topic fails. **]** +**SRS_NODE_DEVICE_MQTT_41_008: [** `enableC2D` shall not subscribe multiple times if already subscribed. **]** + ### enableMethods **SRS_NODE_DEVICE_MQTT_16_038: [** `enableMethods` shall connect the MQTT connection if it is disconnected. **]** @@ -306,6 +308,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_053: [** `enableMethods` shall call its callback with an `Error` if subscribing to the topic fails. **]** +**SRS_NODE_DEVICE_MQTT_41_009: [** `enableMethods` shall not subscribe multiple times if already subscribed. **]** + ### enableTwinDesiredPropertiesUpdates **SRS_NODE_DEVICE_MQTT_16_057: [** `enableTwinDesiredPropertiesUpdates` shall connect the MQTT connection if it is disconnected. **]** @@ -330,6 +334,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_18_063: [** `enableInputMessages` shall call its callback with an `Error` if subscribing to the topic fails. **]** +**SRS_NODE_DEVICE_MQTT_41_010: [** `enableInputMessages` shall not subscribe multiple times if already subscribed. **]** + ### disableC2D **SRS_NODE_DEVICE_MQTT_16_041: [** `disableC2D` shall call its callback immediately if the MQTT connection is already disconnected. **]** @@ -340,6 +346,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_043: [** `disableC2D` shall call its callback with an `Error` if an error is received while unsubscribing. **]** +**SRS_NODE_DEVICE_MQTT_41_011: [** `disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed. **]** + ### disableMethods **SRS_NODE_DEVICE_MQTT_16_044: [** `disableMethods` shall call its callback immediately if the MQTT connection is already disconnected. **]** @@ -350,6 +358,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_16_046: [** `disableMethods` shall call its callback with an `Error` if an error is received while unsubscribing. **]** +**SRS_NODE_DEVICE_MQTT_41_012: [** `disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed. **]** + ### disableTwinDesiredPropertiesUpdates **SRS_NODE_DEVICE_MQTT_16_062: [** `disableTwinDesiredPropertiesUpdates` shall call its callback immediately if the MQTT connection is already disconnected. **]** @@ -370,6 +380,8 @@ The `updateTwinReportedProperties` method is used to retrieve the device twin. **SRS_NODE_DEVICE_MQTT_18_067: [** `disableInputMessages` shall call its callback with an `Error` if an error is received while unsubscribing. **]** +**SRS_NODE_DEVICE_MQTT_41_013: [** `disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed. **]** + ### message Event **SRS_NODE_DEVICE_MQTT_RECEIVER_16_004: [** If there is a listener for the `message` event, a `message` event shall be emitted for each message received. **]** diff --git a/device/transport/mqtt/src/mqtt.ts b/device/transport/mqtt/src/mqtt.ts index 91c395a7b..1fb482c49 100644 --- a/device/transport/mqtt/src/mqtt.ts +++ b/device/transport/mqtt/src/mqtt.ts @@ -317,20 +317,59 @@ export class Mqtt extends EventEmitter implements DeviceTransport { callback(!!err ? translateError(err) : null); }); }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_008: [`enableC2D` shall not subscribe multiple times if already subscribed.]*/ enableC2D: (callback) => { - this._setupSubscription(this._topics.message, 1, callback); + if (this._topics.message && this._topics.message.subscribed) { + debug('already subscribed to `message`, doing nothing...'); + callback(); + } else { + this._setupSubscription(this._topics.message, 1, callback); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_009: [`enableMethods` shall not subscribe multiple times if already subscribed.]*/ enableMethods: (callback) => { - this._setupSubscription(this._topics.method, 0, callback); + if (this._topics.method && this._topics.method.subscribed) { + debug('already subscribed to `method`, doing nothing...'); + callback(); + } else { + this._setupSubscription(this._topics.method, 0, callback); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_010: [`enableInputMessages` shall not subscribe multiple times if already subscribed.]*/ enableInputMessages: (callback) => { - this._setupSubscription(this._topics.inputMessage, 1, callback); + if (this._topics.inputMessage && this._topics.inputMessage.subscribed) { + debug('already subscribed to `inputMessages`, doing nothing...'); + callback(); + } else { + this._setupSubscription(this._topics.inputMessage, 1, callback); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_011: [`disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed.]*/ disableC2D: (callback) => { - this._removeSubscription(this._topics.message, callback); + if (this._topics.message && this._topics.message.subscribed) { + this._removeSubscription(this._topics.message, callback); + } else { + debug('not subscribed to `message`, so doing nothing...'); + callback(); + } }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_012: [`disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed.]*/ disableMethods: (callback) => { - this._removeSubscription(this._topics.method, callback); + if (this._topics.method && this._topics.method.subscribed) { + this._removeSubscription(this._topics.method, callback); + } else { + debug('not subscribed to `method`, so doing nothing...'); + callback(); + } + }, + /* Codes_SRS_NODE_DEVICE_MQTT_41_013: [`disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed.]*/ + disableInputMessages: (callback) => { + if (this._topics.inputMessage && this._topics.inputMessage.subscribed) { + this._removeSubscription(this._topics.inputMessage, callback); + } else { + debug('not subscribed to `method`, so doing nothing...'); + callback(); + } }, /*Codes_SRS_NODE_DEVICE_MQTT_16_077: [`getTwin` shall call the `getTwin` method on the `MqttTwinClient` object and pass it its callback.]*/ getTwin: (callback) => this._twinClient.getTwin(callback), @@ -340,9 +379,6 @@ export class Mqtt extends EventEmitter implements DeviceTransport { enableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.enableTwinDesiredPropertiesUpdates(callback), /*Codes_SRS_NODE_DEVICE_MQTT_16_083: [`disableTwinDesiredPropertiesUpdates` shall call the `disableTwinDesiredPropertiesUpdates` on the `MqttTwinClient` object created by the constructor and pass it its callback.]*/ disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(callback), - disableInputMessages: (callback) => { - this._removeSubscription(this._topics.inputMessage, callback); - }, }, disconnecting: { _onEnter: (disconnectCallback, err) => { @@ -775,7 +811,7 @@ export class Mqtt extends EventEmitter implements DeviceTransport { /*Codes_SRS_NODE_DEVICE_MQTT_16_045: [`disableMethods` shall unsubscribe from the topic for direct methods.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_065: [`disableInputMessages` shall unsubscribe from the topic for inputMessages. ]*/ this._mqtt.unsubscribe(topic.name, (err) => { - topic.subscribed = !err; + topic.subscribed = !!err; // this sets the topic.subscribed to false if the unsubscribe is successful. /*Codes_SRS_NODE_DEVICE_MQTT_16_054: [`disableC2D` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_16_055: [`disableMethods` shall call its callback with no arguments when the `UNSUBACK` packet is received.]*/ /*Codes_SRS_NODE_DEVICE_MQTT_18_066: [`disableInputMessages` shall call its callback with no arguments when the `UNSUBACK` packet is received. ]*/ @@ -971,7 +1007,6 @@ export class Mqtt extends EventEmitter implements DeviceTransport { }); } } - } diff --git a/device/transport/mqtt/test/_mqtt_test.js b/device/transport/mqtt/test/_mqtt_test.js index 9cb632c44..e992e1b3f 100644 --- a/device/transport/mqtt/test/_mqtt_test.js +++ b/device/transport/mqtt/test/_mqtt_test.js @@ -1021,6 +1021,23 @@ describe('Mqtt', function () { }); }); }); + + it('will not subscribe multiple times to the same topic', function (testCallback) { + const transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase); + transport[testConfig.methodName](function (err) { + assert.isTrue(fakeMqttBase.connect.calledOnce); + assert.isUndefined(err); + assert.equal(fakeMqttBase.subscribe.firstCall.args[0], testConfig.topicName); + assert.strictEqual(fakeMqttBase.subscribe.firstCall.args[1].qos, testConfig.qos); + transport[testConfig.methodName](function (err) { + /* Tests_SRS_NODE_DEVICE_MQTT_41_008: [`enableC2D` shall not subscribe multiple times if already subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_009: [`enableMethods` shall not subscribe multiple times if already subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_010: [`enableInputMessages` shall not subscribe multiple times if already subscribed.]*/ + assert(fakeMqttBase.subscribe.calledOnce, 'subscribe called multiple times when it should have been only called once'); + testCallback(); + }); + }); + }); }); }); @@ -1039,7 +1056,7 @@ describe('Mqtt', function () { /* Tests_SRS_NODE_DEVICE_MQTT_16_044: [`disableMethods` shall call its callback immediately if the MQTT connection is already disconnected.]*/ /* Tests_SRS_NODE_DEVICE_MQTT_16_062: [`disableTwinDesiredPropertiesUpdates` shall call its callback immediately if the MQTT connection is already disconnected.]*/ /* Tests_SRS_NODE_DEVICE_MQTT_18_064: [ `disableInputMessages` shall call its callback immediately if the MQTT connection is already disconnected. ]*/ - it('immediately calls its callback if the disconnected', function (testCallback) { + it('immediately calls its callback if the connection is already disconnected', function (testCallback) { const mqtt = new Mqtt(fakeAuthenticationProvider, fakeMqttBase); mqtt[testConfig.disableFeatureMethod](function () { assert.isTrue(fakeMqttBase.connect.notCalled); @@ -1089,6 +1106,25 @@ describe('Mqtt', function () { }); }); }); + + it('calls its callback without unsubscribing if already unsubscribed', function (testCallback) { + const transport = new Mqtt(fakeAuthenticationProvider, fakeMqttBase); + transport.connect(function () { + transport[testConfig.enableFeatureMethod](function () { + transport[testConfig.disableFeatureMethod](function (err) { + assert.isTrue(fakeMqttBase.unsubscribe.called); + assert.isUndefined(err); + transport[testConfig.disableFeatureMethod](function (err) { + /* Tests_SRS_NODE_DEVICE_MQTT_41_011: [`disableC2D` shall unsubscribe from the topic for C2D messages only if it is currently subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_012: [`disableMethods` shall unsubscribe from the topic for direct methods only if it is currently subscribed.]*/ + /* Tests_SRS_NODE_DEVICE_MQTT_41_013: [`disableInputMessages` shall unsubscribe from the topic for inputMessages only if it is currently subscribed.]*/ + assert.isTrue(fakeMqttBase.unsubscribe.calledOnce); + testCallback(); + }); + }); + }); + }); + }); }); }); diff --git a/e2etests/test/c2d_disconnect.js b/e2etests/test/c2d_disconnect.js index 75993324a..66a702c65 100644 --- a/e2etests/test/c2d_disconnect.js +++ b/e2etests/test/c2d_disconnect.js @@ -32,7 +32,8 @@ var protocolAndTermination = [ transport: deviceAmqp.Amqp, operationType: 'KillTcp', closeReason: ' severs the TCP connection ', - delayInSeconds: 2 + delayInSeconds: 2, + durationInSeconds: 20 }, { testEnabled: true, @@ -202,6 +203,7 @@ protocolAndTermination.forEach( function (testConfiguration) { terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType); terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason); terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds); + terminateMessage.properties.add('AzIoTHub_FaultOperationDurationInSecs', testConfiguration.durationInSeconds); deviceClient.sendEvent(terminateMessage, function (sendErr) { debug('at the callback for the fault injection send, err is:' + sendErr); });