Skip to content
This repository has been archived by the owner. It is now read-only.

Do not retry when there's an ambient transaction #621

Merged
merged 4 commits into from Dec 14, 2018
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -8,7 +8,8 @@ namespace Microsoft.Azure.ServiceBus

/// <summary>
/// RetryPolicy implementation where the delay between retries will grow in a staggered exponential manner.
/// RetryIntervals will be computed using a retryFactor which is a function of deltaBackOff (MaximumBackoff - MinimumBackoff) and MaximumRetryCount
/// RetryIntervals will be computed using a retryFactor which is a function of deltaBackOff (MaximumBackoff - MinimumBackoff) and MaximumRetryCount.
/// <remarks>RetryPolicy will not be applied when an ambient transaction is found.</remarks>
/// </summary>
public sealed class RetryExponential : RetryPolicy
{
@@ -6,11 +6,13 @@ namespace Microsoft.Azure.ServiceBus
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;
using Primitives;

/// <summary>
/// Represents an abstraction for retrying messaging operations. Users should not
/// implement this class, and instead should use one of the provided implementations.
/// <remarks>RetryPolicy will not be applied when an ambient transaction is found.</remarks>
/// </summary>
public abstract class RetryPolicy
{
@@ -124,9 +126,9 @@ public virtual bool IsRetryableException(Exception exception)

internal bool ShouldRetry(TimeSpan remainingTime, int currentRetryCount, Exception lastException, out TimeSpan retryInterval)
{
if (lastException == null)
// There is no exception information or there's there's an ambient transaction - should not retry
if (lastException == null || Transaction.Current != null)
{
// there are no exceptions.
retryInterval = TimeSpan.Zero;
return false;
}
@@ -0,0 +1,59 @@
namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System;
using System.Threading.Tasks;
using System.Transactions;
using Xunit;

public class RetryPolicyTests
{
[Fact]
[DisplayTestMethodName]
public async Task Should_retry_when_throttled_and_no_ambient_transaction_is_detected()
{
var retryPolicy = RetryPolicy.Default;

var numberOfExecutions = 0;

await retryPolicy.RunOperation(() =>
{
if (numberOfExecutions > 1)
{
return Task.CompletedTask;
}

numberOfExecutions++;

throw new ServerBusyException("Rico KABOOM!");
}, TimeSpan.FromSeconds(30));

Assert.Equal(2, numberOfExecutions);
}

[Fact]
[DisplayTestMethodName]
public async Task Should_not_retry_when_throttled_and_ambient_transaction_is_detected()
{
var retryPolicy = RetryPolicy.Default;
var numberOfExecutions = 0;

using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
{
await Assert.ThrowsAsync<ServerBusyException>(() =>
retryPolicy.RunOperation(() =>
{
if (numberOfExecutions > 1)
{
return Task.CompletedTask;
}

numberOfExecutions++;

throw new ServerBusyException("Rico KABOOM!");
}, TimeSpan.FromSeconds(30)));
}

Assert.Equal(1, numberOfExecutions);
}
}
}
@@ -25,8 +25,8 @@ internal async Task PeekLockTestCase(IMessageSender messageSender, IMessageRecei
internal async Task ReceiveDeleteTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
{
await TestUtility.SendMessagesAsync(messageSender, messageCount);
var receivedMessages = await TestUtility.ReceiveMessagesAsync(messageReceiver, messageCount);
Assert.True(messageCount == receivedMessages.Count);
var receivedMessages = await TestUtility.ReceiveMessagesAsync(messageReceiver, messageCount, TimeSpan.FromSeconds(10));
Assert.Equal(receivedMessages.Count, messageCount);
}

internal async Task PeekLockWithAbandonTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
@@ -3,9 +3,12 @@

namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System;

static class TestConstants
{
internal const int MaxAttemptsCount = 5;
internal readonly static TimeSpan WaitTimeBetweenAttempts = TimeSpan.FromSeconds(1);

internal const string ConnectionStringEnvironmentVariable = "azure-service-bus-dotnet/connectionstring";

@@ -67,18 +67,27 @@ internal static async Task SendMessagesAsync(IMessageSender messageSender, int m
Log($"Sent {messageCount} messages");
}

internal static async Task<IList<Message>> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount)
internal static async Task<IList<Message>> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount, TimeSpan timeout = default)
{
var receiveAttempts = 0;
var messagesToReturn = new List<Message>();
var stopwatch = Stopwatch.StartNew();

while (receiveAttempts++ < TestConstants.MaxAttemptsCount && messagesToReturn.Count < messageCount)
if (timeout == default)
{
timeout = TimeSpan.Zero;
}

while (messagesToReturn.Count < messageCount && (receiveAttempts++ < TestConstants.MaxAttemptsCount || stopwatch.Elapsed < timeout))
{
var messages = await messageReceiver.ReceiveAsync(messageCount - messagesToReturn.Count);
if (messages != null)
if (messages == null)
This conversation was marked as resolved by nemakam

This comment has been minimized.

Copy link
@nemakam

nemakam Dec 13, 2018

Member

This line anyway goes for 60 seconds. Why introduce the stopwatch and timeout which is less than 60 seconds?

This comment has been minimized.

Copy link
@SeanFeldman

SeanFeldman Dec 13, 2018

Author Collaborator

Despite all messages being sent to the broker and only after that attempted to be received, the broker returns a subset of messages and not all. ReceiveAsync(count) will not wait for 60 seconds, it will return immediately with whatever number of messages it can get. I think you're confusing this with ReceiveAsync(count, timeout) variant.

This comment has been minimized.

Copy link
@SeanFeldman

SeanFeldman Dec 13, 2018

Author Collaborator

BTW, I had to introduce this change after having random builds failing either on both targets (.NET Core and Framework) or one of those. Sometimes it would all pass. With this fix, the test is no longer flaky as it was.

This comment has been minimized.

Copy link
@nemakam

nemakam Dec 14, 2018

Member

Yes my bad. I was thinking of something else.

This comment has been minimized.

Copy link
@SeanFeldman

SeanFeldman Dec 14, 2018

Author Collaborator

All good. If I'd have to count number of times you've corrected me, I'd need more than two hands 😂

{
messagesToReturn.AddRange(messages);
await Task.Delay(TestConstants.WaitTimeBetweenAttempts);
continue;
This conversation was marked as resolved by nemakam

This comment has been minimized.

Copy link
@nemakam

nemakam Dec 13, 2018

Member

why this change?

This comment has been minimized.

Copy link
@SeanFeldman

SeanFeldman Dec 13, 2018

Author Collaborator

Related to the comment above. Just burning through 5 attempts is not enough. The remainder of the messages can show up later. And there's no point to have a tight loop w/o some sort of backoff (liniar, 1 sec in this case).

}

messagesToReturn.AddRange(messages);
}

VerifyUniqueMessages(messagesToReturn);
@@ -98,7 +107,7 @@ internal static async Task<IList<Message>> ReceiveDeferredMessagesAsync(IMessage
var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);
if (msg != null)
{
messagesToReturn.Add(msg);
messagesToReturn.Add(msg);
}
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.