From 72dca89bd385612e711623a6156eda33bd75bee8 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Fri, 18 Jun 2021 11:25:04 -0700 Subject: [PATCH] feat(provisioning-device, prov-amqp, prov-mqtt, prov-https): Add support for timespan timeouts to provisioning device client As discussed in #2036, AMQP provisioning device clients have no way to configure the timeout given to the AMQP library for operations like opening links. This adds overloads to the existing provisioning device client's registerAsync methods that allow for users to configure this timespan timeout. --- .../device/src/ProvisioningDeviceClient.cs | 56 +++++++++++--- .../src/ProvisioningTransportHandler.cs | 13 ++++ .../src/ProvisioningTransportHandlerAmqp.cs | 74 ++++++++++++++----- .../src/ProvisioningTransportHandlerHttp.cs | 14 ++++ .../src/ProvisioningTransportHandlerMqtt.cs | 14 ++++ 5 files changed, 139 insertions(+), 32 deletions(-) diff --git a/provisioning/device/src/ProvisioningDeviceClient.cs b/provisioning/device/src/ProvisioningDeviceClient.cs index 3c7fbb3f66..68528e5dcd 100644 --- a/provisioning/device/src/ProvisioningDeviceClient.cs +++ b/provisioning/device/src/ProvisioningDeviceClient.cs @@ -3,6 +3,7 @@ using Microsoft.Azure.Devices.Provisioning.Client.Transport; using Microsoft.Azure.Devices.Shared; +using System; using System.Threading; using System.Threading.Tasks; @@ -63,45 +64,75 @@ public class ProvisioningDeviceClient /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// + /// The maximum amount of time to allow this operation to run for before timing out. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, this overload and + /// are the only overloads for this method that allow for a specified timeout to be respected in the middle of an AMQP operation such as opening + /// the AMQP connection. MQTT and HTTPS connections do not share that same limitation, though. + /// /// The registration result. - public Task RegisterAsync() + public Task RegisterAsync(TimeSpan timeout) { - return RegisterAsync(CancellationToken.None); + Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); + + return RegisterAsync(null, timeout); } /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// /// The optional additional data. + /// The maximum amount of time to allow this operation to run for before timing out. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, this overload and + /// are the only overloads for this method that allow for a specified timeout to be respected in the middle of an AMQP operation such as opening + /// the AMQP connection. MQTT and HTTPS connections do not share that same limitation, though. + /// /// The registration result. - public Task RegisterAsync(ProvisioningRegistrationAdditionalData data) + public Task RegisterAsync(ProvisioningRegistrationAdditionalData data, TimeSpan timeout) { - return RegisterAsync(data, CancellationToken.None); + Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); + + var request = new ProvisioningTransportRegisterMessage(_globalDeviceEndpoint, _idScope, _security, data?.JsonData) + { + ProductInfo = ProductInfo, + }; + + return _transport.RegisterAsync(request, timeout); } /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// /// The cancellation token. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, the provided cancellation token will only be checked + /// for cancellation in between AMQP operations, and not during. In order to have a timeout for this operation that is checked during AMQP operations + /// (such as opening the connection), you must use instead. MQTT and HTTPS connections do not have the same + /// behavior as AMQP connections in this regard. MQTT and HTTPS connections will check this cancellation token for cancellation during their protocol level operations. + /// /// The registration result. - public Task RegisterAsync(CancellationToken cancellationToken) + public Task RegisterAsync(CancellationToken cancellationToken = default) { Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); - var request = new ProvisioningTransportRegisterMessage(_globalDeviceEndpoint, _idScope, _security) - { - ProductInfo = ProductInfo - }; - return _transport.RegisterAsync(request, cancellationToken); + return RegisterAsync(null, cancellationToken); } /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// - /// The custom content. + /// The optional additional data. /// The cancellation token. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, the provided cancellation token will only be checked + /// for cancellation in between AMQP operations, and not during. In order to have a timeout for this operation that is checked during AMQP operations + /// (such as opening the connection), you must use this overload instead. + /// MQTT and HTTPS connections do not have the same behavior as AMQP connections in this regard. MQTT and HTTPS connections will check this cancellation + /// token for cancellation during their protocol level operations. + /// /// The registration result. - public Task RegisterAsync(ProvisioningRegistrationAdditionalData data, CancellationToken cancellationToken) + public Task RegisterAsync(ProvisioningRegistrationAdditionalData data, CancellationToken cancellationToken = default) { Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); @@ -109,6 +140,7 @@ public Task RegisterAsync(ProvisioningRegistrationAddi { ProductInfo = ProductInfo, }; + return _transport.RegisterAsync(request, cancellationToken); } } diff --git a/provisioning/device/src/ProvisioningTransportHandler.cs b/provisioning/device/src/ProvisioningTransportHandler.cs index 8208932cad..1ddc085cfb 100644 --- a/provisioning/device/src/ProvisioningTransportHandler.cs +++ b/provisioning/device/src/ProvisioningTransportHandler.cs @@ -85,6 +85,19 @@ public int Port return _innerHandler.RegisterAsync(message, cancellationToken); } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public virtual Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + return _innerHandler.RegisterAsync(message, timeout); + } + /// /// Releases the unmanaged resources and disposes of the managed resources used by the invoker. /// diff --git a/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs b/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs index 740d4265a2..c55025c7d7 100644 --- a/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs +++ b/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs @@ -22,7 +22,7 @@ namespace Microsoft.Azure.Devices.Provisioning.Client.Transport /// public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler { - private static readonly TimeSpan s_defaultOperationPoolingInterval = TimeSpan.FromSeconds(2); + private static readonly TimeSpan s_defaultOperationPollingInterval = TimeSpan.FromSeconds(2); private static readonly TimeSpan s_timeoutConstant = TimeSpan.FromMinutes(1); /// @@ -43,6 +43,19 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler Proxy = DefaultWebProxySettings.Instance; } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public override async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + return await RegisterAsync(message, timeout, CancellationToken.None).ConfigureAwait(false); + } + /// /// Registers a device described by the message. /// @@ -52,6 +65,22 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler public override async Task RegisterAsync( ProvisioningTransportRegisterMessage message, CancellationToken cancellationToken) + { + return await RegisterAsync(message, s_timeoutConstant, cancellationToken).ConfigureAwait(false); + } + + /// + /// Registers a device described by the message. Because the AMQP library does not accept cancellation tokens, the provided cancellation token + /// will only be checked for cancellation between AMQP operations. The timeout will be respected during the AMQP operations. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The cancellation token. + /// The registration result. + private async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout, + CancellationToken cancellationToken) { if (Logging.IsEnabled) { @@ -106,19 +135,21 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler string linkEndpoint = $"{message.IdScope}/registrations/{registrationId}"; using AmqpClientConnection connection = authStrategy.CreateConnection(builder.Uri, message.IdScope); - await authStrategy.OpenConnectionAsync(connection, s_timeoutConstant, useWebSocket, Proxy, RemoteCertificateValidationCallback).ConfigureAwait(false); + await authStrategy.OpenConnectionAsync(connection, timeout, useWebSocket, Proxy, RemoteCertificateValidationCallback).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); await CreateLinksAsync( connection, linkEndpoint, - message.ProductInfo).ConfigureAwait(false); + message.ProductInfo, + timeout).ConfigureAwait(false); + cancellationToken.ThrowIfCancellationRequested(); string correlationId = Guid.NewGuid().ToString(); DeviceRegistration deviceRegistration = (message.Payload != null && message.Payload.Length > 0) ? new DeviceRegistration { Payload = new JRaw(message.Payload) } : null; - RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration).ConfigureAwait(false); + RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration, timeout).ConfigureAwait(false); // Poll with operationId until registration complete. int attempts = 0; @@ -131,7 +162,7 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler cancellationToken.ThrowIfCancellationRequested(); await Task.Delay( - operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingInterval), + operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPollingInterval), cancellationToken).ConfigureAwait(false); try @@ -139,7 +170,8 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler operation = await OperationStatusLookupAsync( connection, operationId, - correlationId).ConfigureAwait(false); + correlationId, + timeout).ConfigureAwait(false); } catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp amqp && e.IsTransient) { @@ -154,7 +186,7 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler authStrategy.SaveCredentials(operation); } - await connection.CloseAsync(s_timeoutConstant).ConfigureAwait(false); + await connection.CloseAsync(timeout).ConfigureAwait(false); return ConvertToProvisioningRegistrationResult(operation.RegistrationState); } @@ -179,30 +211,31 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler } } - private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo) + private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo, TimeSpan timeout) { AmqpClientSession amqpDeviceSession = connection.CreateSession(); - await amqpDeviceSession.OpenAsync(s_timeoutConstant).ConfigureAwait(false); + await amqpDeviceSession.OpenAsync(timeout).ConfigureAwait(false); AmqpClientLink amqpReceivingLink = amqpDeviceSession.CreateReceivingLink(linkEndpoint); amqpReceivingLink.AddClientVersion(productInfo); amqpReceivingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion); - await amqpReceivingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false); + await amqpReceivingLink.OpenAsync(timeout).ConfigureAwait(false); AmqpClientLink amqpSendingLink = amqpDeviceSession.CreateSendingLink(linkEndpoint); amqpSendingLink.AddClientVersion(productInfo); amqpSendingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion); - await amqpSendingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false); + await amqpSendingLink.OpenAsync(timeout).ConfigureAwait(false); } private async Task RegisterDeviceAsync( AmqpClientConnection client, string correlationId, - DeviceRegistration deviceRegistration) + DeviceRegistration deviceRegistration, + TimeSpan timeout) { AmqpMessage amqpMessage = null; @@ -226,11 +259,11 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri .SendMessageAsync( amqpMessage, new ArraySegment(Guid.NewGuid().ToByteArray()), - s_timeoutConstant) + timeout) .ConfigureAwait(false); ValidateOutcome(outcome); - AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant).ConfigureAwait(false); + AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(timeout).ConfigureAwait(false); client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse); using var streamReader = new StreamReader(amqpResponse.BodyStream); @@ -238,7 +271,7 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri .ReadToEndAsync() .ConfigureAwait(false); RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); - status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); + status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPollingInterval); return status; } finally @@ -250,7 +283,8 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri private async Task OperationStatusLookupAsync( AmqpClientConnection client, string operationId, - string correlationId) + string correlationId, + TimeSpan timeout) { using var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = DeviceOperations.GetOperationStatus }); @@ -263,12 +297,12 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri .SendMessageAsync( amqpMessage, new ArraySegment(Guid.NewGuid().ToByteArray()), - s_timeoutConstant) + timeout) .ConfigureAwait(false); ValidateOutcome(outcome); - AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant) + AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(timeout) .ConfigureAwait(false); client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse); @@ -277,7 +311,7 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri string jsonResponse = await streamReader.ReadToEndAsync().ConfigureAwait(false); RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); - status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); + status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPollingInterval); return status; } @@ -314,7 +348,7 @@ private void ValidateOutcome(Outcome outcome) bool isTransient = statusCode >= (int)HttpStatusCode.InternalServerError || statusCode == 429; if (isTransient) { - errorDetails.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromRejection(rejected, s_defaultOperationPoolingInterval); + errorDetails.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromRejection(rejected, s_defaultOperationPollingInterval); } throw new ProvisioningTransportException( diff --git a/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs b/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs index 59b3af6723..cdbd2bec00 100644 --- a/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs +++ b/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs @@ -32,6 +32,20 @@ public ProvisioningTransportHandlerHttp() Proxy = DefaultWebProxySettings.Instance; } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public override async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + using var cts = new CancellationTokenSource(timeout); + return await RegisterAsync(message, cts.Token).ConfigureAwait(false); + } + /// /// Registers a device described by the message. /// diff --git a/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs b/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs index d62ee25bd4..99b596d641 100644 --- a/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs +++ b/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs @@ -63,6 +63,20 @@ public class ProvisioningTransportHandlerMqtt : ProvisioningTransportHandler Proxy = DefaultWebProxySettings.Instance; } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public override async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + using var cts = new CancellationTokenSource(timeout); + return await RegisterAsync(message, cts.Token).ConfigureAwait(false); + } + /// /// Registers a device described by the message. ///