Skip to content

Commit

Permalink
Split fix for shared throughput collections (#121)
Browse files Browse the repository at this point in the history
This PR simplifies the check based on the fact that after split, either all or none of the child Partitions are available.
  • Loading branch information
ealsur committed Dec 14, 2018
1 parent bbbd832 commit f78c72e
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 149 deletions.
Expand Up @@ -13,6 +13,7 @@
using Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement;
using Microsoft.Azure.Documents.Client;
using Moq;
using Newtonsoft.Json;
using Xunit;

namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.PartitionManagement
Expand Down Expand Up @@ -90,114 +91,18 @@ public async Task SplitPartitionAsync_ShouldReturnOnlyNewLeases_IfSplitWasAlread
}

[Fact]
public async Task SplitPartitionAsync_ShouldThrow_IfCollectionProvisionedAndPKRangesReturn1()
public async Task SplitPartitionAsync_ShouldThrow_IfPKRangesReturn0()
{
const string lastKnowToken = "last know token";
const string collectionLink = "collectionLink";

IEnumerable<PartitionKeyRange> keyRanges = new[]
{
new PartitionKeyRange
{
Id = "20", Parents = new Collection<string>(new[] {"10"})
}
};

IEnumerable<Offer> offers = new[]
{
new Offer
{
Id = "1",
ResourceId = collectionLink
}
};

var lease = Mock.Of<ILease>(l => l.PartitionId == "10" && l.ContinuationToken == lastKnowToken);

var keyRangeResponse = Mock.Of<IFeedResponse<PartitionKeyRange>>(r => r.GetEnumerator() == keyRanges.GetEnumerator());
var offerResponse = Mock.Of<IFeedResponse<Offer>>(r => r.GetEnumerator() == offers.GetEnumerator());
IChangeFeedDocumentClient documentClient = Mock.Of<IChangeFeedDocumentClient>(c =>
c.ReadPartitionKeyRangeFeedAsync(It.IsAny<string>(), It.IsAny<FeedOptions>()) == Task.FromResult(keyRangeResponse)
&& c.ReadOffersFeedAsync(It.IsAny<FeedOptions>()) == Task.FromResult(offerResponse));

var lease20 = Mock.Of<ILease>();
ILeaseManager leaseManager = Mock.Of<ILeaseManager>(m =>
m.CreateLeaseIfNotExistAsync("20", lastKnowToken) == Task.FromResult(lease20));
var leaseContainer = Mock.Of<ILeaseContainer>();

var sut = new PartitionSynchronizer(documentClient, collectionLink, leaseContainer, leaseManager, 1, int.MaxValue);
Exception exception = await Record.ExceptionAsync(async () => await sut.SplitPartitionAsync(lease));
Assert.IsAssignableFrom<InvalidOperationException>(exception);
}

[Fact]
public async Task SplitPartitionAsync_ShouldPass_IfDatabaseProvisionedAndPKRangesReturn1()
{
const string lastKnowToken = "last know token";
const string databaseLink = "databaseLink";
const string collectionLink = "collectionLink";

IEnumerable<PartitionKeyRange> keyRanges = new[]
{
new PartitionKeyRange
{
Id = "20", Parents = new Collection<string>(new[] {"10"})
}
};

IEnumerable<Offer> offers = new[]
{
new Offer
{
Id = "1",
ResourceId = databaseLink
}
};

var lease = Mock.Of<ILease>(l => l.PartitionId == "10" && l.ContinuationToken == lastKnowToken);

var keyRangeResponse = Mock.Of<IFeedResponse<PartitionKeyRange>>(r => r.GetEnumerator() == keyRanges.GetEnumerator());
var offerResponse = Mock.Of<IFeedResponse<Offer>>(r => r.GetEnumerator() == offers.GetEnumerator());
IChangeFeedDocumentClient documentClient = Mock.Of<IChangeFeedDocumentClient>(c =>
c.ReadPartitionKeyRangeFeedAsync(It.IsAny<string>(), It.IsAny<FeedOptions>()) == Task.FromResult(keyRangeResponse)
&& c.ReadOffersFeedAsync(It.IsAny<FeedOptions>()) == Task.FromResult(offerResponse));

var lease20 = Mock.Of<ILease>();
ILeaseManager leaseManager = Mock.Of<ILeaseManager>(m =>
m.CreateLeaseIfNotExistAsync("20", lastKnowToken) == Task.FromResult(lease20));
var leaseContainer = Mock.Of<ILeaseContainer>();

var sut = new PartitionSynchronizer(documentClient, collectionLink, leaseContainer, leaseManager, 1, int.MaxValue);
IEnumerable<ILease> result = await sut.SplitPartitionAsync(lease);
Assert.NotNull(result);
Assert.Equal(new[] { lease20 }, result);
}

[Fact]
public async Task SplitPartitionAsync_ShouldThrow_IfDatabaseProvisionedAndPKRangesReturn0()
{
const string lastKnowToken = "last know token";
const string databaseLink = "databaseLink";
const string collectionLink = "collectionLink";

IEnumerable<PartitionKeyRange> keyRanges = Enumerable.Empty<PartitionKeyRange>();

IEnumerable<Offer>offers = new[]
{
new Offer
{
Id = "1",
ResourceId = databaseLink
}
};

var lease = Mock.Of<ILease>(l => l.PartitionId == "10" && l.ContinuationToken == lastKnowToken);

var keyRangeResponse = Mock.Of<IFeedResponse<PartitionKeyRange>>(r => r.GetEnumerator() == keyRanges.GetEnumerator());
var offerResponse = Mock.Of<IFeedResponse<Offer>>(r => r.GetEnumerator() == offers.GetEnumerator());
IChangeFeedDocumentClient documentClient = Mock.Of<IChangeFeedDocumentClient>(c =>
c.ReadPartitionKeyRangeFeedAsync(It.IsAny<string>(), It.IsAny<FeedOptions>()) == Task.FromResult(keyRangeResponse)
&& c.ReadOffersFeedAsync(It.IsAny<FeedOptions>()) == Task.FromResult(offerResponse));
c.ReadPartitionKeyRangeFeedAsync(It.IsAny<string>(), It.IsAny<FeedOptions>()) == Task.FromResult(keyRangeResponse));

var lease20 = Mock.Of<ILease>();
ILeaseManager leaseManager = Mock.Of<ILeaseManager>(m =>
Expand Down
Expand Up @@ -28,9 +28,6 @@ internal class PartitionSynchronizer : IPartitionSynchronizer
private readonly int degreeOfParallelism;
private readonly int maxBatchSize;

// Whether the collection has its own provisioned throughput or is using the Shared Throughput at the database level
private bool? hasCollectionProvisionedThroughput;

public PartitionSynchronizer(
IChangeFeedDocumentClient documentClient,
string collectionSelfLink,
Expand Down Expand Up @@ -64,12 +61,13 @@ public async Task<IEnumerable<ILease>> SplitPartitionAsync(ILease lease)
string lastContinuationToken = lease.ContinuationToken;

Logger.InfoFormat("Partition {0} is gone due to split", partitionId);

// After split the childs are either all or none available
List<PartitionKeyRange> ranges = await this.EnumPartitionKeyRangesAsync().ConfigureAwait(false);
List<string> addedPartitionIds = ranges.Where(range => range.Parents.Contains(partitionId)).Select(range => range.Id).ToList();
if (addedPartitionIds.Count == 0
|| (addedPartitionIds.Count < 2 && await this.HasCollectionProvisionedThroughput().ConfigureAwait(false)))
if (addedPartitionIds.Count == 0)
{
Logger.ErrorFormat("Partition {0} had split but we failed to find at least 2 child partitions", partitionId);
Logger.ErrorFormat("Partition {0} had split but we failed to find at least one child partition", partitionId);
throw new InvalidOperationException();
}

Expand Down Expand Up @@ -118,34 +116,6 @@ private async Task<List<PartitionKeyRange>> EnumPartitionKeyRangesAsync()
return partitionKeyRanges;
}

private async Task<bool> HasCollectionProvisionedThroughput()
{
if (this.hasCollectionProvisionedThroughput.HasValue)
{
return this.hasCollectionProvisionedThroughput.Value;
}

IFeedResponse<Offer> response = null;
var offers = new List<Offer>();
do
{
var feedOptions = new FeedOptions
{
MaxItemCount = this.maxBatchSize,
RequestContinuation = response?.ResponseContinuation,
};
response = await this.documentClient.ReadOffersFeedAsync(feedOptions).ConfigureAwait(false);
foreach (Offer offer in response)
{
offers.Add(offer);
}
}
while (!string.IsNullOrEmpty(response.ResponseContinuation));

this.hasCollectionProvisionedThroughput = offers.Any(x => x.ResourceId == this.collectionSelfLink);
return this.hasCollectionProvisionedThroughput.Value;
}

/// <summary>
/// Creates leases if they do not exist. This might happen on initial start or if some lease was unexpectedly lost.
/// Leases are created without the continuation token. It means partitions will be read according to 'From Beginning' or
Expand Down
Expand Up @@ -156,15 +156,5 @@ public IQueryable<T> CreateDocumentQuery<T>(string documentCollectionUri, SqlQue
{
return this.documentClient.CreateDocumentQuery<T>(documentCollectionUri, querySpec, feedOptions);
}

/// <summary>
/// Reads the feed (sequence) of <see cref="Offer"/> for the database account.
/// </summary>
/// <param name="options">The <see cref="Microsoft.Azure.Documents.Client.FeedOptions"/>for this request.</param>
/// <returns>A list of <see cref="Offer"/>.</returns>
public async Task<IFeedResponse<Offer>> ReadOffersFeedAsync(FeedOptions options = null)
{
return await this.documentClient.ReadOffersFeedAsync(options).ConfigureAwait(false);
}
}
}
Expand Up @@ -120,12 +120,5 @@ public interface IChangeFeedDocumentClient
/// <param name="feedOptions">Options for the query.</param>
/// <returns>The query result set.</returns>
IQueryable<T> CreateDocumentQuery<T>(string documentCollectionUri, SqlQuerySpec querySpec, FeedOptions feedOptions = null);

/// <summary>
/// Reads the feed (sequence) of <see cref="Offer"/> for the database account.
/// </summary>
/// <param name="options">The <see cref="Microsoft.Azure.Documents.Client.FeedOptions"/>for this request.</param>
/// <returns>A list of <see cref="Offer"/>.</returns>
Task<IFeedResponse<Offer>> ReadOffersFeedAsync(FeedOptions options = null);
}
}

0 comments on commit f78c72e

Please sign in to comment.