Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LINQ allocations for all batched sends #26911

Merged
merged 10 commits into from Feb 14, 2022
Expand Up @@ -102,7 +102,7 @@ public override bool TryAddMessage(ServiceBusMessage message)
{
// Initialize the size by reserving space for the batch envelope taking into account the properties from the first
// message which will be used to populate properties on the batch envelope.
amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(new ServiceBusMessage[] { message }, forceBatch: true);
amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(message, forceBatch: true);
}
else
{
Expand Down Expand Up @@ -154,14 +154,14 @@ public override void Clear()
///
/// <returns>The set of messages as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>()
public override List<T> AsList<T>()
{
if (typeof(T) != typeof(ServiceBusMessage))
{
throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources.UnsupportedTransportEventType, typeof(T).Name));
}

return (IEnumerable<T>)BatchMessages;
return BatchMessages as List<T>;
}

/// <summary>
Expand Down
Expand Up @@ -30,7 +30,14 @@ internal static class AmqpMessageConverter
/// <summary>The size, in bytes, to use as a buffer for stream operations.</summary>
private const int StreamBufferSizeInBytes = 512;

public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch = false)
public static AmqpMessage BatchSBMessagesAsAmqpMessage(ServiceBusMessage source, bool forceBatch = false)
{
Argument.AssertNotNull(source, nameof(source));
var batchMessages = new List<AmqpMessage>(1) { SBMessageToAmqpMessage(source) };
return BuildAmqpBatchFromMessages(batchMessages, source, forceBatch);
}

public static AmqpMessage BatchSBMessagesAsAmqpMessage(List<ServiceBusMessage> source, bool forceBatch = false)
{
Argument.AssertNotNull(source, nameof(source));
return BuildAmqpBatchFromMessage(source, forceBatch);
Expand All @@ -46,25 +53,27 @@ public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<ServiceBusMes
///
/// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns>
///
private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch)
private static AmqpMessage BuildAmqpBatchFromMessage(List<ServiceBusMessage> source, bool forceBatch)
{
AmqpMessage firstAmqpMessage = null;
ServiceBusMessage firstMessage = null;

return BuildAmqpBatchFromMessages(
source.Select(sbMessage =>
var batchMessages = new List<AmqpMessage>(source.Count);
foreach (ServiceBusMessage sbMessage in source)
{
if (firstAmqpMessage == null)
{
if (firstAmqpMessage == null)
{
firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
firstMessage = sbMessage;
return firstAmqpMessage;
}
else
{
return SBMessageToAmqpMessage(sbMessage);
}
}).ToList(), firstMessage, forceBatch);
firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
firstMessage = sbMessage;
batchMessages.Add(firstAmqpMessage);
}
else
{
batchMessages.Add(SBMessageToAmqpMessage(sbMessage));
}
}

return BuildAmqpBatchFromMessages(batchMessages, firstMessage, forceBatch);
}

/// <summary>
Expand All @@ -78,7 +87,7 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<ServiceBusMessa
/// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns>
///
private static AmqpMessage BuildAmqpBatchFromMessages(
IList<AmqpMessage> batchMessages,
List<AmqpMessage> batchMessages,
ServiceBusMessage firstMessage,
bool forceBatch)
{
Expand All @@ -90,13 +99,14 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<ServiceBusMessa
}
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
else
{
batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
var data = new List<Data>(batchMessages.Count);
foreach (var message in batchMessages)
{
message.Batchable = true;
using var messageStream = message.ToStream();
return new Data { Value = ReadStreamToArraySegment(messageStream) };
}));

data.Add(new Data { Value = ReadStreamToArraySegment(messageStream) });
}
batchEnvelope = AmqpMessage.Create(data);
batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
}

Expand Down
Expand Up @@ -217,7 +217,7 @@ internal class AmqpSender : TransportSender
timeout,
token).ConfigureAwait(false);
},
(this, messageBatch.AsEnumerable<ServiceBusMessage>()),
(this, messageBatch.AsList<ServiceBusMessage>()),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
Expand All @@ -231,11 +231,10 @@ internal class AmqpSender : TransportSender
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
internal virtual async Task SendBatchInternalAsync(
IEnumerable<ServiceBusMessage> messages,
List<ServiceBusMessage> messages,
TimeSpan timeout,
CancellationToken cancellationToken)
{
var stopWatch = ValueStopwatch.StartNew();
var link = default(SendingAmqpLink);

try
Expand Down Expand Up @@ -302,7 +301,7 @@ internal class AmqpSender : TransportSender
/// <param name="messages">The list of messages to send.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public override async Task SendAsync(
IReadOnlyList<ServiceBusMessage> messages,
List<ServiceBusMessage> messages,
CancellationToken cancellationToken)
{
await _retryPolicy.RunOperation(static async (value, timeout, token) =>
Expand Down Expand Up @@ -376,7 +375,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override async Task<IReadOnlyList<long>> ScheduleMessagesAsync(
IReadOnlyList<ServiceBusMessage> messages,
List<ServiceBusMessage> messages,
CancellationToken cancellationToken = default)
{
return await _retryPolicy.RunOperation(static async (value, timeout, token) =>
Expand All @@ -401,7 +400,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
/// <param name="cancellationToken"></param>
/// <returns></returns>
internal async Task<IReadOnlyList<long>> ScheduleMessageInternalAsync(
IReadOnlyList<ServiceBusMessage> messages,
List<ServiceBusMessage> messages,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
Expand All @@ -418,7 +417,7 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name;
}

List<AmqpMap> entries = new List<AmqpMap>();
List<AmqpMap> entries = new List<AmqpMap>(messages.Count);
foreach (ServiceBusMessage message in messages)
{
using AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message);
Expand Down
Expand Up @@ -61,7 +61,7 @@ internal abstract class TransportMessageBatch : IDisposable
///
/// <returns>The set of messages as an enumerable of the requested type.</returns>
///
public abstract IEnumerable<T> AsEnumerable<T>();
public abstract List<T> AsList<T>();

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="TransportMessageBatch" />.
Expand Down
Expand Up @@ -57,7 +57,7 @@ internal abstract class TransportSender
/// <param name="messages">The list of messages to send.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public abstract Task SendAsync(
IReadOnlyList<ServiceBusMessage> messages,
List<ServiceBusMessage> messages,
CancellationToken cancellationToken);

/// <summary>
Expand All @@ -80,7 +80,7 @@ internal abstract class TransportSender
/// <param name="cancellationToken"></param>
/// <returns></returns>
public abstract Task<IReadOnlyList<long>> ScheduleMessagesAsync(
IReadOnlyList<ServiceBusMessage> messages,
List<ServiceBusMessage> messages,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Text;
using Azure.Core.Pipeline;
using System.Linq;
using System.Collections.Generic;

namespace Azure.Messaging.ServiceBus.Diagnostics
Expand All @@ -16,17 +15,22 @@ public static string GetAsciiString(this ArraySegment<byte> arraySegment)
return arraySegment.Array == null ? string.Empty : Encoding.ASCII.GetString(arraySegment.Array, arraySegment.Offset, arraySegment.Count);
}

public static void SetMessageData(this DiagnosticScope scope, IEnumerable<ServiceBusReceivedMessage> messages)
public static void SetMessageData(this DiagnosticScope scope, ServiceBusReceivedMessage message)
{
scope.AddLinkedDiagnostics(message);
}

public static void SetMessageData(this DiagnosticScope scope, IReadOnlyList<ServiceBusReceivedMessage> messages)
{
scope.AddLinkedDiagnostics(messages);
}

public static void SetMessageData(this DiagnosticScope scope, IEnumerable<ServiceBusMessage> messages)
public static void SetMessageData(this DiagnosticScope scope, List<ServiceBusMessage> messages)
{
scope.AddLinkedDiagnostics(messages);
}

private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable<ServiceBusReceivedMessage> messages)
private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyList<ServiceBusReceivedMessage> messages)
{
if (scope.IsEnabled)
{
Expand All @@ -37,7 +41,15 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable
}
}

private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable<ServiceBusMessage> messages)
private static void AddLinkedDiagnostics(this DiagnosticScope scope, ServiceBusReceivedMessage message)
{
if (scope.IsEnabled)
{
AddLinkedDiagnostics(scope, message.ApplicationProperties);
}
}

private static void AddLinkedDiagnostics(this DiagnosticScope scope, List<ServiceBusMessage> messages)
{
if (scope.IsEnabled)
{
Expand All @@ -48,6 +60,16 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable
}
}

private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyDictionary<string, object> properties)
{
if (EntityScopeFactory.TryExtractDiagnosticId(
properties,
out string diagnosticId))
{
scope.AddLink(diagnosticId, null);
}
}

private static void AddLinkedDiagnostics(this DiagnosticScope scope, IDictionary<string, object> properties)
{
if (EntityScopeFactory.TryExtractDiagnosticId(
Expand Down
Expand Up @@ -36,6 +36,28 @@ internal class EntityScopeFactory
_fullyQualifiedNamespace = fullyQualifiedNamespace;
}

/// <summary>
/// Extracts a diagnostic id from a message's properties.
/// </summary>
///
/// <param name="properties">The properties holding the diagnostic id.</param>
/// <param name="id">The value of the diagnostics identifier assigned to the event. </param>
///
/// <returns><c>true</c> if the event was contained the diagnostic id; otherwise, <c>false</c>.</returns>
///
public static bool TryExtractDiagnosticId(IReadOnlyDictionary<string, object> properties, out string id)
{
id = null;

if (properties.TryGetValue(DiagnosticProperty.DiagnosticIdAttribute, out var objectId) && objectId is string stringId)
{
id = stringId;
return true;
}

return false;
}

/// <summary>
/// Extracts a diagnostic id from a message's properties.
/// </summary>
Expand Down
Expand Up @@ -504,7 +504,12 @@ public override bool TryAddMessage(ServiceBusMessage message)
///
/// <returns>The set of events as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>() => (IEnumerable<T>)_backingStore;
public override List<T> AsList<T>() => _backingStore switch
{
List<T> storeList => storeList,
IList<T> storeIList => new List<T>(storeIList),
_ => _backingStore as List<T>
};

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="TransportMessageBatch" />.
Expand Down
Expand Up @@ -118,7 +118,7 @@ public virtual async Task ReceiveAndProcessMessagesAsync(CancellationToken cance
protected async Task ProcessOneMessageWithinScopeAsync(ServiceBusReceivedMessage message, string activityName, CancellationToken cancellationToken)
{
using DiagnosticScope scope = _scopeFactory.CreateScope(activityName, DiagnosticScope.ActivityKind.Consumer);
scope.SetMessageData(new ServiceBusReceivedMessage[] { message });
scope.SetMessageData(message);
scope.Start();
try
{
Expand Down
Expand Up @@ -140,7 +140,7 @@ internal void Clear()
///
/// <returns>The set of messages as an enumerable of the requested type.</returns>
///
internal IEnumerable<T> AsEnumerable<T>() => _innerBatch.AsEnumerable<T>();
internal List<T> AsList<T>() => _innerBatch.AsList<T>();

/// <summary>
/// Locks the batch to prevent new messages from being added while a service
Expand Down
Expand Up @@ -190,9 +190,9 @@ protected ServiceBusSender()
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender));
_connection.ThrowIfClosed();

IReadOnlyList<ServiceBusMessage> messageList = messages switch
List<ServiceBusMessage> messageList = messages switch
{
IReadOnlyList<ServiceBusMessage> alreadyList => alreadyList,
List<ServiceBusMessage> alreadyList => alreadyList,
_ => messages.ToList()
};

Expand All @@ -203,7 +203,7 @@ protected ServiceBusSender()

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.SendMessageStart(Identifier, messageCount: messageList.Count);
using DiagnosticScope scope = CreateDiagnosticScope(messages, DiagnosticProperty.SendActivityName);
using DiagnosticScope scope = CreateDiagnosticScope(messageList, DiagnosticProperty.SendActivityName);
scope.Start();

try
Expand All @@ -223,7 +223,7 @@ protected ServiceBusSender()
Logger.SendMessageComplete(Identifier);
}

private DiagnosticScope CreateDiagnosticScope(IEnumerable<ServiceBusMessage> messages, string activityName)
private DiagnosticScope CreateDiagnosticScope(List<ServiceBusMessage> messages, string activityName)
{
foreach (ServiceBusMessage message in messages)
{
Expand Down Expand Up @@ -325,7 +325,7 @@ private DiagnosticScope CreateDiagnosticScope(IEnumerable<ServiceBusMessage> mes
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.SendMessageStart(Identifier, messageBatch.Count);
using DiagnosticScope scope = CreateDiagnosticScope(
messageBatch.AsEnumerable<ServiceBusMessage>(),
messageBatch.AsList<ServiceBusMessage>(),
DiagnosticProperty.SendActivityName);
scope.Start();

Expand Down Expand Up @@ -405,9 +405,9 @@ private DiagnosticScope CreateDiagnosticScope(IEnumerable<ServiceBusMessage> mes
_connection.ThrowIfClosed();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

IReadOnlyList<ServiceBusMessage> messageList = messages switch
List<ServiceBusMessage> messageList = messages switch
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
IReadOnlyList<ServiceBusMessage> alreadyList => alreadyList,
List<ServiceBusMessage> alreadyList => alreadyList,
_ => messages.ToList()
};

Expand All @@ -422,7 +422,7 @@ private DiagnosticScope CreateDiagnosticScope(IEnumerable<ServiceBusMessage> mes
scheduledEnqueueTime.ToString(CultureInfo.InvariantCulture));

using DiagnosticScope scope = CreateDiagnosticScope(
messages,
messageList,
DiagnosticProperty.ScheduleActivityName);
scope.Start();

Expand Down