Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage by using the body directly instead of copying in BasicPublishAsync #1445

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

GerardSmit
Copy link

@GerardSmit GerardSmit commented Dec 15, 2023

Proposed Changes

This PR adds the ability to use the body in BasicPublishAsync directly when sending it to RabbitMQ, instead of copying it to a temporary byte-array.

Technical information

Currently BasicPublishAsync rents an array from the ArrayPool, with the frame + body length:

int size = Method.FrameSize + Header.FrameSize +
method.GetRequiredBufferSize() + header.GetRequiredBufferSize() +
BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes) + remainingBodyBytes;
// Will be returned by SocketFrameWriter.WriteLoop
byte[] array = ClientArrayPool.Rent(size);

Then the body is being copied to this array.

while (remainingBodyBytes > 0)
{
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
offset += BodySegment.WriteTo(array.AsSpan(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize));
remainingBodyBytes -= frameSize;
}

In our application, we're sometimes forwarding large bodies to RabbitMQ. The problem is that the ArrayPool<byte>.Shared is using a separate buckets in each thread, causing a lot of new byte arrays to be generated.

In addition, the BasicPublishAsync writes the rented array in a channel:

await _channelWriter.WriteAsync(frames)
.ConfigureAwait(false);

Which is later written to the pipe:

while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
{
while (_channelReader.TryRead(out RentedMemory frames))
{
try
{
await _pipeWriter.WriteAsync(frames.Memory)
.ConfigureAwait(false);
RabbitMqClientEventSource.Log.CommandSent(frames.Size);
}

If RabbitMQ is has a slow response rate or we're trying to send a lot of data at once, the channel can fill up quickly. Causing it to hold 128 rented arrays.

New API

ConnectionFactory

In the factory there is a new property called CopyBodyToMemoryThreshold:

ConnectionFactory factory = new ConnectionFactory
{
    CopyBodyToMemoryThreshold = 4096
}

When this value is set, every data that is larger than the provided value (in this instance, 4096) will be used directly instead of being copied to a new array.
If the data is smaller than the provided value, the body is still copied to a new array.

When we use the data directly, we cannot send the data in the background like we're currently doing. We have to wait until the bytes are written to the pipe: otherwise, the application can modify the buffer. So, whenever we're using the buffer directly, the BasicPublishAsync wait until the bytes are written to the pipe.

This solution is a middle ground: smaller bodies are sent in the background but allocating, larger bodies are sent directly but are blocking.

The default value of CopyBodyToMemoryThreshold is int.MaxValue, so there is no breaking change.

IChannel

BasicPublishAsync has a new parameter: bool? copyBody = null:

ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, bool? copyBody = null)
    where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

When the value is:

  • true: the body is copied to a new array and sent in the background.
  • false: the body is written directly but blocking.
  • null: the length is being compared to the config CopyBodyToMemoryThreshold in the connection

There are two new overloads that allows ReadOnlySequence<byte> to be used as body:

ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlySequence<byte> body = default, bool mandatory = false, bool? copyBody = null)
    where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

This way you can use System.IO.Pipelines without allocating to a temporary ReadOnlyMemory.

Types of Changes

What types of changes does your code introduce to this project?

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
    Some tests timed out.
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
    I'll wait until this API is approved/changes have been made.
  • Any dependent changes have been merged and published in related repositories

Further Comments

  • Maybe the naming of the properties and/or parameters are confusing and have to be renamed.
  • I tried my best to make unit tests to check if "CopyBodyToMemoryThreshold" works as indented but doing so I had to make some stuff internal instead of private. I also had to track the number of allocating bytes in the connection. Is this OK?

@lukebakken lukebakken self-assigned this Dec 15, 2023
@lukebakken lukebakken added this to the 7.0.0 milestone Dec 15, 2023
@lukebakken
Copy link
Contributor

Hello, thanks for this contribution. Could you please provide a test application that clearly demonstrates the issue that this PR solves?

@michaelklishin
Copy link
Member

@GerardSmit do you have any benchmarking data that would help the maintainers (and users alike) reason about the scale of the improvements?

@paulomorgado
Copy link
Contributor

Would it be feasible to never copy and being the user of the library the one that copies if needed?

@lukebakken
Copy link
Contributor

Would it be feasible to never copy and being the user of the library the one that copies if needed?

Probably! In my head I have a task to check out how other .NET client libraries work in this regard. I will add a task for the 7.0 release to do that.

@paulomorgado
Copy link
Contributor

Another nice to have would be support Stream and ReadOnlySequence<byte>.

@lukebakken
Copy link
Contributor

@paulomorgado please feel free to add comments to #1446 so they aren't lost!

@lukebakken lukebakken self-requested a review December 15, 2023 16:29
@GerardSmit
Copy link
Author

GerardSmit commented Dec 15, 2023

Hello, thanks for this contribution. Could you please provide a test application that clearly demonstrates the issue that this PR solves?

do you have any benchmarking data that would help the maintainers (and users alike) reason about the scale of the improvements?

I'll see if I can create test application / benchmark tomorrow.

Would it be feasible to never copy and being the user of the library the one that copies if needed?

This would mean the background channel (in the code below) should always be blocking (so the buffer doesn't get modified by the application). In this case a Channel<T> would be unnecessary. Maybe it's better to use an async lock if we go this way.

public async ValueTask WriteAsync(RentedMemory frames)
{
if (_closed)
{
frames.Dispose();
await Task.Yield();
}
else
{
await _channelWriter.WriteAsync(frames)
.ConfigureAwait(false);
}
}

Another nice to have would be support Stream and ReadOnlySequence.

As stated in the PR, ReadOnlySequence<byte> support has been added.
Stream would be a nice addition 😄

I'll wait with any changes until the research in #1446 is done.

@lukebakken
Copy link
Contributor

The more I think about this PR, optimizations like this feel like version 8 sorts of changes, not version 7.

Thoughts?

@paulomorgado
Copy link
Contributor

The more I think about this PR, optimizations like this feel like version 8 sorts of changes, not version 7.

Thoughts?

Probably. Unless the changes can be reduced by forcing the user to copy the message if it needs to be isolated.

@GerardSmit
Copy link
Author

GerardSmit commented Dec 15, 2023

@lukebakken @michaelklishin the test application can be found here: https://github.com/GerardSmit/RabbitMQ.MemoryTest

Results

Official client (copying)

$ dotnet run -c Release -- --mb=1 --iterations=100 --tasks=16
Official version
Body size       : 1 MB
Iterations      : 100
Tasks           : 16
Non-copying     : False
Startup memory  : 20 MB

---  Start  ---
Memory usage: 22 MB
Memory usage: 1765 MB
Memory usage: 2167 MB
Memory usage: 2632 MB

--- Results ---
Avg time        : 3737 ms
Min time        : 3679 ms
Max time        : 3759 ms
Memory          : 2728 MB
Queue length    : 1600 / 1600
Valid messages  : 100 / 100 (first 100 of 1600)

Fork (copying)

$ dotnet run -c Release -p:Fork=true -- --mb=1 --iterations=100 --tasks=16
Forked version
Body size       : 1 MB
Iterations      : 100
Tasks           : 16
Non-copying     : False
Startup memory  : 20 MB

---  Start  ---
Memory usage: 22 MB
Memory usage: 1934 MB
Memory usage: 2398 MB
Memory usage: 2819 MB

--- Results ---
Avg time        : 3447 ms
Min time        : 3384 ms
Max time        : 3489 ms
Memory          : 2854 MB
Queue length    : 1600 / 1600
Valid messages  : 100 / 100 (first 100 of 1600)

Fork (non-copying)

$ dotnet run -c Release -p:Fork=true -- --mb=1 --iterations=100 --tasks=16 --nc
Forked version
Body size       : 1 MB
Iterations      : 100
Tasks           : 16
Non-copying     : True
Startup memory  : 20 MB

---  Start  ---
Memory usage: 22 MB
Memory usage: 61 MB
Memory usage: 61 MB
Memory usage: 61 MB

--- Results ---
Avg time        : 3252 ms
Min time        : 3211 ms
Max time        : 3277 ms
Memory          : 61 MB
Queue length    : 1600 / 1600
Valid messages  : 100 / 100 (first 100 of 1600)

@GerardSmit
Copy link
Author

The more I think about this PR, optimizations like this feel like version 8 sorts of changes, not version 7.

Understandable. I'll keep the fork up-to-date with 7.0 in the meantime and publish it to NuGet; since without it we're getting a lot of OutOfMemory-exceptions in our application.

We're running the application in a restricted container in Kubernetes, at first we thought it was cgroups not being supported in .NET. It does since .NET 9.0 (dotnet/runtime#93611) so we tried a daily build, without success. In the end we were seeing a lot of memory allocation in the ArrayPool coming from RabbitMQ, hence this PR.

@PauloHMattos
Copy link

IMO the library should not make any copy and simply use the data that it received in BasicPublishAsync.
Once the data is passed to the library, the user should await the completion of BasicPublishAsync and only then the data is safe to be modified/reused.

My view is that BasicPublishAsync would only complete once the data is published (or written to the Pipe) and is no longer being utilized by the client. If a user from the library does not wish to wait for the data to be published, he should not await the call to BasicPublishAsync, and any polling or reuse of this data should be make safe by the user and not by the library copying data and doing unnecessary work.

That way each application can make use of this client in a way that better suits itself.
In my application is very common to publish large payloads (10 to 50 MB), and the copying as described in this PR is a major problem to our memory usage.
For us it would be beneficial for these changes to be incorporated in v7, but if not I guess we can use the GerardSmit fork for a while.

@paulomorgado
Copy link
Contributor

IMO the library should not make any copy and simply use the data that it received in BasicPublishAsync. Once the data is passed to the library, the user should await the completion of BasicPublishAsync and only then the data is safe to be modified/reused.

My view is that BasicPublishAsync would only complete once the data is published (or written to the Pipe) and is no longer being utilized by the client. If a user from the library does not wish to wait for the data to be published, he should not await the call to BasicPublishAsync, and any polling or reuse of this data should be make safe by the user and not by the library copying data and doing unnecessary work.

That way each application can make use of this client in a way that better suits itself. In my application is very common to publish large payloads (10 to 50 MB), and the copying as described in this PR is a major problem to our memory usage. For us it would be beneficial for these changes to be incorporated in v7, but if not I guess we can use the GerardSmit fork for a while.

I agree with most of this, except that it's never a good idea to not await in any way async calls.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants