Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide partitionid to ChangeFeedObserverFactory.Create #400

Closed
bartelink opened this issue Jun 10, 2019 · 5 comments · Fixed by #2331
Closed

Provide partitionid to ChangeFeedObserverFactory.Create #400

bartelink opened this issue Jun 10, 2019 · 5 comments · Fixed by #2331

Comments

@bartelink
Copy link
Contributor

Is your feature request related to a problem? Please describe.

No - there's no problem; I have a workaround, but it involves boilerplate...

Describe the solution you'd like

I'd like an overload/duplicate of ChangeFeedObserverFactory that gets to know the partitionid being assigned at the time of creation of the object

I note that you guys are doing some work making fluent builders around this so wanted to make you aware of things that I've yearned for in this space in case there's a potential overlap with your plans.

Describe alternatives you've considered

Right now I do lots of convoluted stuff in theOpenAsync handler, which I would prefer to do in the constructor of my ChangeFeedObserver, so I'm not reliant on the assumption that OpenAsync will be called before ProcessChangesAsync - this would allow me to make a much more succinct wrapper.

Additional context

Sketch syntax would be:

IChangeFeedObserver CreateObserver (ctx : IChangeFeedObserverContext) => new MyObserver(ctx);

var processor = builder.WithObserverFactory(CreateObserver).BuildAsync()

This will let me write the following F#, which can also be done in C# pretty cleanly:

type Observer(log, partitionId) =
    inherit DefaultObserver() 
    let producer = producer.Start(partitionId)
    do log.Information("Started {range}", partitionId)
    new (log, ctx : IChangeFeedObserverContext) = new Observer(log, ctx.PartionKeyRangeId)
    interface IDisposable with Dispose() =
        producer.Dispose()
        log.Information("Disposed {range}", partitionId)
    override ProcessChangesAsycn(ctx, docs, ct) = ....

I'd be delighted to road test any proposed syntax on any branch any time.

@ealsur
Copy link
Member

ealsur commented Jun 11, 2019

CFP in V3 won't be exposing the Partition information and the code in this repo does not contain PartitionKeyRangeId nor Observer as part of the public contract.

What is the reason you need to log/access this information? Is it just for tracing purposes?

@bartelink
Copy link
Contributor Author

Interesting - in general I was logging assignments/revokes of partition assignments as high level info. After that, its useful for the purposes of correlating the progress per partition.

Aside from this, I use the partitionkeyid (and the fact that it's unique and unambiguous) to track events as I propagate them internally within my projection system. (when scheduling work, I optimize for achieving per-partition progress and/or balancing work across partitions).

In general, this scheme has aligned relatively well to equivalent concepts when consuming from Confluent.Kafka. One key difference there is that the API exposes all messages from across all partitions as a unified consumption stream (each message bears a partitionid; you take them as they come and don't read by partition). Are there plans to head in that direction?

@kirankumarkolli
Copy link
Member

PkRangeId is a concept of server side sharding. Its fluent and can change dynamically as sharding get optimized. Tieing/using pkrangeId is not encouraged.

Each change message does have the partitionId (application partition id). Can down stream systems (like Kafka) use it to do their own sharing/distribution?

@bartelink
Copy link
Contributor Author

I understand that the range assignments are fluid. The comparison with Kafka was not based on me using the rangeId e.g. as a sharding key - I appreciate it's not sufficiently stable.

My point here was that various partitions can deliver payloads at varying levels of throughput. If I'm building a consumer, I'd like to be able to controll throughput/backpressure by saying "I've got 2MB inflight from partition 2, let's not declare completion for this range until we get to work soe of that off". The old API used to let me do this - if the new API only says "you got a batch" without saying where from, I can't do that.

The bottom line is that it was useful for troubleshooting purposes to be able to see the varying throughput levels across the ranges. I'd be very concerned to lose this key information in favor of a scheme where batches just get fired at me without any ability to e.g. identify that 3 partitions are stuck for some reason.

(Ironically Kafka doesn't let you control at this level, but it does offer richer diagnostics than even the V2 CFP did - losing this information would be very significant for operability)

@bartelink
Copy link
Contributor Author

bartelink commented Sep 9, 2019

(summarizing a tangent I injected in #782 (comment))
@ealsur perhaps you can shed some light on how you guys envisage the API as a whole addressing the following concerns;

Diagnostics:

  • if 3 ranges have been assigned to a processor host but one is failing to read (imagine a hotspot is consuming all the RUs)
  • in general, one wants to be able to distinguish which ranges are progressing when analysing throughput

Manual checkpointing and/or being able to read-ahead:

I cover it further in #616 but the deeper need is to be able to break the temporal coupling between 1) reading a batch from the CF, 2) checkpointing 3) requesting the next batch 4) getting to process the next one. There are lots of scenarios where you want to be able to read continuously, aggregating the processing one is performing, and provide backpressure only at such point as one has N batches in progress

Decoupling/overlapping the reading of data, processing and writing of checkpoints:

There are also performance benefits to allowing the consumer to control the reading ahead by being able to return immediately. this is important especially if batch size limits can only be expressed via a max item count, having varying document sizes can mean 10 items are anywhere from 10K to 1MB and paying for a roundtrip to the aux collection in between each pull is pretty harmful to throughput)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants