-
-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add flag to restore confluence kafka auto.offset.reset behavior #54
Conversation
arroyo/backends/kafka/consumer.py
Outdated
auto_offset_reset = configuration.get("auto.offset.reset", "largest") | ||
self.__force_offset_reset = configuration.pop("force.offset.reset", False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will admit that having this in the general kafka configuration is really ugly but it's by far the easiest one given the current way this is used in snuba.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure it is not equally complex to pass the flag as an __init__
parameter of the consumer (there are like 2-3 implementations), then update the consumer_builder
which basically has access to most CLI parameters. That one creates the Consumer almost everywhere (except for subscriptions and replacements).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mostly just ran into this build_kafka_consumer_configuration
interface. It currently returns the broker config as dict. If we change it to return a tuple (kafka_broker_config, kwargs)
for the consumer thing it would work. It's unfortunately also used in snuba and for different types of consumers as well. For instance the consumer in snuba which bypasses arroyo also would then get this where I currently just pop it out of the config in case it's there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fpacifici now that I thought about this more I think the weird thing is that we're basically overriding the real auto.offset.reset
later in some of the consumers. Maybe we actually should approach it the other way round and leave auto.offset.reset
untouched and make it behave like force.offset.reset
and then have a separate flag somewhere to override the behavior for what is intended in prod?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think that would be the right way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I will do this change. In that case though we need to be mindful to not update arroyo independent of the changes in snuba or we lose the intended behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I inversed the logic now. There is now a flag called arroyo.strict.offset.reset
which defaults (if not set) to True
which keeps the current behavior. If the flag is set to False
then the regular kafka logic kicks in of honoring the auto offset reset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with the idea. please see my comment, if it is too complex I will accept.
arroyo/backends/kafka/consumer.py
Outdated
auto_offset_reset = configuration.get("auto.offset.reset", "largest") | ||
self.__force_offset_reset = configuration.pop("force.offset.reset", False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure it is not equally complex to pass the flag as an __init__
parameter of the consumer (there are like 2-3 implementations), then update the consumer_builder
which basically has access to most CLI parameters. That one creates the Consumer almost everywhere (except for subscriptions and replacements).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address my two comments before merging.
@fpacifici have another look. I inversed it now. |
The force.offset.reset seems fine. |
The `test_lenient_offset_reset_latest` test introduced in #54 was flaky.
The `test_lenient_offset_reset_latest` test introduced in #54 was flaky. We shouldn't need the hack to force staging offsets if we wait for assignment to actually happen.
The `test_lenient_offset_reset_latest` test introduced in #54 was flaky. We shouldn't need the hack to force staging offsets in the test if we wait for assignment to actually happen.
The `test_lenient_offset_reset_latest` test introduced in #54 was flaky. We shouldn't need the hack to force staging offsets in the test if we wait for assignment to actually happen.
Adds a flag to disable the special handling for auto offset resets in arroyo. When the made up
force.offset.reset
flag is set toTrue
(defaults toFalse
) then the auto offset reset error is not converted into aOffsetOutOfRange
exception.Refs getsentry/snuba#2573