Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset Handling #72

Closed
jcrobak opened this issue Nov 26, 2013 · 8 comments
Closed

Offset Handling #72

jcrobak opened this issue Nov 26, 2013 · 8 comments

Comments

@jcrobak
Copy link
Contributor

jcrobak commented Nov 26, 2013

The SimpleConsumer has some commented-out code for pulling offsets from the brokers: https://github.com/mumrah/kafka-python/blob/33cde520de9067845d4c7159a2c2834846e1957f/kafka/consumer.py#L100

Is this commented out on purpose? It's unclear to me if "Uncomment for 0.8.1" is referring to a version of Kafka or a version of kafka-python.

I tried to uncomment this, and I get an error in encode_offset_fetch_request/write_short_string. Is offset handling broken or should I be able to get this to work?

I am using a work-around now of consumer.seek(0, 0), but this is not ideal, since I have to reprocess every entry each run.

@jcrobak
Copy link
Contributor Author

jcrobak commented Nov 27, 2013

Note that in hard-coding offsets of 0 doesn't work well. You will see exceptions like:

kafka.common.OffsetOutOfRangeException: Request for offset 0 but we only have log segments in the range 39943 to 11471647.
        at kafka.log.Log.read(Log.scala:429)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:382)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.map(Map.scala:93)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
        at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:573)
        at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:555)
        at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
        at java.lang.Thread.run(Thread.java:724)

@mrtheb
Copy link
Collaborator

mrtheb commented Nov 28, 2013

Hi @jcrobak,

it is commented on purpose and these lines relate to the version of Kafka that will support offsets commit to brokers. This features is not available in 0.8.0 which should be released very shortly.

The error you are getting in the second comment is because Kafka has rolled (deleted) the file which contained the offset you are trying to fetch. Did you get this error after using seek(0, 0)?

If you look at parameters for seek, using seek(0, 0) should set the offsets to the earliest (oldest) message in kafka. It is the right thing to do but you need to be ready to handle the error by adjusting the offsets accordingly. Otherwise, I think you'll get stuck there and not consume anything.

If reprocessing messages is a problem for you, the best approach is to store (in zookeeper, or your favorite data store) the offsets where your consumer is at and use it the next time you restart the consumer. Kafka-python does not provide this functionality, partly because the goal for Kafka is to eventually support this instead of relying on a third party such has zookeeper. The commented lines you referred to are meant exactly to support this, whenever it becomes available.

IMHO, any consumer using Kafka needs to have checks for duplicate for robustness sake.

@jcrobak
Copy link
Contributor Author

jcrobak commented Dec 2, 2013

Thanks for the response @mrtheb. I have a few follow-up questions...

it is commented on purpose and these lines relate to the version of Kafka that will support offsets commit to brokers. This features is not available in 0.8.0 which should be released very shortly.

Is the feature available in Apache Kafka trunk, is that why this code exists? If that's the case, it seems like there should be a version check, and if it's < 0.8.1, then offset should be determined via seek(0, 0) rather than hard-coding to 0, which will not always work.

The error you are getting in the second comment is because Kafka has rolled (deleted) the file which contained the offset you are trying to fetch. Did you get this error after using seek(0, 0)?

As mentioned above, this seems like a bug. But it'd be a change in behavior to call seek(0, 0) rather than hard-coding to 0. Thoughts on if that's reasonable or not? At least there should be a big warning in the docstring.

If reprocessing messages is a problem for you, the best approach is to store (in zookeeper, or your favorite data store) the offsets where your consumer is at and use it the next time you restart the consumer. Kafka-python does not provide this functionality

Isn't the storing of offsets implemented in Consumer.commit()? Do I just need to bypass kafka to read out the stored offsets? I'm not seeing this information being persisted in zookeeper, but maybe I'm doing something wrong. Or is the docstring for commit wrong?

IMHO, any consumer using Kafka needs to have checks for duplicate for robustness sake.

Definitely, in my case it's more a matter of speed/efficiency.

@jcrobak
Copy link
Contributor Author

jcrobak commented Dec 2, 2013

OK, I see now that this is not implemented on the kafka side per the docs: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit%2FFetchAPI

It'd be great if the docs for kafka-python were updated. I can put together a PR.

@mrtheb
Copy link
Collaborator

mrtheb commented Dec 4, 2013

My turn!

Is the feature available in Apache Kafka trunk, is that why this code exists?

Yes, you can see in the Kafka trunk that there is support for an OffsetCommitRequest. However, the library is pinned with the Kafka release and 0.8.0 (just released, yeah!) doesn't support this yet.

Isn't the storing of offsets implemented in Consumer.commit()?

This really needs to be made clear on the front page. This method will work only when used with Kafka trunk that supports this, otherwise, it will fail. I haven't tried personally since I am using Zookeeper to store the offsets. I am doing this outside the library. Go ahead with the PR if you can.

I also think it is a bug if you perform seek(0, 0) and following calls to fetch data returned the error you have.

Thoughts on calling seek implicitly? It is just an opinion but, if it were mapped directly on the Kafka clients, there would be a ZookeeperConsumer where this would make sense. kafka-python is halfway between the SimpleConsumer and ZookeeperConsumer in the sense that it does more the the first but less than the latter. I am not against the idea but I think that doing seek implicitly would require leaving some control to the caller to modify the offsets. This is one of the areas where the Kafka ZookeeperConsumer was also criticized. You can find some info here. For this reason, I believe it is a better option to leave it out, even if once OffsetCommit is fully supported.

@jcrobak
Copy link
Contributor Author

jcrobak commented Dec 5, 2013

I haven't tried personally since I am using Zookeeper to store the offsets. I am doing this outside the library.

how are you storing data in zookeeper? Are you doing something like #38?

@mrtheb
Copy link
Collaborator

mrtheb commented Dec 5, 2013

Not exactly, I did some custom work on top of the client before this PR appeared and I have to admit I didn't have the courage to make it generic enough and apply the recipe to kafka-python.

Also, as far as I could see, #38 only implements rebalance and not offset commit in zookeeper. Still, since kazoo is integrated in this fork, you could reuse it and just override commit to save the offsets in ZK as well. I use the same Zookeeper structure as the ZookeeperConsumerConnector does in the Java/Scala client.

@rdiomar
Copy link
Collaborator

rdiomar commented Jan 29, 2014

I think this issue is resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants