Skip to content

Commit

Permalink
Drain excess credits when using sessions to ensure FIFO (Azure#40457)
Browse files Browse the repository at this point in the history
* Drain excess credits when using sessions to ensure FIFO

* fix test

* Add guard

* fix formatting

* Fix activity source tests
  • Loading branch information
JoshLove-msft committed Dec 1, 2023
1 parent a60b007 commit 8ee6482
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 9 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Adjusted retries to consider an unreachable host address as terminal. Previously, all socket-based errors were considered transient and would be retried.
- Updated the `ServiceBusMessage` constructor that takes a `ServiceBusReceivedMessage` to no longer copy over the
`x-opt-partition-id` key as this is meant to apply only to the original message.
- Drain excess credits when attempting to receive using sessions to ensure FIFO ordering.

### Other Changes

Expand Down
17 changes: 13 additions & 4 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -370,15 +371,23 @@ private static IReadOnlyList<ServiceBusReceivedMessage> EmptyList
maxWaitTime ?? timeout,
cancellationToken).ConfigureAwait(false);

IReadOnlyCollection<AmqpMessage> messageList =
messagesReceived as IReadOnlyCollection<AmqpMessage> ?? messagesReceived.ToList();

// If this is a session receiver and we didn't receive all requested messages, we need to drain the credits
// to ensure FIFO ordering within each session.
if (_isSessionReceiver && messageList.Count < maxMessages)
{
await link.DrainAsyc(cancellationToken).ConfigureAwait(false);
}

List<ServiceBusReceivedMessage> receivedMessages = null;
// If event messages were received, then package them for consumption and
// return them.
foreach (AmqpMessage message in messagesReceived)
foreach (AmqpMessage message in messageList)
{
// Getting the count of the underlying collection is good for performance/allocations to prevent the list from growing
receivedMessages ??= messagesReceived is IReadOnlyCollection<AmqpMessage> readOnlyList
? new List<ServiceBusReceivedMessage>(readOnlyList.Count)
: new List<ServiceBusReceivedMessage>();
receivedMessages ??= new List<ServiceBusReceivedMessage>(messageList.Count);

if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void ResetFeatureSwitch()
[TestCase(false)]
public async Task SenderReceiverActivitiesDisabled(bool useSessions)
{
using var listener = new TestActivitySourceListener(DiagnosticProperty.DiagnosticNamespace);
using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace));

await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: useSessions))
{
Expand Down Expand Up @@ -72,7 +72,7 @@ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitionin
}
else
{
await receiver.RenewMessageLockAsync(received[-1]);
await receiver.RenewMessageLockAsync(received[1]);
}

// schedule
Expand Down Expand Up @@ -262,7 +262,7 @@ public async Task ProcessorActivities()
var messages = ServiceBusTestUtilities.GetMessages(messageCt);

using var listener = new TestActivitySourceListener(
DiagnosticProperty.DiagnosticNamespace,
source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace),
activityStartedCallback: activity =>
{
if (activity.OperationName == DiagnosticProperty.ProcessMessageActivityName)
Expand Down Expand Up @@ -321,7 +321,7 @@ public async Task SessionProcessorActivities()
var messages = ServiceBusTestUtilities.GetMessages(messageCt, "sessionId");

using var listener = new TestActivitySourceListener(
DiagnosticProperty.DiagnosticNamespace,
source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace),
activityStartedCallback: activity =>
{
if (activity.OperationName == DiagnosticProperty.ProcessSessionMessageActivityName)
Expand Down Expand Up @@ -374,7 +374,7 @@ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitionin
public async Task RuleManagerActivities()
{
using var _ = SetAppConfigSwitch();
using var listener = new TestActivitySourceListener(DiagnosticProperty.DiagnosticNamespace);
using var listener = new TestActivitySourceListener(source => source.Name.StartsWith(DiagnosticProperty.DiagnosticNamespace));

await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,5 +1144,55 @@ await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitionin
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task SessionOrderingIsGuaranteed()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var receiver = await client.AcceptSessionAsync(scope.QueueName, "session");
var sender = client.CreateSender(scope.QueueName);

CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(60));

var receive = ReceiveMessagesAsync();

var send = SendMessagesAsync();

await Task.WhenAll(send, receive);

async Task SendMessagesAsync()
{
while (!cts.IsCancellationRequested)
{
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}

async Task ReceiveMessagesAsync()
{
long lastSequenceNumber = 0;
while (!cts.IsCancellationRequested)
{
var messages = await receiver.ReceiveMessagesAsync(10);
foreach (var message in messages)
{
if (message.SequenceNumber != lastSequenceNumber + 1)
{
Assert.Fail(
$"Last sequence number: {lastSequenceNumber}, current sequence number: {message.SequenceNumber}");
}

lastSequenceNumber = message.SequenceNumber;

await receiver.CompleteMessageAsync(message);
}
}
}
}
}
}
}

0 comments on commit 8ee6482

Please sign in to comment.