Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChangeFeedProcessor: Adds AllVersionsAndDeletes support to ChangeFeedProcessor #4370

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.Encryption.Custom
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json.Linq;

internal sealed class EncryptionContainer : Container
Expand Down Expand Up @@ -1023,6 +1022,16 @@ internal sealed class EncryptionContainer : Container
}
#endif

#if SDKPROJECTREF
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
{
return this.container.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName,
onChangesDelegate);
}
#endif
private async Task<ResponseMessage> ReadManyItemsHelperAsync(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.Azure.Cosmos.Encryption
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json.Linq;

internal sealed class EncryptionContainer : Container
Expand Down Expand Up @@ -756,6 +755,14 @@ internal sealed class EncryptionContainer : Container
}
#endif

#if SDKPROJECTREF
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
{
throw new NotImplementedException();
}
#endif
/// <summary>
/// This function handles the scenario where a container is deleted(say from different Client) and recreated with same Id but with different client encryption policy.
/// The idea is to have the container Rid cached and sent out as part of RequestOptions with Container Rid set in "x-ms-cosmos-intended-collection-rid" header.
Expand Down
78 changes: 76 additions & 2 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Serializer;

/// <summary>
/// Operations for reading, replacing, or deleting a specific, existing container or item in a container by id.
Expand Down Expand Up @@ -1681,6 +1680,81 @@ public abstract class Container
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);

/// <summary>
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
/// </summary>
/// <typeparam name="T">Document type</typeparam>
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// Container leaseContainer = await this.database.CreateContainerAsync(
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
/// cancellationToken: this.cancellationToken);
///
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
///
/// ChangeFeedProcessor changeFeedProcessor = this.Container
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
/// {
/// Console.WriteLine($"number of documents processed: {documents.Count}");
///
/// string id = default;
/// string pk = default;
/// string description = default;
///
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
/// {
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
/// {
/// id = changeFeedItem.Current.id.ToString();
/// pk = changeFeedItem.Current.pk.ToString();
/// description = changeFeedItem.Current.description.ToString();
/// }
/// else
/// {
/// id = changeFeedItem.Previous.id.ToString();
/// pk = changeFeedItem.Previous.pk.ToString();
/// description = changeFeedItem.Previous.description.ToString();
/// }
///
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
/// long lsn = changeFeedItem.Metadata.Lsn;
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
/// }
///
/// return Task.CompletedTask;
/// })
/// .WithInstanceName(Guid.NewGuid().ToString())
/// .WithLeaseContainer(leaseContainer)
/// .WithErrorNotification((leaseToken, error) =>
/// {
/// Console.WriteLine(error.ToString());
///
/// return Task.CompletedTask;
/// })
/// .Build();
///
/// await changeFeedProcessor.StartAsync();
/// await Task.Delay(1000);
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
///
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
///
/// await changeFeedProcessor.StopAsync();
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.ReadFeed;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;

// This class acts as a wrapper for environments that use SynchronizationContext.
Expand Down Expand Up @@ -661,14 +660,5 @@ public override FeedIteratorInternal GetReadFeedIterator(QueryDefinition queryDe
task: (trace) => base.DeleteAllItemsByPartitionKeyStreamAsync(partitionKey, trace, requestOptions, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
{
return base.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName,
onChangesDelegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,82 +147,11 @@ public static void ValidatePartitionKey(object partitionKey, RequestOptions requ
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);
#endif

/// <summary>
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
/// </summary>
/// <typeparam name="T">Document type</typeparam>
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// Container leaseContainer = await this.database.CreateContainerAsync(
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
/// cancellationToken: this.cancellationToken);
///
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
///
/// ChangeFeedProcessor changeFeedProcessor = this.Container
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
/// {
/// Console.WriteLine($"number of documents processed: {documents.Count}");
///
/// string id = default;
/// string pk = default;
/// string description = default;
///
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
/// {
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
/// {
/// id = changeFeedItem.Current.id.ToString();
/// pk = changeFeedItem.Current.pk.ToString();
/// description = changeFeedItem.Current.description.ToString();
/// }
/// else
/// {
/// id = changeFeedItem.Previous.id.ToString();
/// pk = changeFeedItem.Previous.pk.ToString();
/// description = changeFeedItem.Previous.description.ToString();
/// }
///
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
/// long lsn = changeFeedItem.Metadata.Lsn;
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
/// }
///
/// return Task.CompletedTask;
/// })
/// .WithInstanceName(Guid.NewGuid().ToString())
/// .WithLeaseContainer(leaseContainer)
/// .WithErrorNotification((leaseToken, error) =>
/// {
/// Console.WriteLine(error.ToString());
///
/// return Task.CompletedTask;
/// })
/// .Build();
///
/// await changeFeedProcessor.StartAsync();
/// await Task.Delay(1000);
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
///
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
///
/// await changeFeedProcessor.StopAsync();
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
#endif

public abstract class TryExecuteQueryResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@
"Microsoft.Azure.Cosmos.Container;System.Object;IsAbstract:True;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": {
"Subclasses": {},
"Members": {
"Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes[T](System.String, ChangeFeedHandler`1)": {
"Type": "Method",
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes[T](System.String, ChangeFeedHandler`1);IsAbstract:True;IsStatic:False;IsVirtual:True;IsGenericMethod:True;IsConstructor:False;IsFinal:False;"
},
"System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.ResponseMessage] DeleteAllItemsByPartitionKeyStreamAsync(Microsoft.Azure.Cosmos.PartitionKey, Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)": {
"Type": "Method",
"Attributes": [],
Expand Down