Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset Fetch/Commit Support #2

Closed
eapache opened this issue Jul 19, 2013 · 25 comments
Closed

Offset Fetch/Commit Support #2

eapache opened this issue Jul 19, 2013 · 25 comments

Comments

@eapache
Copy link
Contributor

eapache commented Jul 19, 2013

The Consumer should make use of https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management

The golang protocol backend already supports those request/response types, but the current Kafka 0.8 beta 1 seems to choke on them so I didn't add them to the API.

I suspect this will consist of doing a Fetch on construction of a Consumer, then adding a Commit(offset) api call that the consumer user can call as appropriate. The python bindings have an autocommit option, but I think that's overcomplicated for our needs, at least to start.

@ghost ghost assigned eapache Jul 30, 2013
eapache pushed a commit that referenced this issue Jul 30, 2013
eapache pushed a commit that referenced this issue Aug 15, 2013
Fixes #4

When #2 lands, we can define a constant for -1 that triggers sending an
OffsetFetchRequest without touching the API.
@eapache
Copy link
Contributor Author

eapache commented Aug 15, 2013

After the refactoring this needs to be redone, although much of the code I wrote for the initial version can probably be reused. There's not much point in continuing until we get an answer on https://issues.apache.org/jira/browse/KAFKA-993 though...

@eapache
Copy link
Contributor Author

eapache commented Aug 21, 2013

The ticket I filed finally got an answer: they postponed the inclusion of this API in order to get 0.8 out the door faster, so we will have to wait for 0.8.1 or 0.9 or whatever comes next.

@mikestanley
Copy link

Just checking in on the status of this (but not even sure if this is my issue). I'm trying to manually set an initial offset (a previously read value from ConsumerEvent.Offset) and getting the error message "The requested offset is outside the range of offsets maintained by the server for the given topic/partition.". I'm not sure if this is expected behavior (not implemented yet), if I'm doing something wrong (forgetting to commit an offset or setting an auto commit flag somewhere), or if something else environmental is going on. Any suggestions/insights would be greatly appreciated.

@mikestanley
Copy link

eh. Ok. I figured out my own issue for the time being, (but still looking to understand the use case for committing / auto-committing offsets). We currently track this externally. I'm guessing this feature/enhancement will allow us to track it within Kafka and elliminate the need for our external tracking.

In any case, I resolved my issue by setting OffsetValue = (lastEvent.Offset + 1). the + 1 was important.

@eapache
Copy link
Contributor Author

eapache commented May 8, 2014

If you're getting that error when trying to use OffsetMethodManual then it's coming back from the kafka cluster, and it means that the specified offset is actually invalid - it's either too far in the past (and has been deleted) or it hasn't happened yet. Check the available offsets in the cluster and double-check that the value you're giving is actually within that set.

@eapache
Copy link
Contributor Author

eapache commented May 8, 2014

Yes, eventually kafka/sarama will be able to automatically track offsets so you won't have to do it externally.

@clutchski
Copy link
Contributor

Hey Evan,

What's the state of this issue? We're using Sarama and want to manually commit offsets once we're sure we've safely processed messages. Thanks!

@eapache
Copy link
Contributor Author

eapache commented May 20, 2014

Per https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI we're still waiting on kafka to release 0.8.2 for these APIs to be functional.

@clutchski
Copy link
Contributor

Great. Thanks for responding.

@eapache
Copy link
Contributor Author

eapache commented May 20, 2014

Not as far as I know - the API exists, but does nothing.

@eapache
Copy link
Contributor Author

eapache commented Jul 10, 2014

As @kane-sendgrid pointed out in #135 Kafka 0.8.2 is going to be released relatively soon so we can actually start working on this finally.

@mr-andreas
Copy link

How's this coming along? The API seems to be available and stable from 0.8.1.1 and up: https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

@joestein
Copy link

In 0.8.1.1 the API is stable in that it has a version coded and that won't change. It does not yet actually store the offsets in Kafka. Kafka 0.8.2-beta, which is already released, does support the implementation for OffsetCommit and OffsetFetch with internal topics as their storage mechanism.

@eapache
Copy link
Contributor Author

eapache commented Dec 15, 2014

Joe is correct. Also note that this ticket is tagged for the consumer, not the protocol - Sarama has supported the OffsetFetch, OffsetCommit, and ConsumerMetadata request/response message formats for a while already. This ticket is simply for building automatic support into Sarama's consumer.

@piotrkowalczuk
Copy link

Any estimation when it will be available on master branch? Is there any clean way to commit offset to broker (having client and consumer) with current master code base?

@eapache
Copy link
Contributor Author

eapache commented Dec 15, 2014

As mentioned, you can manually construct an OffsetCommitRequest using the offset returned from the consumer, then send it to the right broker with broker.CommitOffset(). There is no ETA currently for baking this into the consumer.

@sybrandy
Copy link

Has there been any recent work on this? I got a copy of Sarama a couple weeks ago and I'm trying to do manual offset commits to Kafka, but when I do a OffsetFetch to confirm that I committed, I get back an empty map. As far as I know, Kafka is configured correctly (offsets.storage=kafka), so I'm wondering if perhaps the version I have is missing something.

@wvanbergen
Copy link
Contributor

There's no progress on this. You could check out https://github.com/wvanbergen/kafka/tree/master/consumergroup, which, besides load balancing and failover, manages offsets using zookeeper.

@sybrandy
Copy link

Any ETA on when manual commits will work?

@eapache
Copy link
Contributor Author

eapache commented Mar 13, 2015

Kafka 0.8.2.0 introduced a new version of this API which still has not been documented [1]. There's not much we can do until we know how it actually works.

That email thread seems to imply that the old v0 API should still work, but we do implement the documented spec so I'm not sure why it wouldn't be working for you.

[1] https://mail-archives.apache.org/mod_mbox/kafka-users/201502.mbox/%3CCAA4pprC67juiUrnhiv%2Bg9eQ9ZM12UjFyaEkC-Af22%3D6fHZ6QrA%40mail.gmail.com%3E

@jsvisa
Copy link

jsvisa commented Mar 19, 2015

Thanks @wvanbergen . 👍 I've got too much time in wounding my own kafka cluster's config, I'm so curious about in kafka 0.8.2-beta I fetch ConsumerMetadataRequest , then get the Coordinator. But in 0.8.2.0 or 0.8.2.1, it always return ErrConsumerCoordinatorNotAvailable

@wvanbergen
Copy link
Contributor

Unfortunately we have no experience with this API ourselves. Your best guess is to ask on the kafka mailing list on how this API is usppoed to work: http://kafka.apache.org/contact.html

@joestein
Copy link

The Coordinator is being worked on apache kafka trunk now for next release. Some more details here https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+consumer+protocol about that. It doesn't have any overlap here except that the new consumer will also use this method of storage offset maybe even default that way.

Kafka 0.8.2.1 supports kafka as the storage for offsets. The storage of offets by kafka is the offset fetch and commit request/response usage. This is great because in your code you can subvert using zookeeper in consumers and use kafka elodina/go_kafka_client#80 for offset storage instead.

@wvanbergen
Copy link
Contributor

I started working on this in #379.

There's still some outstanding questions and gaps in the implementation, but it's mostly working. @jsvisa the reason why you get ErrConsumerCoordinatorNotAvailable, is that initially the coordinator does not exists. However, the request will trigger its creation, and if you retry the request you will get a useful response.

@eapache
Copy link
Contributor Author

eapache commented Sep 5, 2015

#461 (released in v1.6) implemented Kafka-based offset management. It's not fully integrated into the consumer yet, but there's not much point until the 0.9 consumer protocol is available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants