Skip to content

Commit

Permalink
Propagate DocumentClientException from ProcessChangesAsync (#127)
Browse files Browse the repository at this point in the history
Fix DocumentClientExceptions happening in user code and affecting Processor logic
  • Loading branch information
ealsur committed Jan 28, 2019
1 parent f78c72e commit 50fc7df
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 11 deletions.
@@ -0,0 +1,60 @@
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. Licensed under the MIT license.
//----------------------------------------------------------------

using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
using Xunit;

namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Exceptions
{
[Trait("Category", "Gated")]
public class ObserverExceptionTests
{
[Fact]
public void ValidateConstructor()
{
Exception exception = new Exception("randomMessage");
var ex = new ObserverException(exception);
Assert.Equal(exception.Message, ex.InnerException.Message);
Assert.Equal(exception, ex.InnerException);
}

// Tests the GetObjectData method and the serialization ctor.
[Fact]
public void ValidateSerialization_AllFields()
{
Exception exception = new Exception("randomMessage");
var originalException = new ObserverException(exception);
var buffer = new byte[4096];
var formatter = new BinaryFormatter();
var stream1 = new MemoryStream(buffer);
var stream2 = new MemoryStream(buffer);

formatter.Serialize(stream1, originalException);
var deserializedException = (ObserverException)formatter.Deserialize(stream2);

Assert.Equal(originalException.Message, deserializedException.Message);
Assert.Equal(originalException.InnerException.Message, deserializedException.InnerException.Message);
}

// Make sure that when some fields are not set, serialization is not broken.
[Fact]
public void ValidateSerialization_NullFields()
{
var originalException = new ObserverException(null);
var buffer = new byte[4096];
var formatter = new BinaryFormatter();
var stream1 = new MemoryStream(buffer);
var stream2 = new MemoryStream(buffer);

formatter.Serialize(stream1, originalException);
var deserializedException = (ObserverException)formatter.Deserialize(stream2);

Assert.Equal(originalException.Message, deserializedException.Message);
Assert.Null(deserializedException.InnerException);
}
}
}
@@ -0,0 +1,116 @@
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. Licensed under the MIT license.
//----------------------------------------------------------------

namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.FeedProcessor
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils;
using Moq;
using Xunit;

[Trait("Category", "Gated")]
public class ObserverExceptionWrappingChangeFeedObserverDecoratorTests
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly IChangeFeedObserver observer;
private readonly IChangeFeedObserverContext changeFeedObserverContext;
private readonly FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator observerWrapper;
private readonly List<Document> documents;

public ObserverExceptionWrappingChangeFeedObserverDecoratorTests()
{
this.observer = Mock.Of<IChangeFeedObserver>();
this.changeFeedObserverContext = Mock.Of<IChangeFeedObserverContext>();
this.observerWrapper = new FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator(observer);

var document = new Document();
documents = new List<Document> { document };
}

[Fact]
public async Task OpenAsync_ShouldCallOpenAsync()
{
await observerWrapper.OpenAsync(this.changeFeedObserverContext);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.OpenAsync(It.IsAny<IChangeFeedObserverContext>()),
Times.Once);
}

[Fact]
public async Task CloseAsync_ShouldCallCloseAsync()
{
await observerWrapper.CloseAsync(this.changeFeedObserverContext, ChangeFeedObserverCloseReason.Shutdown);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.CloseAsync(It.IsAny<IChangeFeedObserverContext>(),
It.Is<ChangeFeedObserverCloseReason>(reason => reason == ChangeFeedObserverCloseReason.Shutdown)),
Times.Once);
}

[Fact]
public async Task ProcessChangesAsync_ShouldPassDocumentsToProcessChangesAsync()
{
await observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(),
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
It.IsAny<CancellationToken>()
),
Times.Once);
}

[Fact]
public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrows()
{
Mock.Get(observer)
.SetupSequence(feedObserver => feedObserver
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
.Throws(new Exception());

Exception exception = await Record.ExceptionAsync(() => observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token));
Assert.IsAssignableFrom<ObserverException>(exception);
Assert.IsAssignableFrom<Exception>(exception.InnerException);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(),
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
It.IsAny<CancellationToken>()
),
Times.Once);
}

[Fact]
public async Task ProcessChangesAsync_ShouldThrow_IfObserverThrowsDocumentClientException()
{
Mock.Get(observer)
.SetupSequence(feedObserver => feedObserver
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
.Throws(DocumentExceptionHelpers.CreateRequestRateTooLargeException());

Exception exception = await Record.ExceptionAsync(() => observerWrapper.ProcessChangesAsync(this.changeFeedObserverContext, this.documents, cancellationTokenSource.Token));
Assert.IsAssignableFrom<ObserverException>(exception);
Assert.IsAssignableFrom<DocumentClientException>(exception.InnerException);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(),
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
It.IsAny<CancellationToken>()
),
Times.Once);
}
}
}
Expand Up @@ -10,6 +10,7 @@ namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.FeedProcessor
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents.ChangeFeedProcessor.DataAccess;
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils;
using Microsoft.Azure.Documents.Client;
Expand Down Expand Up @@ -70,7 +71,7 @@ public PartitionProcessorTests()

observer = Mock.Of<IChangeFeedObserver>();
var checkPointer = new Mock<IPartitionCheckpointer>();
sut = new PartitionProcessor(observer, docClient, processorSettings, checkPointer.Object);
sut = new PartitionProcessor(new FeedProcessing.ObserverExceptionWrappingChangeFeedObserverDecorator(observer), docClient, processorSettings, checkPointer.Object);
}

[Fact]
Expand Down Expand Up @@ -175,6 +176,72 @@ public async Task Run_ShouldRetryQuery_IfDocDBThrowsCanceled()
Times.Once);
}

[Fact]
public async Task Run_ShouldThrow_IfObserverThrows()
{
Mock.Get(documentQuery)
.Reset();

Mock.Get(documentQuery)
.SetupSequence(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)))
.ReturnsAsync(feedResponse);

Mock.Get(observer)
.SetupSequence(feedObserver => feedObserver
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
.Throws(new CustomException())
.Returns(Task.CompletedTask);

Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(cancellationTokenSource.Token));
Assert.IsAssignableFrom<ObserverException>(exception);
Assert.IsAssignableFrom<CustomException>(exception.InnerException);

Mock.Get(documentQuery)
.Verify(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)), Times.Once);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.ProcessChangesAsync(
It.Is<IChangeFeedObserverContext>(context => context.PartitionKeyRangeId == processorSettings.PartitionKeyRangeId),
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
It.IsAny<CancellationToken>()),
Times.Once);
}

[Fact]
public async Task Run_ShouldThrow_IfObserverThrowsDocumentClientException()
{
// If the user code throws a DCE, we should bubble it up to stop the Observer and not treat it as a DCE from the Feed Query

Mock.Get(documentQuery)
.Reset();

Mock.Get(documentQuery)
.Setup(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)))
.ReturnsAsync(feedResponse)
.Callback(() => cancellationTokenSource.Cancel());

Mock.Get(observer)
.Setup(feedObserver => feedObserver
.ProcessChangesAsync(It.IsAny<IChangeFeedObserverContext>(), It.IsAny<IReadOnlyList<Document>>(), It.IsAny<CancellationToken>()))
.Throws(DocumentExceptionHelpers.CreateRequestRateTooLargeException());

Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(cancellationTokenSource.Token));
Assert.IsAssignableFrom<ObserverException>(exception);
Assert.IsAssignableFrom<DocumentClientException>(exception.InnerException);

Mock.Get(documentQuery)
.Verify(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)), Times.Once);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.ProcessChangesAsync(
It.Is<IChangeFeedObserverContext>(context => context.PartitionKeyRangeId == processorSettings.PartitionKeyRangeId),
It.Is<IReadOnlyList<Document>>(list => list.SequenceEqual(documents)),
It.IsAny<CancellationToken>()),
Times.Once);
}

/// <summary>
/// (1) Read normal feed
/// (2) Get 400 with
Expand Down Expand Up @@ -250,5 +317,9 @@ public async Task Run_ShouldDecreaseMaxItemCountWhenNeeded()

Assert.Equal("token.token2.token3.", accumulator);
}

private class CustomException : Exception
{
}
}
}
Expand Up @@ -110,6 +110,21 @@ public async Task RunObserver_ShouldCancelRenewer_IfProcessorFailed()
Assert.Equal("processorException", exception.Message);
Assert.True(renewerTask.IsCanceled);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.CloseAsync(It.Is<ChangeFeedObserverContext>(context => context.PartitionKeyRangeId == lease.PartitionId),
ChangeFeedObserverCloseReason.Unknown));
}

[Fact]
public async Task RunObserver_ShouldCloseWithObserverError_IfObserverFailed()
{
Mock.Get(partitionProcessor)
.Setup(processor => processor.RunAsync(It.IsAny<CancellationToken>()))
.ThrowsAsync(new ObserverException(new Exception()));

Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(shutdownToken.Token)).ConfigureAwait(false);

Mock.Get(observer)
.Verify(feedObserver => feedObserver
.CloseAsync(It.Is<ChangeFeedObserverContext>(context => context.PartitionKeyRangeId == lease.PartitionId),
Expand Down
Expand Up @@ -9,7 +9,7 @@

namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils
{
public static class DocumentExceptionHelpers
internal static class DocumentExceptionHelpers
{
public static Exception CreateNotFoundException()
{
Expand All @@ -26,17 +26,21 @@ public static Exception CreateConflictException()

public static Exception CreateRequestRateTooLargeException()
{
return CreateException("Microsoft.Azure.Documents.RequestRateTooLargeException", 1);
return CreateException("Microsoft.Azure.Documents.RequestRateTooLargeException", 1, "Request throttled", 100);
}

public static Exception CreateException(string exceptionType, int subStatusCode, string message = "")
public static Exception CreateException(string exceptionType, int subStatusCode, string message = "", int retryAfter = 0)
{
Type t = typeof(DocumentClientException)
.Assembly.GetType(exceptionType);

HttpResponseHeaders httpResponseHeaders = CreateResponseHeaders();
httpResponseHeaders.TryAddWithoutValidation("x-ms-substatus", subStatusCode.ToString());
httpResponseHeaders.TryAddWithoutValidation("x-ms-activity-id", "activityId");
if (retryAfter > 0)
{
httpResponseHeaders.TryAddWithoutValidation("x-ms-retry-after-ms", retryAfter.ToString());
}

object ex = t.GetConstructor(
BindingFlags.Public | BindingFlags.Instance,
Expand Down
Expand Up @@ -21,7 +21,7 @@
<AssemblyName>Microsoft.Azure.Documents.ChangeFeedProcessor</AssemblyName>

<PackageId>Microsoft.Azure.DocumentDB.ChangeFeedProcessor</PackageId>
<PackageVersion>2.2.5</PackageVersion>
<PackageVersion>2.2.6</PackageVersion>
<Title>Microsoft Azure Cosmos DB Change Feed Processor library</Title>
<Authors>Microsoft</Authors>
<PackageLicenseUrl>http://go.microsoft.com/fwlink/?LinkID=509837</PackageLicenseUrl>
Expand All @@ -44,9 +44,9 @@
<!--CS1587:XML comment is not placed on a valid language element
LibLog files have misplaced comments, but we cannot touch them.-->
<NoWarn>1587</NoWarn>
<Version>2.2.5</Version>
<AssemblyVersion>2.2.5.0</AssemblyVersion>
<FileVersion>2.2.5.0</FileVersion>
<Version>2.2.6</Version>
<AssemblyVersion>2.2.6.0</AssemblyVersion>
<FileVersion>2.2.6.0</FileVersion>
<PackageReleaseNotes>The change log for this project is available at https://docs.microsoft.com/azure/cosmos-db/sql-api-sdk-dotnet-changefeed.
</PackageReleaseNotes>
</PropertyGroup>
Expand Down

0 comments on commit 50fc7df

Please sign in to comment.