Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

consumer.reset_offsets is not working as expected when using datatime.datetime #976

Open
wkelongws opened this issue Nov 5, 2019 · 0 comments

Comments

@wkelongws
Copy link

Based on the documentation (https://pykafka.readthedocs.io/en/latest/api/simpleconsumer.html), we suppose to be able to use datetime.datetime to seek offset with the function consumer.reset_offsets. But it doesn't behave as expected. See the following example:

import pykafka
print(pykafka.__version__)

output:

'2.8.0'
from pykafka import KafkaClient
import time

client = KafkaClient(hosts="<ip:port>")
topic = client.topics['test1']

# create 10 messages with timestamp as the value
p = topic.get_producer()
for m_id in range(0,10):
    msg = '{}'.format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
    p.produce(msg.encode("utf-8"))
    time.sleep(5)
p.stop()

# create a consumer and print out the 10 messages
consumer = topic.get_simple_consumer(auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True)

for message in consumer:
    if message is not None:
        raw_string = message.value.decode("utf-8")
        print(message.offset, ':',raw_string)

output:

0 : 2019-11-05 00:56:11
1 : 2019-11-05 00:56:17
2 : 2019-11-05 00:56:22
3 : 2019-11-05 00:56:27
4 : 2019-11-05 00:56:32
5 : 2019-11-05 00:56:37
6 : 2019-11-05 00:56:42
7 : 2019-11-05 00:56:47
8 : 2019-11-05 00:56:52
9 : 2019-11-05 00:56:57

I want to query the historical data by resetting the offset based on timestamp. For example, if I seek the offset by 2019-11-05 00:56:32 I would expect the offset be reset to 4 (or something close due to any network delays).

# seek by timestamp
partition_offset_pairs = [(p, datetime.fromisoformat('2019-11-05 00:56:32')) for p in consumer.partitions.values()]
consumer.reset_offsets(partition_offsets=partition_offset_pairs)

print(consumer.held_offsets)

output:

{0: -2}

The output is not as expected.
I have done a couple experiments with this and I don't see reset_offsets((pykafka.partition.Partition, datetime.datetime)) behaves in any reasonable way.

Note: I can do the same historical data query (seek offset) in kafka-python (with the function consumer.offsets_for_times) with no issues.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant