Skip to content

Commit

Permalink
fix: Correct the auto-extend lease interval for exactly-once delivery.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishabh-V authored and jskeet committed May 31, 2023
1 parent fea4d59 commit c33999d
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -862,15 +862,15 @@ public void OnlyOneStart()
}
}

[Fact]
public void LeaseExtension()
[Theory, CombinatorialData]
public void LeaseExtension(bool isExactlyOnceDelivery)
{
var msgs = new[] { new[] {
ServerAction.Data(TimeSpan.Zero, new[] { "1" }),
ServerAction.Data(TimeSpan.FromSeconds(5), new[] { "2" }),
ServerAction.Inf()
} };
using (var fake = Fake.Create(msgs, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10)))
using (var fake = Fake.Create(msgs, isExactlyOnceDelivery: isExactlyOnceDelivery, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10)))
{
fake.Scheduler.Run(async () =>
{
Expand All @@ -890,14 +890,14 @@ public void LeaseExtension()
}
}

[Fact]
public void LeaseMaxExtension()
[Theory, CombinatorialData]
public void LeaseMaxExtension(bool isExactlyOnceDelivery)
{
var msgs = new[] { new[] {
ServerAction.Data(TimeSpan.Zero, new[] { "1" }),
ServerAction.Inf()
} };
using (var fake = Fake.Create(msgs, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10)))
using (var fake = Fake.Create(msgs, isExactlyOnceDelivery: isExactlyOnceDelivery, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10)))
{
fake.Scheduler.Run(async () =>
{
Expand All @@ -913,7 +913,7 @@ public void LeaseMaxExtension()
Assert.Equal(1, fake.Subscribers.Count);
// Check that the lease was extended for 60 minutes only.
// +1 is due to initial lease extension at time=0
Assert.Equal((int)SubscriberClient.DefaultMaxTotalAckExtension.TotalSeconds / 20 + 1, fake.Subscribers[0].Extends.Count);
Assert.Equal((int) SubscriberClient.DefaultMaxTotalAckExtension.TotalSeconds / 20 + 1, fake.Subscribers[0].Extends.Count);
});
}
}
Expand Down Expand Up @@ -1070,43 +1070,72 @@ public void ValidParameters()
}

[Fact]
public void SilentlyFixedParameters()
public void MinimumDelayIsUsedWhenMinimumAckDeadlineIsSpecified()
{
var subscriptionName = new SubscriptionName("project", "subscriptionId");
var clients = new[] { new FakeEmptySubscriberServiceApiClient() };

// Test MinimumLeaseExtensionDelay is honoured when minimum ack-deadline used.
var settingsAckExtension1 = new SubscriberClient.Settings
// Test that MinimumLeaseExtensionDelay is honoured when the ack-deadline is specified with the minimum possible value.
var settingsAckDeadline = new SubscriberClient.Settings
{
AckDeadline = SubscriberClient.MinimumAckDeadline
};
var sub1 = new SubscriberClientImpl(subscriptionName, clients, settingsAckExtension1, null);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay, sub1.AutoExtendInterval);

// Test MinimumLeaseExtensionDelay is honoured when ack-deadline set to be equal to the default ack-extension window.
var settingsAckExtension2 = new SubscriberClient.Settings
var subscription = new SubscriberClientImpl(subscriptionName, clients, settingsAckDeadline, null);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay, subscription.AutoExtendDelay);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay, subscription.AutoExtendDelayForExactlyOnceDelivery);
}

[Fact]
public void MinimumDelayIsUsedWhenAckDeadlineEqualsAckExtensionWindow()
{
var subscriptionName = new SubscriptionName("project", "subscriptionId");
var clients = new[] { new FakeEmptySubscriberServiceApiClient() };

// Test that MinimumLeaseExtensionDelay is honoured when ack-deadline is set to be equal to the ack-extension window.
var settingsAckDeadline = new SubscriberClient.Settings
{
AckDeadline = SubscriberClient.DefaultAckExtensionWindow
};
var sub2 = new SubscriberClientImpl(subscriptionName, clients, settingsAckExtension2, null);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay, sub2.AutoExtendInterval);

// Test ack-extension-window is honoured when ack-deadline is a larger value.
var settingsAckExtension3 = new SubscriberClient.Settings
var subscription = new SubscriberClientImpl(subscriptionName, clients, settingsAckDeadline, null);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay, subscription.AutoExtendDelay);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay, subscription.AutoExtendDelayForExactlyOnceDelivery);
}

[Fact]
public void AckExtensionWindowIsHonouredWhenAckDeadlineIsLarger()
{
var subscriptionName = new SubscriptionName("project", "subscriptionId");
var clients = new[] { new FakeEmptySubscriberServiceApiClient() };

// Test that ack-extension-window is honoured when ack-deadline is a larger value than AckExtensionWindow.
var settingsAckDeadline = new SubscriberClient.Settings
{
AckDeadline = SubscriberClient.DefaultAckExtensionWindow + SubscriberClient.MinimumLeaseExtensionDelay + TimeSpan.FromSeconds(1)
};
var sub3 = new SubscriberClientImpl(subscriptionName, clients, settingsAckExtension3, null);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay + TimeSpan.FromSeconds(1), sub3.AutoExtendInterval);

// Test ack-extension-window is honoured when both ack-deadline and ack-extension-window are set to valid values.
var settingsAckExtension4 = new SubscriberClient.Settings
var subscription = new SubscriberClientImpl(subscriptionName, clients, settingsAckDeadline, null);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay + TimeSpan.FromSeconds(1), subscription.AutoExtendDelay);
Assert.Equal(SubscriberClient.MinimumLeaseExtensionDelay + TimeSpan.FromSeconds(1), subscription.AutoExtendDelayForExactlyOnceDelivery);
}

[Fact]
public void AckExtensionWindowIsHonouredWhenAckDeadlineAndAckExtensionWindowAreValid()
{
var subscriptionName = new SubscriptionName("project", "subscriptionId");
var clients = new[] { new FakeEmptySubscriberServiceApiClient() };

// Test that ack-extension-window is honoured when both ack-deadline and ack-extension-window are set to valid values.
var settings = new SubscriberClient.Settings
{
AckDeadline = TimeSpan.FromSeconds(60),
AckExtensionWindow = TimeSpan.FromSeconds(1)
};
var sub4 = new SubscriberClientImpl(subscriptionName, clients, settingsAckExtension4, null);
Assert.Equal(TimeSpan.FromSeconds(59), sub4.AutoExtendInterval);

var subsciption = new SubscriberClientImpl(subscriptionName, clients, settings, null);
Assert.Equal(TimeSpan.FromSeconds(59), subsciption.AutoExtendDelay);
Assert.Equal(TimeSpan.FromSeconds(59), subsciption.AutoExtendDelayForExactlyOnceDelivery);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ public sealed class Settings
public bool UseLegacyFlowControl { get; set; } = false;

/// <summary>
/// The lease time before which a message must either be ACKed
/// The lease time before which a message must either be acknowledged
/// or have its lease extended. This is truncated to the nearest second.
/// If <c>null</c>, uses the default of <see cref="DefaultAckDeadline"/>.
/// </summary>
public TimeSpan? AckDeadline { get; set; }

/// <summary>
/// Duration before <see cref="AckDeadline"/> at which the message ACK deadline
/// Duration before <see cref="AckDeadline"/> at which the message acknowledgement deadline
/// is automatically extended.
/// If <c>null</c>, uses the default of <see cref="DefaultAckExtensionWindow"/>.
/// </summary>
public TimeSpan? AckExtensionWindow { get; set; }

/// <summary>
/// Maximum duration for which a message ACK deadline will be extended.
/// Maximum duration for which a message acknowledgement deadline will be extended.
/// If <c>null</c>, uses the default of <see cref="DefaultMaxTotalAckExtension"/>.
/// </summary>
public TimeSpan? MaxTotalAckExtension { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,41 @@ public enum Reply
public static FlowControlSettings DefaultFlowControlSettings { get; } = new FlowControlSettings(1_000, 100_000_000);

/// <summary>
/// The service-defined minimum message ACKnowledgement deadline of 10 seconds.
/// The service-defined minimum message acknowledgement deadline of 10 seconds.
/// </summary>
public static TimeSpan MinimumAckDeadline { get; } = TimeSpan.FromSeconds(10);

/// <summary>
/// The service-defined maximum message ACKnowledgement deadline of 10 minutes.
/// The service-defined maximum message acknowledgement deadline of 10 minutes.
/// </summary>
public static TimeSpan MaximumAckDeadline { get; } = TimeSpan.FromMinutes(10);

/// <summary>
/// The default message ACKnowledgement deadline of 60 seconds.
/// The default message acknowledgement deadline of 60 seconds.
/// </summary>
public static TimeSpan DefaultAckDeadline { get; } = TimeSpan.FromSeconds(60);

/// <summary>
/// The minimum message ACKnowledgement extension window of 50 milliseconds.
/// The default message acknowledgement deadline of 60 seconds for exactly-once delivery subscriptions.
/// </summary>
public static TimeSpan DefaultAckDeadlineForExactlyOnceDelivery { get; } = TimeSpan.FromSeconds(60);

/// <summary>
/// The minimum message acknowledgement extension window of 50 milliseconds.
/// </summary>
public static TimeSpan MinimumAckExtensionWindow { get; } = TimeSpan.FromMilliseconds(50);

/// <summary>
/// The minimum message acknowledgement extension window of 60 seconds for exactly once delivery subscriptions.
/// </summary>
/// <remarks>
/// This property is deprecated. Use <see cref="DefaultAckDeadlineForExactlyOnceDelivery"/> instead.
/// </remarks>
[Obsolete("Use DefaultAckDeadlineForExactlyOnceDelivery instead.")]
public static TimeSpan MinimumAckExtensionWindowForExactlyOnceDelivery { get; } = TimeSpan.FromSeconds(60);

/// <summary>
/// The default message ACKnowledgement extension window of 15 seconds.
/// The default message acknowledgement extension window of 15 seconds.
/// </summary>
public static TimeSpan DefaultAckExtensionWindow { get; } = TimeSpan.FromSeconds(15);

Expand All @@ -97,7 +106,7 @@ public enum Reply
public static TimeSpan MinimumLeaseExtensionDelay { get; } = TimeSpan.FromSeconds(5);

/// <summary>
/// The default maximum total ACKnowledgement extension of 60 minutes.
/// The default maximum total acknowledgement extension of 60 minutes.
/// </summary>
public static TimeSpan DefaultMaxTotalAckExtension { get; } = TimeSpan.FromMinutes(60);

Expand Down Expand Up @@ -182,8 +191,8 @@ public enum Reply
/// </summary>
/// <param name="handlerAsync">The handler function that is passed all received messages.
/// This function may be called on multiple threads concurrently. Return <see cref="Reply.Ack"/> from this function
/// to ACKnowledge this message (implying it won't be received again); or return <see cref="Reply.Nack"/> to Not
/// ACKnowledge this message (implying it will be received again). If this function throws any Exception, then
/// to acknowledge this message (implying it won't be received again); or return <see cref="Reply.Nack"/> to Not
/// acknowledge this message (implying it will be received again). If this function throws any Exception, then
/// it behaves as if it returned <see cref="Reply.Nack"/>.</param>
/// <returns>A <see cref="Task"/> that completes when the subscriber is stopped, or if an unrecoverable error occurs.</returns>
public virtual Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Reply>> handlerAsync) =>
Expand All @@ -200,25 +209,25 @@ public enum Reply

/// <summary>
/// Stop this <see cref="SubscriberClient"/>. Cancelling <paramref name="hardStopToken"/> aborts the
/// clean stop process, and may leave some handled messages un-ACKnowledged.
/// The returned <see cref="Task"/> completes when all handled messages have been ACKnowledged.
/// clean stop process, and may leave some handled messages un-acknowledged.
/// The returned <see cref="Task"/> completes when all handled messages have been acknowledged.
/// The returned <see cref="Task"/> faults if there is an unrecoverable error with the underlying service.
/// The returned <see cref="Task"/> cancels if <paramref name="hardStopToken"/> is cancelled.
/// </summary>
/// <param name="hardStopToken">Cancel this <see cref="CancellationToken"/> to abort handlers and ACKnowledgement.</param>
/// <returns>A <see cref="Task"/> that completes when all handled messages have been ACKnowledged;
/// <param name="hardStopToken">Cancel this <see cref="CancellationToken"/> to abort handlers and acknowledgement.</param>
/// <returns>A <see cref="Task"/> that completes when all handled messages have been acknowledged;
/// faults on unrecoverable service errors; or cancels if <paramref name="hardStopToken"/> is cancelled.</returns>
public virtual Task StopAsync(CancellationToken hardStopToken) => throw new NotImplementedException();

/// <summary>
/// Stop this <see cref="SubscriberClient"/>. If <paramref name="timeout"/> expires, the
/// clean stop process will be aborted, and may leave some handled messages un-ACKnowledged.
/// The returned <see cref="Task"/> completes when all handled messages have been ACKnowledged.
/// clean stop process will be aborted, and may leave some handled messages un-acknowledged.
/// The returned <see cref="Task"/> completes when all handled messages have been acknowledged.
/// The returned <see cref="Task"/> faults if there is an unrecoverable error with the underlying service.
/// The returned <see cref="Task"/> cancels if <paramref name="timeout"/> expires.
/// </summary>
/// <param name="timeout">After this period, abort handling and ACKnowledging messages.</param>
/// <returns>A <see cref="Task"/> that completes when all handled messages have been ACKnowledged;
/// <param name="timeout">After this period, abort handling and acknowledging messages.</param>
/// <returns>A <see cref="Task"/> that completes when all handled messages have been acknowledged;
/// faults on unrecoverable service errors; or cancels if <paramref name="timeout"/> expires.</returns>
public virtual Task StopAsync(TimeSpan timeout) => StopAsync(new CancellationTokenSource(timeout).Token);

Expand Down

0 comments on commit c33999d

Please sign in to comment.