-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Closed
Description
I wrote some python codes to retrieve Kafka messages from brokers. Below is my code.
def read_message(topic ,partition, broker, from_offset, until_offset):
clientName = "Client_" + str(topic) + "_" + str(partition)
consumer = KafkaConsumer(bootstrap_servers=broker, client_id=clientName)
topicPartition = TopicPartition(topic,partition)
consumer.assign([topicPartition])
consumer.seek(topicPartition, from_offset)
result_list = []
for msg in consumer:
if msg.offset >= until_offset:
break
else:
result_list.append(msg)
return result_list
"broker" here is the lead broker that I find in other method.
"partition" is the partition that messages should be in.
"from_offset" and "until_offset" is the offset range that I want to retrieve messages in.
However, after "result_list" is created, it hangs forever. This function used to work properly. But sometimes it hangs.
I don't know what is the problem. It works in most cases. However, it hangs sometimes.
manupatel007
Metadata
Metadata
Assignees
Labels
No labels