diff --git a/provisioning/device/src/ProvisioningDeviceClient.cs b/provisioning/device/src/ProvisioningDeviceClient.cs index 3c7fbb3f66..68528e5dcd 100644 --- a/provisioning/device/src/ProvisioningDeviceClient.cs +++ b/provisioning/device/src/ProvisioningDeviceClient.cs @@ -3,6 +3,7 @@ using Microsoft.Azure.Devices.Provisioning.Client.Transport; using Microsoft.Azure.Devices.Shared; +using System; using System.Threading; using System.Threading.Tasks; @@ -63,45 +64,75 @@ public class ProvisioningDeviceClient /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// + /// The maximum amount of time to allow this operation to run for before timing out. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, this overload and + /// are the only overloads for this method that allow for a specified timeout to be respected in the middle of an AMQP operation such as opening + /// the AMQP connection. MQTT and HTTPS connections do not share that same limitation, though. + /// /// The registration result. - public Task RegisterAsync() + public Task RegisterAsync(TimeSpan timeout) { - return RegisterAsync(CancellationToken.None); + Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); + + return RegisterAsync(null, timeout); } /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// /// The optional additional data. + /// The maximum amount of time to allow this operation to run for before timing out. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, this overload and + /// are the only overloads for this method that allow for a specified timeout to be respected in the middle of an AMQP operation such as opening + /// the AMQP connection. MQTT and HTTPS connections do not share that same limitation, though. + /// /// The registration result. - public Task RegisterAsync(ProvisioningRegistrationAdditionalData data) + public Task RegisterAsync(ProvisioningRegistrationAdditionalData data, TimeSpan timeout) { - return RegisterAsync(data, CancellationToken.None); + Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); + + var request = new ProvisioningTransportRegisterMessage(_globalDeviceEndpoint, _idScope, _security, data?.JsonData) + { + ProductInfo = ProductInfo, + }; + + return _transport.RegisterAsync(request, timeout); } /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// /// The cancellation token. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, the provided cancellation token will only be checked + /// for cancellation in between AMQP operations, and not during. In order to have a timeout for this operation that is checked during AMQP operations + /// (such as opening the connection), you must use instead. MQTT and HTTPS connections do not have the same + /// behavior as AMQP connections in this regard. MQTT and HTTPS connections will check this cancellation token for cancellation during their protocol level operations. + /// /// The registration result. - public Task RegisterAsync(CancellationToken cancellationToken) + public Task RegisterAsync(CancellationToken cancellationToken = default) { Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); - var request = new ProvisioningTransportRegisterMessage(_globalDeviceEndpoint, _idScope, _security) - { - ProductInfo = ProductInfo - }; - return _transport.RegisterAsync(request, cancellationToken); + return RegisterAsync(null, cancellationToken); } /// /// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub. /// - /// The custom content. + /// The optional additional data. /// The cancellation token. + /// + /// Due to the AMQP library used by this library uses not accepting cancellation tokens, the provided cancellation token will only be checked + /// for cancellation in between AMQP operations, and not during. In order to have a timeout for this operation that is checked during AMQP operations + /// (such as opening the connection), you must use this overload instead. + /// MQTT and HTTPS connections do not have the same behavior as AMQP connections in this regard. MQTT and HTTPS connections will check this cancellation + /// token for cancellation during their protocol level operations. + /// /// The registration result. - public Task RegisterAsync(ProvisioningRegistrationAdditionalData data, CancellationToken cancellationToken) + public Task RegisterAsync(ProvisioningRegistrationAdditionalData data, CancellationToken cancellationToken = default) { Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security); @@ -109,6 +140,7 @@ public Task RegisterAsync(ProvisioningRegistrationAddi { ProductInfo = ProductInfo, }; + return _transport.RegisterAsync(request, cancellationToken); } } diff --git a/provisioning/device/src/ProvisioningTransportHandler.cs b/provisioning/device/src/ProvisioningTransportHandler.cs index 8208932cad..1ddc085cfb 100644 --- a/provisioning/device/src/ProvisioningTransportHandler.cs +++ b/provisioning/device/src/ProvisioningTransportHandler.cs @@ -85,6 +85,19 @@ public int Port return _innerHandler.RegisterAsync(message, cancellationToken); } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public virtual Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + return _innerHandler.RegisterAsync(message, timeout); + } + /// /// Releases the unmanaged resources and disposes of the managed resources used by the invoker. /// diff --git a/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs b/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs index 740d4265a2..c55025c7d7 100644 --- a/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs +++ b/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs @@ -22,7 +22,7 @@ namespace Microsoft.Azure.Devices.Provisioning.Client.Transport /// public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler { - private static readonly TimeSpan s_defaultOperationPoolingInterval = TimeSpan.FromSeconds(2); + private static readonly TimeSpan s_defaultOperationPollingInterval = TimeSpan.FromSeconds(2); private static readonly TimeSpan s_timeoutConstant = TimeSpan.FromMinutes(1); /// @@ -43,6 +43,19 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler Proxy = DefaultWebProxySettings.Instance; } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public override async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + return await RegisterAsync(message, timeout, CancellationToken.None).ConfigureAwait(false); + } + /// /// Registers a device described by the message. /// @@ -52,6 +65,22 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler public override async Task RegisterAsync( ProvisioningTransportRegisterMessage message, CancellationToken cancellationToken) + { + return await RegisterAsync(message, s_timeoutConstant, cancellationToken).ConfigureAwait(false); + } + + /// + /// Registers a device described by the message. Because the AMQP library does not accept cancellation tokens, the provided cancellation token + /// will only be checked for cancellation between AMQP operations. The timeout will be respected during the AMQP operations. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The cancellation token. + /// The registration result. + private async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout, + CancellationToken cancellationToken) { if (Logging.IsEnabled) { @@ -106,19 +135,21 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler string linkEndpoint = $"{message.IdScope}/registrations/{registrationId}"; using AmqpClientConnection connection = authStrategy.CreateConnection(builder.Uri, message.IdScope); - await authStrategy.OpenConnectionAsync(connection, s_timeoutConstant, useWebSocket, Proxy, RemoteCertificateValidationCallback).ConfigureAwait(false); + await authStrategy.OpenConnectionAsync(connection, timeout, useWebSocket, Proxy, RemoteCertificateValidationCallback).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); await CreateLinksAsync( connection, linkEndpoint, - message.ProductInfo).ConfigureAwait(false); + message.ProductInfo, + timeout).ConfigureAwait(false); + cancellationToken.ThrowIfCancellationRequested(); string correlationId = Guid.NewGuid().ToString(); DeviceRegistration deviceRegistration = (message.Payload != null && message.Payload.Length > 0) ? new DeviceRegistration { Payload = new JRaw(message.Payload) } : null; - RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration).ConfigureAwait(false); + RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration, timeout).ConfigureAwait(false); // Poll with operationId until registration complete. int attempts = 0; @@ -131,7 +162,7 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler cancellationToken.ThrowIfCancellationRequested(); await Task.Delay( - operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingInterval), + operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPollingInterval), cancellationToken).ConfigureAwait(false); try @@ -139,7 +170,8 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler operation = await OperationStatusLookupAsync( connection, operationId, - correlationId).ConfigureAwait(false); + correlationId, + timeout).ConfigureAwait(false); } catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp amqp && e.IsTransient) { @@ -154,7 +186,7 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler authStrategy.SaveCredentials(operation); } - await connection.CloseAsync(s_timeoutConstant).ConfigureAwait(false); + await connection.CloseAsync(timeout).ConfigureAwait(false); return ConvertToProvisioningRegistrationResult(operation.RegistrationState); } @@ -179,30 +211,31 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler } } - private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo) + private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo, TimeSpan timeout) { AmqpClientSession amqpDeviceSession = connection.CreateSession(); - await amqpDeviceSession.OpenAsync(s_timeoutConstant).ConfigureAwait(false); + await amqpDeviceSession.OpenAsync(timeout).ConfigureAwait(false); AmqpClientLink amqpReceivingLink = amqpDeviceSession.CreateReceivingLink(linkEndpoint); amqpReceivingLink.AddClientVersion(productInfo); amqpReceivingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion); - await amqpReceivingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false); + await amqpReceivingLink.OpenAsync(timeout).ConfigureAwait(false); AmqpClientLink amqpSendingLink = amqpDeviceSession.CreateSendingLink(linkEndpoint); amqpSendingLink.AddClientVersion(productInfo); amqpSendingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion); - await amqpSendingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false); + await amqpSendingLink.OpenAsync(timeout).ConfigureAwait(false); } private async Task RegisterDeviceAsync( AmqpClientConnection client, string correlationId, - DeviceRegistration deviceRegistration) + DeviceRegistration deviceRegistration, + TimeSpan timeout) { AmqpMessage amqpMessage = null; @@ -226,11 +259,11 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri .SendMessageAsync( amqpMessage, new ArraySegment(Guid.NewGuid().ToByteArray()), - s_timeoutConstant) + timeout) .ConfigureAwait(false); ValidateOutcome(outcome); - AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant).ConfigureAwait(false); + AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(timeout).ConfigureAwait(false); client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse); using var streamReader = new StreamReader(amqpResponse.BodyStream); @@ -238,7 +271,7 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri .ReadToEndAsync() .ConfigureAwait(false); RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); - status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); + status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPollingInterval); return status; } finally @@ -250,7 +283,8 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri private async Task OperationStatusLookupAsync( AmqpClientConnection client, string operationId, - string correlationId) + string correlationId, + TimeSpan timeout) { using var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = DeviceOperations.GetOperationStatus }); @@ -263,12 +297,12 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri .SendMessageAsync( amqpMessage, new ArraySegment(Guid.NewGuid().ToByteArray()), - s_timeoutConstant) + timeout) .ConfigureAwait(false); ValidateOutcome(outcome); - AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant) + AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(timeout) .ConfigureAwait(false); client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse); @@ -277,7 +311,7 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri string jsonResponse = await streamReader.ReadToEndAsync().ConfigureAwait(false); RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); - status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); + status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPollingInterval); return status; } @@ -314,7 +348,7 @@ private void ValidateOutcome(Outcome outcome) bool isTransient = statusCode >= (int)HttpStatusCode.InternalServerError || statusCode == 429; if (isTransient) { - errorDetails.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromRejection(rejected, s_defaultOperationPoolingInterval); + errorDetails.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromRejection(rejected, s_defaultOperationPollingInterval); } throw new ProvisioningTransportException( diff --git a/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs b/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs index 59b3af6723..cdbd2bec00 100644 --- a/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs +++ b/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs @@ -32,6 +32,20 @@ public ProvisioningTransportHandlerHttp() Proxy = DefaultWebProxySettings.Instance; } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public override async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + using var cts = new CancellationTokenSource(timeout); + return await RegisterAsync(message, cts.Token).ConfigureAwait(false); + } + /// /// Registers a device described by the message. /// diff --git a/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs b/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs index d62ee25bd4..99b596d641 100644 --- a/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs +++ b/provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs @@ -63,6 +63,20 @@ public class ProvisioningTransportHandlerMqtt : ProvisioningTransportHandler Proxy = DefaultWebProxySettings.Instance; } + /// + /// Registers a device described by the message. + /// + /// The provisioning message. + /// The maximum amount of time to allow this operation to run for before timing out. + /// The registration result. + public override async Task RegisterAsync( + ProvisioningTransportRegisterMessage message, + TimeSpan timeout) + { + using var cts = new CancellationTokenSource(timeout); + return await RegisterAsync(message, cts.Token).ConfigureAwait(false); + } + /// /// Registers a device described by the message. ///