Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyError on task_done #364

Closed
dinoshauer opened this issue Apr 3, 2015 · 11 comments
Closed

KeyError on task_done #364

dinoshauer opened this issue Apr 3, 2015 · 11 comments

Comments

@dinoshauer
Copy link
Contributor

Hey there.

I keep running into a KeyError when using task_done or commit on a message

Here's a stacktrace:

Traceback (most recent call last):
  File "/home/foo/.virtualenvs/app/lib/python3.4/site-packages/kuer/consumer.py", line 302, in _mark_done
    self.kafka.task_done(message)
  File "/home/foo/.virtualenvs/app/lib/python3.4/site-packages/kafka/consumer/kafka.py", line 495, in task_done
    prev_done = self._offsets.task_done[topic_partition]
KeyError: (b'listen.backfill.unify.data', 14)

Any idea on what I can do to mitigate/fix this?

Let me know if you need more info

@dpkp dpkp added this to the 0.9.4 Release milestone Apr 3, 2015
@dpkp dpkp self-assigned this Apr 3, 2015
@dpkp
Copy link
Owner

dpkp commented Apr 4, 2015

This looks to me like a str / bytes issue using python 3. Going to try to write a test and then fix -- thanks for the bug report!

@dpkp
Copy link
Owner

dpkp commented Apr 4, 2015

well, hmm. I can't reproduce this. looking deeper, existing tests should have caught basic str / bytes issues: https://github.com/mumrah/kafka-python/blob/master/test/test_consumer_integration.py#L476-L517

Could it be the partition that is out of sync? Can you get the value of kafka._offsets when you get this error? Are you setting partitions explicitly or just setting the topic and letting KafkaConsumer determine the partitions?

@dinoshauer
Copy link
Contributor Author

I'm explicitly setting the partitions. I'll see about getting _offsets when
I get a chance.

But a topic shouldn't be out of sync unless a leader has changed, no?

On Saturday, April 4, 2015, Dana Powers notifications@github.com wrote:

well, hmm. I can't reproduce this. looking deeper, existing tests should
have caught basic str / bytes issues:
https://github.com/mumrah/kafka-python/blob/master/test/test_consumer_integration.py#L476-L517

Could it be the partition that is out of sync? Can you get the value of
kafka._offsets when you get this error? Are you setting partitions
explicitly or just setting the topic and letting KafkaConsumer determine
the partitions?


Reply to this email directly or view it on GitHub
#364 (comment).

@dpkp
Copy link
Owner

dpkp commented Apr 5, 2015

or possibly that partitions were added or removed, but you would have to do that explicitly via server commands.

any additional context you can provide would be helpful - server version, kafka-python version (I am assuming 0.9.3), and the KafkaConsumer config you are using.

@dinoshauer
Copy link
Contributor Author

Hey, sorry for the long wait.

We're running kafka server version 0.8.2.1.

Kafka python is version 0.9.3

The consumer's config is pretty trivial I think:

  • consumer_timeout_ms=-1
  • auto_commit_interval_messages=50
  • fetch_message_max_bytes=10000000
  • fetch_min_bytes=1
  • fetch_wait_max_ms=100
  • socket_timeout_ms=30 * 1000
  • auto_commit_enable=True
  • auto_commit_interval_ms=10 * 1000
  • consumer_timeout_ms=-1
  • auto_offset_reset='smallest'

Here is the contents of kafka._offsets - The topic itself has 512 partitions with a replication factor of 3 spread across 4 brokers

OffsetsStruct(fetch={(b'listen.prod.puller.moreover', 306): 104193, (b'listen.prod.puller.moreover', 205): 104177, (b'listen.prod.puller.moreover', 104): 104060, (b'listen.prod.puller.moreover', 508): 104040, (b'listen.prod.puller.moreover', 407): 109449}, highwater={(b'listen.prod.puller.moreover', 306): None, (b'listen.prod.puller.moreover', 205): None, (b'listen.prod.puller.moreover', 104): None, (b'listen.prod.puller.moreover', 508): None, (b'listen.prod.puller.moreover', 407): None}, commit={(b'listen.prod.puller.moreover', 306): 104193, (b'listen.prod.puller.moreover', 205): 104177, (b'listen.prod.puller.moreover', 104): 104060, (b'listen.prod.puller.moreover', 508): 104040, (b'listen.prod.puller.moreover', 407): 109449}, task_done={(b'listen.prod.puller.moreover', 306): None, (b'listen.prod.puller.moreover', 205): None, (b'listen.prod.puller.moreover', 104): None, (b'listen.prod.puller.moreover', 508): None, (b'listen.prod.puller.moreover', 407): None})

Here's the traceback

Traceback (most recent call last):
  File "/home/ubuntu/.virtualenvs/listen-parse-moreover_s3/lib/python3.4/site-packages/kuer/consumer.py", line 305, in _mark_done
    self.kafka.task_done(message)
  File "/home/ubuntu/.virtualenvs/listen-parse-moreover_s3/lib/python3.4/site-packages/kafka/consumer/kafka.py", line 495, in task_done
    prev_done = self._offsets.task_done[topic_partition]
KeyError: (b'listen.prod.puller.moreover', 450)

Let me know if you need more info

@dinoshauer
Copy link
Contributor Author

I just changed retention.ms while I had a consumer running on the topic and it started throwing KeyError s after the logs were deleted

@dpkp
Copy link
Owner

dpkp commented May 13, 2015

thanks for all the info -- are you using set_topic_partitions ? If so, what type of args are you passing in?

@dinoshauer
Copy link
Contributor Author

I'm passing this to kafka.KafkaConsumer

*[('test.topic', 0), ('test.topic', 1), ('test.topic', 2)]

EDIT: Tabbed and published comment too early

@dpkp
Copy link
Owner

dpkp commented Jun 5, 2015

This one is very strange. You have a consistent topic-partition list of:

(b'listen.prod.puller.moreover', 104)
(b'listen.prod.puller.moreover', 205)
(b'listen.prod.puller.moreover', 306)
(b'listen.prod.puller.moreover', 407)
(b'listen.prod.puller.moreover', 508)

but the task_done error happens when processing a message for topic-partition (b'listen.prod.puller.moreover', 450)

somehow your consumer is getting messages for a topic-partition that it wasn't configured for. I dont have any good ideas for how that could happen from the kafka-consumer side. Nonetheless, I can add a check in task_done to skip the message if the topic-partition is unrecognized. That should fix the crash, but really we need to figure out how this is happening. Is it possible for the kafka server to return messages for a topic-partition that we did not request specifically? Could this have something to do with segment files on the server side that are corrupted or out of sync?

@dinoshauer
Copy link
Contributor Author

Oh wow, just found this lying in my inbox. So sorry to not have answered. We ended up just skipping and warning on KeyError s as well.

To add to your speculation, I don't believe any segments have been corrupted or out of sync, no

@dpkp
Copy link
Owner

dpkp commented Jan 27, 2016

We've dropped task_done from the new KafkaConsumer interface so I'm going to close this as wont-fix.

@dpkp dpkp closed this as completed Jan 27, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants