-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom manage the body to avoid copying #19996
Conversation
Thank you for your contribution @danielmarbach! We will review the pull request and get back to you soon. |
@@ -140,7 +140,7 @@ public BinaryData Body | |||
get => AmqpMessage.GetBody(); | |||
set | |||
{ | |||
AmqpMessage.Body = new AmqpMessageBody(new ReadOnlyMemory<byte>[] { value }); | |||
AmqpMessage.Body = new AmqpMessageBody(new BodyMemory(new ReadOnlyMemory<byte>[] { value })); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The direct body modifications from outside are not yet covered. I wanted to first see what you think about this
This locally passed all Receiver, Sender and Processor tests. It is still missing to address the direct modifications of the underlying annotated message |
This would also account for the more lazy memory allocation for the sender path when the body of the sent message is actually required vs the receive path where we anyway always need to copy but with this change we would copy once only |
@@ -147,7 +94,7 @@ public static BinaryData GetBody(this AmqpAnnotatedMessage message) | |||
{ | |||
if (message.Body.TryGetData(out IEnumerable<ReadOnlyMemory<byte>> dataBody)) | |||
{ | |||
return dataBody.ConvertAndFlattenData(); | |||
return BinaryData.FromBytes((BodyMemory)dataBody); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course this is a bit brute force right now especially since users can smuggle in stuff, but I figured it is good enough to get the conversation started even if it makes some test bomb with an InvalidCastException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah if users directly modify the AmqpAnnotatedMessage, this would not be BodyMemory, but I think this approach will work for the vast majority of cases.
sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/BodyMemory.cs
Outdated
Show resolved
Hide resolved
It's an interesting approach. I'm curious as to how you would bridge the gap for the case where the |
I was thinking about this scenario a bunch, and wanted to bounce some of those thoughts. Assumptions
Possible Approach Since the message types create the It's only when As a very rough sketch, I'm thinking something like: public class ServiceBusMessage
{
private BinaryData _cachedBody = null;
private bool _rawMessageAccessed = false;
public BinaryData Body
{
get
{
if (!_rawMessageAccessed)
{
return _cachedBody ??= ReadDataBody();
}
return ReadDataBody();
}
set
{
WriteDataBody(value);
if (!_rawMessageAccessed)
{
_cachedBody = value;
}
}
}
public AmqpAnnotatedMessage GetRawAmqpMessage()
{
_rawMessageAccessed = true;
_cachedBody = null;
return AmqpMessage;
}
private BinaryData ReadDataBody()
{
// ... code here
}
private void WriteDataBody()
{
// ... code here
}
} Considerations
|
When users do modify the body through AmqpAnnotatedMessage, I think we could still cache the underlying data reference and thereby avoid recomputing. |
We'd need a way to detect the change. I agree it's possible and worth doing if we have a design that isn't overly complex or high effort. Given the very small set of developers that would fall into this scenario, I question if it's worth pursing anything complicated. |
Or maybe just bypass any type of optimizations for those seldom used scenarios given that basically anything is mutable there at any point I'm wondering if we can safely discover the cases. I'll think about this a bit more after my easter break or if I have a moment of motivation even during |
Yeah I agree it may not be worth it, but I think the way to detect the change would be checking if the references are equal, no? |
Potentially, but if there's a list or other backing store for the |
Right, but we would still need to apply the same conversion. |
I may not be following... unless you're thinking of doing a multi-level check, looking first at the IEnumerable itself and if that hasn't changed then inspecting each bucket of the enumerable for changes, I'm not sure that you'd be able to tell. Am I overlooking what you're actually suggesting? I feel like I probably am. Just because I'm thinking of it, the multi-level approach is interesting and may work, but runs afoul of the "don't assume that it is safe to enumerate an IEnumerable multiple times" guidance. |
Well if the underlying buffer changes values (but the reference remains the same), those changes would be reflected in the BinaryData instance provided that there was only one Data section. If there were multiple data sections, then we would still need to recombine them. |
90ba604
to
66c1358
Compare
await using var serviceBusClient = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
RetryOptions = new ServiceBusRetryOptions
{
TryTimeout = TimeSpan.FromSeconds(60)
}
});
await using var sender = serviceBusClient.CreateSender(destination);
var messages = new List<ServiceBusMessage>(10000);
for (int i = 0; i < 10000; i++)
{
messages.Add(new ServiceBusMessage(UTF8.GetBytes($"Deep Dive {i} Deep Dive {i} Deep Dive {i} Deep Dive {i} Deep Dive {i} Deep Dive {i}")));
if (i % 1000 == 0)
{
await sender.SendMessagesAsync(messages);
messages.Clear();
}
}
await sender.SendMessagesAsync(messages);
WriteLine("Message sent");
Console.WriteLine("Take snapshot");
Console.ReadLine();
var countDownEvent = new CountdownEvent(10000);
var processorOptions = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 100,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.PeekLock,
};
await using var receiver = serviceBusClient.CreateProcessor(destination, processorOptions);
receiver.ProcessMessageAsync += async messageEventArgs =>
{
var message = messageEventArgs.Message;
await Out.WriteLineAsync(
$"Received message with '{message.MessageId}' and content '{UTF8.GetString(message.Body)}' / binary {message.Body}");
await messageEventArgs.CompleteMessageAsync(message);
countDownEvent.Signal();
};
receiver.ProcessErrorAsync += async errorEventArgs =>
{
await Out.WriteLineAsync($"Exception: {errorEventArgs.Exception}");
await Out.WriteLineAsync($"FullyQualifiedNamespace: {errorEventArgs.FullyQualifiedNamespace}");
await Out.WriteLineAsync($"ErrorSource: {errorEventArgs.ErrorSource}");
await Out.WriteLineAsync($"EntityPath: {errorEventArgs.EntityPath}");
};
await receiver.StartProcessingAsync();
countDownEvent.Wait();
Console.WriteLine("Take snapshot");
Console.ReadLine();
await receiver.StopProcessingAsync(); I deliberately assessed the body twice to demonstrate the effect of the changes. So also the allocations of GetBytArray are gone BeforeAfter |
|
||
public static BodyMemory From(IEnumerable<ReadOnlyMemory<byte>> segments) | ||
{ | ||
return segments as BodyMemory ?? new BodyMemory(segments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the user smuggled in IEnumerable<ReadOnlyMemory<byte>>
over the lower level APIs it will be freshly wrapped and materialized when needed.
I think this looks promising #19996 (comment) So with these changes, the body is materialized ones on the receive path and no longer redundantly copied. On the send path it is only materialized when the body is accessed but also only once. |
Can also do a run with single body access if you want |
That would be great! Out of curiosity, is that dotTrace? |
sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/BodyMemory.cs
Outdated
Show resolved
Hide resolved
DotMemory |
/azp run net - servicebus - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
🎆 |
Alternative to #19985
This checklist is used to make sure that common guidelines for a pull request are followed.
Draft
mode if it is:General Guidelines and Best Practices
Testing Guidelines
SDK Generation Guidelines
*.csproj
andAssemblyInfo.cs
files have been updated with the new version of the SDK. Please double check nuget.org current release version.Additional management plane SDK specific contribution checklist:
Note: Only applies to
Microsoft.Azure.Management.[RP]
orAzure.ResourceManager.[RP]
Management plane SDK Troubleshooting
new service
label and/or contact assigned reviewer.Verify Code Generation
step, please ensure:generate.ps1/cmd
to generate this PR instead of callingautorest
directly.Please pay attention to the @microsoft.csharp version output after running generate.ps1. If it is lower than current released version (2.3.82), please run it again as it should pull down the latest version,
Old outstanding PR cleanup
Please note:
If PRs (including draft) has been out for more than 60 days and there are no responses from our query or followups, they will be closed to maintain a concise list for our reviewers.