Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latest Release (0.3.1) seems to always start from earliest offset #74

Closed
amacciola opened this issue Feb 4, 2022 · 7 comments
Closed

Comments

@amacciola
Copy link
Contributor

When upgrading from 0.3.0 -> 0.3.1. I started to notice that when i restart my application, my Broadway Kafka Ingest Pipelines would come back up as well (using the same name and group_id that it was before the application restarted). However now on the new release version when the pipeline comes back up it starts Ingesting from the earliest offset every single time. Which is a major issue.

Did something change with how i need to be setting the Group_id or the Pipeline name ? Or is this just a Regression ?

@amacciola amacciola changed the title Latest Release seems to always start from earliest offset Latest Release (0.3.1) seems to always start from earliest offset Feb 4, 2022
@josevalim
Copy link
Member

It seems like a regression as there were changes to the offset computation here: #72

@amacciola
Copy link
Contributor Author

@jamespeterschinner it seems the PR: #72 caused a regression i reported here

@jamespeterschinner
Copy link
Contributor

@amacciola could you provide some more information on the settings being used? Im assuming your using an offset reset policy of :latest?

The PR you mentioned was to fix an issue I had with offset out of range error not being resolved. That error is generated when trying to fetch events and it could be argued it's best handled in that function, but seeing as there is a 'resolve' function that seemed incorrect.

There is a chance the :brod.resolve function has a side effect I'm unaware of. But at first glance it seems checking that the assigned offset from the group coordinator is within the range of valid offsets should a reasonable thing to do.

I suppose if we check all the relevant Kafka settings; how the group coordinator is assigning offsets and any other relevant details and nothing it found. We should probably just revert the change

@amacciola
Copy link
Contributor Author

@jamespeterschinner no my configuration surrounding the offsets are

 offset_commit_on_ack: true,
 offset_reset_policy: :earliest

That is because i re-use the same group ID when i pause and resume pipelines however if i delete the data in my app and restart the pipeline i use a new group_id and i want it to re ingest all the data.

This works as expected on version 0.3.0 but stopped working on version 0.3.1

@jamespeterschinner
Copy link
Contributor

jamespeterschinner commented Feb 7, 2022

@amacciola I'm trying to understand how this change makes sense of the behaviour/use case you describe.

Here is what I have:

When you create a new consumer group, the groups offsets (for each partition) are undefined, with both the prior and current release broadway_kafka will resolve this to the offset as set in the offset reset policy (earliest/latest).

(This is the bit where I'm making some assumptions to in order for this to make sense) With the prior implementation if the group coordinator assigns broadway_kafka an offset that is out of the valid range, it'll still try and fetch the offset from the broker. If the broker has the auto.offset.reset set to latest then the offset out of range error occurs on server side, broadway_kafka never sees it and continues on from latest.

The latest release, will identify that the offset is out of range before trying to fetch the offset and apply the offset reset policy (in this case earliest) which is the opposite of the behaviour you wanted.

If the scenario I described above is correct, it sounds to me like you actually want the offset_reset_policy in broadway_kafka to be :latest, and you want the begin_offset to be :earliest. Which semantically says to me "start from the beginning and continue with latest".

If this is the case, what the prior release called offset_reset_policy was actually working as the begin_offset. What makes sense to me is that the auto.offset.reset policy on the server is a default setting which the client can override by implementing it's own offset_reset_policy which in turn acts as the default behaviour for the begin_offset policy (because an offset of :undefined can be considered outside the range of earliest -> latest).

If my above guess is correct and we agree that the definitions of what the offset_reset_policy and begin_policy are then the latest release may be considered correct, but may warrant a major version change (in addition to implementing the correct begin_offset policy)

These are the definitions for these settings in brod: https://hexdocs.pm/brod/ (the kafka library broadway_kafka wraps)

begin_offset (optional, default = latest)
The offset from which to begin fetch requests.

offset_reset_policy (optional, default = reset_by_subscriber)

How to reset begin_offset if OffsetOutOfRange exception is received.

A more drastic change would be to simply wrap the :brod_group_subscriber behaviour in brodway_kafka and just inherit all of it's configuration semantics, rather than re implementing it.

@amacciola
Copy link
Contributor Author

@jamespeterschinner If the scenario I described above is correct, it sounds to me like you actually want the offset_reset_policy in broadway_kafka to be :latest, and you want the begin_offset to be :earliest. Which semantically says to me "start from the beginning and continue with latest".

This is what expect yes. I do believe the previous version was just allowing the configs to be used incorrectly, or being misleading. I am going to test the latest release (0.3.1) with setting:

begin_offset: :earliest
offset_reset_policy: :latest

and see if it gives me the outcome i am expecting.

Regardless with version (0.3.1) there will need to be some form of documentation update to just let others know of this change of functionality and what configs they should be expecting to set

@jamespeterschinner
Copy link
Contributor

jamespeterschinner commented Feb 9, 2022

@amacciola I should have been clearer, currently broadway_kafka doesn't have abegin_offset option.

That would need to e handled on this line here: https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/brod_client.ex#L152

I think this is the key thing we need to agree upon is:

How to reset begin_offset if OffsetOutOfRange exception is received.

Should the brokers auto offset reset take precedence over clients offset reset policy or the other way around (currently it's the other way around)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants