Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split body memory into distinct parts for better clarity and avoiding to copy for the send path when a single ROM is passed #20098

Merged
merged 6 commits into from Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
162 changes: 105 additions & 57 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/BodyMemory.cs
Expand Up @@ -6,96 +6,144 @@
using System.Collections.Generic;
jsquire marked this conversation as resolved.
Show resolved Hide resolved
using System.Linq;
using Azure.Core;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;

namespace Azure.Messaging.ServiceBus.Amqp
{
internal sealed class BodyMemory : IEnumerable<ReadOnlyMemory<byte>>
internal abstract class BodyMemory : IEnumerable<ReadOnlyMemory<byte>>
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
{
private ArrayBufferWriter<byte> _writer;
private IList<ReadOnlyMemory<byte>> _segments;
private IEnumerable<ReadOnlyMemory<byte>> _lazySegments;

private ReadOnlyMemory<byte> WrittenMemory
public static BodyMemory FromReadOnlyMemory(IEnumerable<ReadOnlyMemory<byte>> segments)
{
get
return segments switch
{
if (_lazySegments != null)
{
foreach (var segment in _lazySegments)
{
Append(segment);
}

_lazySegments = null;
}

return _writer?.WrittenMemory ?? ReadOnlyMemory<byte>.Empty;
}
BodyMemory bodyMemory => bodyMemory,
_ => new SendWithMultipleDataSegmentsBodyMemory(segments)
};
}

// for the send path the combined memory for the body is not precomputed
private BodyMemory(IEnumerable<ReadOnlyMemory<byte>> dataSegments)
public static BodyMemory FromReadOnlyMemory(ReadOnlyMemory<byte> segment)
{
_lazySegments = dataSegments;
return new SendWithSingleDataSegmentBodyMemory(segment);
}

// for the receive path the combined memory is precomputed because we need to copy the underlying AMQP data anyway
private BodyMemory(IEnumerable<Data> dataSegments)
public static BodyMemory FromAmqpData(IEnumerable<Data> segments)
{
foreach (var segment in dataSegments)
{
Append(segment);
}
return new ReceiveBodyMemory(segments ?? Enumerable.Empty<Data>());
}

public static BodyMemory FromReadOnlyMemory(IEnumerable<ReadOnlyMemory<byte>> segments)
{
return segments as BodyMemory ?? new BodyMemory(segments);
}
protected abstract ReadOnlyMemory<byte> WrittenMemory { get; }

public static BodyMemory FromAmqpData(IEnumerable<Data> segments)
public abstract IEnumerator<ReadOnlyMemory<byte>> GetEnumerator();
IEnumerator IEnumerable.GetEnumerator()
{
return new BodyMemory(segments ?? Enumerable.Empty<Data>());
return GetEnumerator();
}

public IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
public static implicit operator ReadOnlyMemory<byte>(BodyMemory memory)
{
return _segments?.GetEnumerator() ?? _lazySegments.GetEnumerator();
return memory.WrittenMemory;
}

IEnumerator IEnumerable.GetEnumerator()
private sealed class SendWithSingleDataSegmentBodyMemory : BodyMemory
{
return GetEnumerator();
}
public SendWithSingleDataSegmentBodyMemory(ReadOnlyMemory<byte> dataSegment)
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
{
WrittenMemory = dataSegment;
}

private void Append(ReadOnlyMemory<byte> segment)
{
_writer ??= new ArrayBufferWriter<byte>();
_segments ??= new List<ReadOnlyMemory<byte>>();
protected override ReadOnlyMemory<byte> WrittenMemory { get; }

var memory = _writer.GetMemory(segment.Length);
segment.CopyTo(memory);
_writer.Advance(segment.Length);
_segments.Add(memory.Slice(0, segment.Length));
public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
{
yield return WrittenMemory;
}
}

private void Append(Data segment)
private sealed class ReceiveBodyMemory : BodyMemory
{
ReadOnlyMemory<byte> dataToAppend = segment.Value switch
private ArrayBufferWriter<byte> _writer;
private IList<ReadOnlyMemory<byte>> _segments;

// for the receive path the combined memory is precomputed because we need to copy the underlying AMQP data anyway
internal ReceiveBodyMemory(IEnumerable<Data> dataSegments)
{
byte[] byteArray => byteArray,
ArraySegment<byte> arraySegment => arraySegment,
_ => ReadOnlyMemory<byte>.Empty
};
foreach (var segment in dataSegments)
{
Append(segment);
}
}

protected override ReadOnlyMemory<byte> WrittenMemory => _writer?.WrittenMemory ?? ReadOnlyMemory<byte>.Empty;

Append(dataToAppend);
public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
{
return _segments.GetEnumerator();
}

private void Append(Data segment)
{
ReadOnlyMemory<byte> dataToAppend = segment.Value switch
{
byte[] byteArray => byteArray,
ArraySegment<byte> arraySegment => arraySegment,
_ => ReadOnlyMemory<byte>.Empty
};

_writer ??= new ArrayBufferWriter<byte>();
_segments ??= new List<ReadOnlyMemory<byte>>();

var memory = _writer.GetMemory(dataToAppend.Length);
dataToAppend.CopyTo(memory);
_writer.Advance(dataToAppend.Length);
_segments.Add(memory.Slice(0, dataToAppend.Length));
}
}

public static implicit operator ReadOnlyMemory<byte>(BodyMemory memory)
private sealed class SendWithMultipleDataSegmentsBodyMemory : BodyMemory
{
return memory.WrittenMemory;
private ArrayBufferWriter<byte> _writer;
private IList<ReadOnlyMemory<byte>> _segments;
private IEnumerable<ReadOnlyMemory<byte>> _lazySegments;

protected override ReadOnlyMemory<byte> WrittenMemory
{
get
{
if (_lazySegments != null)
{
foreach (var segment in _lazySegments)
{
Append(segment);
}

_lazySegments = null;
}

return _writer?.WrittenMemory ?? ReadOnlyMemory<byte>.Empty;
}
}

// for the send path the combined memory for the body is not precomputed
internal SendWithMultipleDataSegmentsBodyMemory(IEnumerable<ReadOnlyMemory<byte>> dataSegments)
{
_lazySegments = dataSegments;
}

private void Append(ReadOnlyMemory<byte> segment)
{
_writer ??= new ArrayBufferWriter<byte>();
_segments ??= new List<ReadOnlyMemory<byte>>();

var memory = _writer.GetMemory(segment.Length);
segment.CopyTo(memory);
_writer.Advance(segment.Length);
_segments.Add(memory.Slice(0, segment.Length));
}

public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
{
return _segments?.GetEnumerator() ?? _lazySegments.GetEnumerator();
}
}
}
}
Expand Up @@ -44,7 +44,7 @@ public class ServiceBusMessage
/// <param name="body">The payload of the message in bytes.</param>
public ServiceBusMessage(ReadOnlyMemory<byte> body)
{
AmqpMessageBody amqpBody = new AmqpMessageBody(BodyMemory.FromReadOnlyMemory(new ReadOnlyMemory<byte>[] { body }));
AmqpMessageBody amqpBody = new AmqpMessageBody(BodyMemory.FromReadOnlyMemory(body));
AmqpMessage = new AmqpAnnotatedMessage(amqpBody);
}

Expand Down Expand Up @@ -140,7 +140,7 @@ public BinaryData Body
get => AmqpMessage.GetBody();
set
{
AmqpMessage.Body = new AmqpMessageBody(BodyMemory.FromReadOnlyMemory(new ReadOnlyMemory<byte>[] { value }));
AmqpMessage.Body = new AmqpMessageBody(BodyMemory.FromReadOnlyMemory(value));
}
}

Expand Down