From ca6ba97a54e9a1d16ed6e65e1a1c12aa5792da33 Mon Sep 17 00:00:00 2001 From: Oleg Temnov Date: Tue, 11 Nov 2025 14:10:19 +0100 Subject: [PATCH] Fix RabbitMQ consumer concurrency by ensuring safe body handling in asynchronous processing --- .../RabbitMQBasicConsumer.cs | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs index 966b78bc..4e0828b2 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs @@ -48,46 +48,46 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del if (_usingTaskRun) { await _semaphore.WaitAsync(cancellationToken); - - var safeBody = body.ToArray(); - - _ = Task.Run(Consume, cancellationToken).ConfigureAwait(false); + // Copy of the body safe to use outside the RabbitMQ thread context + ReadOnlyMemory safeBody = body.ToArray(); + _ = Task.Run(() => Consume(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, safeBody), cancellationToken).ConfigureAwait(false); } else { - await Consume().ConfigureAwait(false); + await Consume(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false); } + } - Task Consume() - { - var headers = new Dictionary(); + private Task Consume(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, + string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) + { + var headers = new Dictionary(); - if (properties.Headers != null) - foreach (var header in properties.Headers) - { - if (header.Value is byte[] val) - headers.Add(header.Key, Encoding.UTF8.GetString(val)); - else - headers.Add(header.Key, header.Value?.ToString()); - } + if (properties.Headers != null) + foreach (var header in properties.Headers) + { + if (header.Value is byte[] val) + headers.Add(header.Key, Encoding.UTF8.GetString(val)); + else + headers.Add(header.Key, header.Value?.ToString()); + } - headers[Messages.Headers.Group] = _groupName; + headers[Messages.Headers.Group] = _groupName; - if (_customHeadersBuilder != null) + if (_customHeadersBuilder != null) + { + var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, + properties, body); + var customHeaders = _customHeadersBuilder(e, _serviceProvider); + foreach (var customHeader in customHeaders) { - var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, - properties, body); - var customHeaders = _customHeadersBuilder(e, _serviceProvider); - foreach (var customHeader in customHeaders) - { - headers[customHeader.Key] = customHeader.Value; - } + headers[customHeader.Key] = customHeader.Value; } + } - var message = new TransportMessage(headers, body); + var message = new TransportMessage(headers, body); - return _msgCallback(message, deliveryTag); - } + return _msgCallback(message, deliveryTag); } public async Task BasicAck(ulong deliveryTag)