diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs index 75e23c831ff..d8e6b540087 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs @@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos /// internal class BatchAsyncContainerExecutor : IDisposable { - private const int DefaultDispatchTimerInSeconds = 1; private const int TimerWheelBucketCount = 20; private static readonly TimeSpan TimerWheelResolution = TimeSpan.FromMilliseconds(50); @@ -53,11 +52,6 @@ internal BatchAsyncContainerExecutor() int maxServerRequestOperationCount, int maxServerRequestBodyLength) { - if (cosmosContainer == null) - { - throw new ArgumentNullException(nameof(cosmosContainer)); - } - if (maxServerRequestOperationCount < 1) { throw new ArgumentOutOfRangeException(nameof(maxServerRequestOperationCount)); @@ -68,7 +62,7 @@ internal BatchAsyncContainerExecutor() throw new ArgumentOutOfRangeException(nameof(maxServerRequestBodyLength)); } - this.cosmosContainer = cosmosContainer; + this.cosmosContainer = cosmosContainer ?? throw new ArgumentNullException(nameof(cosmosContainer)); this.cosmosClientContext = cosmosClientContext; this.maxServerRequestBodyLength = maxServerRequestBodyLength; this.maxServerRequestOperationCount = maxServerRequestOperationCount; @@ -90,7 +84,7 @@ internal BatchAsyncContainerExecutor() string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false); BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId); - ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions)); + ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, this.retryOptions)); operation.AttachContext(context); streamer.Add(operation); return await context.OperationTask; @@ -137,10 +131,13 @@ public void Dispose() await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken); } - private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions) + private static IDocumentClientRetryPolicy GetRetryPolicy( + ContainerInternal containerInternal, + RetryOptions retryOptions) { return new BulkPartitionKeyRangeGoneRetryPolicy( - new ResourceThrottleRetryPolicy( + containerInternal, + new ResourceThrottleRetryPolicy( retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds)); } @@ -186,6 +183,7 @@ private static void AddHeadersToRequestMessage(RequestMessage requestMessage, st CancellationToken cancellationToken) { string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false); + operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId); BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId); streamer.Add(operation); } diff --git a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs index f384b044971..eabf5397897 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos /// internal class ItemBatchOperationContext : IDisposable { - public string PartitionKeyRangeId { get; } + public string PartitionKeyRangeId { get; private set; } public BatchAsyncBatcher CurrentBatcher { get; set; } @@ -74,6 +74,11 @@ internal class ItemBatchOperationContext : IDisposable this.Dispose(); } + public void ReRouteOperation(string newPartitionKeyRangeId) + { + this.PartitionKeyRangeId = newPartitionKeyRangeId; + } + public void Dispose() { this.CurrentBatcher = null; diff --git a/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs b/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs index 6b55f2bdbc5..0191c13e522 100644 --- a/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos using System.Net; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; /// @@ -17,57 +18,61 @@ namespace Microsoft.Azure.Cosmos /// internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy { - private const int MaxRetries = 1; - private readonly IDocumentClientRetryPolicy nextRetryPolicy; + private readonly ContainerInternal container; - private int retriesAttempted; - - public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy) + public BulkPartitionKeyRangeGoneRetryPolicy( + ContainerInternal container, + IDocumentClientRetryPolicy nextRetryPolicy) { + this.container = container ?? throw new ArgumentNullException(nameof(container)); this.nextRetryPolicy = nextRetryPolicy; } - public Task ShouldRetryAsync( + public async Task ShouldRetryAsync( Exception exception, CancellationToken cancellationToken) { - DocumentClientException clientException = exception as DocumentClientException; - - ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal( - clientException?.StatusCode, - clientException?.GetSubStatus()); - - if (shouldRetryResult != null) + if (exception is CosmosException clientException) { - return Task.FromResult(shouldRetryResult); + ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync( + clientException.StatusCode, + (SubStatusCodes)clientException.SubStatusCode, + cancellationToken); + + if (shouldRetryResult != null) + { + return shouldRetryResult; + } + + if (this.nextRetryPolicy == null) + { + return ShouldRetryResult.NoRetry(); + } } - if (this.nextRetryPolicy == null) - { - return Task.FromResult(ShouldRetryResult.NoRetry()); - } - - return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken); + return await this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken); } - public Task ShouldRetryAsync( + public async Task ShouldRetryAsync( ResponseMessage cosmosResponseMessage, CancellationToken cancellationToken) { - ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode, - cosmosResponseMessage?.Headers.SubStatusCode); + ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync( + cosmosResponseMessage?.StatusCode, + cosmosResponseMessage?.Headers.SubStatusCode, + cancellationToken); if (shouldRetryResult != null) { - return Task.FromResult(shouldRetryResult); + return shouldRetryResult; } if (this.nextRetryPolicy == null) { - return Task.FromResult(ShouldRetryResult.NoRetry()); + return ShouldRetryResult.NoRetry(); } - return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken); + return await this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken); } public void OnBeforeSendRequest(DocumentServiceRequest request) @@ -75,19 +80,27 @@ public void OnBeforeSendRequest(DocumentServiceRequest request) this.nextRetryPolicy.OnBeforeSendRequest(request); } - private ShouldRetryResult ShouldRetryInternal( + private async Task ShouldRetryInternalAsync( HttpStatusCode? statusCode, - SubStatusCodes? subStatusCode) + SubStatusCodes? subStatusCode, + CancellationToken cancellationToken) { - if (statusCode == HttpStatusCode.Gone - && (subStatusCode == SubStatusCodes.PartitionKeyRangeGone - || subStatusCode == SubStatusCodes.NameCacheIsStale - || subStatusCode == SubStatusCodes.CompletingSplit - || subStatusCode == SubStatusCodes.CompletingPartitionMigration) - && this.retriesAttempted < MaxRetries) + if (statusCode == HttpStatusCode.Gone) { - this.retriesAttempted++; - return ShouldRetryResult.RetryAfter(TimeSpan.Zero); + if (subStatusCode == SubStatusCodes.PartitionKeyRangeGone + || subStatusCode == SubStatusCodes.CompletingSplit + || subStatusCode == SubStatusCodes.CompletingPartitionMigration) + { + PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(); + string containerRid = await this.container.GetCachedRIDAsync(forceRefresh: false, cancellationToken: cancellationToken); + await partitionKeyRangeCache.TryGetOverlappingRangesAsync(containerRid, FeedRangeEpk.FullRange.Range, forceRefresh: true); + return ShouldRetryResult.RetryAfter(TimeSpan.Zero); + } + + if (subStatusCode == SubStatusCodes.NameCacheIsStale) + { + return ShouldRetryResult.RetryAfter(TimeSpan.Zero); + } } return null; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs index 5b6a032def9..e0e85b85a2e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Net; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -448,9 +449,11 @@ public async Task CannotAddToDispatchedBatch() public async Task RetrierGetsCalledOnSplit() { IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); ItemBatchOperation operation1 = this.CreateItemBatchOperation(); @@ -475,9 +478,11 @@ public async Task RetrierGetsCalledOnSplit() public async Task RetrierGetsCalledOnCompletingSplit() { IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); ItemBatchOperation operation1 = this.CreateItemBatchOperation(); @@ -502,9 +507,11 @@ public async Task RetrierGetsCalledOnCompletingSplit() public async Task RetrierGetsCalledOnCompletingPartitionMigration() { IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); ItemBatchOperation operation1 = this.CreateItemBatchOperation(); @@ -545,6 +552,16 @@ public async Task RetrierGetsCalledOnOverFlow() retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Once); } + private static ContainerInternal GetSplitEnabledContainer() + { + Mock container = new Mock(); + container.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString()); + Mock context = new Mock(); + container.Setup(c => c.ClientContext).Returns(context.Object); + context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection()); + return container.Object; + } + private class BatchAsyncBatcherThatOverflows : BatchAsyncBatcher { public BatchAsyncBatcherThatOverflows( @@ -565,5 +582,28 @@ private class BatchAsyncBatcherThatOverflows : BatchAsyncBatcher return new Tuple>(serverRequest, new ArraySegment(serverRequest.Operations.ToArray(), 1, 1)); } } + + private class ClientWithSplitDetection : MockDocumentClient + { + private readonly Mock partitionKeyRangeCache; + + public ClientWithSplitDetection() + { + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache.Setup( + m => m.TryGetOverlappingRangesAsync( + It.IsAny(), + It.IsAny>(), + It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails + ) + ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh))); + } + + internal override Task GetPartitionKeyRangeCacheAsync() + { + return Task.FromResult(this.partitionKeyRangeCache.Object); + } + + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs index d204998afee..7b444f73633 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs @@ -51,6 +51,11 @@ public async Task RetryOnSplit() Mock mockContainer = new Mock(); mockContainer.Setup(x => x.LinkUri).Returns(link); mockContainer.Setup(x => x.GetPartitionKeyDefinitionAsync(It.IsAny())).Returns(Task.FromResult(new PartitionKeyDefinition() { Paths = new Collection() { "/id" } })); + mockContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString()); + Mock context = new Mock(); + mockContainer.Setup(c => c.ClientContext).Returns(context.Object); + context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection()); + CollectionRoutingMap routingMap = CollectionRoutingMap.TryCreateCompleteRoutingMap( new[] @@ -449,5 +454,28 @@ private class MyDocument public bool Updated { get; set; } } + + private class ClientWithSplitDetection : MockDocumentClient + { + private readonly Mock partitionKeyRangeCache; + + public ClientWithSplitDetection() + { + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache.Setup( + m => m.TryGetOverlappingRangesAsync( + It.IsAny(), + It.IsAny>(), + It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails + ) + ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh))); + } + + internal override Task GetPartitionKeyRangeCacheAsync() + { + return Task.FromResult(this.partitionKeyRangeCache.Object); + } + + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs index 40da9a2f344..989506ff6da 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs @@ -5,11 +5,14 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; + using System.Collections.Generic; using System.Net; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; [TestClass] public class BatchAsyncOperationContextTests @@ -92,6 +95,7 @@ public async Task ShouldRetry_NoPolicy() public async Task ShouldRetry_WithPolicy_OnSuccess() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + Mock.Of(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); @@ -104,6 +108,7 @@ public async Task ShouldRetry_WithPolicy_OnSuccess() public async Task ShouldRetry_WithPolicy_On429() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + Mock.Of(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult((HttpStatusCode)StatusCodes.TooManyRequests); ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); @@ -116,6 +121,7 @@ public async Task ShouldRetry_WithPolicy_On429() public async Task ShouldRetry_WithPolicy_OnSplit() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.PartitionKeyRangeGone }; ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); @@ -128,6 +134,7 @@ public async Task ShouldRetry_WithPolicy_OnSplit() public async Task ShouldRetry_WithPolicy_OnCompletingSplit() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingSplit }; ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); @@ -140,6 +147,7 @@ public async Task ShouldRetry_WithPolicy_OnCompletingSplit() public async Task ShouldRetry_WithPolicy_OnCompletingPartitionMigration() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingPartitionMigration }; ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null); @@ -147,5 +155,38 @@ public async Task ShouldRetry_WithPolicy_OnCompletingPartitionMigration() ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default); Assert.IsTrue(shouldRetryResult.ShouldRetry); } + + private static ContainerInternal GetSplitEnabledContainer() + { + Mock container = new Mock(); + container.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString()); + Mock context = new Mock(); + container.Setup(c => c.ClientContext).Returns(context.Object); + context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection()); + return container.Object; + } + + private class ClientWithSplitDetection : MockDocumentClient + { + private readonly Mock partitionKeyRangeCache; + + public ClientWithSplitDetection() + { + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache.Setup( + m => m.TryGetOverlappingRangesAsync( + It.IsAny(), + It.IsAny>(), + It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails + ) + ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh))); + } + + internal override Task GetPartitionKeyRangeCacheAsync() + { + return Task.FromResult(this.partitionKeyRangeCache.Object); + } + + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs index fc276b55517..0548932c009 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs @@ -4,11 +4,15 @@ namespace Microsoft.Azure.Cosmos.Tests { + using System; + using System.Collections.Generic; using System.Net; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; [TestClass] public class BulkPartitionKeyRangeGoneRetryPolicyTests @@ -17,6 +21,7 @@ public class BulkPartitionKeyRangeGoneRetryPolicyTests public async Task NotRetryOnSuccess() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + Mock.Of(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK); @@ -28,6 +33,7 @@ public async Task NotRetryOnSuccess() public async Task RetriesOn429() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + Mock.Of(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult((HttpStatusCode)StatusCodes.TooManyRequests); @@ -39,6 +45,7 @@ public async Task RetriesOn429() public async Task RetriesOnSplits() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.PartitionKeyRangeGone }; @@ -50,6 +57,7 @@ public async Task RetriesOnSplits() public async Task RetriesOnCompletingSplits() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingSplit }; @@ -61,11 +69,45 @@ public async Task RetriesOnCompletingSplits() public async Task RetriesOnCompletingPartitionMigrationSplits() { IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy( + GetSplitEnabledContainer(), new ResourceThrottleRetryPolicy(1)); TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingPartitionMigration }; ShouldRetryResult shouldRetryResult = await retryPolicy.ShouldRetryAsync(result.ToResponseMessage(), default); Assert.IsTrue(shouldRetryResult.ShouldRetry); } + + private static ContainerInternal GetSplitEnabledContainer() + { + Mock container = new Mock(); + container.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString()); + Mock context = new Mock(); + container.Setup(c => c.ClientContext).Returns(context.Object); + context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection()); + return container.Object; + } + + private class ClientWithSplitDetection : MockDocumentClient + { + private readonly Mock partitionKeyRangeCache; + + public ClientWithSplitDetection() + { + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache.Setup( + m => m.TryGetOverlappingRangesAsync( + It.IsAny(), + It.IsAny>(), + It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails + ) + ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh))); + } + + internal override Task GetPartitionKeyRangeCacheAsync() + { + return Task.FromResult(this.partitionKeyRangeCache.Object); + } + + } } }