Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document safe usage conditions of commitBatchWithin #347

Open
gordon-rennie opened this issue Apr 1, 2020 · 1 comment
Open

Document safe usage conditions of commitBatchWithin #347

gordon-rennie opened this issue Apr 1, 2020 · 1 comment
Labels
docs New or improved documentation

Comments

@gordon-rennie
Copy link

CommittableOffsetBatch#commit clearly explains some requirements for safe usage in its scaladoc:

/**
* Commits the [[offsets]] to Kafka in a single commit.
* For the batch to be valid and for commit to succeed,
* the following conditions must hold:<br>
* - [[consumerGroupIdsMissing]] must be false, and<br>
* - [[consumerGroupIds]] must have exactly one ID.<br>
* <br>
* If one of the conditions above do not hold, there will
* be a [[ConsumerGroupException]] exception raised and a
* commit will not be attempted. If [[offsets]] is empty
* then these conditions do not need to hold, as there
* is nothing to commit.
*/
def commit: F[Unit]

It would be good to document these also in fs2.kafka.commitBatchWithin, which calls CommittableOffsetBatch#commit but does not document the failure conditions:

/**
* Commits offsets in batches of every `n` offsets or time window
* of length `d`, whichever happens first. If there are no offsets
* to commit within a time window, no attempt will be made to commit
* offsets for that time window.
*/
def commitBatchWithin[F[_]](n: Int, d: FiniteDuration)(
implicit F: Concurrent[F],
timer: Timer[F]
): Pipe[F, CommittableOffset[F], Unit] =
_.groupWithin(n, d).evalMap(CommittableOffsetBatch.fromFoldable(_).commit)

I experienced an app failing with error fs2.kafka.ConsumerGroupException: multiple or missing consumer group ids [topic_foo_id, topic_bar_id]. After some investigation, I narrowed the issue down: the events consumed from two topics were being merged into a single stream, processed, and offsets committed with a call to fs2.kafka.commitBatchWithin. The issue is slightly vicious because the code was able to run for a while before multiple events from different topics happened to end up in the same batch and blew up at runtime. The solution settled on was to process the consuming streams separately using .parJoinUnbounded instead.

(there is also a "Rolls-Royce" solution of making commitBatchWithin able to separate and batch commits by topic, but I think warnings in the docs are a good start 😄)

If it sounds OK I could raise a small PR to append the commit scaladoc (starting from "For the batch to be valid...") onto the end of commitBatchWithin.

@bplommer
Copy link
Member

Thanks for this, and sorry for the lack of response - a PR would be very welcome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs New or improved documentation
Development

No branches or pull requests

2 participants