Skip to content

Commit

Permalink
Do not create consumerTag string in HandleBasicDeliver when conumer e…
Browse files Browse the repository at this point in the history
…xists in the dictionary
  • Loading branch information
Gábor Zavarkó committed May 18, 2023
1 parent 929047d commit 7c034d0
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 16 deletions.
5 changes: 3 additions & 2 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class ConsumerDispatcherBase
private protected IConsumerDispatcher _dispatcher;
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
protected readonly string _consumerTag = "ConsumerTag";
protected static readonly byte[] _consumerTagBytes = Encoding.UTF8.GetBytes("ConsumerTag");
protected readonly ulong _deliveryTag = 500UL;
protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange");
protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey");
Expand Down Expand Up @@ -43,7 +44,7 @@ public void AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand All @@ -61,7 +62,7 @@ public void ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/framing/BasicDeliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal readonly struct BasicDeliver : IAmqpMethod
{
public readonly string _consumerTag;
public readonly ReadOnlyMemory<byte> _consumerTag;
public readonly ulong _deliveryTag;
public readonly bool _redelivered;
public readonly ReadOnlyMemory<byte> _exchange;
public readonly ReadOnlyMemory<byte> _routingKey;

public BasicDeliver(ReadOnlyMemory<byte> data)
{
int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag);
int offset = WireFormatting.ReadShortMemory(data, out _consumerTag);
offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag);
offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered);
offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,87 @@
using System.Collections.Generic;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;

namespace RabbitMQ.Client.ConsumerDispatching
{
#nullable enable
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer();
private readonly Dictionary<string, IBasicConsumer> _consumers;
private readonly Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> _consumers;

public IBasicConsumer? DefaultConsumer { get; set; }

protected ConsumerDispatcherBase()
{
_consumers = new Dictionary<string, IBasicConsumer>();
_consumers = new Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer, string)>(MemoryOfByteEqualityComparer.Instance);
}

protected void AddConsumer(IBasicConsumer consumer, string tag)
{
lock (_consumers)
{
_consumers[tag] = consumer;
var tagBytes = Encoding.UTF8.GetBytes(tag);
_consumers[tagBytes] = (consumer, tag);
}
}

protected IBasicConsumer GetConsumerOrDefault(string tag)
protected (IBasicConsumer consumer, string consumerTag) GetConsumerOrDefault(ReadOnlyMemory<byte> tag)
{
lock (_consumers)
{
return _consumers.TryGetValue(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
if (_consumers.TryGetValue(tag, out var consumerPair))
{
return consumerPair;
}

#if !NETSTANDARD
var consumerTag = Encoding.UTF8.GetString(tag.Span);
#else
string consumerTag;
unsafe
{
fixed (byte* bytes = tag.Span)
{
consumerTag = Encoding.UTF8.GetString(bytes, tag.Length);
}
}
#endif

return (GetDefaultOrFallbackConsumer(), consumerTag);
}
}

public IBasicConsumer GetAndRemoveConsumer(string tag)
{
lock (_consumers)
{
return _consumers.Remove(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
var utf8 = Encoding.UTF8;
var pool = ArrayPool<byte>.Shared;
var buf = pool.Rent(utf8.GetMaxByteCount(tag.Length));
#if NETSTANDARD
int count = utf8.GetBytes(tag, 0, tag.Length, buf, 0);
#else
int count = utf8.GetBytes(tag, buf);
#endif
var memory = buf.AsMemory(0, count);
var result = _consumers.Remove(memory, out var consumerPair) ? consumerPair.consumer : GetDefaultOrFallbackConsumer();
pool.Return(buf);
return result;
}
}

public Task ShutdownAsync(ShutdownEventArgs reason)
{
lock (_consumers)
{
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers)
foreach (KeyValuePair<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> pair in _consumers)
{
ShutdownConsumer(pair.Value, reason);
ShutdownConsumer(pair.Value.consumer, reason);
}
_consumers.Clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag)
}
}

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
public void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag, ulong deliveryTag, bool redelivered,
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedMethodArray, byte[] rentedArray)
{
if (!IsShutdown)
{
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray));
var consumerPair = GetConsumerOrDefault(consumerTag);
_writer.TryWrite(new WorkStruct(consumerPair.consumer, consumerPair.consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal interface IConsumerDispatcher

void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag);

void HandleBasicDeliver(string consumerTag,
void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag,
ulong deliveryTag,
bool redelivered,
ReadOnlyMemory<byte> exchange,
Expand Down
41 changes: 41 additions & 0 deletions projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;

namespace RabbitMQ.Util;

internal sealed class MemoryOfByteEqualityComparer : IEqualityComparer<ReadOnlyMemory<byte>>
{
public static MemoryOfByteEqualityComparer Instance { get; } = new MemoryOfByteEqualityComparer();

public bool Equals(ReadOnlyMemory<byte> left, ReadOnlyMemory<byte> right)
{
return left.Span.SequenceEqual(right.Span);
}

public int GetHashCode(ReadOnlyMemory<byte> value)
{
#if NETSTANDARD
unchecked
{
int hashCode = 0;
var longPart = MemoryMarshal.Cast<byte, long>(value.Span);
foreach (long item in longPart)
{
hashCode = (hashCode * 397) ^ item.GetHashCode();
}

foreach (int item in value.Span.Slice(longPart.Length * 8))
{
hashCode = (hashCode * 397) ^ item.GetHashCode();
}

return hashCode;
}
#else
HashCode result = default;
result.AddBytes(value.Span);
return result.ToHashCode();
#endif
}
}

0 comments on commit 7c034d0

Please sign in to comment.