Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions doc/dependency_decisions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
- :who: mocsharp
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.0/LICENSE)
:versions:
- 17.3.2
- 17.4.0
:when: 2022-08-16 21:39:37.382080790 Z
- - :approve
- Microsoft.Extensions.Diagnostics.HealthChecks
Expand Down Expand Up @@ -221,7 +221,7 @@
- :who: mocsharp
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.1/LICENSE)
:versions:
- 17.3.2
- 17.4.0
:when: 2022-08-16 21:39:48.253593534 Z
- - :approve
- Microsoft.NETCore.Platforms
Expand All @@ -242,14 +242,14 @@
- :who: mocsharp
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.1/LICENSE)
:versions:
- 17.3.2
- 17.4.0
:when: 2022-08-16 21:39:49.547958989 Z
- - :approve
- Microsoft.TestPlatform.TestHost
- :who: mocsharp
:why: MIT (https://github.com/microsoft/vstest/raw/v17.3.1/LICENSE)
:versions:
- 17.3.2
- 17.4.0
:when: 2022-08-16 21:39:49.963749572 Z
- - :approve
- Microsoft.Win32.Primitives
Expand Down Expand Up @@ -944,3 +944,10 @@
:versions:
- 2.4.5
:when: 2022-08-16 21:40:32.294717110 Z
- - :approve
- Polly
- :who: mocsharp
:why: MIT ( https://licenses.nuget.org/MIT)
:versions:
- 7.2.3
:when: 2022-11-09 18:57:32.294717110 Z
29 changes: 29 additions & 0 deletions src/Messaging/API/ConnectionErrorArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2021-2022 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using RabbitMQ.Client;

namespace Monai.Deploy.Messaging.API
{
public delegate void ConnectionErrorHandler(object? sender, ConnectionErrorArgs args);

public class ConnectionErrorArgs
{
public ConnectionErrorArgs(ShutdownEventArgs eventArgs) => ShutdownEventArguments = eventArgs ?? throw new ArgumentNullException(nameof(eventArgs));

public ShutdownEventArgs ShutdownEventArguments { get; }
}
}
5 changes: 5 additions & 0 deletions src/Messaging/API/IMessageBrokerSubscriberService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ namespace Monai.Deploy.Messaging.API
{
public interface IMessageBrokerSubscriberService : IDisposable
{
/// <summary>
/// Gets or sets the event delegate for client to handle connection errors.
/// </summary>
event ConnectionErrorHandler? OnConnectionError;

/// <summary>
/// Gets or sets the name of the storage service.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/Messaging/Tests/IServiceCollectionExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ internal class GoodSubscriberService : IMessageBrokerSubscriberService
{
public string Name => throw new NotImplementedException();

public event ConnectionErrorHandler? OnConnectionError;

public void Acknowledge(MessageBase message) => throw new NotImplementedException();

public void Dispose() => throw new NotImplementedException();
Expand Down
2 changes: 1 addition & 1 deletion src/Messaging/Tests/Monai.Deploy.Messaging.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageReference Include="Moq" Version="4.18.2" />
<PackageReference Include="System.IO.Abstractions.TestingHelpers" Version="17.2.3" />
<PackageReference Include="xunit" Version="2.4.2" />
Expand Down
56 changes: 0 additions & 56 deletions src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs

This file was deleted.

9 changes: 0 additions & 9 deletions src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,5 @@ public interface IRabbitMQConnectionFactory
/// <param name="portNumber">Port Number</param>
/// <returns>Instance of <see cref="IModel"/>.</returns>
IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber);

/// <summary>
/// Creates a new channel for RabbitMQ client.
/// The connection factory maintains a single connection to the specified
/// <c>hostName</c>, <c>username</c>, <c>password</c>, and <c>virtualHost</c> combination.
/// </summary>
/// <param name="virtualHost">Virtual host</param>
/// <returns>Instance of <see cref="IModel"/>.</returns>
IModel CreateChannel(CreateChannelArguments args);
}
}
164 changes: 29 additions & 135 deletions src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Net.Security;
Expand All @@ -30,21 +31,18 @@ namespace Monai.Deploy.Messaging.RabbitMQ
{
public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable
{
private readonly ConcurrentDictionary<string, Lazy<ConnectionFactory>> _connectionFactoriess = new();
private readonly ConcurrentDictionary<string, Lazy<IConnection>> _connections = new();
private readonly ConcurrentDictionary<string, Lazy<IModel>> _models = new();

private readonly ConcurrentDictionary<string, Lazy<ConnectionFactory>> _connectionFactoriess;
private readonly ConcurrentDictionary<string, Lazy<IConnection>> _connections;
private readonly ILogger<RabbitMQConnectionFactory> _logger;
private bool _disposedValue;

public RabbitMQConnectionFactory(ILogger<RabbitMQConnectionFactory> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_connectionFactoriess = new ConcurrentDictionary<string, Lazy<ConnectionFactory>>();
_connections = new ConcurrentDictionary<string, Lazy<IConnection>>();
}

public IModel CreateChannel(CreateChannelArguments args) =>
CreateChannel(args.HostName, args.Username, args.Password, args.VirtualHost,
args.UseSSL, args.PortNumber);

public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber)
{
Expand All @@ -55,11 +53,6 @@ public IModel CreateChannel(string hostName, string username, string password, s

var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}";

if (ConnectionIsOpen(key, out var value))
{
return value.Value;
}

var connection = _connections.AddOrUpdate(key,
x => CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber),
(updateKey, updateConnection) =>
Expand All @@ -68,7 +61,7 @@ public IModel CreateChannel(string hostName, string username, string password, s
// - RMQ service returns before calling the next line, then IsOpen returns false
// - a call is made before RMQ returns, then a new connection
// is made with error with IsValueFaulted = true && IsValueCreated = false
if (updateConnection.IsValueCreated)
if (updateConnection.IsValueCreated && updateConnection.Value.IsOpen)
{
return updateConnection;
}
Expand All @@ -78,42 +71,34 @@ public IModel CreateChannel(string hostName, string username, string password, s
}
});

var argsObj = new CreateChannelArguments(hostName, password, username, virtualHost, useSSL, portNumber);
connection.Value.ConnectionShutdown += (connection, args) => OnShutdown(args, key, argsObj);
connection.Value.CallbackException += (connection, args) => OnException(args, key, argsObj);
connection.Value.ConnectionShutdown += (sender, args) => ConnectionShutdown(args, connection.Value, key);
connection.Value.CallbackException += (sender, args) => OnException(args, connection.Value, key);

var model = _models.AddOrUpdate(key,
x =>
{
var model = CreateModelAndAttachEvents(key, connection, argsObj);
return new Lazy<IModel>(model);
},
(updateKey, updateModel) =>
{
// If connection to RMQ is lost and:
// - RMQ service returns before calling the next line, then IsOpen returns false
// - a call is made before RMQ returns, then a new connection
// is made with error with IsValueFaulted = true && IsValueCreated = false
if (updateModel.IsValueCreated)
{
return updateModel;
}
else
{
var model = CreateModelAndAttachEvents(key, connection, argsObj);
return new Lazy<IModel>(model);
}
});
var model = connection.Value.CreateModel();
model.CallbackException += (sender, args) => OnException(args, connection.Value, key);
model.ModelShutdown += (sender, args) => ConnectionShutdown(args, connection.Value, key);

return model.Value;
return model;
}

private IModel CreateModelAndAttachEvents(string key, Lazy<IConnection> connection, CreateChannelArguments argsObj)
private void ConnectionShutdown(ShutdownEventArgs args, IConnection value, string key)
{
var model = connection.Value.CreateModel();
model.ModelShutdown += (connection, args) => OnShutdown(args, key, argsObj);
model.CallbackException += (connection, args) => OnException(args, key, argsObj);
return model;
_logger.ConnectionShutdown(args.ToString());

if (_connections.ContainsKey(key) && !value.IsOpen)
{
_connections.Remove(key, out _);
}
}

private void OnException(CallbackExceptionEventArgs args, IConnection value, string key)
{
_logger.ConnectionException(args.Exception);

if (_connections.ContainsKey(key) && !value.IsOpen)
{
_connections.Remove(key, out _);
}
}

private Lazy<IConnection> CreatConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber)
Expand Down Expand Up @@ -157,97 +142,6 @@ private static object HashPassword(string password)
return hash.Select(x => x.ToString("x2", CultureInfo.InvariantCulture));
}

private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArguments createChannelArguments)
{
_logger.ConnectionShutdown(args.ReplyText);

if (ConnectionIsOpen(key, out var _))
{
return;
}

_logger.ConnectionReconnect();
_connections.TryRemove(key, out var value);

if (value is not null)
{
value?.Value.Dispose();
}

CreateChannel(createChannelArguments);
}

private void OnException(CallbackExceptionEventArgs args, string key, CreateChannelArguments createChannelArguments)
{
_logger.ConnectionException(args.Exception);

if (ConnectionIsOpen(key, out var _))
{
return;
}

_logger.ConnectionReconnect();
CreateChannel(createChannelArguments);
}

/// <summary>
/// Checks if we have a connection and it is open on both channel/model and connection.
/// </summary>
/// <param name="key">Lookup Key</param>
/// <param name="model">IModel</param>
/// <returns>If this function returns true output param model will have the value.</returns>
private bool ConnectionIsOpen(string key, out Lazy<IModel> outModel)
{
outModel = new Lazy<IModel>();

_models.TryGetValue(key, out var model);
_connections.TryGetValue(key, out var connection);

if (model is null || connection is null)
{
return false;
}

outModel = model;
if (model.IsValueCreated == false || connection.IsValueCreated == false)
{
return false;
}

if (connection.Value.IsOpen == false)
{
RemoveConnection(key);
RemoveModel(key);
return false;
}

if (model.Value.IsOpen == false)
{
RemoveModel(key);
return false;
}

return true;
}

private void RemoveConnection(string key)
{
_connections.TryRemove(key, out var conn);
if (conn is not null)
{
conn.Value.Dispose();
}
}

private void RemoveModel(string key)
{
_models.TryRemove(key, out var mod);
if (mod is not null)
{
mod.Value.Dispose();
}
}

protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
Expand Down
Loading