Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 131 additions & 33 deletions src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ namespace Monai.Deploy.Messaging.RabbitMQ
{
public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable
{
private readonly ConcurrentDictionary<string, Lazy<ConnectionFactory>> _connectionFactoriess;
private readonly ConcurrentDictionary<string, Lazy<IConnection>> _connections;
private readonly ConcurrentDictionary<string, Lazy<ConnectionFactory>> _connectionFactoriess = new();
private readonly ConcurrentDictionary<string, Lazy<IConnection>> _connections = new();
private readonly ConcurrentDictionary<string, Lazy<IModel>> _models = new();

private readonly ILogger<RabbitMQConnectionFactory> _logger;
private bool _disposedValue;

public RabbitMQConnectionFactory(ILogger<RabbitMQConnectionFactory> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_connectionFactoriess = new ConcurrentDictionary<string, Lazy<ConnectionFactory>>();
_connections = new ConcurrentDictionary<string, Lazy<IConnection>>();
}

public IModel CreateChannel(CreateChannelArguments args) =>
Expand All @@ -55,6 +55,11 @@ public IModel CreateChannel(string hostName, string username, string password, s

var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}";

if (ConnectionIsOpen(key, out var value))
{
return value.Value;
}

var connection = _connections.AddOrUpdate(key,
x => CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber),
(updateKey, updateConnection) =>
Expand All @@ -63,7 +68,7 @@ public IModel CreateChannel(string hostName, string username, string password, s
// - RMQ service returns before calling the next line, then IsOpen returns false
// - a call is made before RMQ returns, then a new connection
// is made with error with IsValueFaulted = true && IsValueCreated = false
if (updateConnection.IsValueCreated && updateConnection.Value.IsOpen)
if (updateConnection.IsValueCreated)
{
return updateConnection;
}
Expand All @@ -73,40 +78,42 @@ public IModel CreateChannel(string hostName, string username, string password, s
}
});

var model = connection.Value.CreateModel();

var argsObj = new CreateChannelArguments(hostName, password, username, virtualHost, useSSL, portNumber);
connection.Value.ConnectionShutdown += (connection, args) => OnShutdown(args, key, argsObj);
connection.Value.CallbackException += (connection, args) => OnException(args, key, argsObj);

model.CallbackException += (connection, args) => OnException(args, key, argsObj);
model.ModelShutdown += (connection, args) => OnShutdown(args, key, argsObj);

return model;
}

private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArguments createChannelArguments)
{
_logger.ConnectionShutdown(args.ReplyText);
_connections.TryRemove(key, out var value);

if (value is not null)
{
value?.Value.Dispose();
}
var model = _models.AddOrUpdate(key,
x =>
{
var model = CreateModelAndAttachEvents(key, connection, argsObj);
return new Lazy<IModel>(model);
},
(updateKey, updateModel) =>
{
// If connection to RMQ is lost and:
// - RMQ service returns before calling the next line, then IsOpen returns false
// - a call is made before RMQ returns, then a new connection
// is made with error with IsValueFaulted = true && IsValueCreated = false
if (updateModel.IsValueCreated)
{
return updateModel;
}
else
{
var model = CreateModelAndAttachEvents(key, connection, argsObj);
return new Lazy<IModel>(model);
}
});

CreateChannel(createChannelArguments);
return model.Value;
}

private void OnException(CallbackExceptionEventArgs args, string key, CreateChannelArguments createChannelArguments)
private IModel CreateModelAndAttachEvents(string key, Lazy<IConnection> connection, CreateChannelArguments argsObj)
{
_logger.ConnectionException(args.Exception);
_connections.TryRemove(key, out var value);

if (value is not null)
{
value?.Value.Dispose();
}

CreateChannel(createChannelArguments);
var model = connection.Value.CreateModel();
model.ModelShutdown += (connection, args) => OnShutdown(args, key, argsObj);
model.CallbackException += (connection, args) => OnException(args, key, argsObj);
return model;
}

private Lazy<IConnection> CreatConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber)
Expand Down Expand Up @@ -150,6 +157,97 @@ private static object HashPassword(string password)
return hash.Select(x => x.ToString("x2", CultureInfo.InvariantCulture));
}

private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArguments createChannelArguments)
{
_logger.ConnectionShutdown(args.ReplyText);

if (ConnectionIsOpen(key, out var _))
{
return;
}

_logger.ConnectionReconnect();
_connections.TryRemove(key, out var value);

if (value is not null)
{
value?.Value.Dispose();
}

CreateChannel(createChannelArguments);
}

private void OnException(CallbackExceptionEventArgs args, string key, CreateChannelArguments createChannelArguments)
{
_logger.ConnectionException(args.Exception);

if (ConnectionIsOpen(key, out var _))
{
return;
}

_logger.ConnectionReconnect();
CreateChannel(createChannelArguments);
}

/// <summary>
/// Checks if we have a connection and it is open on both channel/model and connection.
/// </summary>
/// <param name="key">Lookup Key</param>
/// <param name="model">IModel</param>
/// <returns>If this function returns true output param model will have the value.</returns>
private bool ConnectionIsOpen(string key, out Lazy<IModel> outModel)
{
outModel = new Lazy<IModel>();

_models.TryGetValue(key, out var model);
_connections.TryGetValue(key, out var connection);

if (model is null || connection is null)
{
return false;
}

outModel = model;
if (model.IsValueCreated == false || connection.IsValueCreated == false)
{
return false;
}

if (connection.Value.IsOpen == false)
{
RemoveConnection(key);
RemoveModel(key);
return false;
}

if (model.Value.IsOpen == false)
{
RemoveModel(key);
return false;
}

return true;
}

private void RemoveConnection(string key)
{
_connections.TryRemove(key, out var conn);
if (conn is not null)
{
conn.Value.Dispose();
}
}

private void RemoveModel(string key)
{
_models.TryRemove(key, out var mod);
if (mod is not null)
{
mod.Value.Dispose();
}
}

protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
Expand Down
7 changes: 5 additions & 2 deletions src/Plugins/RabbitMQ/Logger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ public static partial class Logger
[LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "Health check failure.")]
public static partial void HealthCheckError(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 10013, Level = LogLevel.Error, Message = "RabbitMQ connection shutdown ({replyText}) attempting to reconnect.")]
[LoggerMessage(EventId = 10013, Level = LogLevel.Error, Message = "RabbitMQ connection shutdown ({replyText})")]
public static partial void ConnectionShutdown(this ILogger logger, string replyText);

[LoggerMessage(EventId = 10014, Level = LogLevel.Error, Message = "RabbitMQ connection exception attempting to reconnect.")]
[LoggerMessage(EventId = 10014, Level = LogLevel.Error, Message = "RabbitMQ connection exception.")]
public static partial void ConnectionException(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 10015, Level = LogLevel.Debug, Message = "RabbitMQ attempting to reconnect.")]
public static partial void ConnectionReconnect(this ILogger logger);
}
}