Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(multipe packages): c2d states no longer using booleans #797

Merged
merged 17 commits into from
May 14, 2020
6 changes: 5 additions & 1 deletion common/transport/mqtt/devdoc/mqtt_base_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ The `disconnect` method closes the connection to the server.

**SRS_NODE_COMMON_MQTT_BASE_16_035: [** The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds. **]**

**SRS_NODE_COMMON_MQTT_BASE_16_036: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails. **]**
**SRS_NODE_COMMON_MQTT_BASE_16_036: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails. **]**

**SRS_NODE_COMMON_MQTT_BASE_41_002: [** The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect. **]**

**SRS_NODE_COMMON_MQTT_BASE_41_003: [** The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out. **]**
2 changes: 2 additions & 0 deletions common/transport/mqtt/src/mqtt_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ export class MqttBase extends EventEmitter {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_036: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails.]*/
let switched = false;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/
const disconnectTimeout = setTimeout(() => {
debug('disconnecting mqtt client timed out. Force disconnecting.');
switched = true;
Expand Down
6 changes: 2 additions & 4 deletions common/transport/mqtt/test/_mqtt_base_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ describe('MqttBase', function () {
fakeMqtt.emit('connect');
});

/*Tests_SRS_NODE_COMMON_MQTT_BASE_41_XXX: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/
it('force disconnects the client on disconnect timeout during reconnecting', function (testCallback) {
this.clock = sinon.useFakeTimers();
var newSas = 'newsas';
Expand All @@ -476,6 +475,7 @@ describe('MqttBase', function () {
}).bind(this);

fakeMqtt.end.onSecondCall().callsFake((force, callback) => {
/*Tests_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/
assert.isTrue(force);
callback();
}).bind(this);
Expand All @@ -484,7 +484,6 @@ describe('MqttBase', function () {
assert.isTrue(fakeMqtt.connect.calledOnce);
assert.strictEqual(fakeMqtt.connect.firstCall.args[1].password, fakeConfig.sharedAccessSignature);
transport.updateSharedAccessSignature(newSas, function (err) {
/*Tests_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/
assert.notExists(err);
assert.isTrue(fakeMqtt.end.calledTwice);
assert.isTrue(fakeMqtt.connect.calledTwice);
Expand Down Expand Up @@ -516,7 +515,7 @@ describe('MqttBase', function () {
fakeMqtt.emit('connect');
});

/*Tests_SRS_NODE_COMMON_MQTT_BASE_41_XXX: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/
/*Tests_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/
it('calls the callback with an error if it fails to reconnect the mqtt client (force disconnect)', function (testCallback) {
this.clock = sinon.useFakeTimers();
var fakeError = new Error('fake failed to reconnect');
Expand All @@ -532,7 +531,6 @@ describe('MqttBase', function () {
var transport = new MqttBase(fakeMqtt);
transport.connect(fakeConfig, function () {
transport.updateSharedAccessSignature('newSas', function (err) {
/*Tests_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/
assert.isTrue(fakeMqtt.end.calledTwice);
assert.isTrue(fakeMqtt.connect.calledTwice);
assert.strictEqual(err, fakeError);
Expand Down
71 changes: 29 additions & 42 deletions device/core/src/device_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Er
* to create an IoT Hub device client.
*/
export class Client extends InternalClient {
private _c2dEnabled: boolean;
private _c2dFeature: boolean;
private _deviceDisconnectHandler: (err?: Error, result?: any) => void;
private _blobUploadClient: BlobUploadClient;
private _fileUploadApi: FileUploadInterface;
Expand All @@ -45,7 +45,7 @@ export class Client extends InternalClient {
constructor(transport: DeviceTransport, connStr?: string, blobUploadClient?: BlobUploadClient, fileUploadApi?: FileUploadInterface) {
super(transport, connStr);
this._blobUploadClient = blobUploadClient;
this._c2dEnabled = false;
this._c2dFeature = false;
this._fileUploadApi = fileUploadApi;

this.on('removeListener', (eventName) => {
Expand All @@ -57,6 +57,7 @@ export class Client extends InternalClient {
debug('in removeListener, error disabling C2D.');
this.emit('error', err);
} else {
this._c2dFeature = false;
debug('removeListener successfully disabled C2D.');
}
});
Expand All @@ -72,6 +73,7 @@ export class Client extends InternalClient {
debug('in newListener, error enabling C2D.');
this.emit('error', err);
} else {
this._c2dFeature = true;
debug('in newListener, successfully enabled C2D');
}
});
Expand All @@ -89,10 +91,9 @@ export class Client extends InternalClient {
if (err && this._retryPolicy.shouldRetry(err)) {
debug('reconnect policy specifies a reconnect on error');
/*Codes_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/
debug('_c2dEnabled is: ' + this._c2dEnabled);
if (this._c2dEnabled) {
this._c2dEnabled = false;
debug('re-enabling C2D link.');
if (this._c2dFeature) {
// turn on C2D
debug('disconnectHandler re-enabling C2D');
this._enableC2D((err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
Expand All @@ -103,7 +104,7 @@ export class Client extends InternalClient {
}
});
} else {
debug('During _deviceDisconnectHandler, _c2dEnabled is false');
debug('C2D has not been enabled on the device');
}
}
};
Expand Down Expand Up @@ -251,44 +252,30 @@ export class Client extends InternalClient {


private _enableC2D(callback: (err?: Error) => void): void {
debug('_c2dEnabled is: ' + this._c2dEnabled);
if (!this._c2dEnabled) {
debug('enabling C2D');
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.enableC2D(opCallback);
}, (err) => {
if (!err) {
debug('enabled C2D. Setting this._c2dEnabled to true.');
this._c2dEnabled = true;
} else {
debug('Error while enabling C2D.');
}
callback(err);
});
} else {
debug('this._c2dEnable is true. Not enabling C2D.');
callback();
}
debug('enabling C2D');
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.enableC2D(opCallback);
}, (err) => {
if (!err) {
debug('enabled C2D');
} else {
debug('Error while enabling C2D.');
}
callback(err);
});
}

private _disableC2D(callback: (err?: Error) => void): void {
debug('_c2dEnabled is: ' + this._c2dEnabled);
if (this._c2dEnabled) {
debug('disabling C2D');
this._transport.disableC2D((err) => {
if (!err) {
debug('disabled C2D. Setting this._c2dEnabled to false.');
this._c2dEnabled = false;
} else {
debug('Error while disabling C2D.');
}
callback(err);
});
} else {
debug('this._c2dEnable is false. Not disabling C2D.');
callback();
}
debug('disabling C2D');
this._transport.disableC2D((err) => {
if (!err) {
debug('disabled C2D');
} else {
debug('Error while disabling C2D.');
}
callback(err);
});
}

/**
Expand Down
9 changes: 5 additions & 4 deletions device/core/test/_device_client_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,12 @@ describe('Device Client', function () {
sinon.spy(fakeTransport, 'enableC2D');
var client = new Client(fakeTransport);

// Calling 'on' twice to make sure it's called only once on the receiver.
// It works because the test will fail if the test callback is called multiple times, and it's called for every time the 'message' event is subscribed on the receiver.
// Calling 'on' thrice to make sure it's called each time on the receiver.
// It should work because enableC2D does not check that it has not been called once before, instead leaving that job up to the transport level.
client.on('message', function () { });
client.on('message', function () { });
assert.isTrue(fakeTransport.enableC2D.calledOnce);
client.on('message', function () { });
assert.isTrue(fakeTransport.enableC2D.calledThrice, 'client called enabledC2D the incorrect number of times');
});

/*Tests_SRS_NODE_DEVICE_CLIENT_16_005: [The client shall stop listening for messages from the service whenever the last listener unsubscribes from the ‘message’ event.]*/
Expand All @@ -348,7 +349,7 @@ describe('Device Client', function () {
client.removeListener('message', listener1);
assert.isTrue(fakeTransport.disableC2D.notCalled);
client.removeListener('message', listener2);
assert(fakeTransport.disableC2D.calledOnce);
assert.strictEqual(fakeTransport.disableC2D.callCount, 1, 'disableC2D not called once');
testCallback();
});
});
Expand Down
7 changes: 6 additions & 1 deletion device/transport/amqp/devdoc/device_amqp_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a

**SRS_NODE_DEVICE_AMQP_06_013: [** The authentication providers `setTokenRenewalValues` method shall be invoked with the values provided in the tokenRenewal option.
**]**

**SRS_NODE_DEVICE_AMQP_13_001: [** The `setOptions` method shall save the options passed in. **]**


Expand Down Expand Up @@ -202,6 +202,8 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a

**SRS_NODE_DEVICE_AMQP_16_033: [** The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link. **]**

**SRS_NODE_DEVICE_AMQP_41_003: [** The `enableC2D` method shall attach the C2D link only if it is not already attached. **]**

**SRS_NODE_DEVICE_AMQP_16_034: [** Any `error` event received on the C2D link shall trigger the emission of an `error` event by the transport, with an argument that is a `CloudToDeviceDetachedError` object with the `innerError` property set to that error. **]**

### disableC2D(callback)
Expand All @@ -212,6 +214,9 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a

**SRS_NODE_DEVICE_AMQP_16_037: [** The `disableC2D` method shall call its `callback` immediately if the transport is already disconnected. **]**

**SRS_NODE_DEVICE_AMQP_41_004: [** The `disableC2D` method shall detach the C2D link only if it is already attached. **]**


### enableInputMessages(callback: (err?: Error) => void): void;

**SRS_NODE_DEVICE_AMQP_18_010: [** The `enableInputMessages` method shall enable C2D messages **]**
Expand Down
50 changes: 32 additions & 18 deletions device/transport/amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,33 +414,47 @@ export class Amqp extends EventEmitter implements DeviceTransport {
/*Codes_SRS_NODE_DEVICE_AMQP_16_079: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback no arguments if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` succeeds.]*/
disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(handleResult('could not disable twin desired properties updates', callback)),
enableC2D: (callback) => {
debug('attaching C2D link');
this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => {
if (err) {
debug('error creating a C2D link: ' + err.toString());
/*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
handleResult('AMQP Transport: Could not attach link', callback)(err);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
debug('C2D link created and attached successfully');
this._c2dLink = receiverLink;
this._c2dLink.on('error', this._c2dErrorListener);
this._c2dLink.on('message', this._c2dMessageListener);
callback();
}
});
/*Codes_SRS_NODE_DEVICE_AMQP_41_003: [The `enableC2D` method shall attach the C2D link only if it is not already attached.] */
if (!this._c2dLink) {
debug('attaching C2D link');
this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => {
if (err) {
debug('error creating a C2D link: ' + err.toString());
/*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
handleResult('AMQP Transport: Could not attach link', callback)(err);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
debug('C2D link created and attached successfully');
this._c2dLink = receiverLink;
this._c2dLink.on('error', this._c2dErrorListener);
this._c2dLink.on('message', this._c2dMessageListener);
callback();
}
});
} else {
debug('C2D link already attached, doing nothing....');
callback();
}
},
disableC2D: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
this._stopC2DListener(undefined, callback);
/*Codes_SRS_NODE_DEVICE_AMQP_41_004: [The `disableC2D` method shall detach the C2D link only if it is already attached.] */
if (this._c2dLink) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
this._stopC2DListener(undefined, callback);
} else {
debug('C2D link already detached, doing nothing...');
callback();
}
},
enableMethods: (callback) => {
// deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D
/*Codes_SRS_NODE_DEVICE_AMQP_16_039: [The `enableMethods` method shall attach the method links and call its `callback` once these are successfully attached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_040: [The `enableMethods` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach method links.]*/
this._deviceMethodClient.attach(callback);
},
disableMethods: (callback) => {
// deviceMethodClient already checks if the link is attached or not, so we do not need to check that here like we do with C2D
/*Codes_SRS_NODE_DEVICE_AMQP_16_042: [The `disableMethods` method shall call `detach` on the device method links and call its callback when these are successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_043: [The `disableMethods` method shall call its `callback` with an `Error` if it fails to detach the device method links.]*/
this._deviceMethodClient.detach(callback);
Expand Down
Loading