Skip to content

Commit

Permalink
ChangeFeedProcessor: Adds AllVersionsAndDeletes support to ChangeFeed…
Browse files Browse the repository at this point in the history
…Processor (#4370)

* preview cfp ffcf

* ran updatecontracts

* including this in Encryption

* fixing name onChangesDelegate

* sdkproject on encryptioncontainer

* try this again

* try, try, try again

* with impl
  • Loading branch information
philipthomas-MSFT committed Mar 26, 2024
1 parent 75a2e5f commit 71e58ee
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.Encryption.Custom
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json.Linq;

internal sealed class EncryptionContainer : Container
Expand Down Expand Up @@ -1023,6 +1022,16 @@ internal sealed class EncryptionContainer : Container
}
#endif

#if SDKPROJECTREF
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
{
return this.container.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName,
onChangesDelegate);
}
#endif
private async Task<ResponseMessage> ReadManyItemsHelperAsync(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
Expand Down
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.Azure.Cosmos.Encryption
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json.Linq;

internal sealed class EncryptionContainer : Container
Expand Down Expand Up @@ -756,6 +755,14 @@ internal sealed class EncryptionContainer : Container
}
#endif

#if SDKPROJECTREF
public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
{
throw new NotImplementedException();
}
#endif
/// <summary>
/// This function handles the scenario where a container is deleted(say from different Client) and recreated with same Id but with different client encryption policy.
/// The idea is to have the container Rid cached and sent out as part of RequestOptions with Container Rid set in "x-ms-cosmos-intended-collection-rid" header.
Expand Down
78 changes: 76 additions & 2 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Serializer;

/// <summary>
/// Operations for reading, replacing, or deleting a specific, existing container or item in a container by id.
Expand Down Expand Up @@ -1681,6 +1680,81 @@ public abstract class Container
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);

/// <summary>
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
/// </summary>
/// <typeparam name="T">Document type</typeparam>
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// Container leaseContainer = await this.database.CreateContainerAsync(
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
/// cancellationToken: this.cancellationToken);
///
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
///
/// ChangeFeedProcessor changeFeedProcessor = this.Container
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
/// {
/// Console.WriteLine($"number of documents processed: {documents.Count}");
///
/// string id = default;
/// string pk = default;
/// string description = default;
///
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
/// {
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
/// {
/// id = changeFeedItem.Current.id.ToString();
/// pk = changeFeedItem.Current.pk.ToString();
/// description = changeFeedItem.Current.description.ToString();
/// }
/// else
/// {
/// id = changeFeedItem.Previous.id.ToString();
/// pk = changeFeedItem.Previous.pk.ToString();
/// description = changeFeedItem.Previous.description.ToString();
/// }
///
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
/// long lsn = changeFeedItem.Metadata.Lsn;
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
/// }
///
/// return Task.CompletedTask;
/// })
/// .WithInstanceName(Guid.NewGuid().ToString())
/// .WithLeaseContainer(leaseContainer)
/// .WithErrorNotification((leaseToken, error) =>
/// {
/// Console.WriteLine(error.ToString());
///
/// return Task.CompletedTask;
/// })
/// .Build();
///
/// await changeFeedProcessor.StartAsync();
/// await Task.Delay(1000);
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
///
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
///
/// await changeFeedProcessor.StopAsync();
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.ReadFeed;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;

// This class acts as a wrapper for environments that use SynchronizationContext.
Expand Down Expand Up @@ -661,14 +660,5 @@ public override FeedIteratorInternal GetReadFeedIterator(QueryDefinition queryDe
task: (trace) => base.DeleteAllItemsByPartitionKeyStreamAsync(partitionKey, trace, requestOptions, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
{
return base.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName,
onChangesDelegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,82 +147,11 @@ public static void ValidatePartitionKey(object partitionKey, RequestOptions requ
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);
#endif

/// <summary>
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
/// </summary>
/// <typeparam name="T">Document type</typeparam>
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// Container leaseContainer = await this.database.CreateContainerAsync(
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
/// cancellationToken: this.cancellationToken);
///
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
///
/// ChangeFeedProcessor changeFeedProcessor = this.Container
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
/// {
/// Console.WriteLine($"number of documents processed: {documents.Count}");
///
/// string id = default;
/// string pk = default;
/// string description = default;
///
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
/// {
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
/// {
/// id = changeFeedItem.Current.id.ToString();
/// pk = changeFeedItem.Current.pk.ToString();
/// description = changeFeedItem.Current.description.ToString();
/// }
/// else
/// {
/// id = changeFeedItem.Previous.id.ToString();
/// pk = changeFeedItem.Previous.pk.ToString();
/// description = changeFeedItem.Previous.description.ToString();
/// }
///
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
/// long lsn = changeFeedItem.Metadata.Lsn;
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
/// }
///
/// return Task.CompletedTask;
/// })
/// .WithInstanceName(Guid.NewGuid().ToString())
/// .WithLeaseContainer(leaseContainer)
/// .WithErrorNotification((leaseToken, error) =>
/// {
/// Console.WriteLine(error.ToString());
///
/// return Task.CompletedTask;
/// })
/// .Build();
///
/// await changeFeedProcessor.StartAsync();
/// await Task.Delay(1000);
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
///
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
///
/// await changeFeedProcessor.StopAsync();
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
#endif

public abstract class TryExecuteQueryResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@
"Microsoft.Azure.Cosmos.Container;System.Object;IsAbstract:True;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": {
"Subclasses": {},
"Members": {
"Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes[T](System.String, ChangeFeedHandler`1)": {
"Type": "Method",
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes[T](System.String, ChangeFeedHandler`1);IsAbstract:True;IsStatic:False;IsVirtual:True;IsGenericMethod:True;IsConstructor:False;IsFinal:False;"
},
"System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.ResponseMessage] DeleteAllItemsByPartitionKeyStreamAsync(Microsoft.Azure.Cosmos.PartitionKey, Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)": {
"Type": "Method",
"Attributes": [],
Expand Down

0 comments on commit 71e58ee

Please sign in to comment.