Skip to content

Commit

Permalink
feat(provisioning-device, prov-amqp, prov-mqtt, prov-https): Add supp…
Browse files Browse the repository at this point in the history
…ort for timespan timeouts to provisioning device client

As discussed in #2036, AMQP provisioning device clients have no way to configure the timeout given to the AMQP library for operations like opening links. This adds overloads to the existing provisioning device client's registerAsync methods that allow for users to configure this timespan timeout.
  • Loading branch information
timtay-microsoft committed Jun 18, 2021
1 parent 4357995 commit 72dca89
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 32 deletions.
56 changes: 44 additions & 12 deletions provisioning/device/src/ProvisioningDeviceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using Microsoft.Azure.Devices.Provisioning.Client.Transport;
using Microsoft.Azure.Devices.Shared;
using System;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -63,52 +64,83 @@ public class ProvisioningDeviceClient
/// <summary>
/// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub.
/// </summary>
/// <param name="timeout">The maximum amount of time to allow this operation to run for before timing out.</param>
/// <remarks>
/// Due to the AMQP library used by this library uses not accepting cancellation tokens, this overload and <see cref="RegisterAsync(ProvisioningRegistrationAdditionalData, TimeSpan)"/>
/// are the only overloads for this method that allow for a specified timeout to be respected in the middle of an AMQP operation such as opening
/// the AMQP connection. MQTT and HTTPS connections do not share that same limitation, though.
/// </remarks>
/// <returns>The registration result.</returns>
public Task<DeviceRegistrationResult> RegisterAsync()
public Task<DeviceRegistrationResult> RegisterAsync(TimeSpan timeout)
{
return RegisterAsync(CancellationToken.None);
Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security);

return RegisterAsync(null, timeout);
}

/// <summary>
/// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub.
/// </summary>
/// <param name="data">The optional additional data.</param>
/// <param name="timeout">The maximum amount of time to allow this operation to run for before timing out.</param>
/// <remarks>
/// Due to the AMQP library used by this library uses not accepting cancellation tokens, this overload and <see cref="RegisterAsync(TimeSpan)"/>
/// are the only overloads for this method that allow for a specified timeout to be respected in the middle of an AMQP operation such as opening
/// the AMQP connection. MQTT and HTTPS connections do not share that same limitation, though.
/// </remarks>
/// <returns>The registration result.</returns>
public Task<DeviceRegistrationResult> RegisterAsync(ProvisioningRegistrationAdditionalData data)
public Task<DeviceRegistrationResult> RegisterAsync(ProvisioningRegistrationAdditionalData data, TimeSpan timeout)
{
return RegisterAsync(data, CancellationToken.None);
Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security);

var request = new ProvisioningTransportRegisterMessage(_globalDeviceEndpoint, _idScope, _security, data?.JsonData)
{
ProductInfo = ProductInfo,
};

return _transport.RegisterAsync(request, timeout);
}

/// <summary>
/// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// Due to the AMQP library used by this library uses not accepting cancellation tokens, the provided cancellation token will only be checked
/// for cancellation in between AMQP operations, and not during. In order to have a timeout for this operation that is checked during AMQP operations
/// (such as opening the connection), you must use <see cref="RegisterAsync(TimeSpan)"/> instead. MQTT and HTTPS connections do not have the same
/// behavior as AMQP connections in this regard. MQTT and HTTPS connections will check this cancellation token for cancellation during their protocol level operations.
/// </remarks>
/// <returns>The registration result.</returns>
public Task<DeviceRegistrationResult> RegisterAsync(CancellationToken cancellationToken)
public Task<DeviceRegistrationResult> RegisterAsync(CancellationToken cancellationToken = default)
{
Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security);

var request = new ProvisioningTransportRegisterMessage(_globalDeviceEndpoint, _idScope, _security)
{
ProductInfo = ProductInfo
};
return _transport.RegisterAsync(request, cancellationToken);
return RegisterAsync(null, cancellationToken);
}

/// <summary>
/// Registers the current device using the Device Provisioning Service and assigns it to an IoT Hub.
/// </summary>
/// <param name="data">The custom content.</param>
/// <param name="data">The optional additional data.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// Due to the AMQP library used by this library uses not accepting cancellation tokens, the provided cancellation token will only be checked
/// for cancellation in between AMQP operations, and not during. In order to have a timeout for this operation that is checked during AMQP operations
/// (such as opening the connection), you must use <see cref="RegisterAsync(ProvisioningRegistrationAdditionalData, TimeSpan)">this overload</see> instead.
/// MQTT and HTTPS connections do not have the same behavior as AMQP connections in this regard. MQTT and HTTPS connections will check this cancellation
/// token for cancellation during their protocol level operations.
/// </remarks>
/// <returns>The registration result.</returns>
public Task<DeviceRegistrationResult> RegisterAsync(ProvisioningRegistrationAdditionalData data, CancellationToken cancellationToken)
public Task<DeviceRegistrationResult> RegisterAsync(ProvisioningRegistrationAdditionalData data, CancellationToken cancellationToken = default)
{
Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _transport, _security);

var request = new ProvisioningTransportRegisterMessage(_globalDeviceEndpoint, _idScope, _security, data?.JsonData)
{
ProductInfo = ProductInfo,
};

return _transport.RegisterAsync(request, cancellationToken);
}
}
Expand Down
13 changes: 13 additions & 0 deletions provisioning/device/src/ProvisioningTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ public int Port
return _innerHandler.RegisterAsync(message, cancellationToken);
}

/// <summary>
/// Registers a device described by the message.
/// </summary>
/// <param name="message">The provisioning message.</param>
/// <param name="timeout">The maximum amount of time to allow this operation to run for before timing out.</param>
/// <returns>The registration result.</returns>
public virtual Task<DeviceRegistrationResult> RegisterAsync(
ProvisioningTransportRegisterMessage message,
TimeSpan timeout)
{
return _innerHandler.RegisterAsync(message, timeout);
}

/// <summary>
/// Releases the unmanaged resources and disposes of the managed resources used by the invoker.
/// </summary>
Expand Down
74 changes: 54 additions & 20 deletions provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace Microsoft.Azure.Devices.Provisioning.Client.Transport
/// </summary>
public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler
{
private static readonly TimeSpan s_defaultOperationPoolingInterval = TimeSpan.FromSeconds(2);
private static readonly TimeSpan s_defaultOperationPollingInterval = TimeSpan.FromSeconds(2);
private static readonly TimeSpan s_timeoutConstant = TimeSpan.FromMinutes(1);

/// <summary>
Expand All @@ -43,6 +43,19 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler
Proxy = DefaultWebProxySettings.Instance;
}

/// <summary>
/// Registers a device described by the message.
/// </summary>
/// <param name="message">The provisioning message.</param>
/// <param name="timeout">The maximum amount of time to allow this operation to run for before timing out.</param>
/// <returns>The registration result.</returns>
public override async Task<DeviceRegistrationResult> RegisterAsync(
ProvisioningTransportRegisterMessage message,
TimeSpan timeout)
{
return await RegisterAsync(message, timeout, CancellationToken.None).ConfigureAwait(false);
}

/// <summary>
/// Registers a device described by the message.
/// </summary>
Expand All @@ -52,6 +65,22 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler
public override async Task<DeviceRegistrationResult> RegisterAsync(
ProvisioningTransportRegisterMessage message,
CancellationToken cancellationToken)
{
return await RegisterAsync(message, s_timeoutConstant, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Registers a device described by the message. Because the AMQP library does not accept cancellation tokens, the provided cancellation token
/// will only be checked for cancellation between AMQP operations. The timeout will be respected during the AMQP operations.
/// </summary>
/// <param name="message">The provisioning message.</param>
/// <param name="timeout">The maximum amount of time to allow this operation to run for before timing out.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The registration result.</returns>
private async Task<DeviceRegistrationResult> RegisterAsync(
ProvisioningTransportRegisterMessage message,
TimeSpan timeout,
CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
{
Expand Down Expand Up @@ -106,19 +135,21 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler
string linkEndpoint = $"{message.IdScope}/registrations/{registrationId}";

using AmqpClientConnection connection = authStrategy.CreateConnection(builder.Uri, message.IdScope);
await authStrategy.OpenConnectionAsync(connection, s_timeoutConstant, useWebSocket, Proxy, RemoteCertificateValidationCallback).ConfigureAwait(false);
await authStrategy.OpenConnectionAsync(connection, timeout, useWebSocket, Proxy, RemoteCertificateValidationCallback).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();

await CreateLinksAsync(
connection,
linkEndpoint,
message.ProductInfo).ConfigureAwait(false);
message.ProductInfo,
timeout).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();

string correlationId = Guid.NewGuid().ToString();
DeviceRegistration deviceRegistration = (message.Payload != null && message.Payload.Length > 0) ? new DeviceRegistration { Payload = new JRaw(message.Payload) } : null;

RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration).ConfigureAwait(false);
RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration, timeout).ConfigureAwait(false);

// Poll with operationId until registration complete.
int attempts = 0;
Expand All @@ -131,15 +162,16 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler
cancellationToken.ThrowIfCancellationRequested();

await Task.Delay(
operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingInterval),
operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPollingInterval),
cancellationToken).ConfigureAwait(false);

try
{
operation = await OperationStatusLookupAsync(
connection,
operationId,
correlationId).ConfigureAwait(false);
correlationId,
timeout).ConfigureAwait(false);
}
catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp amqp && e.IsTransient)
{
Expand All @@ -154,7 +186,7 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler
authStrategy.SaveCredentials(operation);
}

await connection.CloseAsync(s_timeoutConstant).ConfigureAwait(false);
await connection.CloseAsync(timeout).ConfigureAwait(false);

return ConvertToProvisioningRegistrationResult(operation.RegistrationState);
}
Expand All @@ -179,30 +211,31 @@ public class ProvisioningTransportHandlerAmqp : ProvisioningTransportHandler
}
}

private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo)
private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo, TimeSpan timeout)
{
AmqpClientSession amqpDeviceSession = connection.CreateSession();
await amqpDeviceSession.OpenAsync(s_timeoutConstant).ConfigureAwait(false);
await amqpDeviceSession.OpenAsync(timeout).ConfigureAwait(false);

AmqpClientLink amqpReceivingLink = amqpDeviceSession.CreateReceivingLink(linkEndpoint);

amqpReceivingLink.AddClientVersion(productInfo);
amqpReceivingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion);

await amqpReceivingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false);
await amqpReceivingLink.OpenAsync(timeout).ConfigureAwait(false);

AmqpClientLink amqpSendingLink = amqpDeviceSession.CreateSendingLink(linkEndpoint);

amqpSendingLink.AddClientVersion(productInfo);
amqpSendingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion);

await amqpSendingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false);
await amqpSendingLink.OpenAsync(timeout).ConfigureAwait(false);
}

private async Task<RegistrationOperationStatus> RegisterDeviceAsync(
AmqpClientConnection client,
string correlationId,
DeviceRegistration deviceRegistration)
DeviceRegistration deviceRegistration,
TimeSpan timeout)
{
AmqpMessage amqpMessage = null;

Expand All @@ -226,19 +259,19 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri
.SendMessageAsync(
amqpMessage,
new ArraySegment<byte>(Guid.NewGuid().ToByteArray()),
s_timeoutConstant)
timeout)
.ConfigureAwait(false);
ValidateOutcome(outcome);

AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant).ConfigureAwait(false);
AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(timeout).ConfigureAwait(false);
client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse);

using var streamReader = new StreamReader(amqpResponse.BodyStream);
string jsonResponse = await streamReader
.ReadToEndAsync()
.ConfigureAwait(false);
RegistrationOperationStatus status = JsonConvert.DeserializeObject<RegistrationOperationStatus>(jsonResponse);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPollingInterval);
return status;
}
finally
Expand All @@ -250,7 +283,8 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri
private async Task<RegistrationOperationStatus> OperationStatusLookupAsync(
AmqpClientConnection client,
string operationId,
string correlationId)
string correlationId,
TimeSpan timeout)
{
using var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = DeviceOperations.GetOperationStatus });

Expand All @@ -263,12 +297,12 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri
.SendMessageAsync(
amqpMessage,
new ArraySegment<byte>(Guid.NewGuid().ToByteArray()),
s_timeoutConstant)
timeout)
.ConfigureAwait(false);

ValidateOutcome(outcome);

AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant)
AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(timeout)
.ConfigureAwait(false);

client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse);
Expand All @@ -277,7 +311,7 @@ private static async Task CreateLinksAsync(AmqpClientConnection connection, stri

string jsonResponse = await streamReader.ReadToEndAsync().ConfigureAwait(false);
RegistrationOperationStatus status = JsonConvert.DeserializeObject<RegistrationOperationStatus>(jsonResponse);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPollingInterval);

return status;
}
Expand Down Expand Up @@ -314,7 +348,7 @@ private void ValidateOutcome(Outcome outcome)
bool isTransient = statusCode >= (int)HttpStatusCode.InternalServerError || statusCode == 429;
if (isTransient)
{
errorDetails.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromRejection(rejected, s_defaultOperationPoolingInterval);
errorDetails.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromRejection(rejected, s_defaultOperationPollingInterval);
}

throw new ProvisioningTransportException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ public ProvisioningTransportHandlerHttp()
Proxy = DefaultWebProxySettings.Instance;
}

/// <summary>
/// Registers a device described by the message.
/// </summary>
/// <param name="message">The provisioning message.</param>
/// <param name="timeout">The maximum amount of time to allow this operation to run for before timing out.</param>
/// <returns>The registration result.</returns>
public override async Task<DeviceRegistrationResult> RegisterAsync(
ProvisioningTransportRegisterMessage message,
TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
return await RegisterAsync(message, cts.Token).ConfigureAwait(false);
}

/// <summary>
/// Registers a device described by the message.
/// </summary>
Expand Down

0 comments on commit 72dca89

Please sign in to comment.