Skip to content

Commit

Permalink
Add retry policies into DPS device (#2926)
Browse files Browse the repository at this point in the history
  • Loading branch information
brycewang-microsoft committed Oct 28, 2022
1 parent 5f1a2c5 commit 6448bff
Show file tree
Hide file tree
Showing 22 changed files with 770 additions and 12 deletions.
4 changes: 3 additions & 1 deletion SDK v2 migration guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ What was a loose affiliation of separate clients is now a consolidated client wi
#### Notable additions

- The library now includes `IIotHubServiceRetryPolicy` implementations: `IotHubServiceExponentialBackoffRetryPolicy`, `IotHubServiceFixedDelayRetryPolicy`, `IotHubServiceIncrementalDelayRetryPolicy` and `IotHubServiceNoRetry`,
which can be set by calling `IotHubServiceClientOptions.RetryPolicy`.
which can be set via `IotHubServiceClientOptions.RetryPolicy`.

#### API mapping

Expand Down Expand Up @@ -328,6 +328,8 @@ What was a loose affiliation of separate clients is now a consolidated client wi
- Added support for setting a client web socket instance in the client options so that users can have better control over AMQP web socket connections.
- Added support for setting the web socket level keep alive interval for AMQP web socket connections.
- Added support for setting the remote certificate validation callback for AMQP TCP connections.
- The library now includes `IProvisioningClientRetryPolicy` implementations: `ProvisioningClientExponentialBackoffRetryPolicy`, `ProvisioningClientFixedDelayRetryPolicy`, `IotHubServiceIncrementalDelayRetryPolicy` and `ProvisioningClientNoRetry`,
which can be set via `ProvisioningClientOptions.RetryPolicy`.

#### API mapping

Expand Down
14 changes: 10 additions & 4 deletions e2e/test/provisioning/ProvisioningE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,9 @@ public async Task DPS_Registration_AmqpWs_X509_IndividualEnrollment_InvalidGloba
string groupId)
{
ProvisioningClientOptions clientOptions = CreateProvisioningClientOptionsFromName(transportSettings);
// Set no retry for the provisioning client other than letting it retry infinitely, so that the
// expected ProvisioningClientException can be thrown before the cancellation token is signaled.
clientOptions.RetryPolicy = new ProvisioningClientNoRetry();
AuthenticationProvider auth = null;

try
Expand All @@ -754,7 +757,7 @@ public async Task DPS_Registration_AmqpWs_X509_IndividualEnrollment_InvalidGloba
using var cts = new CancellationTokenSource(FailingTimeoutMiliseconds);
Func<Task> act = async () => await provClient.RegisterAsync(cts.Token);
var exception = await act.Should().ThrowAsync<ProvisioningClientException>().ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Exception: {exception}");
VerboseTestLogger.WriteLine($"Exception: {exception.And.Message}");
}
finally
{
Expand Down Expand Up @@ -790,6 +793,9 @@ public async Task DPS_Registration_AmqpWs_X509_IndividualEnrollment_InvalidGloba
string groupId = "")
{
ProvisioningClientOptions clientOptions = CreateProvisioningClientOptionsFromName(transportSettings);
// Set no retry for the provisioning client other than letting it retry infinitely, so that the
// expected ProvisioningClientException can be thrown before the cancellation token is signaled.
clientOptions.RetryPolicy = new ProvisioningClientNoRetry();
AuthenticationProvider auth = null;

try
Expand Down Expand Up @@ -817,7 +823,7 @@ public async Task DPS_Registration_AmqpWs_X509_IndividualEnrollment_InvalidGloba
Func<Task> act = async () => await provClient.RegisterAsync(cts.Token);
var exception = await act.Should().ThrowAsync<ProvisioningClientException>().ConfigureAwait(false);

VerboseTestLogger.WriteLine($"Exception: {exception}");
VerboseTestLogger.WriteLine($"Exception: {exception.And.Message}");
}
finally
{
Expand Down Expand Up @@ -858,8 +864,8 @@ public static ProvisioningClientOptions CreateProvisioningClientOptionsFromName(
if (transportSettings is IotHubClientMqttSettings)
{
return transportSettings.Protocol == IotHubClientTransportProtocol.Tcp
? new ProvisioningClientOptions(new ProvisioningClientAmqpSettings(ProvisioningClientTransportProtocol.Tcp))
: new ProvisioningClientOptions(new ProvisioningClientAmqpSettings(ProvisioningClientTransportProtocol.WebSocket));
? new ProvisioningClientOptions(new ProvisioningClientMqttSettings(ProvisioningClientTransportProtocol.Tcp))
: new ProvisioningClientOptions(new ProvisioningClientMqttSettings(ProvisioningClientTransportProtocol.WebSocket));
}

throw new NotSupportedException($"Unknown transport: '{transportSettings}'.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public void IncrementalDelayRetryPolicy_IncrementsInSteps()
retryPolicy.ShouldRetry(i, new IotHubClientException("", true), out TimeSpan retryInterval);
retryInterval.TotalSeconds.Should().Be(step.TotalSeconds * i);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public void IncrementalDelayRetryPolicy_IncrementsInSteps()
retryPolicy.ShouldRetry(i, new IotHubServiceException("") { IsTransient = true }, out TimeSpan retryInterval);
retryInterval.TotalSeconds.Should().Be(step.TotalSeconds * i);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void RetryPolicyBase_ObservesInifiniteRetries()
[TestMethod]
[DataRow(true)]
[DataRow(false)]
public void RetryPolicyBase_IotHubException_ReturnsTrueWhenTransient(bool isTransient)
public void RetryPolicyBase_IotHubServiceException_ReturnsTrueWhenTransient(bool isTransient)
{
// arrange
var retryPolicy = new IotHubServiceTestRetryPolicy(0);
Expand Down
11 changes: 11 additions & 0 deletions provisioning/device/src/ProvisioningClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ public string AdditionalUserAgentInfo
set => UserAgentInfo.Extra = value;
}

/// <summary>
/// Sets the retry policy used in the operation retries.
/// </summary>
/// <remarks>
/// Defaults to a nearly infinite exponential backoff. If set to null, will use <see cref="ProvisioningClientNoRetry"/> to perform no retries.
/// Can be set to any of the built in retry policies such as <see cref="ProvisioningClientFixedDelayRetryPolicy"/> or <see cref="ProvisioningClientIncrementalDelayRetryPolicy"/>
/// or a custom one by inheriting from <see cref="IProvisioningClientRetryPolicy"/>.
/// </remarks>
public IProvisioningClientRetryPolicy RetryPolicy { get; set; } = new ProvisioningClientExponentialBackoffRetryPolicy(0, TimeSpan.FromHours(12), true);

internal ProductInfo UserAgentInfo { get; } = new();

internal ProvisioningClientOptions Clone()
Expand All @@ -61,6 +71,7 @@ internal ProvisioningClientOptions Clone()
return new ProvisioningClientOptions(transport)
{
AdditionalUserAgentInfo = AdditionalUserAgentInfo,
RetryPolicy = RetryPolicy,
};
}
}
Expand Down
19 changes: 17 additions & 2 deletions provisioning/device/src/ProvisioningDeviceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class ProvisioningDeviceClient
private readonly AuthenticationProvider _authentication;
private readonly ProvisioningClientOptions _options;
private readonly ProvisioningTransportHandler _provisioningTransportHandler;
private readonly IProvisioningClientRetryPolicy _retryPolicy;
private readonly RetryHandler _retryHandler;

/// <summary>
/// Creates an instance of this class.
Expand Down Expand Up @@ -47,6 +49,8 @@ public class ProvisioningDeviceClient
_globalDeviceEndpoint = globalDeviceEndpoint;
_idScope = idScope;
_authentication = authenticationProvider;
_retryPolicy = _options.RetryPolicy ?? new ProvisioningClientNoRetry();
_retryHandler = new RetryHandler(_retryPolicy);

Logging.Associate(this, _authentication);
Logging.Associate(this, _options);
Expand All @@ -73,13 +77,24 @@ public Task<DeviceRegistrationResult> RegisterAsync(CancellationToken cancellati
/// </param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The registration result.</returns>
public Task<DeviceRegistrationResult> RegisterAsync(RegistrationRequestPayload data, CancellationToken cancellationToken = default)
public async Task<DeviceRegistrationResult> RegisterAsync(RegistrationRequestPayload data, CancellationToken cancellationToken = default)
{
Logging.RegisterAsync(this, _globalDeviceEndpoint, _idScope, _options, _authentication);

var request = new ProvisioningTransportRegisterRequest(_globalDeviceEndpoint, _idScope, _authentication, data?.JsonData);

return _provisioningTransportHandler.RegisterAsync(request, cancellationToken);
DeviceRegistrationResult result = null;

await _retryHandler
.RunWithRetryAsync(
async () =>
{
result = await _provisioningTransportHandler.RegisterAsync(request, cancellationToken);
},
cancellationToken)
.ConfigureAwait(false);

return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Microsoft.Azure.Devices.Provisioning.Client
{
/// <summary>
/// Represents a retry policy for the DPS device client.
/// </summary>
public interface IProvisioningClientRetryPolicy
{
/// <summary>
/// Method called by the client prior to a retry.
/// </summary>
/// <param name="currentRetryCount">The number of times the current operation has been attempted.</param>
/// <param name="lastException">The exception that caused this retry policy check.</param>
/// <param name="retryDelay">Set this to the desired delay prior to the next attempt.</param>
/// <returns>True if the operation should be retried, otherwise false.</returns>
bool ShouldRetry(uint currentRetryCount, Exception lastException, out TimeSpan retryDelay);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Microsoft.Azure.Devices.Provisioning.Client
{
/// <summary>
/// Represents a retry policy that performs a specified number of retries, using a exponential back-off scheme, with option jitter,
/// to determine the interval between retries.
/// </summary>
/// <remarks>
/// Jitter can change the delay from 95% to 105% of the calculated time.
/// </remarks>
public class ProvisioningClientExponentialBackoffRetryPolicy : ProvisioningClientRetryPolicyBase
{
// If we start with an exponent of 1 to calculate the number of millisecond delay, it starts too low and takes too long to get over 1 second.
// So we always add 6 to the retry count to start at 2^7=128 milliseconds, and exceed 1 second delay on retry #4.
private const uint MinExponent = 6u;

// Avoid integer overlow (max of 32) and clamp max delay.
private const uint MaxExponent = 32u;

private readonly TimeSpan _maxDelay;
private readonly bool _useJitter;

/// <summary>
/// Creates an instance of this class.
/// </summary>
/// <param name="maxRetries">The maximum number of retry attempts; use 0 for infinite retries.</param>
/// <param name="maxWait">The maximum amount of time to wait between retries (will not exceed ~12.43 days).</param>
/// <param name="useJitter">Whether to add a small, random adjustment to the retry delay to avoid synchronicity in clients retrying.</param>
/// <exception cref="ArgumentOutOfRangeException">Throw if the value of <paramref name="maxWait"/> is negative.</exception>
public ProvisioningClientExponentialBackoffRetryPolicy(uint maxRetries, TimeSpan maxWait, bool useJitter = true)
: base(maxRetries)
{
Argument.AssertNotNegativeValue(maxWait, nameof(maxWait));

_maxDelay = maxWait;
_useJitter = useJitter;
}

/// <summary>
/// Returns true if, based on the parameters, the operation should be retried.
/// </summary>
/// <param name="currentRetryCount">How many times the operation has been retried.</param>
/// <param name="lastException">Operation exception.</param>
/// <param name="retryInterval">Next retry should be performed after this time interval.</param>
/// <returns>True if the operation should be retried, false otherwise.</returns>
public override bool ShouldRetry(uint currentRetryCount, Exception lastException, out TimeSpan retryInterval)
{
if (!base.ShouldRetry(currentRetryCount, lastException, out retryInterval))
{
return false;
}

// Avoid integer overlow and clamp max delay.
uint exponent = currentRetryCount + MinExponent;
exponent = Math.Min(MaxExponent, exponent);

// 2 to the power of the retry count gives us exponential back-off.
double exponentialIntervalMs = Math.Pow(2.0, exponent);

double clampedWaitMs = Math.Min(exponentialIntervalMs, _maxDelay.TotalMilliseconds);

retryInterval = _useJitter
? UpdateWithJitter(clampedWaitMs)
: TimeSpan.FromMilliseconds(clampedWaitMs);

return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Microsoft.Azure.Devices.Provisioning.Client
{
/// <summary>
/// Represents a retry policy that performs a specified number of retries, using a fixed retry delay with jitter.
/// </summary>
/// <remarks>
/// Jitter can change the delay from 95% to 105% of the calculated time.
/// </remarks>
public class ProvisioningClientFixedDelayRetryPolicy : ProvisioningClientRetryPolicyBase
{
private readonly TimeSpan _fixedDelay;
private readonly bool _useJitter;

/// <summary>
/// Creates an instance of this class.
/// </summary>
/// <param name="maxRetries">The maximum number of retry attempts; use 0 for infinite retries.</param>
/// <param name="fixedDelay">The fixed delay to wait between retries.</param>
/// <param name="useJitter">Whether to add a small, random adjustment to the retry delay to avoid synchronicity in retrying clients.</param>
/// <exception cref="ArgumentOutOfRangeException">Throw if the value of <paramref name="fixedDelay"/> is negative.</exception>
public ProvisioningClientFixedDelayRetryPolicy(uint maxRetries, TimeSpan fixedDelay, bool useJitter = true)
: base(maxRetries)
{
Argument.AssertNotNegativeValue(fixedDelay, nameof(fixedDelay));

_fixedDelay = fixedDelay;
_useJitter = useJitter;
}

/// <summary>
/// Returns true if, based on the parameters, the operation should be retried.
/// </summary>
/// <param name="currentRetryCount">How many times the operation has been retried.</param>
/// <param name="lastException">Operation exception.</param>
/// <param name="retryInterval">Next retry should be performed after this time interval.</param>
/// <returns>True if the operation should be retried, false otherwise.</returns>
public override bool ShouldRetry(uint currentRetryCount, Exception lastException, out TimeSpan retryInterval)
{
if (!base.ShouldRetry(currentRetryCount, lastException, out retryInterval))
{
return false;
}

retryInterval = _useJitter
? UpdateWithJitter(_fixedDelay.TotalMilliseconds)
: _fixedDelay;

return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Microsoft.Azure.Devices.Provisioning.Client
{
/// <summary>
/// Represents a retry policy that performs a specified number of retries, using an incrementally increasing retry delay with jitter.
/// </summary>
/// <remarks>
/// Jitter can change the delay from 95% to 105% of the calculated time.
/// </remarks>
public class ProvisioningClientIncrementalDelayRetryPolicy : ProvisioningClientRetryPolicyBase
{
/// <summary>
/// Creates an instance of this class.
/// </summary>
/// <param name="maxRetries">The maximum number of retry attempts; use 0 for infinite retries.</param>
/// <param name="delayIncrement">The amount to increment the delay on each additional count of retry.</param>
/// <param name="maxDelay">The maximum amount of time to wait between retries.</param>
/// <param name="useJitter">Whether to add a small, random adjustment to the retry delay to avoid synchronicity in clients retrying.</param>
/// <exception cref="ArgumentOutOfRangeException">
/// Throw if the value of <paramref name="delayIncrement"/> or <paramref name="maxDelay"/> is negative.
/// </exception>
public ProvisioningClientIncrementalDelayRetryPolicy(uint maxRetries, TimeSpan delayIncrement, TimeSpan maxDelay, bool useJitter = true)
: base(maxRetries)
{
Argument.AssertNotNegativeValue(delayIncrement, nameof(delayIncrement));
Argument.AssertNotNegativeValue(maxDelay, nameof(maxDelay));

DelayIncrement = delayIncrement;
MaxDelay = maxDelay;
UseJitter = useJitter;
}

/// <summary>
/// The amount to increment the delay on each additional count of retry.
/// </summary>
internal protected TimeSpan DelayIncrement { get; }

/// <summary>
/// The maximum amount of time to wait between retries.
/// </summary>
internal protected TimeSpan MaxDelay { get; }

/// <summary>
/// Whether to add a small, random adjustment to the retry delay to avoid synchronicity in clients retrying.
/// </summary>
internal protected bool UseJitter { get; }

/// <summary>
/// Returns true if, based on the parameters, the operation should be retried.
/// </summary>
/// <param name="currentRetryCount">How many times the operation has been retried.</param>
/// <param name="lastException">Operation exception.</param>
/// <param name="retryInterval">Next retry should be performed after this time interval.</param>
/// <returns>True if the operation should be retried, false otherwise.</returns>
public override bool ShouldRetry(uint currentRetryCount, Exception lastException, out TimeSpan retryInterval)
{
if (!base.ShouldRetry(currentRetryCount, lastException, out retryInterval))
{
return false;
}

double waitDurationMs = Math.Min(
currentRetryCount * DelayIncrement.TotalMilliseconds,
MaxDelay.TotalMilliseconds);

retryInterval = UseJitter
? UpdateWithJitter(waitDurationMs)
: TimeSpan.FromMilliseconds(waitDurationMs);

return true;
}
}
}

0 comments on commit 6448bff

Please sign in to comment.