Skip to content

Clearly document that KafkaConsumer is not thread safe #1105

@ecksun

Description

@ecksun

The question is basically in the title.

On the one hand, we had MultiprocessConsumer which I assume must be, but it has been deprecated in favor of KafkaConsumer, which leads me to think that KafkaConsumer also is thread safe.

On the other hand I just got this error using it:

Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "code/node_list/reaction.py", line 27, in run
  File "code/env/local/lib/python3.5/site-packages/event_consumers/event_consumer.py", line 28, in __iter__
    for message in self.consumer:
  File "code/env/local/lib/python3.5/site-packages/kafka/consumer/group.py", line 964, in __next__
    return next(self._iterator)
  File "code/env/local/lib/python3.5/site-packages/kafka/consumer/group.py", line 904, in _message_generator
    self._client.poll(timeout_ms=poll_ms, sleep=True)
  File "code/env/local/lib/python3.5/site-packages/kafka/client_async.py", line 525, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
  File "code/env/local/lib/python3.5/site-packages/kafka/client_async.py", line 578, in _poll
    response = conn.recv()  # Note: conn.recv runs callbacks / errbacks
  File "code/env/local/lib/python3.5/site-packages/kafka/conn.py", line 607, in recv
    response = self._recv()
  File "code/env/local/lib/python3.5/site-packages/kafka/conn.py", line 654, in _recv
    raise Errors.KafkaError('this should not happen - are you threading?')
kafka.errors.KafkaError: KafkaError: this should not happen - are you threading?

Which suggest it is not.

Edit: this example also clearly indicates that using threading should be fine

Can I safely use KafkaConsumer in a threaded environment?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions