Skip to content

Commit

Permalink
Dispose all clients when network disconnection (#2291) (#2315)
Browse files Browse the repository at this point in the history
* Dispose all clients when network disconnects
  • Loading branch information
ancaantochi committed Jan 23, 2020
1 parent acf6bf3 commit 4269881
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void Dispose()
{
this.deviceConnectivityManager.DeviceConnected -= this.HandleDeviceConnectedEvent;
this.deviceConnectivityManager.DeviceDisconnected -= this.HandleDeviceDisconnectedEvent;
this.isConnected.Set(false);
this.underlyingClient?.Dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ public DeviceClientWrapper(DeviceClient deviceClient)

public Task AbandonAsync(string messageId) => this.underlyingDeviceClient.AbandonAsync(messageId);

public Task CloseAsync() => this.isActive.GetAndSet(false)
? this.underlyingDeviceClient.CloseAsync()
: Task.CompletedTask;
public Task CloseAsync()
{
if (this.isActive.GetAndSet(false))
{
this.underlyingDeviceClient?.Dispose();
}

return Task.CompletedTask;
}

public Task CompleteAsync(string messageId) => this.underlyingDeviceClient.CompleteAsync(messageId);

Expand All @@ -47,6 +53,7 @@ public async Task OpenAsync()
catch (Exception)
{
this.isActive.Set(false);
this.underlyingDeviceClient?.Dispose();
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ public ModuleClientWrapper(ModuleClient moduleClient)

public Task AbandonAsync(string messageId) => this.underlyingModuleClient.AbandonAsync(messageId);

public Task CloseAsync() => this.isActive.GetAndSet(false)
? this.underlyingModuleClient.CloseAsync()
: Task.CompletedTask;
public Task CloseAsync()
{
if (this.isActive.GetAndSet(false))
{
this.underlyingModuleClient?.Dispose();
}

return Task.CompletedTask;
}

public Task RejectAsync(string messageId) => throw new InvalidOperationException("Reject is not supported for modules.");

Expand All @@ -50,6 +56,7 @@ public async Task OpenAsync()
catch (Exception)
{
this.isActive.Set(false);
this.underlyingModuleClient?.Dispose();
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,27 @@ public class ConnectionManager : IConnectionManager
{
const int DefaultMaxClients = 101; // 100 Clients + 1 Edgehub
readonly object deviceConnLock = new object();
readonly AsyncLock connectToCloudLock = new AsyncLock();
readonly ConcurrentDictionary<string, ConnectedDevice> devices = new ConcurrentDictionary<string, ConnectedDevice>();
readonly ICloudConnectionProvider cloudConnectionProvider;
readonly int maxClients;
readonly ICredentialsCache credentialsCache;
readonly IIdentityProvider identityProvider;
readonly IDeviceConnectivityManager connectivityManager;

public ConnectionManager(
ICloudConnectionProvider cloudConnectionProvider,
ICredentialsCache credentialsCache,
IIdentityProvider identityProvider,
IDeviceConnectivityManager connectivityManager,
int maxClients = DefaultMaxClients)
{
this.cloudConnectionProvider = Preconditions.CheckNotNull(cloudConnectionProvider, nameof(cloudConnectionProvider));
this.maxClients = Preconditions.CheckRange(maxClients, 1, nameof(maxClients));
this.credentialsCache = Preconditions.CheckNotNull(credentialsCache, nameof(credentialsCache));
this.identityProvider = Preconditions.CheckNotNull(identityProvider, nameof(identityProvider));
this.connectivityManager = Preconditions.CheckNotNull(connectivityManager, nameof(connectivityManager));
this.connectivityManager.DeviceDisconnected += (o, args) => this.HandleDeviceCloudConnectionDisconnected();
Util.Metrics.MetricsV0.RegisterGaugeCallback(() => MetricsV0.SetConnectedClientCountGauge(this));
}

Expand Down Expand Up @@ -94,7 +99,7 @@ async Task<Try<ICloudProxy>> TryGetCloudConnection(string id)
ConnectedDevice device = this.GetOrCreateConnectedDevice(identity);

Try<ICloudConnection> cloudConnectionTry = await device.GetOrCreateCloudConnection(
c => this.cloudConnectionProvider.Connect(c.Identity, this.CloudConnectionStatusChangedHandler));
c => this.ConnectToCloud(c.Identity, this.CloudConnectionStatusChangedHandler));

Events.GetCloudConnection(device.Identity, cloudConnectionTry);
Try<ICloudProxy> cloudProxyTry = GetCloudProxyFromCloudConnection(cloudConnectionTry, device.Identity);
Expand Down Expand Up @@ -210,7 +215,7 @@ await device.CloudConnection.Filter(cp => cp.IsActive)
return Try<ICloudConnection>.Failure(new EdgeHubConnectionException($"Error updating identity for device {device.Identity.Id}", ex));
}
})
.GetOrElse(() => this.cloudConnectionProvider.Connect(credentials, (identity, status) => this.CloudConnectionStatusChangedHandler(identity, status)));
.GetOrElse(() => this.ConnectToCloud(credentials, this.CloudConnectionStatusChangedHandler));

async void CloudConnectionStatusChangedHandler(
string deviceId,
Expand Down Expand Up @@ -265,12 +270,6 @@ await device.CloudConnection.Filter(cp => cp.IsActive)
case CloudConnectionStatus.Disconnected:
Events.InvokingCloudConnectionLostEvent(device.Identity);
this.CloudConnectionLost?.Invoke(this, device.Identity);
await device.CloudConnection.Filter(cp => cp.IsActive).ForEachAsync(
cp =>
{
Events.CloudConnectionLostClosingClient(device.Identity);
return cp.CloseAsync();
});
break;

case CloudConnectionStatus.ConnectionEstablished:
Expand All @@ -280,6 +279,24 @@ await device.CloudConnection.Filter(cp => cp.IsActive)
}
}

async void HandleDeviceCloudConnectionDisconnected()
{
using (await this.connectToCloudLock.LockAsync())
{
KeyValuePair<string, ConnectedDevice>[] snapshot = this.devices.ToArray();
Events.CloudConnectionLostClosingAllClients();
foreach (var item in snapshot)
{
await item.Value.CloudConnection.Filter(cp => cp.IsActive).ForEachAsync(
cp =>
{
Events.CloudConnectionLostClosingClient(item.Value.Identity);
return cp.CloseAsync();
});
}
}
}

ConnectedDevice GetOrCreateConnectedDevice(IIdentity identity)
{
string deviceId = Preconditions.CheckNotNull(identity, nameof(identity)).Id;
Expand Down Expand Up @@ -311,6 +328,22 @@ ConnectedDevice CreateNewConnectedDevice(IIdentity identity)
}
}

async Task<Try<ICloudConnection>> ConnectToCloud(IIdentity identity, Action<string, CloudConnectionStatus> connectionStatusChangedHandler)
{
using (await this.connectToCloudLock.LockAsync())
{
return await this.cloudConnectionProvider.Connect(identity, connectionStatusChangedHandler);
}
}

async Task<Try<ICloudConnection>> ConnectToCloud(IClientCredentials credentials, Action<string, CloudConnectionStatus> connectionStatusChangedHandler)
{
using (await this.connectToCloudLock.LockAsync())
{
return await this.cloudConnectionProvider.Connect(credentials, connectionStatusChangedHandler);
}
}

class ConnectedDevice
{
// Device Proxy methods are sync coming from the Protocol gateway,
Expand Down Expand Up @@ -440,7 +473,8 @@ enum EventIds
InvokingCloudConnectionLostEvent,
InvokingCloudConnectionEstablishedEvent,
HandlingConnectionStatusChangedHandler,
CloudConnectionLostClosingClient
CloudConnectionLostClosingClient,
CloudConnectionLostClosingAllClients
}

public static void NewCloudConnection(IIdentity identity, Try<ICloudConnection> cloudConnection)
Expand Down Expand Up @@ -501,6 +535,11 @@ internal static void GetCloudConnection(IIdentity identity, Try<ICloudConnection
Log.LogInformation((int)EventIds.ObtainCloudConnectionError, cloudConnection.Exception, Invariant($"Error getting cloud connection for device {identity.Id}"));
}
}

public static void CloudConnectionLostClosingAllClients()
{
Log.LogDebug((int)EventIds.CloudConnectionLostClosingAllClients, Invariant($"Cloud connection lost, closing all clients."));
}
}

static class MetricsV0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,14 @@ protected override void Load(ContainerBuilder builder)
var cloudConnectionProviderTask = c.Resolve<Task<ICloudConnectionProvider>>();
var credentialsCacheTask = c.Resolve<Task<ICredentialsCache>>();
var identityProvider = c.Resolve<IIdentityProvider>();
var deviceConnectivityManager = c.Resolve<IDeviceConnectivityManager>();
ICloudConnectionProvider cloudConnectionProvider = await cloudConnectionProviderTask;
ICredentialsCache credentialsCache = await credentialsCacheTask;
IConnectionManager connectionManager = new ConnectionManager(
cloudConnectionProvider,
credentialsCache,
identityProvider,
deviceConnectivityManager,
this.maxConnectedClients);
return connectionManager;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ IClient GetMockedDeviceClient()
Option.None<IWebProxy>(),
productInfoStore);
cloudConnectionProvider.BindEdgeHub(Mock.Of<IEdgeHub>());
IConnectionManager connectionManager = new ConnectionManager(cloudConnectionProvider, Mock.Of<ICredentialsCache>(), new IdentityProvider(hostname));
var deviceConnectivityManager = Mock.Of<IDeviceConnectivityManager>();
IConnectionManager connectionManager = new ConnectionManager(cloudConnectionProvider, Mock.Of<ICredentialsCache>(), new IdentityProvider(hostname), deviceConnectivityManager);

ITokenCredentials clientCredentials1 = GetClientCredentials(TimeSpan.FromSeconds(10));
Try<ICloudProxy> cloudProxyTry1 = await connectionManager.CreateCloudConnectionAsync(clientCredentials1);
Expand Down
Loading

0 comments on commit 4269881

Please sign in to comment.