Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.Sign up
Getting timestamp of a specific offset #1187
I'm trying to achieve something that I feel like should be simple. And usually one of 2 things happen:
Anyway, trying to do some monitoring for our kafka clusters. We obviously monitor the lag. And one thing that we are missing, is the answer to the question "how old is the message we haven't yet consumed from that queue". Since 0.10 has an embedded timestamp, I figured this was probably easy. Well I'm asking you guys to tell me if it isn't, or if I'm just dumb :-D
My approach to this is: knowing that for topic , and group , committed offset is , let's try to do this:
import kafka kafka_host="localhost:9092" topic="topic" offset=21 client = kafka.KafkaClient(kafka_host) payload = kafka.common.FetchRequestPayload(topic, 0, offset, 100000000) response = client.send_fetch_request(payloads=[payload]) print(response)
Alas... although I can get the timestamp from the console consumer, or even the KafkaConsumer when consuming these messages, whatever I tried, this always return a timestamp==None.
Is this expected?
You are getting the problem because that API is deprecated and it does not support the needed version of the request to return the timestamp.
Try something like:
import kafka from kafka.errors import OffsetOutOfRangeError kafka_host = "localhost:9092" topic = "topic" partition = 0 offset = 1 consumer = kafka.KafkaConsumer( bootstrap_servers=kafka_host, group_id=None, # We don't need group management auto_offset_reset="none", # Don't reset offset if it's incorrect consumer_timeout_ms=500 # timeout for next() call below ) tp = kafka.TopicPartition(topic, 0) consumer.assign([tp]) consumer.seek(tp, offset) try: msg = next(consumer) except (StopIteration, OffsetOutOfRangeError): print("Message does not exist") else: print(msg.timestamp)
That's what I feared.
Thanks for the help :-)
I want to create files of messages in timely manner, but when I run consumer in some cases it goes to wait condition for some message, because of this my file creation is not working properly. Is anyone knows how can this achieved using Kafka Python.