-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Closed
Description
I'm wondering if there is a version incompatibility.
Here's my traceback:
In [1]: from kafka import KafkaConsumer, codec
In [2]: codec.has_snappy()
Out[2]: True
In [3]: consumer = KafkaConsumer('xxx', bootstrap_servers =['xx.xxx.xx.xxx:xxxx']) # (substituted x's)
In [4]: x = consumer.next()
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-4-9399a37ddcd6> in <module>()
----> 1 x = consumer.next()
/opt/anaconda-2.3.0/lib/python2.7/site-packages/six.pyc in next(self)
533
534 def next(self):
--> 535 return type(self).__next__(self)
536
537 callable = callable
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/consumer/group.pyc in __next__(self)
795 self._set_consumer_timeout()
796 try:
--> 797 return next(self._iterator)
798 except StopIteration:
799 self._iterator = None
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/consumer/group.pyc in _message_generator(self)
756 continue
757
--> 758 for msg in self._fetcher:
759 yield msg
760 if time.time() > timeout_at:
/opt/anaconda-2.3.0/lib/python2.7/site-packages/six.pyc in next(self)
533
534 def next(self):
--> 535 return type(self).__next__(self)
536
537 callable = callable
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/consumer/group.pyc in __next__(self)
795 self._set_consumer_timeout()
796 try:
--> 797 return next(self._iterator)
798 except StopIteration:
799 self._iterator = None
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/consumer/group.pyc in _message_generator(self)
756 continue
757
--> 758 for msg in self._fetcher:
759 yield msg
760 if time.time() > timeout_at:
/opt/anaconda-2.3.0/lib/python2.7/site-packages/six.pyc in next(self)
533
534 def next(self):
--> 535 return type(self).__next__(self)
536
537 callable = callable
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/consumer/fetcher.pyc in __next__(self)
437 self._iterator = self._message_generator()
438 try:
--> 439 return next(self._iterator)
440 except StopIteration:
441 self._iterator = None
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/consumer/fetcher.pyc in _message_generator(self)
405 log.log(0, "Returning fetched records at offset %d for assigned"
406 " partition %s", position, tp)
--> 407 for msg in self._unpack_message_set(tp, messages):
408
409 # Because we are in a generator, it is possible for
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/consumer/fetcher.pyc in _unpack_message_set(self, tp, messages)
354 raise Errors.InvalidMessageError(msg)
355 elif msg.is_compressed():
--> 356 for record in self._unpack_message_set(tp, msg.decompress()):
357 yield record
358 else:
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/protocol/message.pyc in decompress(self)
70 elif codec == self.CODEC_SNAPPY:
71 assert has_snappy(), 'Snappy decompression unsupported'
---> 72 raw_bytes = snappy_decode(self.value)
73 elif codec == self.CODEC_LZ4:
74 assert has_lz4(), 'LZ4 decompression unsupported'
/opt/anaconda-2.3.0/lib/python2.7/site-packages/kafka/codec.pyc in snappy_decode(payload)
171 cursor += 4
172 end = cursor + block_size
--> 173 out.write(snappy.decompress(byt[cursor:end]))
174 cursor = end
175
AttributeError: 'module' object has no attribute 'decompress'
Metadata
Metadata
Assignees
Labels
No labels