-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
We're trying to write a Kafka consumer in Python that uses manual partition assignment and an out-of-band REST API call to a producer service to synchronize to the end of the Kafka stream.
We have a simple pattern that implements this, and the equivalent Java implementation works (using the org.apache.kafka
API). However, the Python version hangs:
tp = TopicPartition(topic, 0)
consumer.assign([tp])
consumer.seek_to_end(tp)
position = consumer.position(tp) # Hangs indefinitely.
It appears that the Python implementation of KafkaConsumer.position()
is relying on _update_fetch_positions()
to, well, actually update the fetch positions, but for some reason it is only sending the request to the Fetcher
(via reset_offsets_if_needed()
), not actually processing the response to that request.
As a workaround, it seems to be possible to call poll()
with an arbitrary timeout to trigger processing.
I was able to reproduce this bug as an integration test within the kafka-python
repo. Adding these two test cases to test_consumer_integration.py
at commit 512d0a0 clearly illustrates both the bug and its workaround:
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic, send_messages):
send_messages(range(0, 10), partition=0)
# Start a consumer with manual partition assignment.
consumer = kafka_consumer_factory(
topics=(),
group_id=None,
enable_auto_commit=False,
)
tp = TopicPartition(topic, 0)
consumer.assign([tp])
# Seek to the end of the partition, and call position() to synchronize the
# partition's offset without calling poll().
consumer.seek_to_end(tp)
position = consumer.position(tp, timeout_ms=1000)
# Verify we got the expected position
assert position == 10, f"Expected position 10, got {position}"
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer_position_after_seek_to_end_with_poll(kafka_consumer_factory, topic, send_messages):
send_messages(range(0, 10), partition=0)
# Start a consumer with manual partition assignment.
consumer = kafka_consumer_factory(
topics=(),
group_id=None,
enable_auto_commit=False,
)
tp = TopicPartition(topic, 0)
consumer.assign([tp])
# Seek to the end of the partition, poll() for an arbitary amount of time
# to process offsets, and then call position() to synchronize.
consumer.seek_to_end(tp)
consumer.poll(timeout_ms=1000)
position = consumer.position(tp, timeout_ms=1000)
# Verify we got the expected position
assert position == 10, f"Expected position 10, got {position}"
test_kafka_consumer_position_after_seek_to_end_with_poll
passes, but test_kafka_consumer_position_after_seek_to_end
fails:
FAILED test/integration/test_consumer_integration.py::test_kafka_consumer_position_after_seek_to_end - AssertionError: Expected position 10, got None