Skip to content

Commit

Permalink
Bulk: Fixes incorrect routing on split (#2069)
Browse files Browse the repository at this point in the history
* increasing retry

* Correctly routing after split

* Adding tests

* More tests

* using cosmosexception

Co-authored-by: j82w <j82w@users.noreply.github.com>
  • Loading branch information
ealsur and j82w committed Dec 16, 2020
1 parent c808020 commit 260324d
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 47 deletions.
18 changes: 8 additions & 10 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Expand Up @@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncStreamer"/>
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimerInSeconds = 1;
private const int TimerWheelBucketCount = 20;
private static readonly TimeSpan TimerWheelResolution = TimeSpan.FromMilliseconds(50);

Expand All @@ -53,11 +52,6 @@ internal BatchAsyncContainerExecutor()
int maxServerRequestOperationCount,
int maxServerRequestBodyLength)
{
if (cosmosContainer == null)
{
throw new ArgumentNullException(nameof(cosmosContainer));
}

if (maxServerRequestOperationCount < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxServerRequestOperationCount));
Expand All @@ -68,7 +62,7 @@ internal BatchAsyncContainerExecutor()
throw new ArgumentOutOfRangeException(nameof(maxServerRequestBodyLength));
}

this.cosmosContainer = cosmosContainer;
this.cosmosContainer = cosmosContainer ?? throw new ArgumentNullException(nameof(cosmosContainer));
this.cosmosClientContext = cosmosClientContext;
this.maxServerRequestBodyLength = maxServerRequestBodyLength;
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
Expand All @@ -90,7 +84,7 @@ internal BatchAsyncContainerExecutor()

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.OperationTask;
Expand Down Expand Up @@ -137,10 +131,13 @@ public void Dispose()
await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
new ResourceThrottleRetryPolicy(
containerInternal,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}
Expand Down Expand Up @@ -186,6 +183,7 @@ private static void AddHeadersToRequestMessage(RequestMessage requestMessage, st
CancellationToken cancellationToken)
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
}
Expand Down
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class ItemBatchOperationContext : IDisposable
{
public string PartitionKeyRangeId { get; }
public string PartitionKeyRangeId { get; private set; }

public BatchAsyncBatcher CurrentBatcher { get; set; }

Expand Down Expand Up @@ -74,6 +74,11 @@ internal class ItemBatchOperationContext : IDisposable
this.Dispose();
}

public void ReRouteOperation(string newPartitionKeyRangeId)
{
this.PartitionKeyRangeId = newPartitionKeyRangeId;
}

public void Dispose()
{
this.CurrentBatcher = null;
Expand Down
85 changes: 49 additions & 36 deletions Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -17,77 +18,89 @@ namespace Microsoft.Azure.Cosmos
/// <see cref="ItemBatchOperationContext"/>
internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy
{
private const int MaxRetries = 1;

private readonly IDocumentClientRetryPolicy nextRetryPolicy;
private readonly ContainerInternal container;

private int retriesAttempted;

public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy)
public BulkPartitionKeyRangeGoneRetryPolicy(
ContainerInternal container,
IDocumentClientRetryPolicy nextRetryPolicy)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.nextRetryPolicy = nextRetryPolicy;
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
DocumentClientException clientException = exception as DocumentClientException;

ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(
clientException?.StatusCode,
clientException?.GetSubStatus());

if (shouldRetryResult != null)
if (exception is CosmosException clientException)
{
return Task.FromResult(shouldRetryResult);
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
clientException.StatusCode,
(SubStatusCodes)clientException.SubStatusCode,
cancellationToken);

if (shouldRetryResult != null)
{
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return ShouldRetryResult.NoRetry();
}
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode);
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode,
cancellationToken);
if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
return ShouldRetryResult.NoRetry();
}

return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.nextRetryPolicy.OnBeforeSendRequest(request);
}

private ShouldRetryResult ShouldRetryInternal(
private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode)
SubStatusCodes? subStatusCode,
CancellationToken cancellationToken)
{
if (statusCode == HttpStatusCode.Gone
&& (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
|| subStatusCode == SubStatusCodes.NameCacheIsStale
|| subStatusCode == SubStatusCodes.CompletingSplit
|| subStatusCode == SubStatusCodes.CompletingPartitionMigration)
&& this.retriesAttempted < MaxRetries)
if (statusCode == HttpStatusCode.Gone)
{
this.retriesAttempted++;
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
if (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
|| subStatusCode == SubStatusCodes.CompletingSplit
|| subStatusCode == SubStatusCodes.CompletingPartitionMigration)
{
PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
string containerRid = await this.container.GetCachedRIDAsync(forceRefresh: false, cancellationToken: cancellationToken);
await partitionKeyRangeCache.TryGetOverlappingRangesAsync(containerRid, FeedRangeEpk.FullRange.Range, forceRefresh: true);
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}

if (subStatusCode == SubStatusCodes.NameCacheIsStale)
{
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}
}

return null;
Expand Down
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
Expand Down Expand Up @@ -448,9 +449,11 @@ public async Task CannotAddToDispatchedBatch()
public async Task RetrierGetsCalledOnSplit()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
Expand All @@ -475,9 +478,11 @@ public async Task RetrierGetsCalledOnSplit()
public async Task RetrierGetsCalledOnCompletingSplit()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
Expand All @@ -502,9 +507,11 @@ public async Task RetrierGetsCalledOnCompletingSplit()
public async Task RetrierGetsCalledOnCompletingPartitionMigration()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
Expand Down Expand Up @@ -545,6 +552,16 @@ public async Task RetrierGetsCalledOnOverFlow()
retryDelegate.Verify(a => a(It.IsAny<ItemBatchOperation>(), It.IsAny<CancellationToken>()), Times.Once);
}

private static ContainerInternal GetSplitEnabledContainer()
{
Mock<ContainerInternal> container = new Mock<ContainerInternal>();
container.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>())).ReturnsAsync(Guid.NewGuid().ToString());
Mock<CosmosClientContext> context = new Mock<CosmosClientContext>();
container.Setup(c => c.ClientContext).Returns(context.Object);
context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());
return container.Object;
}

private class BatchAsyncBatcherThatOverflows : BatchAsyncBatcher
{
public BatchAsyncBatcherThatOverflows(
Expand All @@ -565,5 +582,28 @@ private class BatchAsyncBatcherThatOverflows : BatchAsyncBatcher
return new Tuple<PartitionKeyRangeServerBatchRequest, ArraySegment<ItemBatchOperation>>(serverRequest, new ArraySegment<ItemBatchOperation>(serverRequest.Operations.ToArray(), 1, 1));
}
}

private class ClientWithSplitDetection : MockDocumentClient
{
private readonly Mock<PartitionKeyRangeCache> partitionKeyRangeCache;

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
It.IsAny<Documents.Routing.Range<string>>(),
It.Is<bool>(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
)
).Returns((string collectionRid, Documents.Routing.Range<string> range, bool forceRefresh) => Task.FromResult<IReadOnlyList<PartitionKeyRange>>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
}

internal override Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsync()
{
return Task.FromResult(this.partitionKeyRangeCache.Object);
}

}
}
}
Expand Up @@ -51,6 +51,11 @@ public async Task RetryOnSplit()
Mock<ContainerInternal> mockContainer = new Mock<ContainerInternal>();
mockContainer.Setup(x => x.LinkUri).Returns(link);
mockContainer.Setup(x => x.GetPartitionKeyDefinitionAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(new PartitionKeyDefinition() { Paths = new Collection<string>() { "/id" } }));
mockContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>())).ReturnsAsync(Guid.NewGuid().ToString());
Mock<CosmosClientContext> context = new Mock<CosmosClientContext>();
mockContainer.Setup(c => c.ClientContext).Returns(context.Object);
context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());


CollectionRoutingMap routingMap = CollectionRoutingMap.TryCreateCompleteRoutingMap(
new[]
Expand Down Expand Up @@ -449,5 +454,28 @@ private class MyDocument

public bool Updated { get; set; }
}

private class ClientWithSplitDetection : MockDocumentClient
{
private readonly Mock<PartitionKeyRangeCache> partitionKeyRangeCache;

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
It.IsAny<Documents.Routing.Range<string>>(),
It.Is<bool>(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
)
).Returns((string collectionRid, Documents.Routing.Range<string> range, bool forceRefresh) => Task.FromResult<IReadOnlyList<PartitionKeyRange>>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
}

internal override Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsync()
{
return Task.FromResult(this.partitionKeyRangeCache.Object);
}

}
}
}

0 comments on commit 260324d

Please sign in to comment.