Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long running processes causing Heartbeat session expirations #2422

Open
saurhkumar opened this issue Dec 30, 2023 · 2 comments
Open

Long running processes causing Heartbeat session expirations #2422

saurhkumar opened this issue Dec 30, 2023 · 2 comments

Comments

@saurhkumar
Copy link

I am running some long-running job and that causing the hear-beat timeout. Specifically, this is the code:

import logging
import time
from kafka import KafkaConsumer, KafkaProducer, OffsetAndMetadata, TopicPartition
import kafka

print(kafka.version.__version__)
kafkaVersion = kafka.version
logger = logging.getLogger(__name__)

hosts = "localhost:9092"
topic = "blobMetadata"

consumer = KafkaConsumer(
    topic,
    group_id='test1',
    bootstrap_servers=hosts,
    max_poll_records=1,
    enable_auto_commit=False,
    max_poll_interval_ms=1000 * 3
)

producer = KafkaProducer(
    bootstrap_servers=hosts,
    key_serializer=lambda x: x.encode(),
    value_serializer=lambda x: x.encode()
)

logging.basicConfig(level=logging.INFO)
counter = 0
for consumerRecord in consumer:
    if consumerRecord:
        currentOffset: int = consumerRecord.offset
        meta = consumer.partitions_for_topic(topic)
        tp = TopicPartition(consumerRecord.topic, consumerRecord.partition)
        om = OffsetAndMetadata(consumerRecord.offset + 1, consumerRecord.timestamp)
        options = {tp: om}
        print(f'got new message ---------------------: {counter}')
        print(f'processing message --------------------  {consumerRecord}')
        time.sleep(1 * 10)  # just mimic long-running job in second, this process in real life can run for 45 min to 2 hours.
        print('committing message --------- ')
        consumer.commit(options)
        counter = counter + 1

These are the logs:

2.0.2
INFO:kafka.cluster:Group coordinator for test1 is BrokerMetadata(nodeId='coordinator-1', host='localhost', port=9092, rack=None)
INFO:kafka.coordinator:Discovered coordinator coordinator-1 for group test1
INFO:kafka.coordinator:Starting new heartbeat thread
INFO:kafka.coordinator.consumer:Revoking previously assigned partitions set() for group test1
INFO:kafka.conn:<BrokerConnection node_id=coordinator-1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection node_id=coordinator-1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. 
INFO:kafka.coordinator:(Re-)joining group test1
INFO:kafka.coordinator:Elected group leader -- performing partition assignments using range
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.coordinator:Successfully joined group test1 with generation 22
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='blobMetadata', partition=0), TopicPartition(topic='blobMetadata', partition=29)]
INFO:kafka.coordinator.consumer:Setting newly assigned partitions {TopicPartition(topic='blobMetadata', partition=24), 
-- some logs for the partition assignment --
got new message ---------------------: 0
processing message --------------------  ConsumerRecord(topic='blobMetadata', partition=0, offset=25, timestamp=1703950794479, timestamp_type=0, key=b'123', value=b'{}', headers=[], checksum=None, serialized_key_size=3, serialized_value_size=2, serialized_header_size=-1)
WARNING:kafka.coordinator:Heartbeat poll expired, leaving group
INFO:kafka.coordinator:Leaving consumer group (test1).
committing message --------- 
Traceback (most recent call last):
  File "**\kafkaTest.py", line 41, in <module>
    consumer.commit(options)
  File "**\Lib\site-packages\kafka\consumer\group.py", line 527, in commit
    self._coordinator.commit_offsets_sync(offsets)
  File "**\Lib\site-packages\kafka\coordinator\consumer.py", line 521, in commit_offsets_sync
    raise future.exception # pylint: disable-msg=raising-bad-type
    ^^^^^^^^^^^^^^^^^^^^^^
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.
            
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.coordinator:Stopping heartbeat thread

I saw few related threads, but still I am not able to figure out, what's wrong with the above code
#1491
#948

My understanding is that the consumer thread should send heartbeats in the background while message processing is ongoing. Not sure what I am doing wrong.

Kafka python library version: 2.0.2
Platform: Windows 10
Kafka: Running as docker (confluentinc/cp-kafka:7.2.1)

@jeffwidman @dpkp can you suggest what is wrong?

@308299160
Copy link

I got the same error.

@psc0606
Copy link

psc0606 commented Feb 20, 2024

try not to set max_poll_interval_ms or give it a larger value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants