Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions src/DotNetCore.CAP.RabbitMQ/RabbitMQBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,46 +48,46 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del
if (_usingTaskRun)
{
await _semaphore.WaitAsync(cancellationToken);

var safeBody = body.ToArray();

_ = Task.Run(Consume, cancellationToken).ConfigureAwait(false);
// Copy of the body safe to use outside the RabbitMQ thread context
ReadOnlyMemory<byte> safeBody = body.ToArray();
_ = Task.Run(() => Consume(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, safeBody), cancellationToken).ConfigureAwait(false);
}
else
{
await Consume().ConfigureAwait(false);
await Consume(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
}
}

Task Consume()
{
var headers = new Dictionary<string, string?>();
private Task Consume(string consumerTag, ulong deliveryTag, bool redelivered, string exchange,
string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
var headers = new Dictionary<string, string?>();

if (properties.Headers != null)
foreach (var header in properties.Headers)
{
if (header.Value is byte[] val)
headers.Add(header.Key, Encoding.UTF8.GetString(val));
else
headers.Add(header.Key, header.Value?.ToString());
}
if (properties.Headers != null)
foreach (var header in properties.Headers)
{
if (header.Value is byte[] val)
headers.Add(header.Key, Encoding.UTF8.GetString(val));
else
headers.Add(header.Key, header.Value?.ToString());
}

headers[Messages.Headers.Group] = _groupName;
headers[Messages.Headers.Group] = _groupName;

if (_customHeadersBuilder != null)
if (_customHeadersBuilder != null)
{
var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey,
properties, body);
var customHeaders = _customHeadersBuilder(e, _serviceProvider);
foreach (var customHeader in customHeaders)
{
var e = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey,
properties, body);
var customHeaders = _customHeadersBuilder(e, _serviceProvider);
foreach (var customHeader in customHeaders)
{
headers[customHeader.Key] = customHeader.Value;
}
headers[customHeader.Key] = customHeader.Value;
}
}

var message = new TransportMessage(headers, body);
var message = new TransportMessage(headers, body);

return _msgCallback(message, deliveryTag);
}
return _msgCallback(message, deliveryTag);
}

public async Task BasicAck(ulong deliveryTag)
Expand Down