Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(multipe packages): c2d states no longer using booleans #797

Merged
merged 17 commits into from
May 14, 2020
31 changes: 14 additions & 17 deletions device/core/src/device_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function safeCallback(callback?: (err?: Error, result?: any) => void, error?: Er
* to create an IoT Hub device client.
*/
export class Client extends InternalClient {
private _c2dEnabled: boolean;
private _c2dFeature: boolean;
private _deviceDisconnectHandler: (err?: Error, result?: any) => void;
private _blobUploadClient: BlobUploadClient;
private _fileUploadApi: FileUploadInterface;
Expand All @@ -45,13 +45,14 @@ export class Client extends InternalClient {
constructor(transport: DeviceTransport, connStr?: string, blobUploadClient?: BlobUploadClient, fileUploadApi?: FileUploadInterface) {
super(transport, connStr);
this._blobUploadClient = blobUploadClient;
this._c2dEnabled = false;
this._c2dFeature = false;
this._fileUploadApi = fileUploadApi;

this.on('removeListener', (eventName) => {
if (eventName === 'message' && this.listeners('message').length === 0) {
/*Codes_SRS_NODE_DEVICE_CLIENT_16_005: [The client shall stop listening for messages from the service whenever the last listener unsubscribes from the `message` event.]*/
debug('in removeListener, disabling C2D.');
this._c2dFeature = false;
this._disableC2D((err) => {
if (err) {
debug('in removeListener, error disabling C2D.');
Expand All @@ -67,6 +68,7 @@ export class Client extends InternalClient {
if (eventName === 'message') {
/*Codes_SRS_NODE_DEVICE_CLIENT_16_004: [The client shall start listening for messages from the service whenever there is a listener subscribed to the `message` event.]*/
debug('in newListener, enabling C2D.');
this._c2dFeature = true;
this._enableC2D((err) => {
if (err) {
debug('in newListener, error enabling C2D.');
Expand All @@ -89,10 +91,9 @@ export class Client extends InternalClient {
if (err && this._retryPolicy.shouldRetry(err)) {
debug('reconnect policy specifies a reconnect on error');
/*Codes_SRS_NODE_DEVICE_CLIENT_16_097: [If the transport emits a `disconnect` event while the client is subscribed to c2d messages the retry policy shall be used to reconnect and re-enable the feature using the transport `enableC2D` method.]*/
debug('_c2dEnabled is: ' + this._c2dEnabled);
if (this._c2dEnabled) {
this._c2dEnabled = false;
debug('re-enabling C2D link.');
if (this._c2dFeature) {
// turn on C2D
debug('disconnectHandler re-enabling C2D');
this._enableC2D((err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_CLIENT_16_102: [If the retry policy fails to reestablish the C2D functionality a `disconnect` event shall be emitted with a `results.Disconnected` object.]*/
Expand All @@ -103,7 +104,7 @@ export class Client extends InternalClient {
}
});
} else {
debug('During _deviceDisconnectHandler, _c2dEnabled is false');
debug('C2D has not been enabled on the device');
}
}
};
Expand Down Expand Up @@ -251,42 +252,38 @@ export class Client extends InternalClient {


private _enableC2D(callback: (err?: Error) => void): void {
debug('_c2dEnabled is: ' + this._c2dEnabled);
if (!this._c2dEnabled) {
if (this._c2dFeature) {
debug('enabling C2D');
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.enableC2D(opCallback);
}, (err) => {
if (!err) {
debug('enabled C2D. Setting this._c2dEnabled to true.');
this._c2dEnabled = true;
debug('enabled C2D');
} else {
debug('Error while enabling C2D.');
}
callback(err);
});
} else {
debug('this._c2dEnable is true. Not enabling C2D.');
debug('C2D is not enabled as a feature, so not enabling on device');
callback();
}
}

private _disableC2D(callback: (err?: Error) => void): void {
debug('_c2dEnabled is: ' + this._c2dEnabled);
if (this._c2dEnabled) {
if (this._c2dFeature) {
debug('disabling C2D');
this._transport.disableC2D((err) => {
if (!err) {
debug('disabled C2D. Setting this._c2dEnabled to false.');
this._c2dEnabled = false;
debug('disabled C2D');
} else {
debug('Error while disabling C2D.');
}
callback(err);
});
} else {
debug('this._c2dEnable is false. Not disabling C2D.');
debug('C2D is not enabled as a feature, so there is nothing to do here');
callback();
}
}
Expand Down
57 changes: 39 additions & 18 deletions device/transport/amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,26 +414,47 @@ export class Amqp extends EventEmitter implements DeviceTransport {
/*Codes_SRS_NODE_DEVICE_AMQP_16_079: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback no arguments if the call to `AmqpTwinClient.disableTwinDesiredPropertiesUpdates` succeeds.]*/
disableTwinDesiredPropertiesUpdates: (callback) => this._twinClient.disableTwinDesiredPropertiesUpdates(handleResult('could not disable twin desired properties updates', callback)),
enableC2D: (callback) => {
debug('attaching C2D link');
this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => {
if (err) {
debug('error creating a C2D link: ' + err.toString());
/*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
handleResult('AMQP Transport: Could not attach link', callback)(err);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
debug('C2D link created and attached successfully');
this._c2dLink = receiverLink;
this._c2dLink.on('error', this._c2dErrorListener);
this._c2dLink.on('message', this._c2dMessageListener);
callback();
}
});

// let listenerCounts = [0, 0];
// if (this._c2dLink) {
// listenerCounts[0] = this._c2dLink.listenerCount('message');
// listenerCounts[1] = this._c2dLink.listenerCount('error');
// }
// if (listenerCounts[0] === 1 && listenerCounts[1] === 1) {
// callback(true);
// } else {
// callback(false);
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what is going on with the commented code.

if (!this._c2dLink) {
debug('attaching C2D link');
this._amqp.attachReceiverLink(this._c2dEndpoint, null, (err, receiverLink) => {
if (err) {
debug('error creating a C2D link: ' + err.toString());
/*Codes_SRS_NODE_DEVICE_AMQP_16_033: [The `enableC2D` method shall call its `callback` with an `Error` if the transport fails to connect, authenticate or attach link.]*/
handleResult('AMQP Transport: Could not attach link', callback)(err);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_16_032: [The `enableC2D` method shall attach the C2D link and call its `callback` once it is successfully attached.]*/
debug('C2D link created and attached successfully');
this._c2dLink = receiverLink;
this._c2dLink.on('error', this._c2dErrorListener);
this._c2dLink.on('message', this._c2dMessageListener);
callback();
}
});
} else {
debug('C2D link already attached, doing nothing....');
callback();
}
},
disableC2D: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
this._stopC2DListener(undefined, callback);
if (this._c2dLink) {
/*Codes_SRS_NODE_DEVICE_AMQP_16_035: [The `disableC2D` method shall call `detach` on the C2D link and call its callback when it is successfully detached.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_16_036: [The `disableC2D` method shall call its `callback` with an `Error` if it fails to detach the C2D link.]*/
this._stopC2DListener(undefined, callback);
} else {
debug('C2D link already detached, doing nothing...');
callback();
}
},
enableMethods: (callback) => {
/*Codes_SRS_NODE_DEVICE_AMQP_16_039: [The `enableMethods` method shall attach the method links and call its `callback` once these are successfully attached.]*/
Expand Down
17 changes: 14 additions & 3 deletions device/transport/mqtt/src/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,13 @@ export class Mqtt extends EventEmitter implements DeviceTransport {
});
},
enableC2D: (callback) => {
this._setupSubscription(this._topics.message, 1, callback);
if (this._topics.message.subscribed) {
debug('C2D already enabled, doing nothing...');
callback();
} else {
debug('setting up C2D subscription');
this._setupSubscription(this._topics.message, 1, callback);
}
},
enableMethods: (callback) => {
this._setupSubscription(this._topics.method, 0, callback);
Expand All @@ -327,7 +333,13 @@ export class Mqtt extends EventEmitter implements DeviceTransport {
this._setupSubscription(this._topics.inputMessage, 1, callback);
},
disableC2D: (callback) => {
this._removeSubscription(this._topics.message, callback);
if (this._topics.message.subscribed) {
debug('removing C2D subscription');
this._removeSubscription(this._topics.message, callback);
} else {
debug('C2D already disabled, doing nothing...');
callback();
}
},
disableMethods: (callback) => {
this._removeSubscription(this._topics.method, callback);
Expand Down Expand Up @@ -971,7 +983,6 @@ export class Mqtt extends EventEmitter implements DeviceTransport {
});
}
}

}


Expand Down
4 changes: 3 additions & 1 deletion e2etests/test/c2d_disconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ var protocolAndTermination = [
transport: deviceAmqp.Amqp,
operationType: 'KillTcp',
closeReason: ' severs the TCP connection ',
delayInSeconds: 2
delayInSeconds: 2,
durationInSeconds: 20
},
{
testEnabled: true,
Expand Down Expand Up @@ -202,6 +203,7 @@ protocolAndTermination.forEach( function (testConfiguration) {
terminateMessage.properties.add('AzIoTHub_FaultOperationType', testConfiguration.operationType);
terminateMessage.properties.add('AzIoTHub_FaultOperationCloseReason', testConfiguration.closeReason);
terminateMessage.properties.add('AzIoTHub_FaultOperationDelayInSecs', testConfiguration.delayInSeconds);
terminateMessage.properties.add('AzIoTHub_FaultOperationDurationInSecs', testConfiguration.durationInSeconds);
deviceClient.sendEvent(terminateMessage, function (sendErr) {
debug('at the callback for the fault injection send, err is:' + sendErr);
});
Expand Down