Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer and current positions #118

Closed
AlexeyRaga opened this issue Feb 2, 2017 · 11 comments
Closed

KafkaConsumer and current positions #118

AlexeyRaga opened this issue Feb 2, 2017 · 11 comments

Comments

@AlexeyRaga
Copy link

AlexeyRaga commented Feb 2, 2017

When the consumer starts and there are no new messages for the group it is impossible to know current positions and to commit current offsets.

Committing offsets even if there are no new messages can be important because offsets in Kafka can expire. To prevent offsets from expiring we need to commit them periodically, even is there are no changes.

Normal commit() doesn't help because it won't do anything and would return KafkaError.NO_OFFSETS.

It is possible to use commit(offsets) and give it specific offsets, but how to get them?

Currently even if the consumer gets partitions assigned, consumer.assignment() returns an empty list when there are no new messages. I strongly suspect that even if I receive messages it will only give me partitions for the received messages, not all of them.

OK, I can get a list of assigned partitions from on_assign callback. But then when I call consumer.position(_assigned) it won't give me current offsets for the group, only for the partitions I received messages from.

Adding the ability to query currently assigned offsets would be very helpful.
Making commit() committing offsets even if nothing changed would be even more helpful.

@edenhill
Copy link
Contributor

edenhill commented Feb 2, 2017

There is a librdkafka issue for this:
confluentinc/librdkafka#584

While it makes sense to commit when reaching EOF even if there were no messages, it is effectively the same as setting auto.offset.reset=earliest.

@ewencp Thoughts?

As you say position() will return the application's current offsets, but if there are no messages on a partition there is no offset to return, do you think it should return the current partition high-watermark offset?

@ewencp
Copy link
Contributor

ewencp commented Feb 3, 2017

While it makes sense to commit when reaching EOF even if there were no messages, it is effectively the same as setting auto.offset.reset=earliest.

Why is that effectively the same? Having auto.offset.reset=earliest and your offset commit expires, it seems like you'd go back and reprocess data?

Actually the behavior described for consumer.assignment and consumer.position sound confusing and surprising. Why won't consumer.position return offsets? It must know where it is currently trying to fetch data from.

@newhoggy
Copy link

newhoggy commented Feb 3, 2017

And if you have auto.offset.reset=latest, it could mean messages are skipped when jobs are restarted or rebalanced. It's not just unintuitive, it's dangerous.

@edenhill
Copy link
Contributor

edenhill commented Mar 2, 2017

There is a difference between where it tries to fetch (pre fetch position) and the application's actual position.
If we reach the partition EOF without seeing any messages (e.g., auto.offset.reset=largest), does that mean the consumer position is at EOF? Should that offset be committed even if the application has not seen any messages?

@AlexeyRaga
Copy link
Author

There are edge cases though.

Here is an (edge-case) example of how messages can be lost if a job has an unfortunate downtime (however short):
If you are not committing offsets then what may happen is:

  • You reach EOF
  • Your job sits there waiting for more messages for a long time
  • Offsets expire
  • Job is still running fine without being able to commit offsets
  • Job crashes, got shut down for re-deployment, etc.
  • New messages are pushed to the topic
  • Job wakes up.

Because offsets were expired despite the job was running and attempting to re-commit its offsets previously, this job will lose messages. The job may go down because of re-deploying for a bug fix, etc, you just need to be lucky enough to listen to a not busy topic and then have a downtime when offsets are expired.

Typically in prod-ready environment when all the data matters you wouldn't use latest, you would use earliest though. This is because often you want to handle all the data you have. If it happens to be that the consumer group is deployed 1st time (or changed) you (re)-process all the data.
What happens then, if the offsets are lost/expired and the job is restarted, it would trigger full re-processing again, which is probably undesirable.

This is why things like samza is re-committing offsets every minute or so. And kafka-streams (and I believe any Java consumer) do the same.

I think that allowing jobs to re-assure their offsets would be logical in many reasons.

@edenhill
Copy link
Contributor

edenhill commented Mar 8, 2017

Thanks for the thorough explanation, Alexey.

This boils down to two things:

  • commit offsets on EOF
  • commit offsets at regular intervals (auto.commit.interval.ms) even if the locally stored offset didnt change. You are right that the Java client does this but speaking the developers they actually think this should be fixed.

I would be happy to add a config property for the EOF one (as not to change the default behaviour), but I'm a little bit hesitant on the latter.

@AlexeyRaga
Copy link
Author

I was not talking about auto commits (since we don't use them) but about explicit commit calls: when I call commit explicitly I would expect my offsets to be persisted (even if they are not changed).

I am surprised that this behaviour is considered to be a bug in kafka clients... Could you please explain to me why this behaviour is undesirable?

@edenhill
Copy link
Contributor

I am surprised that this behaviour is considered to be a bug in kafka clients... Could you please explain to me why this behaviour is undesirable?

This is one of those corner-cases where you'll just need to make a design decision, both options have pros and cons.

If you have a use-case for re-committing offsets I'd be happy to add that as an option to librdkafka (can't change the default though).

@AlexeyRaga
Copy link
Author

My point here is simple: messages frequency should not impact reasonability.
When I have a topic whose messages are infrequent, I still want to be able to reason about the behaviour of my consumers.
While my consumer is still alive and is still committing offsets, however stale, that should be the offsets for that consumer.
Otherwise, it is a bit weird: if I have a topic and there are no messages in it for a long while, then even if my consumer is up and running the offsets would get expired. And when it happens, and my consumer restarts (whatever causes it), the whole situation becomes unreasonable: the consumer will start from the beginning of a topic if it was configured with earliest, or it can potentially lose messages if it was configured with latest.

I could compensate with much longer offsets expiration settings, but then it impacts the broker performance AFAIK.

@edenhill
Copy link
Contributor

edenhill commented Aug 9, 2017

This needs to be fixed in librdkafka, created upstream issue here: confluentinc/librdkafka#1372

@edenhill
Copy link
Contributor

edenhill commented Sep 1, 2017

Closing this issue in favour of the librdkafka one. The librdkafka fix will not require changes to the Python code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants