diff --git a/src/Plugins/RabbitMQ/Logger.cs b/src/Plugins/RabbitMQ/Logger.cs index e187145..eae8c32 100644 --- a/src/Plugins/RabbitMQ/Logger.cs +++ b/src/Plugins/RabbitMQ/Logger.cs @@ -38,8 +38,8 @@ public static partial class Logger [LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")] public static partial void SendingAcknowledgement(this ILogger logger, string messageId); - [LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}.")] - public static partial void AcknowledgementSent(this ILogger logger, string messageId); + [LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}. Event Duration {durationMilliseconds}")] + public static partial void AcknowledgementSent(this ILogger logger, string messageId, double durationMilliseconds); [LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")] public static partial void SendingNAcknowledgement(this ILogger logger, string messageId); diff --git a/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj b/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj index 5f1b00e..cdb28dd 100644 --- a/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj +++ b/src/Plugins/RabbitMQ/Monai.Deploy.Messaging.RabbitMQ.csproj @@ -62,6 +62,10 @@ + + + + diff --git a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs index 9c85e59..b6e98c0 100644 --- a/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs +++ b/src/Plugins/RabbitMQ/RabbitMqMessageSubscriberService.cs @@ -15,6 +15,7 @@ */ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Threading; @@ -44,6 +45,7 @@ public class RabbitMQMessageSubscriberService : IMessageBrokerSubscriberService private readonly string _portNumber; private readonly IModel _channel; private bool _disposedValue; + private static readonly ConcurrentDictionary MessageTimings = new(); public string Name => ConfigurationKeys.SubscriberServiceName; @@ -209,10 +211,12 @@ public void SubscribeAsync(string[] topics, string queue, Func + { + ["EventDuration"] = eventDuration + }); + _logger.AcknowledgementSent(message.MessageId, eventDuration); + RemoveTimeMessage(message.MessageId); } public async Task RequeueWithDelay(MessageBase message) @@ -274,6 +286,7 @@ public void Reject(MessageBase message, bool requeue = true) _logger.SendingNAcknowledgement(message.MessageId); _channel.BasicNack(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false, requeue: requeue); _logger.NAcknowledgementSent(message.MessageId, requeue); + RemoveTimeMessage(message.MessageId); } protected virtual void Dispose(bool disposing) @@ -342,5 +355,31 @@ private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliver deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)), CancellationToken.None); } + + private static void TimeNewMessage(string messageId) + { + if (MessageTimings.ContainsKey(messageId)) + { + RemoveTimeMessage(messageId); + } + MessageTimings.TryAdd(messageId, DateTime.UtcNow); + } + + private static void RemoveTimeMessage(string messageId) + { + if (MessageTimings.ContainsKey(messageId)) + { + MessageTimings.TryRemove(messageId, out _); + } + } + + private static double GetMessageDuration(string messageId) + { + if (MessageTimings.ContainsKey(messageId)) + { + return (DateTime.UtcNow - MessageTimings[messageId]).TotalMilliseconds; + } + return 0; + } } }