From a42000f694324d8deadacfd244ba25185fab6f21 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 3 Nov 2022 17:12:28 +0000 Subject: [PATCH 1/3] adding mesage timings Signed-off-by: Neil South --- src/Plugins/RabbitMQ/Logger.cs | 4 +- .../RabbitMqMessageSubscriberService.cs | 46 +++++++++++++++++-- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/Plugins/RabbitMQ/Logger.cs b/src/Plugins/RabbitMQ/Logger.cs index e187145..eae8c32 100644 --- a/src/Plugins/RabbitMQ/Logger.cs +++ b/src/Plugins/RabbitMQ/Logger.cs @@ -38,8 +38,8 @@ public static partial class Logger [LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")] public static partial void SendingAcknowledgement(this ILogger logger, string messageId); - [LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}.")] - public static partial void AcknowledgementSent(this ILogger logger, string messageId); + [LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}. Event Duration {durationMilliseconds}")] + public static partial void AcknowledgementSent(this ILogger logger, string messageId, double durationMilliseconds); [LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")] public static partial void SendingNAcknowledgement(this ILogger logger, string messageId); diff --git a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs index 9c85e59..0501f3f 100644 --- a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs +++ b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs @@ -44,6 +44,7 @@ public class RabbitMQMessageSubscriberService : IMessageBrokerSubscriberService private readonly string _portNumber; private readonly IModel _channel; private bool _disposedValue; + private static readonly Dictionary MessageTimings = new(); public string Name => ConfigurationKeys.SubscriberServiceName; @@ -150,6 +151,8 @@ public void Subscribe(string[] topics, string queue, Action + { + ["EventDuration"] = EventDuration + }); + _logger.AcknowledgementSent(message.MessageId, EventDuration); + RemoveTimeMessage(message.MessageId); } public async Task RequeueWithDelay(MessageBase message) @@ -274,6 +287,7 @@ public void Reject(MessageBase message, bool requeue = true) _logger.SendingNAcknowledgement(message.MessageId); _channel.BasicNack(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false, requeue: requeue); _logger.NAcknowledgementSent(message.MessageId, requeue); + RemoveTimeMessage(message.MessageId); } protected virtual void Dispose(bool disposing) @@ -342,5 +356,31 @@ private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliver deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)), CancellationToken.None); } + + private static void TimeNewMessage(string messageId) + { + if (MessageTimings.ContainsKey(messageId)) + { + RemoveTimeMessage(messageId); + } + MessageTimings.Add(messageId, DateTime.UtcNow); + } + + private static void RemoveTimeMessage(string messageId) + { + if (MessageTimings.ContainsKey(messageId)) + { + MessageTimings.Remove(messageId); + } + } + + private static double GetMessageDuration(string messageId) + { + if (MessageTimings.ContainsKey(messageId)) + { + return (DateTime.UtcNow - MessageTimings[messageId]).TotalMilliseconds; + } + return default; + } } } From 8398821c4922c65914f474514507dcdb95126143 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 3 Nov 2022 17:59:41 +0000 Subject: [PATCH 2/3] couple of small improvments Signed-off-by: Neil South --- .../RabbitMQ/RabbitMqMessageSubscriberService.cs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs index 0501f3f..8d37f2c 100644 --- a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs +++ b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs @@ -151,8 +151,6 @@ public void Subscribe(string[] topics, string queue, Action { - ["EventDuration"] = EventDuration + ["EventDuration"] = eventDuration }); - _logger.AcknowledgementSent(message.MessageId, EventDuration); + _logger.AcknowledgementSent(message.MessageId, eventDuration); RemoveTimeMessage(message.MessageId); } @@ -380,7 +378,7 @@ private static double GetMessageDuration(string messageId) { return (DateTime.UtcNow - MessageTimings[messageId]).TotalMilliseconds; } - return default; + return 0; } } } From f3cd5eb2dd34be2f2d4b781bbd64b887d75eb0a0 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 4 Nov 2022 09:11:39 +0000 Subject: [PATCH 3/3] change message timing directory to thread safe one Signed-off-by: Neil South --- .../RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj | 4 ++++ src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs | 7 ++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj b/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj index 5f1b00e..cdb28dd 100644 --- a/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj +++ b/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj @@ -62,6 +62,10 @@ + + + + diff --git a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs index 8d37f2c..b6e98c0 100644 --- a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs +++ b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs @@ -15,6 +15,7 @@ */ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Threading; @@ -44,7 +45,7 @@ public class RabbitMQMessageSubscriberService : IMessageBrokerSubscriberService private readonly string _portNumber; private readonly IModel _channel; private bool _disposedValue; - private static readonly Dictionary MessageTimings = new(); + private static readonly ConcurrentDictionary MessageTimings = new(); public string Name => ConfigurationKeys.SubscriberServiceName; @@ -361,14 +362,14 @@ private static void TimeNewMessage(string messageId) { RemoveTimeMessage(messageId); } - MessageTimings.Add(messageId, DateTime.UtcNow); + MessageTimings.TryAdd(messageId, DateTime.UtcNow); } private static void RemoveTimeMessage(string messageId) { if (MessageTimings.ContainsKey(messageId)) { - MessageTimings.Remove(messageId); + MessageTimings.TryRemove(messageId, out _); } }