diff --git a/src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs b/src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs
new file mode 100644
index 0000000..5492dee
--- /dev/null
+++ b/src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2022 MONAI Consortium
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Ardalis.GuardClauses;
+
+namespace Monai.Deploy.Messaging.RabbitMQ
+{
+ public class CreateChannelArguments
+ {
+ public CreateChannelArguments(
+ string hostName,
+ string password,
+ string username,
+ string virtualHost,
+ string useSSL,
+ string portNumber)
+ {
+ Guard.Against.NullOrWhiteSpace(hostName);
+ Guard.Against.NullOrWhiteSpace(password);
+ Guard.Against.NullOrWhiteSpace(username);
+ Guard.Against.NullOrWhiteSpace(virtualHost);
+ Guard.Against.NullOrWhiteSpace(useSSL);
+ Guard.Against.NullOrWhiteSpace(portNumber);
+
+ HostName = hostName;
+ Password = password;
+ Username = username;
+ VirtualHost = virtualHost;
+ UseSSL = useSSL;
+ PortNumber = portNumber;
+ }
+
+ public string HostName { get; set; }
+
+ public string Password { get; set; }
+
+ public string Username { get; set; }
+
+ public string VirtualHost { get; set; }
+
+ public string UseSSL { get; set; }
+
+ public string PortNumber { get; set; }
+ }
+}
diff --git a/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs
new file mode 100644
index 0000000..6100e3b
--- /dev/null
+++ b/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2022 MONAI Consortium
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using RabbitMQ.Client;
+
+namespace Monai.Deploy.Messaging.RabbitMQ
+{
+ public interface IRabbitMQConnectionFactory
+ {
+ ///
+ /// Creates a new channel for RabbitMQ client.
+ /// The connection factory maintains a single connection to the specified
+ /// hostName, username, password, and virtualHost combination.
+ ///
+ /// Host name
+ /// User name
+ /// Password
+ /// Virtual host
+ /// Encrypt communication
+ /// Port Number
+ /// Instance of .
+ IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber);
+
+ ///
+ /// Creates a new channel for RabbitMQ client.
+ /// The connection factory maintains a single connection to the specified
+ /// hostName, username, password, and virtualHost combination.
+ ///
+ /// Virtual host
+ /// Instance of .
+ IModel CreateChannel(CreateChannelArguments args);
+ }
+}
diff --git a/src/Plugins/RabbitMQ/RabbitMqConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
similarity index 66%
rename from src/Plugins/RabbitMQ/RabbitMqConnectionFactory.cs
rename to src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
index 03eed52..3bc295d 100644
--- a/src/Plugins/RabbitMQ/RabbitMqConnectionFactory.cs
+++ b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs
@@ -16,6 +16,7 @@
using System;
using System.Collections.Concurrent;
+using System.Globalization;
using System.Linq;
using System.Net.Security;
using System.Security.Cryptography;
@@ -23,26 +24,10 @@
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
namespace Monai.Deploy.Messaging.RabbitMQ
{
- public interface IRabbitMQConnectionFactory
- {
- ///
- /// Creates a new channel for RabbitMQ client.
- /// THe connection factory maintains a single connection to the specified
- /// hostName, username, password, and virtualHost combination.
- ///
- /// Host name
- /// User name
- /// Password
- /// Virtual host
- /// Encrypt communication
- /// Port Number
- /// Instance of .
- IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber);
- }
-
public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable
{
private readonly ConcurrentDictionary> _connectionFactoriess;
@@ -57,25 +42,27 @@ public RabbitMQConnectionFactory(ILogger logger)
_connections = new ConcurrentDictionary>();
}
+ public IModel CreateChannel(CreateChannelArguments args) =>
+ CreateChannel(args.HostName, args.Username, args.Password, args.VirtualHost,
+ args.UseSSL, args.PortNumber);
+
public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber)
{
- Guard.Against.NullOrWhiteSpace(hostName, nameof(hostName));
- Guard.Against.NullOrWhiteSpace(username, nameof(username));
- Guard.Against.NullOrWhiteSpace(password, nameof(password));
- Guard.Against.NullOrWhiteSpace(virtualHost, nameof(virtualHost));
+ Guard.Against.NullOrWhiteSpace(hostName);
+ Guard.Against.NullOrWhiteSpace(username);
+ Guard.Against.NullOrWhiteSpace(password);
+ Guard.Against.NullOrWhiteSpace(virtualHost);
var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}";
var connection = _connections.AddOrUpdate(key,
- x =>
- {
- return CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber);
- },
+ x => CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber),
(updateKey, updateConnection) =>
{
// If connection to RMQ is lost and:
// - RMQ service returns before calling the next line, then IsOpen returns false
- // - a call is made before RMQ returns, then a new connection is made with error with IsValueFaulted = true && IsValueCreated = false
+ // - a call is made before RMQ returns, then a new connection
+ // is made with error with IsValueFaulted = true && IsValueCreated = false
if (updateConnection.IsValueCreated && updateConnection.Value.IsOpen)
{
return updateConnection;
@@ -86,7 +73,40 @@ public IModel CreateChannel(string hostName, string username, string password, s
}
});
- return connection.Value.CreateModel();
+ var model = connection.Value.CreateModel();
+
+ var argsObj = new CreateChannelArguments(hostName, password, username, virtualHost, useSSL, portNumber);
+
+ model.CallbackException += (connection, args) => OnException(args, key, argsObj);
+ model.ModelShutdown += (connection, args) => OnShutdown(args, key, argsObj);
+
+ return model;
+ }
+
+ private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArguments createChannelArguments)
+ {
+ _logger.ConnectionShutdown(args.ReplyText);
+ _connections.TryRemove(key, out var value);
+
+ if (value is not null)
+ {
+ value?.Value.Dispose();
+ }
+
+ CreateChannel(createChannelArguments);
+ }
+
+ private void OnException(CallbackExceptionEventArgs args, string key, CreateChannelArguments createChannelArguments)
+ {
+ _logger.ConnectionException(args.Exception);
+ _connections.TryRemove(key, out var value);
+
+ if (value is not null)
+ {
+ value?.Value.Dispose();
+ }
+
+ CreateChannel(createChannelArguments);
}
private Lazy CreatConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber)
@@ -96,7 +116,7 @@ private Lazy CreatConnection(string hostName, string username, stri
sslEnabled = false;
}
- if (!Int32.TryParse(portNumber, out var port))
+ if (!int.TryParse(portNumber, out var port))
{
port = sslEnabled ? 5671 : 5672; // 5671 is default port for SSL/TLS , 5672 is default port for PLAIN.
}
@@ -115,18 +135,19 @@ private Lazy CreatConnection(string hostName, string username, stri
Password = password,
VirtualHost = virtualHost,
Ssl = sslOptions,
- Port = port
+ Port = port,
+ RequestedHeartbeat = TimeSpan.FromSeconds(10),
}));
- return new Lazy(() => connectionFactory.Value.CreateConnection());
+ return new Lazy(connectionFactory.Value.CreateConnection);
}
- private object HashPassword(string password)
+ private static object HashPassword(string password)
{
- Guard.Against.NullOrWhiteSpace(password, nameof(password));
+ Guard.Against.NullOrWhiteSpace(password);
var sha256 = SHA256.Create();
var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(password));
- return hash.Select(x => x.ToString("x2"));
+ return hash.Select(x => x.ToString("x2", CultureInfo.InvariantCulture));
}
protected virtual void Dispose(bool disposing)
diff --git a/src/Plugins/RabbitMQ/Logger.cs b/src/Plugins/RabbitMQ/Logger.cs
index eae8c32..5032e23 100644
--- a/src/Plugins/RabbitMQ/Logger.cs
+++ b/src/Plugins/RabbitMQ/Logger.cs
@@ -61,5 +61,11 @@ public static partial class Logger
[LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "Health check failure.")]
public static partial void HealthCheckError(this ILogger logger, Exception ex);
+
+ [LoggerMessage(EventId = 10013, Level = LogLevel.Error, Message = "RabbitMQ connection shutdown ({replyText}) attempting to reconnect.")]
+ public static partial void ConnectionShutdown(this ILogger logger, string replyText);
+
+ [LoggerMessage(EventId = 10014, Level = LogLevel.Error, Message = "RabbitMQ connection exception attempting to reconnect.")]
+ public static partial void ConnectionException(this ILogger logger, Exception ex);
}
}
diff --git a/src/Plugins/RabbitMQ/PublisherServicHealthCheckBuilder.cs b/src/Plugins/RabbitMQ/Publisher/PublisherServicHealthCheckBuilder.cs
similarity index 100%
rename from src/Plugins/RabbitMQ/PublisherServicHealthCheckBuilder.cs
rename to src/Plugins/RabbitMQ/Publisher/PublisherServicHealthCheckBuilder.cs
diff --git a/src/Plugins/RabbitMQ/PublisherServiceRegistration.cs b/src/Plugins/RabbitMQ/Publisher/PublisherServiceRegistration.cs
similarity index 100%
rename from src/Plugins/RabbitMQ/PublisherServiceRegistration.cs
rename to src/Plugins/RabbitMQ/Publisher/PublisherServiceRegistration.cs
diff --git a/src/Plugins/RabbitMQ/RabbitMqMessagePublisherService.cs b/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs
similarity index 100%
rename from src/Plugins/RabbitMQ/RabbitMqMessagePublisherService.cs
rename to src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs
diff --git a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs
similarity index 91%
rename from src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs
rename to src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs
index b6e98c0..f991d37 100644
--- a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs
+++ b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs
@@ -53,7 +53,7 @@ public RabbitMQMessageSubscriberService(IOptions logger,
IRabbitMQConnectionFactory rabbitMqConnectionFactory)
{
- Guard.Against.Null(options, nameof(options));
+ Guard.Against.Null(options);
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@@ -74,7 +74,7 @@ public RabbitMQMessageSubscriberService(IOptions configuration)
{
- Guard.Against.Null(configuration, nameof(configuration));
+ Guard.Against.Null(configuration);
foreach (var key in ConfigurationKeys.SubscriberRequiredKeys)
{
@@ -126,8 +126,8 @@ public void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0)
{
- Guard.Against.Null(topics, nameof(topics));
- Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback));
+ Guard.Against.Null(topics);
+ Guard.Against.Null(messageReceivedCallback);
var arguments = new Dictionary()
{
@@ -188,8 +188,8 @@ public void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0)
{
- Guard.Against.Null(topics, nameof(topics));
- Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback));
+ Guard.Against.Null(topics);
+ Guard.Against.Null(messageReceivedCallback);
var arguments = new Dictionary()
{
@@ -250,7 +250,7 @@ public void SubscribeAsync(string[] topics, string queue, Func