Skip to content

Commit

Permalink
EdgeHub: Add support for a RetryingClient to handle SDK issues (#1239) (
Browse files Browse the repository at this point in the history
#1255)

* EdgeHub: Add support for a RetryingClient to handle SDK issues (#1239)

* Add support for retrying CloudProxy

* Add Tests

* Add / fix tests

* Cleanup and fix test

* Fix build

* Update RetryingCloudProxyTest.cs

* Fix build

* Fix RetryCount usage

* Fix RetryingCloudProxy loop

* Update RetryingCloudProxy.cs

* remove env var (#1247) (#1253)

* Dispose client on getting an Object Disposed exception (#1211) (#1251)

* Kill EA on ObjDispException (#1254)

* EdgeHub: Add support for a RetryingClient to handle SDK issues (#1239)

* Add support for retrying CloudProxy

* Add Tests

* Add / fix tests

* Cleanup and fix test

* Fix build

* Update RetryingCloudProxyTest.cs

* Fix build

* Fix RetryCount usage

* Fix RetryingCloudProxy loop

* Update RetryingCloudProxy.cs

* Fix build
  • Loading branch information
varunpuranik committed May 24, 2019
1 parent bbc8d3c commit 7598ef0
Show file tree
Hide file tree
Showing 8 changed files with 607 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,14 @@ public async Task SendFeedbackMessageAsync(string messageId, FeedbackStatus feed
public Task RemoveDesiredPropertyUpdatesAsync() =>
this.EnsureCloudReceiver(nameof(this.RemoveDesiredPropertyUpdatesAsync)) ? this.cloudReceiver.RemoveDesiredPropertyUpdatesAsync() : Task.CompletedTask;

public void StartListening()
public Task StartListening()
{
if (this.EnsureCloudReceiver(nameof(this.RemoveDesiredPropertyUpdatesAsync)))
{
this.cloudReceiver.StartListening();
}

return Task.CompletedTask;
}

// This API is to be used for Tests only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,24 @@ public Option<IDeviceProxy> GetDeviceConnection(string id)
}

public async Task<Option<ICloudProxy>> GetCloudConnection(string id)
{
Try<ICloudProxy> cloudProxyTry = await this.TryGetCloudConnection(id);
return cloudProxyTry
.Ok()
.Map(c => (ICloudProxy)new RetryingCloudProxy(() => this.TryGetCloudConnection(id), c));
}

async Task<Try<ICloudProxy>> TryGetCloudConnection(string id)
{
IIdentity identity = this.identityProvider.Create(Preconditions.CheckNonWhiteSpace(id, nameof(id)));
ConnectedDevice device = this.GetOrCreateConnectedDevice(identity);

Try<ICloudConnection> cloudConnectionTry = await device.GetOrCreateCloudConnection(
c => this.cloudConnectionProvider.Connect(c.Identity, (i, status) => this.CloudConnectionStatusChangedHandler(i, status)));
c => this.cloudConnectionProvider.Connect(c.Identity, this.CloudConnectionStatusChangedHandler));

Events.GetCloudConnection(device.Identity, cloudConnectionTry);
Try<ICloudProxy> cloudProxyTry = GetCloudProxyFromCloudConnection(cloudConnectionTry, device.Identity);
return cloudProxyTry.Ok();
return cloudProxyTry;
}

public void AddSubscription(string id, DeviceSubscription deviceSubscription)
Expand Down Expand Up @@ -134,7 +142,9 @@ public async Task<Try<ICloudProxy>> CreateCloudConnectionAsync(IClientCredential
Try<ICloudConnection> newCloudConnection = await device.CreateOrUpdateCloudConnection(c => this.CreateOrUpdateCloudConnection(c, credentials));
Events.NewCloudConnection(credentials.Identity, newCloudConnection);
Try<ICloudProxy> cloudProxyTry = GetCloudProxyFromCloudConnection(newCloudConnection, credentials.Identity);
return cloudProxyTry;
return cloudProxyTry.Success
? Try.Success((ICloudProxy)new RetryingCloudProxy(() => this.TryGetCloudConnection(credentials.Identity.Id), cloudProxyTry.Value))
: cloudProxyTry;
}

// This method is not used, but it has important logic and this will be useful for offline scenarios.
Expand All @@ -150,11 +160,13 @@ public async Task<Try<ICloudProxy>> GetOrCreateCloudConnectionAsync(IClientCrede
Try<ICloudConnection> cloudConnectionTry = await device.GetOrCreateCloudConnection((c) => this.CreateOrUpdateCloudConnection(c, credentials));
Events.GetCloudConnection(credentials.Identity, cloudConnectionTry);
Try<ICloudProxy> cloudProxyTry = GetCloudProxyFromCloudConnection(cloudConnectionTry, credentials.Identity);
return cloudProxyTry;
return cloudProxyTry.Success
? Try.Success((ICloudProxy)new RetryingCloudProxy(() => this.TryGetCloudConnection(credentials.Identity.Id), cloudProxyTry.Value))
: cloudProxyTry;
}

static Try<ICloudProxy> GetCloudProxyFromCloudConnection(Try<ICloudConnection> cloudConnection, IIdentity identity) => cloudConnection.Success
? cloudConnection.Value.CloudProxy.Map(cp => Try.Success(cp))
? cloudConnection.Value.CloudProxy.Map(Try.Success)
.GetOrElse(() => Try<ICloudProxy>.Failure(new EdgeHubConnectionException($"Unable to get cloud proxy for device {identity.Id}")))
: Try<ICloudProxy>.Failure(cloudConnection.Exception);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public interface ICloudProxy

Task RemoveDesiredPropertyUpdatesAsync();

void StartListening();
Task StartListening();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Cloud
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
using Nito.AsyncEx;

public class RetryingCloudProxy : ICloudProxy
{
const int RetryCount = 3;
readonly AsyncLock cloudProxyLock = new AsyncLock();
readonly Func<Task<Try<ICloudProxy>>> cloudProxyGetter;

ICloudProxy innerCloudProxy;

public RetryingCloudProxy(Func<Task<Try<ICloudProxy>>> cloudProxyGetter, ICloudProxy cloudProxyImplementation)
{
this.cloudProxyGetter = Preconditions.CheckNotNull(cloudProxyGetter, nameof(cloudProxyGetter));
this.innerCloudProxy = Preconditions.CheckNotNull(cloudProxyImplementation, nameof(cloudProxyImplementation));
}

public bool IsActive => this.innerCloudProxy.IsActive;

internal ICloudProxy InnerCloudProxy => this.innerCloudProxy;

public Task<bool> CloseAsync() => this.ExecuteOperation(c => c.CloseAsync());

public Task<bool> OpenAsync() => this.ExecuteOperation(c => c.OpenAsync());

public Task SendMessageAsync(IMessage message) => this.ExecuteOperation(c => c.SendMessageAsync(message));

public Task SendMessageBatchAsync(IEnumerable<IMessage> inputMessages) => this.ExecuteOperation(c => c.SendMessageBatchAsync(inputMessages));

public Task UpdateReportedPropertiesAsync(IMessage reportedPropertiesMessage) => this.ExecuteOperation(c => c.UpdateReportedPropertiesAsync(reportedPropertiesMessage));

public Task<IMessage> GetTwinAsync() => this.ExecuteOperation(c => c.GetTwinAsync());

public Task SendFeedbackMessageAsync(string messageId, FeedbackStatus feedbackStatus) => this.ExecuteOperation(c => c.SendFeedbackMessageAsync(messageId, feedbackStatus));

public Task SetupCallMethodAsync() => this.ExecuteOperation(c => c.SetupCallMethodAsync());

public Task RemoveCallMethodAsync() => this.ExecuteOperation(c => c.RemoveCallMethodAsync());

public Task SetupDesiredPropertyUpdatesAsync() => this.ExecuteOperation(c => c.SetupDesiredPropertyUpdatesAsync());

public Task RemoveDesiredPropertyUpdatesAsync() => this.ExecuteOperation(c => c.RemoveDesiredPropertyUpdatesAsync());

public Task StartListening() => this.ExecuteOperation(c => c.StartListening());

Task ExecuteOperation(Func<ICloudProxy, Task> func) => this.ExecuteOperation(
async c =>
{
await func(c);
return 1;
});

async Task<T> ExecuteOperation<T>(Func<ICloudProxy, Task<T>> func)
{
int i = 0;
while (true)
{
ICloudProxy cloudProxy = await this.GetCloudProxy();
try
{
return await func(cloudProxy);
}
catch (Exception)
{
if (cloudProxy.IsActive || ++i == RetryCount)
{
throw;
}
}
}
}

async Task<ICloudProxy> GetCloudProxy()
{
if (!this.innerCloudProxy.IsActive)
{
using (await this.cloudProxyLock.LockAsync())
{
if (!this.innerCloudProxy.IsActive)
{
Try<ICloudProxy> cloudProxyTry = await this.cloudProxyGetter();
if (!cloudProxyTry.Success)
{
throw new EdgeHubIOException("Unable to create IoTHub connection", cloudProxyTry.Exception);
}

this.innerCloudProxy = cloudProxyTry.Value;
}
}
}

return this.innerCloudProxy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ public async Task CloudProxyNullReceiverTest()
ICloudProxy cloudProxy = await this.GetCloudProxyWithConnectionStringKey(deviceConnectionStringKey);

// Act/assert
// Without setting up the cloudlistener, the following methods should not throw.
// Without setting up the CloudListener, the following methods should not throw.
await cloudProxy.SetupCallMethodAsync();
await cloudProxy.RemoveCallMethodAsync();
await cloudProxy.SetupDesiredPropertyUpdatesAsync();
await cloudProxy.RemoveDesiredPropertyUpdatesAsync();
cloudProxy.StartListening();
await cloudProxy.StartListening();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public async Task TestDisableTimerOnC2DSubscription()
ICloudProxy cloudProxy = new CloudProxy(client.Object, messageConverterProvider.Object, "device1", null, cloudListener.Object, idleTimeout, true);

// Act
cloudProxy.StartListening();
await cloudProxy.StartListening();

// Assert
Assert.True(cloudProxy.IsActive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public async Task CloudConnectionTest()

returnedValue = await connectionManager.GetCloudConnection(deviceCredentials1.Identity.Id);
Assert.True(returnedValue.HasValue);
Assert.Equal(cloudProxy1.Value, returnedValue.OrDefault());
Assert.Equal(((RetryingCloudProxy)cloudProxy1.Value).InnerCloudProxy, ((RetryingCloudProxy)returnedValue.OrDefault()).InnerCloudProxy);

Try<ICloudProxy> cloudProxy2 = await connectionManager.CreateCloudConnectionAsync(deviceCredentials2);
Assert.True(cloudProxy2.Success);
Expand Down Expand Up @@ -280,9 +280,9 @@ public async Task GetOrCreateCloudProxyTest()
Assert.True(cloudProxy1.Success);
Assert.True(cloudProxy2.Success);
Assert.True(cloudProxy3.Success);
Assert.Equal(cloudProxyMock1, cloudProxy1.Value);
Assert.Equal(cloudProxyMock2, cloudProxy2.Value);
Assert.Equal(cloudProxyMock1, cloudProxy3.Value);
Assert.Equal(cloudProxyMock1, ((RetryingCloudProxy)cloudProxy1.Value).InnerCloudProxy);
Assert.Equal(cloudProxyMock2, ((RetryingCloudProxy)cloudProxy2.Value).InnerCloudProxy);
Assert.Equal(cloudProxyMock1, ((RetryingCloudProxy)cloudProxy3.Value).InnerCloudProxy);
cloudProxyProviderMock.Verify(c => c.Connect(It.IsAny<IClientCredentials>(), It.IsAny<Action<string, CloudConnectionStatus>>()), Times.Exactly(2));
}

Expand Down Expand Up @@ -328,9 +328,9 @@ public async Task CreateCloudProxyTest()
Assert.NotEqual(cloudProxies[0].Value, cloudProxies[1].Value);

Option<ICloudProxy> currentCloudProxyId1 = await connectionManager.GetCloudConnection(module1Credentials.Identity.Id);
ICloudProxy currentCloudProxy = currentCloudProxyId1.OrDefault();
ICloudProxy cloudProxy1 = cloudProxies[0].Value;
ICloudProxy cloudProxy2 = cloudProxies[1].Value;
ICloudProxy currentCloudProxy = ((RetryingCloudProxy)currentCloudProxyId1.OrDefault()).InnerCloudProxy;
ICloudProxy cloudProxy1 = ((RetryingCloudProxy)cloudProxies[0].Value).InnerCloudProxy;
ICloudProxy cloudProxy2 = ((RetryingCloudProxy)cloudProxies[1].Value).InnerCloudProxy;
Assert.True(currentCloudProxy == cloudProxy1 || currentCloudProxy == cloudProxy2);
if (currentCloudProxy == cloudProxy1)
{
Expand Down Expand Up @@ -844,9 +844,9 @@ public async Task GetMultipleCloudProxiesTest()
Assert.True(cloudProxies[1].HasValue);
Assert.True(cloudProxies[2].HasValue);
Assert.True(cloudProxies[3].HasValue);
Assert.Equal(cloudProxies[0].OrDefault(), cloudProxies[1].OrDefault());
Assert.Equal(cloudProxies[0].OrDefault(), cloudProxies[2].OrDefault());
Assert.Equal(cloudProxies[0].OrDefault(), cloudProxies[3].OrDefault());
Assert.Equal(((RetryingCloudProxy)cloudProxies[0].OrDefault()).InnerCloudProxy, ((RetryingCloudProxy)cloudProxies[1].OrDefault()).InnerCloudProxy);
Assert.Equal(((RetryingCloudProxy)cloudProxies[0].OrDefault()).InnerCloudProxy, ((RetryingCloudProxy)cloudProxies[2].OrDefault()).InnerCloudProxy);
Assert.Equal(((RetryingCloudProxy)cloudProxies[0].OrDefault()).InnerCloudProxy, ((RetryingCloudProxy)cloudProxies[3].OrDefault()).InnerCloudProxy);

// Act
await cloudProxies[0].OrDefault().CloseAsync();
Expand Down
Loading

0 comments on commit 7598ef0

Please sign in to comment.