From a838fe478f5e37cc335359df833ba91a62f34bd3 Mon Sep 17 00:00:00 2001 From: Lillie Dae Date: Fri, 4 Nov 2022 14:18:39 +0000 Subject: [PATCH 1/4] connection resiliency and improvements Signed-off-by: Lillie Dae --- src/Plugins/RabbitMQ/Logger.cs | 65 ---- .../PublisherServicHealthCheckBuilder.cs | 50 --- .../RabbitMQ/PublisherServiceRegistration.cs | 31 -- .../RabbitMQ/RabbitMqConnectionFactory.cs | 158 -------- .../RabbitMqMessagePublisherService.cs | 153 -------- .../RabbitMqMessageSubscriberService.cs | 346 ------------------ .../SubscriberServicHealthCheckBuilder.cs | 50 --- .../RabbitMQ/SubscriberServiceRegistration.cs | 31 -- 8 files changed, 884 deletions(-) delete mode 100644 src/Plugins/RabbitMQ/Logger.cs delete mode 100644 src/Plugins/RabbitMQ/PublisherServicHealthCheckBuilder.cs delete mode 100644 src/Plugins/RabbitMQ/PublisherServiceRegistration.cs delete mode 100644 src/Plugins/RabbitMQ/RabbitMqConnectionFactory.cs delete mode 100644 src/Plugins/RabbitMQ/RabbitMqMessagePublisherService.cs delete mode 100644 src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs delete mode 100644 src/Plugins/RabbitMQ/SubscriberServicHealthCheckBuilder.cs delete mode 100644 src/Plugins/RabbitMQ/SubscriberServiceRegistration.cs diff --git a/src/Plugins/RabbitMQ/Logger.cs b/src/Plugins/RabbitMQ/Logger.cs deleted file mode 100644 index e187145..0000000 --- a/src/Plugins/RabbitMQ/Logger.cs +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using Microsoft.Extensions.Logging; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public static partial class Logger - { - internal static readonly string LoggingScopeMessageApplication = "Message ID={0}. Application ID={1}."; - - [LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message to {endpoint}/{virtualHost}. Exchange={exchange}, Routing Key={topic}.")] - public static partial void PublshingRabbitMQ(this ILogger logger, string endpoint, string virtualHost, string exchange, string topic); - - [LoggerMessage(EventId = 10001, Level = LogLevel.Information, Message = "{ServiceName} connecting to {endpoint}/{virtualHost}.")] - public static partial void ConnectingToRabbitMQ(this ILogger logger, string serviceNAme, string endpoint, string virtualHost); - - [LoggerMessage(EventId = 10002, Level = LogLevel.Information, Message = "Message received from queue {queue} for {topic}.")] - public static partial void MessageReceivedFromQueue(this ILogger logger, string queue, string topic); - - [LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}/{virtualHost}. Exchange={exchange}, Queue={queue}, Routing Key={topic}.")] - public static partial void SubscribeToRabbitMQQueue(this ILogger logger, string endpoint, string virtualHost, string exchange, string queue, string topic); - - [LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")] - public static partial void SendingAcknowledgement(this ILogger logger, string messageId); - - [LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}.")] - public static partial void AcknowledgementSent(this ILogger logger, string messageId); - - [LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")] - public static partial void SendingNAcknowledgement(this ILogger logger, string messageId); - - [LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")] - public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue); - - [LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")] - public static partial void ClosingConnections(this ILogger logger); - - [LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Topic={topic}, Message ID={messageId}.")] - public static partial void InvalidMessage(this ILogger logger, string queueName, string topic, string messageId, Exception ex); - - [LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Topic={topic}, Message ID={messageId}.")] - public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string topic, string messageId, Exception ex); - - [LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Exception thrown: Message ID={messageId}.")] - public static partial void Exception(this ILogger logger, string messageId, Exception ex); - - [LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "Health check failure.")] - public static partial void HealthCheckError(this ILogger logger, Exception ex); - } -} diff --git a/src/Plugins/RabbitMQ/PublisherServicHealthCheckBuilder.cs b/src/Plugins/RabbitMQ/PublisherServicHealthCheckBuilder.cs deleted file mode 100644 index 0e65c86..0000000 --- a/src/Plugins/RabbitMQ/PublisherServicHealthCheckBuilder.cs +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Generic; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Diagnostics.HealthChecks; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Monai.Deploy.Messaging.Configuration; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public class PublisherServicHealthCheckBuilder : PublisherServiceHealthCheckRegistrationBase - { - public override IHealthChecksBuilder Configure( - IHealthChecksBuilder builder, - HealthStatus? failureStatus = null, - IEnumerable? tags = null, - TimeSpan? timeout = null) - { - builder.Add(new HealthCheckRegistration( - ConfigurationKeys.PublisherServiceName, - serviceProvider => - { - var options = serviceProvider.GetRequiredService>(); - var logger = serviceProvider.GetRequiredService>(); - var connectionFactory = serviceProvider.GetRequiredService(); - return new RabbitMQHealthCheck(connectionFactory, options.Value.PublisherSettings, logger, RabbitMQMessagePublisherService.ValidateConfiguration); - }, - failureStatus, - tags, - timeout)); - return builder; - } - } -} diff --git a/src/Plugins/RabbitMQ/PublisherServiceRegistration.cs b/src/Plugins/RabbitMQ/PublisherServiceRegistration.cs deleted file mode 100644 index 82eb40a..0000000 --- a/src/Plugins/RabbitMQ/PublisherServiceRegistration.cs +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using Microsoft.Extensions.DependencyInjection; -using Monai.Deploy.Messaging.API; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public class PublisherServiceRegistration : PublisherServiceRegistrationBase - { - public override IServiceCollection Configure(IServiceCollection services) - { - return services - .AddSingleton() - .AddSingleton(); - } - } -} diff --git a/src/Plugins/RabbitMQ/RabbitMqConnectionFactory.cs b/src/Plugins/RabbitMQ/RabbitMqConnectionFactory.cs deleted file mode 100644 index 03eed52..0000000 --- a/src/Plugins/RabbitMQ/RabbitMqConnectionFactory.cs +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright 2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Net.Security; -using System.Security.Cryptography; -using System.Text; -using Ardalis.GuardClauses; -using Microsoft.Extensions.Logging; -using RabbitMQ.Client; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public interface IRabbitMQConnectionFactory - { - /// - /// Creates a new channel for RabbitMQ client. - /// THe connection factory maintains a single connection to the specified - /// hostName, username, password, and virtualHost combination. - /// - /// Host name - /// User name - /// Password - /// Virtual host - /// Encrypt communication - /// Port Number - /// Instance of . - IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber); - } - - public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable - { - private readonly ConcurrentDictionary> _connectionFactoriess; - private readonly ConcurrentDictionary> _connections; - private readonly ILogger _logger; - private bool _disposedValue; - - public RabbitMQConnectionFactory(ILogger logger) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _connectionFactoriess = new ConcurrentDictionary>(); - _connections = new ConcurrentDictionary>(); - } - - public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber) - { - Guard.Against.NullOrWhiteSpace(hostName, nameof(hostName)); - Guard.Against.NullOrWhiteSpace(username, nameof(username)); - Guard.Against.NullOrWhiteSpace(password, nameof(password)); - Guard.Against.NullOrWhiteSpace(virtualHost, nameof(virtualHost)); - - var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}"; - - var connection = _connections.AddOrUpdate(key, - x => - { - return CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber); - }, - (updateKey, updateConnection) => - { - // If connection to RMQ is lost and: - // - RMQ service returns before calling the next line, then IsOpen returns false - // - a call is made before RMQ returns, then a new connection is made with error with IsValueFaulted = true && IsValueCreated = false - if (updateConnection.IsValueCreated && updateConnection.Value.IsOpen) - { - return updateConnection; - } - else - { - return CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber); - } - }); - - return connection.Value.CreateModel(); - } - - private Lazy CreatConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber) - { - if (!bool.TryParse(useSSL, out var sslEnabled)) - { - sslEnabled = false; - } - - if (!Int32.TryParse(portNumber, out var port)) - { - port = sslEnabled ? 5671 : 5672; // 5671 is default port for SSL/TLS , 5672 is default port for PLAIN. - } - - var sslOptions = new SslOption - { - Enabled = sslEnabled, - ServerName = hostName, - AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors | SslPolicyErrors.RemoteCertificateNotAvailable - }; - - var connectionFactory = _connectionFactoriess.GetOrAdd(key, y => new Lazy(() => new ConnectionFactory() - { - HostName = hostName, - UserName = username, - Password = password, - VirtualHost = virtualHost, - Ssl = sslOptions, - Port = port - })); - - return new Lazy(() => connectionFactory.Value.CreateConnection()); - } - - private object HashPassword(string password) - { - Guard.Against.NullOrWhiteSpace(password, nameof(password)); - var sha256 = SHA256.Create(); - var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(password)); - return hash.Select(x => x.ToString("x2")); - } - - protected virtual void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - _logger.ClosingConnections(); - foreach (var connection in _connections.Values) - { - connection.Value.Close(); - } - _connections.Clear(); - _connectionFactoriess.Clear(); - } - - _disposedValue = true; - } - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - } -} diff --git a/src/Plugins/RabbitMQ/RabbitMqMessagePublisherService.cs b/src/Plugins/RabbitMQ/RabbitMqMessagePublisherService.cs deleted file mode 100644 index 8139b0f..0000000 --- a/src/Plugins/RabbitMQ/RabbitMqMessagePublisherService.cs +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2021-2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Generic; -using System.Diagnostics.Tracing; -using System.Globalization; -using System.Threading.Tasks; -using Ardalis.GuardClauses; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Monai.Deploy.Messaging.API; -using Monai.Deploy.Messaging.Configuration; -using Monai.Deploy.Messaging.Messages; -using RabbitMQ.Client; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public class RabbitMQMessagePublisherService : IMessageBrokerPublisherService - { - private const int PersistentDeliveryMode = 2; - - private readonly ILogger _logger; - private readonly IRabbitMQConnectionFactory _rabbitMqConnectionFactory; - private readonly string _endpoint; - private readonly string _username; - private readonly string _password; - private readonly string _virtualHost; - private readonly string _exchange; - private readonly string _useSSL; - private readonly string _portNumber; - private bool _disposedValue; - - public string Name => ConfigurationKeys.PublisherServiceName; - - public RabbitMQMessagePublisherService(IOptions options, - ILogger logger, - IRabbitMQConnectionFactory rabbitMqConnectionFactory) - { - Guard.Against.Null(options, nameof(options)); - - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _rabbitMqConnectionFactory = rabbitMqConnectionFactory ?? throw new ArgumentNullException(nameof(rabbitMqConnectionFactory)); - - var configuration = options.Value; - ValidateConfiguration(configuration.PublisherSettings); - _endpoint = configuration.PublisherSettings[ConfigurationKeys.EndPoint]; - _username = configuration.PublisherSettings[ConfigurationKeys.Username]; - _password = configuration.PublisherSettings[ConfigurationKeys.Password]; - _virtualHost = configuration.PublisherSettings[ConfigurationKeys.VirtualHost]; - _exchange = configuration.PublisherSettings[ConfigurationKeys.Exchange]; - - if (configuration.PublisherSettings.ContainsKey(ConfigurationKeys.UseSSL)) - { - _useSSL = configuration.PublisherSettings[ConfigurationKeys.UseSSL]; - } - else - { - _useSSL = String.Empty; - } - - if (configuration.PublisherSettings.ContainsKey(ConfigurationKeys.Port)) - { - _portNumber = configuration.PublisherSettings[ConfigurationKeys.Port]; - } - else - { - _portNumber = String.Empty; - } - } - - internal static void ValidateConfiguration(Dictionary configuration) - { - Guard.Against.Null(configuration, nameof(configuration)); - - foreach (var key in ConfigurationKeys.PublisherRequiredKeys) - { - if (!configuration.ContainsKey(key)) - { - throw new ConfigurationException($"{ConfigurationKeys.PublisherServiceName} is missing configuration for {key}."); - } - } - } - - public Task Publish(string topic, Message message) - { - Guard.Against.NullOrWhiteSpace(topic, nameof(topic)); - Guard.Against.Null(message, nameof(message)); - - using var loggingScope = _logger.BeginScope(new Dictionary - { - ["MessageId"] = message.MessageId, - ["ApplicationId"] = message.ApplicationId, - ["CorrelationId"] = message.CorrelationId - }); - - _logger.PublshingRabbitMQ(_endpoint, _virtualHost, _exchange, topic); - - using var channel = _rabbitMqConnectionFactory.CreateChannel(_endpoint, _username, _password, _virtualHost, _useSSL, _portNumber); - channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); - - var properties = channel.CreateBasicProperties(); - properties.Persistent = true; - properties.ContentType = message.ContentType; - properties.MessageId = message.MessageId; - properties.AppId = message.ApplicationId; - properties.CorrelationId = message.CorrelationId; - properties.DeliveryMode = PersistentDeliveryMode; - properties.Type = message.MessageDescription; - properties.Timestamp = new AmqpTimestamp(message.CreationDateTime.ToUnixTimeSeconds()); - - channel.BasicPublish(exchange: _exchange, - routingKey: topic, - basicProperties: properties, - body: message.Body); - - return Task.CompletedTask; - } - - protected virtual void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - // Dispose any managed objects - } - - _disposedValue = true; - } - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - } -} diff --git a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs deleted file mode 100644 index 9c85e59..0000000 --- a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Copyright 2021-2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Generic; -using System.Globalization; -using System.Threading; -using System.Threading.Tasks; -using Ardalis.GuardClauses; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Monai.Deploy.Messaging.API; -using Monai.Deploy.Messaging.Common; -using Monai.Deploy.Messaging.Configuration; -using Monai.Deploy.Messaging.Messages; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public class RabbitMQMessageSubscriberService : IMessageBrokerSubscriberService - { - private readonly ILogger _logger; - private readonly string _endpoint; - private readonly string _virtualHost; - private readonly string _exchange; - private readonly string _deadLetterExchange; - private readonly int _deliveryLimit; - private readonly int _requeueDelay; - private readonly string _useSSL; - private readonly string _portNumber; - private readonly IModel _channel; - private bool _disposedValue; - - public string Name => ConfigurationKeys.SubscriberServiceName; - - public RabbitMQMessageSubscriberService(IOptions options, - ILogger logger, - IRabbitMQConnectionFactory rabbitMqConnectionFactory) - { - Guard.Against.Null(options, nameof(options)); - - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - - var configuration = options.Value; - ValidateConfiguration(configuration.SubscriberSettings); - _endpoint = configuration.SubscriberSettings[ConfigurationKeys.EndPoint]; - var username = configuration.SubscriberSettings[ConfigurationKeys.Username]; - var password = configuration.SubscriberSettings[ConfigurationKeys.Password]; - _virtualHost = configuration.SubscriberSettings[ConfigurationKeys.VirtualHost]; - _exchange = configuration.SubscriberSettings[ConfigurationKeys.Exchange]; - _deadLetterExchange = configuration.SubscriberSettings[ConfigurationKeys.DeadLetterExchange]; - _deliveryLimit = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.DeliveryLimit], NumberFormatInfo.InvariantInfo); - _requeueDelay = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.RequeueDelay], NumberFormatInfo.InvariantInfo); - - if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.UseSSL)) - { - _useSSL = configuration.SubscriberSettings[ConfigurationKeys.UseSSL]; - } - else - { - _useSSL = String.Empty; - } - - if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.Port)) - { - _portNumber = configuration.SubscriberSettings[ConfigurationKeys.Port]; - } - else - { - _portNumber = String.Empty; - } - - _logger.ConnectingToRabbitMQ(Name, _endpoint, _virtualHost); - _channel = rabbitMqConnectionFactory.CreateChannel(_endpoint, username, password, _virtualHost, _useSSL, _portNumber); - _channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); - _channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false); - _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); - } - - internal static void ValidateConfiguration(Dictionary configuration) - { - Guard.Against.Null(configuration, nameof(configuration)); - - foreach (var key in ConfigurationKeys.SubscriberRequiredKeys) - { - if (!configuration.ContainsKey(key)) - { - throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} is missing configuration for {key}."); - } - } - - if (!int.TryParse(configuration[ConfigurationKeys.DeliveryLimit], out var deliveryLimit)) - { - throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has a non int value for {ConfigurationKeys.DeliveryLimit}"); - } - - if (!int.TryParse(configuration[ConfigurationKeys.RequeueDelay], out var requeueDelay)) - { - throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has a non int value for {ConfigurationKeys.RequeueDelay}"); - } - - if (deliveryLimit < 0 || requeueDelay < 0) - { - throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has int values of less than 1"); - } - } - - public void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) - => Subscribe(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); - - public void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) - { - Guard.Against.Null(topics, nameof(topics)); - Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback)); - - var arguments = new Dictionary() - { - { "x-queue-type", "quorum" }, - { "x-delivery-limit", _deliveryLimit }, - { "x-dead-letter-exchange", _deadLetterExchange } - }; - - var deadLetterQueue = $"{queue}-dead-letter"; - - var queueDeclareResult = _channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); - var deadLetterQueueDeclareResult = _channel.QueueDeclare(queue: deadLetterQueue, durable: true, exclusive: false, autoDelete: false); - BindToRoutingKeys(topics, queueDeclareResult.QueueName, deadLetterQueueDeclareResult.QueueName); - - var consumer = new EventingBasicConsumer(_channel); - consumer.Received += (model, eventArgs) => - { - using var loggingScope = _logger.BeginScope(new Dictionary - { - ["MessageId"] = eventArgs.BasicProperties.MessageId, - ["ApplicationId"] = eventArgs.BasicProperties.AppId, - ["CorrelationId"] = eventArgs.BasicProperties.CorrelationId - }); - - _logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey); - - MessageReceivedEventArgs messageReceivedEventArgs; - try - { - messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs); - } - catch (Exception ex) - { - _logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); - - _logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId); - _channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false); - _logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false); - return; - } - - try - { - messageReceivedCallback(messageReceivedEventArgs); - } - catch (Exception ex) - { - _logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); - } - }; - _channel.BasicQos(0, prefetchCount, false); - _channel.BasicConsume(queueDeclareResult.QueueName, false, consumer); - _logger.SubscribeToRabbitMQQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics)); - } - - public void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) - => SubscribeAsync(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); - - public void SubscribeAsync(string[] topics, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) - { - Guard.Against.Null(topics, nameof(topics)); - Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback)); - - var arguments = new Dictionary() - { - { "x-queue-type", "quorum" }, - { "x-delivery-limit", _deliveryLimit }, - { "x-dead-letter-exchange", _deadLetterExchange } - }; - - var deadLetterQueue = $"{queue}-dead-letter"; - - var queueDeclareResult = _channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); - var deadLetterQueueDeclareResult = _channel.QueueDeclare(queue: deadLetterQueue, durable: true, exclusive: false, autoDelete: false); - BindToRoutingKeys(topics, queueDeclareResult.QueueName, deadLetterQueueDeclareResult.QueueName); - - var consumer = new EventingBasicConsumer(_channel); - consumer.Received += async (model, eventArgs) => - { - using var loggingScope = _logger.BeginScope(new Dictionary - { - ["MessageId"] = eventArgs.BasicProperties.MessageId, - ["ApplicationId"] = eventArgs.BasicProperties.AppId, - ["CorrelationId"] = eventArgs.BasicProperties.CorrelationId - - }); - - _logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey); - - MessageReceivedEventArgs messageReceivedEventArgs; - try - { - messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs); - } - catch (Exception ex) - { - _logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); - - _logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId); - _channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false); - _logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false); - return; - } - try - { - await messageReceivedCallback(messageReceivedEventArgs).ConfigureAwait(false); - } - catch (Exception ex) - { - _logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); - } - }; - _channel.BasicQos(0, prefetchCount, false); - _channel.BasicConsume(queueDeclareResult.QueueName, false, consumer); - _logger.SubscribeToRabbitMQQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics)); - } - - public void Acknowledge(MessageBase message) - { - Guard.Against.Null(message, nameof(message)); - - _logger.SendingAcknowledgement(message.MessageId); - _channel.BasicAck(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false); - _logger.AcknowledgementSent(message.MessageId); - } - - public async Task RequeueWithDelay(MessageBase message) - { - try - { - await Task.Delay(_requeueDelay * 1000).ConfigureAwait(false); - - Reject(message, true); - } - catch (Exception e) - { - _logger.Exception($"Requeue delay failed.", e); - Reject(message, true); - } - } - - public void Reject(MessageBase message, bool requeue = true) - { - Guard.Against.Null(message, nameof(message)); - - _logger.SendingNAcknowledgement(message.MessageId); - _channel.BasicNack(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false, requeue: requeue); - _logger.NAcknowledgementSent(message.MessageId, requeue); - } - - protected virtual void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - _channel.Close(); - _channel.Dispose(); - } - - _disposedValue = true; - } - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - private void BindToRoutingKeys(string[] topics, string queue, string deadLetterQueue = "") - { - Guard.Against.Null(topics, nameof(topics)); - Guard.Against.NullOrWhiteSpace(queue, nameof(queue)); - - foreach (var topic in topics) - { - if (!string.IsNullOrEmpty(topic)) - { - _channel.QueueBind(queue, _exchange, topic); - - if (!string.IsNullOrEmpty(deadLetterQueue)) - { - _channel.QueueBind(deadLetterQueue, _deadLetterExchange, topic); - } - } - } - } - - private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliverEventArgs eventArgs) - { - Guard.Against.NullOrWhiteSpace(topic, nameof(topic)); - Guard.Against.Null(eventArgs, nameof(eventArgs)); - - Guard.Against.Null(eventArgs.Body, nameof(eventArgs.Body)); - Guard.Against.Null(eventArgs.BasicProperties, nameof(eventArgs.BasicProperties)); - Guard.Against.Null(eventArgs.BasicProperties.MessageId, nameof(eventArgs.BasicProperties.MessageId)); - Guard.Against.Null(eventArgs.BasicProperties.AppId, nameof(eventArgs.BasicProperties.AppId)); - Guard.Against.Null(eventArgs.BasicProperties.ContentType, nameof(eventArgs.BasicProperties.ContentType)); - Guard.Against.Null(eventArgs.BasicProperties.CorrelationId, nameof(eventArgs.BasicProperties.CorrelationId)); - Guard.Against.Null(eventArgs.BasicProperties.Timestamp, nameof(eventArgs.BasicProperties.Timestamp)); - Guard.Against.Null(eventArgs.DeliveryTag, nameof(eventArgs.DeliveryTag)); - - return new MessageReceivedEventArgs( - new Message( - body: eventArgs.Body.ToArray(), - messageDescription: eventArgs.BasicProperties.Type, - messageId: eventArgs.BasicProperties.MessageId, - applicationId: eventArgs.BasicProperties.AppId, - contentType: eventArgs.BasicProperties.ContentType, - correlationId: eventArgs.BasicProperties.CorrelationId, - creationDateTime: DateTimeOffset.FromUnixTimeSeconds(eventArgs.BasicProperties.Timestamp.UnixTime), - deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)), - CancellationToken.None); - } - } -} diff --git a/src/Plugins/RabbitMQ/SubscriberServicHealthCheckBuilder.cs b/src/Plugins/RabbitMQ/SubscriberServicHealthCheckBuilder.cs deleted file mode 100644 index f8bc17d..0000000 --- a/src/Plugins/RabbitMQ/SubscriberServicHealthCheckBuilder.cs +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Generic; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Diagnostics.HealthChecks; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Monai.Deploy.Messaging.Configuration; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public class SubscriberServicHealthCheckBuilder : SubscriberServiceHealthCheckRegistrationBase - { - public override IHealthChecksBuilder Configure( - IHealthChecksBuilder builder, - HealthStatus? failureStatus = null, - IEnumerable? tags = null, - TimeSpan? timeout = null) - { - builder.Add(new HealthCheckRegistration( - ConfigurationKeys.SubscriberServiceName, - serviceProvider => - { - var options = serviceProvider.GetRequiredService>(); - var logger = serviceProvider.GetRequiredService>(); - var connectionFactory = serviceProvider.GetRequiredService(); - return new RabbitMQHealthCheck(connectionFactory, options.Value.SubscriberSettings, logger, RabbitMQMessageSubscriberService.ValidateConfiguration); - }, - failureStatus, - tags, - timeout)); - return builder; - } - } -} diff --git a/src/Plugins/RabbitMQ/SubscriberServiceRegistration.cs b/src/Plugins/RabbitMQ/SubscriberServiceRegistration.cs deleted file mode 100644 index 7a8a466..0000000 --- a/src/Plugins/RabbitMQ/SubscriberServiceRegistration.cs +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using Microsoft.Extensions.DependencyInjection; -using Monai.Deploy.Messaging.API; - -namespace Monai.Deploy.Messaging.RabbitMQ -{ - public class SubscriberServiceRegistration : SubscriberServiceRegistrationBase - { - public override IServiceCollection Configure(IServiceCollection services) - { - services.AddSingleton(); - services.AddSingleton(); - return services; - } - } -} From f72c7ca1d1c6d67f0a53ff9fbccbef3fdd2bc4d6 Mon Sep 17 00:00:00 2001 From: Lillie Dae Date: Fri, 4 Nov 2022 14:18:59 +0000 Subject: [PATCH 2/4] connection resiliency and improvements Signed-off-by: Lillie Dae --- .../Factory/CreateChannelArguments.cs | 58 +++ .../Factory/IRabbitMQConnectionFactory.cs | 46 +++ .../Factory/RabbitMqConnectionFactory.cs | 179 +++++++++ .../PublisherServicHealthCheckBuilder.cs | 50 +++ .../Publisher/PublisherServiceRegistration.cs | 31 ++ .../RabbitMqMessagePublisherService.cs | 153 ++++++++ .../RabbitMqMessageSubscriberService.cs | 346 ++++++++++++++++++ .../SubscriberServicHealthCheckBuilder.cs | 50 +++ .../SubscriberServiceRegistration.cs | 31 ++ 9 files changed, 944 insertions(+) create mode 100644 src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs create mode 100644 src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs create mode 100644 src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs create mode 100644 src/Plugins/RabbitMQ/Publisher/PublisherServicHealthCheckBuilder.cs create mode 100644 src/Plugins/RabbitMQ/Publisher/PublisherServiceRegistration.cs create mode 100644 src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs create mode 100644 src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs create mode 100644 src/Plugins/RabbitMQ/Subscriber/SubscriberServicHealthCheckBuilder.cs create mode 100644 src/Plugins/RabbitMQ/Subscriber/SubscriberServiceRegistration.cs diff --git a/src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs b/src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs new file mode 100644 index 0000000..5492dee --- /dev/null +++ b/src/Plugins/RabbitMQ/Factory/CreateChannelArguments.cs @@ -0,0 +1,58 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Ardalis.GuardClauses; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class CreateChannelArguments + { + public CreateChannelArguments( + string hostName, + string password, + string username, + string virtualHost, + string useSSL, + string portNumber) + { + Guard.Against.NullOrWhiteSpace(hostName); + Guard.Against.NullOrWhiteSpace(password); + Guard.Against.NullOrWhiteSpace(username); + Guard.Against.NullOrWhiteSpace(virtualHost); + Guard.Against.NullOrWhiteSpace(useSSL); + Guard.Against.NullOrWhiteSpace(portNumber); + + HostName = hostName; + Password = password; + Username = username; + VirtualHost = virtualHost; + UseSSL = useSSL; + PortNumber = portNumber; + } + + public string HostName { get; set; } + + public string Password { get; set; } + + public string Username { get; set; } + + public string VirtualHost { get; set; } + + public string UseSSL { get; set; } + + public string PortNumber { get; set; } + } +} diff --git a/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs new file mode 100644 index 0000000..6100e3b --- /dev/null +++ b/src/Plugins/RabbitMQ/Factory/IRabbitMQConnectionFactory.cs @@ -0,0 +1,46 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using RabbitMQ.Client; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public interface IRabbitMQConnectionFactory + { + /// + /// Creates a new channel for RabbitMQ client. + /// The connection factory maintains a single connection to the specified + /// hostName, username, password, and virtualHost combination. + /// + /// Host name + /// User name + /// Password + /// Virtual host + /// Encrypt communication + /// Port Number + /// Instance of . + IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber); + + /// + /// Creates a new channel for RabbitMQ client. + /// The connection factory maintains a single connection to the specified + /// hostName, username, password, and virtualHost combination. + /// + /// Virtual host + /// Instance of . + IModel CreateChannel(CreateChannelArguments args); + } +} diff --git a/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs new file mode 100644 index 0000000..7f2afec --- /dev/null +++ b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs @@ -0,0 +1,179 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Globalization; +using System.Linq; +using System.Net.Security; +using System.Security.Cryptography; +using System.Text; +using Ardalis.GuardClauses; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class RabbitMQConnectionFactory : IRabbitMQConnectionFactory, IDisposable + { + private readonly ConcurrentDictionary> _connectionFactoriess; + private readonly ConcurrentDictionary> _connections; + private readonly ILogger _logger; + private bool _disposedValue; + + public RabbitMQConnectionFactory(ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _connectionFactoriess = new ConcurrentDictionary>(); + _connections = new ConcurrentDictionary>(); + } + + public IModel CreateChannel(CreateChannelArguments args) => + CreateChannel(args.HostName, args.Username, args.Password, args.VirtualHost, + args.UseSSL, args.PortNumber); + + public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portNumber) + { + Guard.Against.NullOrWhiteSpace(hostName); + Guard.Against.NullOrWhiteSpace(username); + Guard.Against.NullOrWhiteSpace(password); + Guard.Against.NullOrWhiteSpace(virtualHost); + + var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}"; + + var connection = _connections.AddOrUpdate(key, + x => CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber), + (updateKey, updateConnection) => + { + // If connection to RMQ is lost and: + // - RMQ service returns before calling the next line, then IsOpen returns false + // - a call is made before RMQ returns, then a new connection + // is made with error with IsValueFaulted = true && IsValueCreated = false + if (updateConnection.IsValueCreated && updateConnection.Value.IsOpen) + { + return updateConnection; + } + else + { + return CreatConnection(hostName, username, password, virtualHost, key, useSSL, portNumber); + } + }); + + var model = connection.Value.CreateModel(); + + var argsObj = new CreateChannelArguments(hostName, password, username, virtualHost, useSSL, portNumber); + + model.CallbackException += (connection, args) => OnException(args, key, argsObj); + model.ModelShutdown += (connection, args) => OnShutdown(args, key, argsObj); + + return model; + } + + private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArguments createChannelArguments) + { + _logger.LogError($"RabbitMQ connection shutdown attempting to reconnect.", args); + _connections.TryRemove(key, out var value); + + if (value is not null) + { + value?.Value.Dispose(); + } + + CreateChannel(createChannelArguments); + } + + private void OnException(CallbackExceptionEventArgs args, string key, CreateChannelArguments createChannelArguments) + { + _logger.LogError(args.Exception, $"RabbitMQ connection exception attempting to reconnect {args.Exception.Message}"); + _connections.TryRemove(key, out var value); + + if (value is not null) + { + value?.Value.Dispose(); + } + + CreateChannel(createChannelArguments); + } + + private Lazy CreatConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portNumber) + { + if (!bool.TryParse(useSSL, out var sslEnabled)) + { + sslEnabled = false; + } + + if (!int.TryParse(portNumber, out var port)) + { + port = sslEnabled ? 5671 : 5672; // 5671 is default port for SSL/TLS , 5672 is default port for PLAIN. + } + + var sslOptions = new SslOption + { + Enabled = sslEnabled, + ServerName = hostName, + AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors | SslPolicyErrors.RemoteCertificateNotAvailable + }; + + var connectionFactory = _connectionFactoriess.GetOrAdd(key, y => new Lazy(() => new ConnectionFactory() + { + HostName = hostName, + UserName = username, + Password = password, + VirtualHost = virtualHost, + Ssl = sslOptions, + Port = port, + RequestedHeartbeat = TimeSpan.FromSeconds(10), + })); + + return new Lazy(connectionFactory.Value.CreateConnection); + } + + private static object HashPassword(string password) + { + Guard.Against.NullOrWhiteSpace(password); + var sha256 = SHA256.Create(); + var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(password)); + return hash.Select(x => x.ToString("x2", CultureInfo.InvariantCulture)); + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _logger.ClosingConnections(); + foreach (var connection in _connections.Values) + { + connection.Value.Close(); + } + _connections.Clear(); + _connectionFactoriess.Clear(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/src/Plugins/RabbitMQ/Publisher/PublisherServicHealthCheckBuilder.cs b/src/Plugins/RabbitMQ/Publisher/PublisherServicHealthCheckBuilder.cs new file mode 100644 index 0000000..0e65c86 --- /dev/null +++ b/src/Plugins/RabbitMQ/Publisher/PublisherServicHealthCheckBuilder.cs @@ -0,0 +1,50 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.Configuration; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class PublisherServicHealthCheckBuilder : PublisherServiceHealthCheckRegistrationBase + { + public override IHealthChecksBuilder Configure( + IHealthChecksBuilder builder, + HealthStatus? failureStatus = null, + IEnumerable? tags = null, + TimeSpan? timeout = null) + { + builder.Add(new HealthCheckRegistration( + ConfigurationKeys.PublisherServiceName, + serviceProvider => + { + var options = serviceProvider.GetRequiredService>(); + var logger = serviceProvider.GetRequiredService>(); + var connectionFactory = serviceProvider.GetRequiredService(); + return new RabbitMQHealthCheck(connectionFactory, options.Value.PublisherSettings, logger, RabbitMQMessagePublisherService.ValidateConfiguration); + }, + failureStatus, + tags, + timeout)); + return builder; + } + } +} diff --git a/src/Plugins/RabbitMQ/Publisher/PublisherServiceRegistration.cs b/src/Plugins/RabbitMQ/Publisher/PublisherServiceRegistration.cs new file mode 100644 index 0000000..82eb40a --- /dev/null +++ b/src/Plugins/RabbitMQ/Publisher/PublisherServiceRegistration.cs @@ -0,0 +1,31 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Microsoft.Extensions.DependencyInjection; +using Monai.Deploy.Messaging.API; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class PublisherServiceRegistration : PublisherServiceRegistrationBase + { + public override IServiceCollection Configure(IServiceCollection services) + { + return services + .AddSingleton() + .AddSingleton(); + } + } +} diff --git a/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs b/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs new file mode 100644 index 0000000..8139b0f --- /dev/null +++ b/src/Plugins/RabbitMQ/Publisher/RabbitMqMessagePublisherService.cs @@ -0,0 +1,153 @@ +/* + * Copyright 2021-2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics.Tracing; +using System.Globalization; +using System.Threading.Tasks; +using Ardalis.GuardClauses; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.API; +using Monai.Deploy.Messaging.Configuration; +using Monai.Deploy.Messaging.Messages; +using RabbitMQ.Client; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class RabbitMQMessagePublisherService : IMessageBrokerPublisherService + { + private const int PersistentDeliveryMode = 2; + + private readonly ILogger _logger; + private readonly IRabbitMQConnectionFactory _rabbitMqConnectionFactory; + private readonly string _endpoint; + private readonly string _username; + private readonly string _password; + private readonly string _virtualHost; + private readonly string _exchange; + private readonly string _useSSL; + private readonly string _portNumber; + private bool _disposedValue; + + public string Name => ConfigurationKeys.PublisherServiceName; + + public RabbitMQMessagePublisherService(IOptions options, + ILogger logger, + IRabbitMQConnectionFactory rabbitMqConnectionFactory) + { + Guard.Against.Null(options, nameof(options)); + + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _rabbitMqConnectionFactory = rabbitMqConnectionFactory ?? throw new ArgumentNullException(nameof(rabbitMqConnectionFactory)); + + var configuration = options.Value; + ValidateConfiguration(configuration.PublisherSettings); + _endpoint = configuration.PublisherSettings[ConfigurationKeys.EndPoint]; + _username = configuration.PublisherSettings[ConfigurationKeys.Username]; + _password = configuration.PublisherSettings[ConfigurationKeys.Password]; + _virtualHost = configuration.PublisherSettings[ConfigurationKeys.VirtualHost]; + _exchange = configuration.PublisherSettings[ConfigurationKeys.Exchange]; + + if (configuration.PublisherSettings.ContainsKey(ConfigurationKeys.UseSSL)) + { + _useSSL = configuration.PublisherSettings[ConfigurationKeys.UseSSL]; + } + else + { + _useSSL = String.Empty; + } + + if (configuration.PublisherSettings.ContainsKey(ConfigurationKeys.Port)) + { + _portNumber = configuration.PublisherSettings[ConfigurationKeys.Port]; + } + else + { + _portNumber = String.Empty; + } + } + + internal static void ValidateConfiguration(Dictionary configuration) + { + Guard.Against.Null(configuration, nameof(configuration)); + + foreach (var key in ConfigurationKeys.PublisherRequiredKeys) + { + if (!configuration.ContainsKey(key)) + { + throw new ConfigurationException($"{ConfigurationKeys.PublisherServiceName} is missing configuration for {key}."); + } + } + } + + public Task Publish(string topic, Message message) + { + Guard.Against.NullOrWhiteSpace(topic, nameof(topic)); + Guard.Against.Null(message, nameof(message)); + + using var loggingScope = _logger.BeginScope(new Dictionary + { + ["MessageId"] = message.MessageId, + ["ApplicationId"] = message.ApplicationId, + ["CorrelationId"] = message.CorrelationId + }); + + _logger.PublshingRabbitMQ(_endpoint, _virtualHost, _exchange, topic); + + using var channel = _rabbitMqConnectionFactory.CreateChannel(_endpoint, _username, _password, _virtualHost, _useSSL, _portNumber); + channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); + + var properties = channel.CreateBasicProperties(); + properties.Persistent = true; + properties.ContentType = message.ContentType; + properties.MessageId = message.MessageId; + properties.AppId = message.ApplicationId; + properties.CorrelationId = message.CorrelationId; + properties.DeliveryMode = PersistentDeliveryMode; + properties.Type = message.MessageDescription; + properties.Timestamp = new AmqpTimestamp(message.CreationDateTime.ToUnixTimeSeconds()); + + channel.BasicPublish(exchange: _exchange, + routingKey: topic, + basicProperties: properties, + body: message.Body); + + return Task.CompletedTask; + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + // Dispose any managed objects + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs new file mode 100644 index 0000000..42d5106 --- /dev/null +++ b/src/Plugins/RabbitMQ/Subscriber/RabbitMqMessageSubscriberService.cs @@ -0,0 +1,346 @@ +/* + * Copyright 2021-2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Threading; +using System.Threading.Tasks; +using Ardalis.GuardClauses; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.API; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Configuration; +using Monai.Deploy.Messaging.Messages; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class RabbitMQMessageSubscriberService : IMessageBrokerSubscriberService + { + private readonly ILogger _logger; + private readonly string _endpoint; + private readonly string _virtualHost; + private readonly string _exchange; + private readonly string _deadLetterExchange; + private readonly int _deliveryLimit; + private readonly int _requeueDelay; + private readonly string _useSSL; + private readonly string _portNumber; + private readonly IModel _channel; + private bool _disposedValue; + + public string Name => ConfigurationKeys.SubscriberServiceName; + + public RabbitMQMessageSubscriberService(IOptions options, + ILogger logger, + IRabbitMQConnectionFactory rabbitMqConnectionFactory) + { + Guard.Against.Null(options); + + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + var configuration = options.Value; + ValidateConfiguration(configuration.SubscriberSettings); + _endpoint = configuration.SubscriberSettings[ConfigurationKeys.EndPoint]; + var username = configuration.SubscriberSettings[ConfigurationKeys.Username]; + var password = configuration.SubscriberSettings[ConfigurationKeys.Password]; + _virtualHost = configuration.SubscriberSettings[ConfigurationKeys.VirtualHost]; + _exchange = configuration.SubscriberSettings[ConfigurationKeys.Exchange]; + _deadLetterExchange = configuration.SubscriberSettings[ConfigurationKeys.DeadLetterExchange]; + _deliveryLimit = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.DeliveryLimit], NumberFormatInfo.InvariantInfo); + _requeueDelay = int.Parse(configuration.SubscriberSettings[ConfigurationKeys.RequeueDelay], NumberFormatInfo.InvariantInfo); + + if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.UseSSL)) + { + _useSSL = configuration.SubscriberSettings[ConfigurationKeys.UseSSL]; + } + else + { + _useSSL = string.Empty; + } + + if (configuration.SubscriberSettings.ContainsKey(ConfigurationKeys.Port)) + { + _portNumber = configuration.SubscriberSettings[ConfigurationKeys.Port]; + } + else + { + _portNumber = string.Empty; + } + + _logger.ConnectingToRabbitMQ(Name, _endpoint, _virtualHost); + _channel = rabbitMqConnectionFactory.CreateChannel(_endpoint, username, password, _virtualHost, _useSSL, _portNumber); + _channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); + _channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Topic, durable: true, autoDelete: false); + _channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); + } + + internal static void ValidateConfiguration(Dictionary configuration) + { + Guard.Against.Null(configuration); + + foreach (var key in ConfigurationKeys.SubscriberRequiredKeys) + { + if (!configuration.ContainsKey(key)) + { + throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} is missing configuration for {key}."); + } + } + + if (!int.TryParse(configuration[ConfigurationKeys.DeliveryLimit], out var deliveryLimit)) + { + throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has a non int value for {ConfigurationKeys.DeliveryLimit}"); + } + + if (!int.TryParse(configuration[ConfigurationKeys.RequeueDelay], out var requeueDelay)) + { + throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has a non int value for {ConfigurationKeys.RequeueDelay}"); + } + + if (deliveryLimit < 0 || requeueDelay < 0) + { + throw new ConfigurationException($"{ConfigurationKeys.SubscriberServiceName} has int values of less than 1"); + } + } + + public void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) + => Subscribe(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); + + public void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) + { + Guard.Against.Null(topics); + Guard.Against.Null(messageReceivedCallback); + + var arguments = new Dictionary() + { + { "x-queue-type", "quorum" }, + { "x-delivery-limit", _deliveryLimit }, + { "x-dead-letter-exchange", _deadLetterExchange } + }; + + var deadLetterQueue = $"{queue}-dead-letter"; + + var queueDeclareResult = _channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); + var deadLetterQueueDeclareResult = _channel.QueueDeclare(queue: deadLetterQueue, durable: true, exclusive: false, autoDelete: false); + BindToRoutingKeys(topics, queueDeclareResult.QueueName, deadLetterQueueDeclareResult.QueueName); + + var consumer = new EventingBasicConsumer(_channel); + consumer.Received += (model, eventArgs) => + { + using var loggingScope = _logger.BeginScope(new Dictionary + { + ["MessageId"] = eventArgs.BasicProperties.MessageId, + ["ApplicationId"] = eventArgs.BasicProperties.AppId, + ["CorrelationId"] = eventArgs.BasicProperties.CorrelationId + }); + + _logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey); + + MessageReceivedEventArgs messageReceivedEventArgs; + try + { + messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs); + } + catch (Exception ex) + { + _logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + + _logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId); + _channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false); + _logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false); + return; + } + + try + { + messageReceivedCallback(messageReceivedEventArgs); + } + catch (Exception ex) + { + _logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + } + }; + _channel.BasicQos(0, prefetchCount, false); + _channel.BasicConsume(queueDeclareResult.QueueName, false, consumer); + _logger.SubscribeToRabbitMQQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics)); + } + + public void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) + => SubscribeAsync(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); + + public void SubscribeAsync(string[] topics, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) + { + Guard.Against.Null(topics); + Guard.Against.Null(messageReceivedCallback); + + var arguments = new Dictionary() + { + { "x-queue-type", "quorum" }, + { "x-delivery-limit", _deliveryLimit }, + { "x-dead-letter-exchange", _deadLetterExchange } + }; + + var deadLetterQueue = $"{queue}-dead-letter"; + + var queueDeclareResult = _channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false, arguments: arguments); + var deadLetterQueueDeclareResult = _channel.QueueDeclare(queue: deadLetterQueue, durable: true, exclusive: false, autoDelete: false); + BindToRoutingKeys(topics, queueDeclareResult.QueueName, deadLetterQueueDeclareResult.QueueName); + + var consumer = new EventingBasicConsumer(_channel); + consumer.Received += async (model, eventArgs) => + { + using var loggingScope = _logger.BeginScope(new Dictionary + { + ["MessageId"] = eventArgs.BasicProperties.MessageId, + ["ApplicationId"] = eventArgs.BasicProperties.AppId, + ["CorrelationId"] = eventArgs.BasicProperties.CorrelationId + + }); + + _logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey); + + MessageReceivedEventArgs messageReceivedEventArgs; + try + { + messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs); + } + catch (Exception ex) + { + _logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + + _logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId); + _channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false); + _logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false); + return; + } + try + { + await messageReceivedCallback(messageReceivedEventArgs).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + } + }; + _channel.BasicQos(0, prefetchCount, false); + _channel.BasicConsume(queueDeclareResult.QueueName, false, consumer); + _logger.SubscribeToRabbitMQQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics)); + } + + public void Acknowledge(MessageBase message) + { + Guard.Against.Null(message); + + _logger.SendingAcknowledgement(message.MessageId); + _channel.BasicAck(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false); + _logger.AcknowledgementSent(message.MessageId); + } + + public async Task RequeueWithDelay(MessageBase message) + { + try + { + await Task.Delay(_requeueDelay * 1000).ConfigureAwait(false); + + Reject(message, true); + } + catch (Exception e) + { + _logger.Exception($"Requeue delay failed.", e); + Reject(message, true); + } + } + + public void Reject(MessageBase message, bool requeue = true) + { + Guard.Against.Null(message); + + _logger.SendingNAcknowledgement(message.MessageId); + _channel.BasicNack(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false, requeue: requeue); + _logger.NAcknowledgementSent(message.MessageId, requeue); + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _channel.Close(); + _channel.Dispose(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + private void BindToRoutingKeys(string[] topics, string queue, string deadLetterQueue = "") + { + Guard.Against.Null(topics); + Guard.Against.NullOrWhiteSpace(queue); + + foreach (var topic in topics) + { + if (!string.IsNullOrEmpty(topic)) + { + _channel.QueueBind(queue, _exchange, topic); + + if (!string.IsNullOrEmpty(deadLetterQueue)) + { + _channel.QueueBind(deadLetterQueue, _deadLetterExchange, topic); + } + } + } + } + + private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliverEventArgs eventArgs) + { + Guard.Against.NullOrWhiteSpace(topic); + Guard.Against.Null(eventArgs); + + Guard.Against.Null(eventArgs.Body); + Guard.Against.Null(eventArgs.BasicProperties); + Guard.Against.Null(eventArgs.BasicProperties.MessageId); + Guard.Against.Null(eventArgs.BasicProperties.AppId); + Guard.Against.Null(eventArgs.BasicProperties.ContentType); + Guard.Against.Null(eventArgs.BasicProperties.CorrelationId); + Guard.Against.Null(eventArgs.BasicProperties.Timestamp); + Guard.Against.Null(eventArgs.DeliveryTag); + + return new MessageReceivedEventArgs( + new Message( + body: eventArgs.Body.ToArray(), + messageDescription: eventArgs.BasicProperties.Type, + messageId: eventArgs.BasicProperties.MessageId, + applicationId: eventArgs.BasicProperties.AppId, + contentType: eventArgs.BasicProperties.ContentType, + correlationId: eventArgs.BasicProperties.CorrelationId, + creationDateTime: DateTimeOffset.FromUnixTimeSeconds(eventArgs.BasicProperties.Timestamp.UnixTime), + deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)), + CancellationToken.None); + } + } +} diff --git a/src/Plugins/RabbitMQ/Subscriber/SubscriberServicHealthCheckBuilder.cs b/src/Plugins/RabbitMQ/Subscriber/SubscriberServicHealthCheckBuilder.cs new file mode 100644 index 0000000..f8bc17d --- /dev/null +++ b/src/Plugins/RabbitMQ/Subscriber/SubscriberServicHealthCheckBuilder.cs @@ -0,0 +1,50 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.Configuration; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class SubscriberServicHealthCheckBuilder : SubscriberServiceHealthCheckRegistrationBase + { + public override IHealthChecksBuilder Configure( + IHealthChecksBuilder builder, + HealthStatus? failureStatus = null, + IEnumerable? tags = null, + TimeSpan? timeout = null) + { + builder.Add(new HealthCheckRegistration( + ConfigurationKeys.SubscriberServiceName, + serviceProvider => + { + var options = serviceProvider.GetRequiredService>(); + var logger = serviceProvider.GetRequiredService>(); + var connectionFactory = serviceProvider.GetRequiredService(); + return new RabbitMQHealthCheck(connectionFactory, options.Value.SubscriberSettings, logger, RabbitMQMessageSubscriberService.ValidateConfiguration); + }, + failureStatus, + tags, + timeout)); + return builder; + } + } +} diff --git a/src/Plugins/RabbitMQ/Subscriber/SubscriberServiceRegistration.cs b/src/Plugins/RabbitMQ/Subscriber/SubscriberServiceRegistration.cs new file mode 100644 index 0000000..7a8a466 --- /dev/null +++ b/src/Plugins/RabbitMQ/Subscriber/SubscriberServiceRegistration.cs @@ -0,0 +1,31 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Microsoft.Extensions.DependencyInjection; +using Monai.Deploy.Messaging.API; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public class SubscriberServiceRegistration : SubscriberServiceRegistrationBase + { + public override IServiceCollection Configure(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + return services; + } + } +} From a6cf6373340235bc2df2ceb37068dd53b2414cfc Mon Sep 17 00:00:00 2001 From: Lillie Dae Date: Fri, 4 Nov 2022 14:33:41 +0000 Subject: [PATCH 3/4] connection resiliency and improvements Signed-off-by: Lillie Dae --- src/Plugins/RabbitMQ/Logger.cs | 65 ++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 src/Plugins/RabbitMQ/Logger.cs diff --git a/src/Plugins/RabbitMQ/Logger.cs b/src/Plugins/RabbitMQ/Logger.cs new file mode 100644 index 0000000..e187145 --- /dev/null +++ b/src/Plugins/RabbitMQ/Logger.cs @@ -0,0 +1,65 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Microsoft.Extensions.Logging; + +namespace Monai.Deploy.Messaging.RabbitMQ +{ + public static partial class Logger + { + internal static readonly string LoggingScopeMessageApplication = "Message ID={0}. Application ID={1}."; + + [LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message to {endpoint}/{virtualHost}. Exchange={exchange}, Routing Key={topic}.")] + public static partial void PublshingRabbitMQ(this ILogger logger, string endpoint, string virtualHost, string exchange, string topic); + + [LoggerMessage(EventId = 10001, Level = LogLevel.Information, Message = "{ServiceName} connecting to {endpoint}/{virtualHost}.")] + public static partial void ConnectingToRabbitMQ(this ILogger logger, string serviceNAme, string endpoint, string virtualHost); + + [LoggerMessage(EventId = 10002, Level = LogLevel.Information, Message = "Message received from queue {queue} for {topic}.")] + public static partial void MessageReceivedFromQueue(this ILogger logger, string queue, string topic); + + [LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}/{virtualHost}. Exchange={exchange}, Queue={queue}, Routing Key={topic}.")] + public static partial void SubscribeToRabbitMQQueue(this ILogger logger, string endpoint, string virtualHost, string exchange, string queue, string topic); + + [LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")] + public static partial void SendingAcknowledgement(this ILogger logger, string messageId); + + [LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}.")] + public static partial void AcknowledgementSent(this ILogger logger, string messageId); + + [LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")] + public static partial void SendingNAcknowledgement(this ILogger logger, string messageId); + + [LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")] + public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue); + + [LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")] + public static partial void ClosingConnections(this ILogger logger); + + [LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Topic={topic}, Message ID={messageId}.")] + public static partial void InvalidMessage(this ILogger logger, string queueName, string topic, string messageId, Exception ex); + + [LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Topic={topic}, Message ID={messageId}.")] + public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string topic, string messageId, Exception ex); + + [LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Exception thrown: Message ID={messageId}.")] + public static partial void Exception(this ILogger logger, string messageId, Exception ex); + + [LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "Health check failure.")] + public static partial void HealthCheckError(this ILogger logger, Exception ex); + } +} From e1ca223d4a4f8ffb4ba11ce01013257c427d3127 Mon Sep 17 00:00:00 2001 From: Lillie Dae Date: Mon, 7 Nov 2022 09:28:06 +0000 Subject: [PATCH 4/4] improve logging Signed-off-by: Lillie Dae --- src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs | 4 ++-- src/Plugins/RabbitMQ/Logger.cs | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs index 7f2afec..3bc295d 100644 --- a/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs +++ b/src/Plugins/RabbitMQ/Factory/RabbitMqConnectionFactory.cs @@ -85,7 +85,7 @@ public IModel CreateChannel(string hostName, string username, string password, s private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArguments createChannelArguments) { - _logger.LogError($"RabbitMQ connection shutdown attempting to reconnect.", args); + _logger.ConnectionShutdown(args.ReplyText); _connections.TryRemove(key, out var value); if (value is not null) @@ -98,7 +98,7 @@ private void OnShutdown(ShutdownEventArgs args, string key, CreateChannelArgumen private void OnException(CallbackExceptionEventArgs args, string key, CreateChannelArguments createChannelArguments) { - _logger.LogError(args.Exception, $"RabbitMQ connection exception attempting to reconnect {args.Exception.Message}"); + _logger.ConnectionException(args.Exception); _connections.TryRemove(key, out var value); if (value is not null) diff --git a/src/Plugins/RabbitMQ/Logger.cs b/src/Plugins/RabbitMQ/Logger.cs index e187145..38b84cc 100644 --- a/src/Plugins/RabbitMQ/Logger.cs +++ b/src/Plugins/RabbitMQ/Logger.cs @@ -61,5 +61,11 @@ public static partial class Logger [LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "Health check failure.")] public static partial void HealthCheckError(this ILogger logger, Exception ex); + + [LoggerMessage(EventId = 10013, Level = LogLevel.Error, Message = "RabbitMQ connection shutdown ({replyText}) attempting to reconnect.")] + public static partial void ConnectionShutdown(this ILogger logger, string replyText); + + [LoggerMessage(EventId = 10014, Level = LogLevel.Error, Message = "RabbitMQ connection exception attempting to reconnect.")] + public static partial void ConnectionException(this ILogger logger, Exception ex); } }