Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer crashes when consuming messages that are compressed with zstd #2044

Closed
Green-Angry-Bird opened this issue May 4, 2020 · 4 comments

Comments

@Green-Angry-Bird
Copy link

Green-Angry-Bird commented May 4, 2020

We have been working to resolve this issue for a few days now. The consumer seems to work in most circumstances but sometimes crashes. We have figured out that it is crashing whenever a message that has been compressed with zstd arrives.

The stack trace follows:

    return self.next_v2()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 1200, in next_v2
    return next(self._iterator)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 1115, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 654, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 707, in _poll_once
    records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
    self._next_partition_records = self._parse_fetched_data(completion)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 816, in _parse_fetched_data
    unpacked = list(self._unpack_message_set(tp, records))
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
    for record in batch:
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 271, in __iter__
    self._maybe_uncompress()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 180, in _maybe_uncompress
    self._assert_has_codec(compression_type)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 114, in _assert_has_codec
    if not checker():
UnboundLocalError: local variable 'checker' referenced before assignment```
@RuiLoureiro
Copy link

RuiLoureiro commented Nov 11, 2021

I've also stumbled onto this issue. Did you manage to fix it?
EDIT: pip install zstandard did the trick. Only saw this information in this PR #2021

@metron2
Copy link

metron2 commented Mar 22, 2022

This is missing from the documentation

@efung
Copy link

efung commented Jul 5, 2022

This is missing from the documentation

Yes, the docs are definitely misleading. In #2123, the extras were defined so that you are supposed to be able to do pip install kafka-python[zstd] and have the zstandard dependency installed as well. However, that PR was merged after the 2.0.2 release was made, and there has been no subsequent release of kafka-python.

@Green-Angry-Bird
Copy link
Author

The root cause of my original issue was #2021. Sorry, I should have linked it two years ago.

This issue can be closed now. Duplicate of #2021.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants