Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Add support for manual checkpoint #1765

Closed
ealsur opened this issue Aug 10, 2020 · 14 comments · Fixed by #2331
Closed

Change Feed Processor: Add support for manual checkpoint #1765

ealsur opened this issue Aug 10, 2020 · 14 comments · Fixed by #2331

Comments

@ealsur
Copy link
Member

ealsur commented Aug 10, 2020

Full detail of user request is on #616

TL;DR

The request is to add manual checkpoint as a configuration option to the Change Feed Processor, this was a feature already available in V2 CFP through a configuration option.
The lack of this feature is a migration blocker for users using it on CFP V2.

There are potentially 2 options that differ in the public API:

Option 1 - Have a Context object

How will the user enable the configuration

There will be an explicit handler signature that defines the intent of the user wanting to do manual checkpoint:

ChangeFeedProcessor changeFeedProcessor = sourceContainer
            .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "myProcessor", ChangesHandlerForManualCheckpoint)
                .WithInstanceName("myInstanceName")
                .WithLeaseContainer(leaseContainer)
                .Build();

Where the delegate ChangesHandlerForManualCheckpoint would be:

        public delegate Task ChangesHandlerForManualCheckpoint<T>(
            ChangeFeedProcessorContextWithManualCheckpoint context,
            IReadOnlyCollection<T> changes,
            CancellationToken cancellationToken);

Additionally we can expose the new context data, minus the Checkpoint API through:

        public delegate Task ChangesHandler<T>(
            ChangeFeedProcessorContextcontext,
            IReadOnlyCollection<T> changes,
            CancellationToken cancellationToken);

How will the user interact with the context

Similar to V2 CFP, the Handler will receive a Context object in the handler:

    public abstract class ChangeFeedProcessorContext
    {
        /// <summary>
        /// Gets the token representative of the current lease from which the changes come from.
        /// </summary>
        public abstract string LeaseToken { get; }
        
        /// <summary>
        /// Gets the diagnostics related to the service response.
        /// </summary>
        public abstract CosmosDiagnostics Diagnostics { get; }

        /// <summary>
        /// Gets the headers related to the service response that provided the changes.
        /// </summary>
        public abstract Headers Headers{ get; }
    }

    public abstract class ChangeFeedProcessorContextWithManualCheckpoint : ChangeFeedProcessorContext
    {
        /// <summary>
        /// Checkpoints progress of a stream. This method is valid only if manual checkpoint was configured.
        /// Client may accept multiple change feed batches to process in parallel.
        /// Once first N document processing was finished the client can call checkpoint on the last completed batches in the row.
        /// </summary>
        public abstract Task<(bool isSuccess, CosmosException error)> TryCheckpointAsync();
    }
  • The LeaseToken will help with users who want to send monitoring/diagnosing information to understand which is the lease where the changes are coming from. It also helps to send telemetry that identifies which leases are being processed.
  • Headers gives access to all the response headers, including SessionToken. Is useful if the user is sending the changes to another system that has a different client instance. For example, (real customer scenario) once the changes are received, they are sent to a Queue where they get picked up with another application and used the information to read the document. If the user is on Session consistency, without the Session Token, those reads might fail with a 404.
  • TryCheckpointAsync is the API meant to be called after the user's custom logic has decided that it is time to checkpoint when manual checkpoint is enabled.

Can the checkpoint fail?

Yes, there is a possibility of the call to TryCheckpointAsync to fail in expected scenarios:

  • The current lease has been deleted externally (this is not a common scenario but if the user manually deletes the documents in the lease store, it can happen).
  • The lease was acquired by another host. In a load-balancing scenario it can occur that the lease has been taken by another host.

How would the user call the CheckpointAsync?

public async Task HandleChangesAsync(
    ChangeFeedProcessorContextWithManualCheckpoint context,
    IReadOnlyCollection<ToDoItem> changes, 
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    foreach (ToDoItem item in changes)
    {
        // put changes in some buffer using context.LeaseToken as buffer key
    }
    
    if (buffer[context.LeaseToken].Size == expectedMax)
    {
        (bool isSuccess, CosmosException error) = await context.TryCheckpointAsync()
        if (!isSuccess)
        {
                 Console.WriteLine($"Checkpoint failed for {context.LeaseToken} due to the lease being transfered to another host.");
                // log exception if desired
                throw error; //to stop the processing
        }
    }

    Console.WriteLine($"Finished handling changes for lease {context.LeaseToken}.");
}

Option 2 - Rely on FeedResponse

ChangeFeedProcessor changeFeedProcessor = sourceContainer
            .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "myProcessor", ChangesHandlerForManualCheckpoint)
                .WithInstanceName("myInstanceName")
                .WithLeaseContainer(leaseContainer)
                .Build();

Where the delegate ChangesHandlerForManualCheckpoint would be:

        public delegate Task ChangesHandlerForManualCheckpoint<T>(
            FeedResponse<T> feedResponse,
            string leaseToken,
            Func<Task<(bool isSuccess, CosmosException error)>> TryCheckpointAsync
            CancellationToken cancellationToken);
  • The leaseToken will help with users who want to send monitoring/diagnosing information to understand which is the lease where the changes are coming from. It also helps to send telemetry that identifies which leases are being processed.
  • FeedResponse gives access to all the response headers, diagnostics, and the content with documents
  • TryCheckpointAsync is the API meant to be called after the user's custom logic has decided that it is time to checkpoint when manual checkpoint is enabled.

How would the user call the CheckpointAsync?

public async Task HandleChangesAsync(
     FeedResponse<ToDoItem> feedResponse,
            string leaseToken,
            Func<Task<(bool isSuccess, CosmosException error)>> TryCheckpointAsync
            CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {leaseToken}...");
    foreach (ToDoItem item in feedResponse)
    {
        // put changes in some buffer using leaseToken as buffer key
    }
    
    if (buffer[leaseToken].Size == expectedMax)
    {
        (bool isSuccess, CosmosException error) = await TryCheckpointAsync()
        if (!isSuccess)
        {
                 Console.WriteLine($"Checkpoint failed for {leaseToken} due to the lease being transfered to another host.");
                // log exception if desired
                throw error; //to stop the processing
        }
    }

    Console.WriteLine($"Finished handling changes for lease {leaseToken}.");
}
@joshidp
Copy link

joshidp commented Oct 20, 2020

Hi @ealsur ,

Can you please share the release timeline for manual check-point feature.

Thanks

@bartelink
Copy link
Contributor

Sorry for pestering but.... bump ;)

Any signs of this getting prioritized in the medium term?

@bartelink
Copy link
Contributor

Is there any roadmap and/or indicative information of any kind available?

The absence of this feature is a major blocker for adopting the V3 SDK, which is causing significant concern for multiple projects.

@ealsur
Copy link
Member Author

ealsur commented Dec 16, 2020

@bartelink I am terribly sorry for the delay, priorities shifted in this past year and I still have to follow up and get approval of this API. We are tracking this, but the work to enable higher scale (you might have seen the PRs about it) took higher priority.

@bartelink
Copy link
Contributor

Thanks @ealsur - definitely not trying to pin this on you; its abundantly clear you're a) snowed under b) but still doing great work !

My main concern is that this remain on the agenda - while its theoretically possible to rewrite our consumer loops in terms of the CFP Pull Model, that's a disaster scenario from the point of view of upgrading our systems with any degree of confidence. i.e. while we believe we're in a good place to upgrade from V2 to V3, having to simultaneously move from V2 to V3 and from the standard CFP model with explicit checkpointing to Pull Model would be a massive amount of change in one go.

TL;DR Having feature parity between V2 and V3 (aka having the Explicit Checkpoint mechanism reinstated as per the V2 CFP) is not a nice to have for anyone that is already working with 'higher scale' V2 system which is reliant on being able to control that aspect to deliver it

@ealsur
Copy link
Member Author

ealsur commented Dec 16, 2020

I whole heartily agree, and I am keeping this one on my mind, and working on it every time I have some free time. It is being tracked in our agenda, so it won't be forgotten.

@j82w
Copy link
Contributor

j82w commented Feb 17, 2021

Why not just pass in a FeedResponse? This will give access to all the headers, and diagnostics.

 public delegate Task ChangesHandler<T>(
            FeedResponse<T> feedResponse,
            Exception exception,
            Func<Task<(bool isSuccess, CosmosException error)>> TryCheckpointAsync,
            CancellationToken cancellationToken);

@ealsur
Copy link
Member Author

ealsur commented Feb 17, 2021

@j82w we would still need the LeaseToken too, that's why I thought about the Context as grouping mechanism for all that information, but your proposal is also interesting.
One question though, what is the Exception parameter?

I added your proposal as Option 2.

@j82w
Copy link
Contributor

j82w commented Feb 17, 2021

It was an attempt to fix the following issue: #1780. The user would either get the FeedResponse or an Exception. This way users could logs and handle different failure cases.

public delegate Task ChangesHandler<T>(
           FeedResponse<T> feedResponse,
           ChangeFeedProcessorContext context,
           CancellationToken cancellationToken);

public abstract class ChangeFeedProcessorContext
   {
       /// <summary>
       /// Gets the token representative of the current lease from which the changes come from.
       /// </summary>
       public abstract string LeaseToken { get; }
       
        /// <summary>
       /// Gets the exception if a failure occurred
       /// </summary>
       public abstract bool TryGetException(out Exception exception);

       public abstract Task<(bool isSuccess, CosmosException error)> TryCheckpointAsync();
   }

@ealsur
Copy link
Member Author

ealsur commented Feb 17, 2021

The problem with injecting the Exception into the handler is that Exceptions on user area (a custom serializer for instance) should abort the current Lease Observer. Passing the Exception (if any) inside could prevent this mechanism.

For exception monitoring it is better to go with a Monitor injection (which is another workitem). V2 CFP has a WithHealthMonitor that lets users hook into any internal errors that might occur and get telemetry out of it.

Passing it to the actual delegate handler might bring the unexpected consequence of the CFP thinking that there was no error and checkpointing or letting users checkpoint manually even on an error state, which would be wrong.

@j82w
Copy link
Contributor

j82w commented Feb 17, 2021

That makes since. Then I my vote is for either of the following options:

public delegate Task ChangesHandlerForManualCheckpoint<T>(
            FeedResponse<T> feedResponse,
            string leaseToken,
            Func<Task<(bool isSuccess, CosmosException error)>> TryCheckpointAsync
            CancellationToken cancellationToken);

or

public delegate Task ChangesHandler<T>(
           FeedResponse<T> feedResponse,
           ChangeFeedProcessorContext context,
           CancellationToken cancellationToken);

public abstract class ChangeFeedProcessorContext
{
       /// <summary>
       /// Gets the token representative of the current lease from which the changes come from.
       /// </summary>
       public abstract string LeaseToken { get; }
       
       public abstract Task<(bool isSuccess, CosmosException error)> TryCheckpointAsync();
}

@ghost
Copy link

ghost commented Dec 15, 2021

Closing due to in-activity, pease feel free to re-open.

@bartelink
Copy link
Contributor

bartelink commented Dec 15, 2021

@msftbot Can you have a quick chat with your owner or developer about fixing some bugs so I and others get spammed less please? - you're redundantly "closing" things that are already closed, and that ultimately results in humans switching off and ignoring real issues

@j82w
Copy link
Contributor

j82w commented Dec 15, 2021

@bartelink sorry for the spam. I'm following up with the owner to see why it's doing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants