Skip to content
This library provides a host for distributing change feed events in partitioned collection across multiple observers. Instances of the host can scale up (by adding) or down (by removing) dynamically, and the load will be automatically distributed among active instances in about-equal way.
Branch: master
Clone or download
ealsur Propagate DocumentClientException from ProcessChangesAsync (#127)
Fix DocumentClientExceptions happening in user code and affecting Processor logic
Latest commit 50fc7df Jan 28, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
src Propagate DocumentClientException from ProcessChangesAsync (#127) Jan 28, 2019
.gitattributes
.gitignore
ChangeFeedProcessor.png
LICENSE
README.md Fix public exceptions serialization: add missing GetObjectData to Par… ( Jun 14, 2018

README.md

Change Feed Processor library (v2)

This library helps distributing Azure Cosmos DB change feed events in partitioned collection across multiple observers. Instances of the processor can be scaled up (by adding) or down (by removing) dynamically, with partition load being automatically distributed among active instances in about-equal way.

Releases

Change Feed Processor for .Net is released as NuGet package.

Design highlights

For illustration, let's assume we are processing the change feed from Monitored collection, which is partitioned by city. The arrows in the preceding diagram indicate the current position (continuation) in the change feed.

Change Feed Processor

  • For coordinating the processing of change feed across multiple workers, the leasing mechanism is used. To store leases, a separate collection (Lease collection in the diagram) is used.

    • There is one lease per partition.
    • Each lease in persisted as document in the Lease collection.
    • Lease has the following attributes:
      • Owner: specifies the host that owns the lease (no owner - the lease is available).
      • Continuation: specifies position (continuation token) in the change feed for particular partition.
      • Timestamp: last time the lease was updated.
      • Expiration interval: if a lease wasn't updated during this time it is considered expired and can be taken by other hosts.
  • Each instance of the processor (IChangeFeedProcessor) does the following:

    • Determines how many partitions (leases) to own. This is based on how many other instances of the processor own active leases.
    • Acquires leases: starting with available leases, then expired leases, then steals leases from other processors if needed to fill up load-balancing target.
    • For each owned lease:
      • Renews the lease periodically. This is to make sure that lease expiration time is extended.
      • Reads change feed from the partition the lease is associated with.
      • Notifies the consumer (the observer, one per lease) with these changes.
      • After successful delivery notification, checkpoints last continuation token to its lease.
      • If lease is lost or stolen, notifies the observer.
    • For concurrency safety, for each lease update, ETag (if-match condition) is used. If it fails, the processor reads the lease document and checks whether it still owns it, and if not, the lease is considered lost.
    • On shutdown, releases all leases (sets Owner to null), but keeps the Continuation so that next time change feed for this partition will continue from last checkpoint.
  • Scalability considerations

    • If one processor owns all leases, and another one comes up, the latter will start stealing leases from the former, so that eventually the leases will be about-equally divided between running hosts.
    • If a processor crashes, hangs or loses network connection, its leases will expire and will be distributes between other processors.

Usage

The workflow

  • Implement Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver and, if needed, IChangeFeedObserverFactory.
  • Create DocumentCollectionInfo for monitored collection and lease collection.
  • Create ChangeFeedProcessorBuilder and use its .With... methods to build desired configuration, then call .BuildAsync() method to build the processor instance.
  • Call processor.StartAsync().
  • When done, call processor.StopAsync().

Example

// Observer.cs
namespace Sample
{
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Documents;
    using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;

    class SampleObserver : IChangeFeedObserver
    {
        public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
        {
            return Task.CompletedTask;  // Note: requires targeting .Net 4.6+.
        }

        public Task OpenAsync(IChangeFeedObserverContext context)
        {
             return Task.CompletedTask;
        }

        public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
        {
            Console.WriteLine("ProcessChangesAsync: partition {0}, {1} docs", context.PartitionKeyRangeId, docs.Count);
            return Task.CompletedTask;
        }
    }
}

// Main.cs
namespace Sample
{
    using System;
    using System.Threading.Tasks;
    using Microsoft.Azure.Documents.ChangeFeedProcessor;
    using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;

    class ChangeFeedProcessorSample
    {
        public static void Run()
        {
            RunAsync().Wait();
        }

        static async Task RunAsync()
        {
            DocumentCollectionInfo feedCollectionInfo = new DocumentCollectionInfo()
            {
                DatabaseName = "DatabaseName",
                CollectionName = "MonitoredCollectionName",
                Uri = new Uri("https://sampleservice.documents.azure.com:443/"),
                MasterKey = "-- the auth key"
            };

            DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo()
            {
                DatabaseName = "DatabaseName",
                CollectionName = "leases",
                Uri = new Uri("https://sampleservice.documents.azure.com:443/"),
                MasterKey = "-- the auth key"
            };

            var builder = new ChangeFeedProcessorBuilder();
            var processor = await builder
                .WithHostName("SampleHost")
                .WithFeedCollection(feedCollectionInfo)
                .WithLeaseCollection(leaseCollectionInfo)
                .WithObserver<SampleObserver>()
                .BuildAsync();

            await processor.StartAsync();

            Console.WriteLine("Change Feed Processor started. Press <Enter> key to stop...");
            Console.ReadLine();

            await processor.StopAsync();
        }
    }
}

Note on obsolete API

The following v1 API from v1 is is present in v2 for backward compatibility but is marked obsolete. It is recommended to use new API.

  • Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedEventHost.
    Use Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorBuilder.
  • Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedHostOptions.
    Use Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorOptions. Note that ChangeFeedProcessorBuilder does not take ChangeFeedOptions, use ChangeFeedProcessorOptions to provide ChangeFeedOptions-related parameters.
  • Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedObserverContext.
    Use Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserverContext that has ProcessChangesAsync taking new CancellationToken parameter.
  • Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedObserverCloseReason.
    Use Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.ChangeFeedObserverCloseReason.
  • Microsoft.Azure.Documents.ChangeFeedProcessor.IChangeFeedObserver.
    Use Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver.
  • Microsoft.Azure.Documents.ChangeFeedProcessor.IChangeFeedObserverFactory.
    Use Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserverFactory.

Troubleshooting

How to enable tracing

Change Feed Processor library is using LibLog and supports a few log providers out of the box.

Log provider for .Net tracing is provided by the library and needs to be enabled. Follow steps below to enable .Net tracing:

  • When using obsolete (v1) API
    Just use <system.configuration> section provided below.

  • When using current (v2) API

  1. Add the following code to your project:
using Microsoft.Azure.Documents.ChangeFeedProcessor.Logging;

var hostName = "SampleHost";
var tracelogProvider = new TraceLogProvider();
using (tracelogProvider.OpenNestedContext(hostName))
{
    LogProvider.SetCurrentLogProvider(tracelogProvider);
    // After this, create IChangeFeedProcessor instance and start/stop it.
}
  1. Add the following <system.configuration> section in your app.config:
  <system.diagnostics>
    <sharedListeners>
      <add name="file" type="System.Diagnostics.TextWriterTraceListener" initializeData="C:\ChangeFeedProcessorTrace.log" />
    </sharedListeners>
    <sources>
      <source name="ChangeFeedEventHost" switchType="System.Diagnostics.SourceSwitch" switchValue="Information">
        <!-- All, Verbose, Warning, Information, Error, Off -->
        <listeners>
          <clear />
          <add name="file" />
        </listeners>
      </source>
    </sources>
    <trace autoflush="true" useGlobalLock="false" />
  </system.diagnostics>

See also

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

You can’t perform that action at this time.