-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
auto.offset.reset to largest not working #607
Comments
Can you paste your setup code? (config and creating the consumer) |
|
You have 4 messages in queueTest 0 and the consumer starts reading at the end (since you have auto.offset.reset=largest) so you will not see any messages. |
I produce new messages, but its the same problem because last commited offset for the first time is invalid and when i try to consume these messages and fetch for the offset is invalid, and finally set committted offset to -1001 and setting offset INVALID for commit, like the first time. |
@jose9r You may want to try to publish messages while the script is running. That way, you will receive the messages, and also save a valid offset. Alternatively, auto.offset.reset=smallest would allow to consume messages from the beginning when you haven't consumed any message from the topic, and to start from the saved offset the next time. |
@jose9r Any luck with this? |
@edenhill no, the problem with the offset continues |
So even though you are producing new messages for the given partitions while the consumer is running no new messages are showing up in the consumer? |
@edenhill exactly! |
Okay, can you run your program with debug=topic,fetch and provide the logs, commenting where in the logs you are starting to produce new messages? |
Which librdkafka version are you on? You should be using latest master |
I have commit version e32127b (7 April). I'm going to test with latest master. |
With latest master, executing with a breakpoint on consume, and meanwhile producing new message: And next execution: %7|1460708776.919|TOPIC|rdkafka#consumer-2| New local topic: queueTest |
It looks like it doesnt commit your consumed offsets. |
$conf = new Conf();
|
I'm not sure what exits that loop, but you will need to call $consumer->close() before terminating your program so that it can commit final offsets. |
$consumer->close() doesnt exist, i think the problem is related with php-rdkafka library, @arnaud-lb can you read this topic please? thank you!! |
php-rdkafka calls close() when the KafkaConsumer object is destroyed (e.g. when the object is not referenced anymore). Note however that hitting CTRL-C or killing the program may not let PHP properly destroy objects. I think that tuning |
i change auto.commit.interval.ms to 1000, but the problem continues. Another question is how can i see consumer groups created, because with kafka-consumer-groups.sh script from kafka i only see groups created by console, whose offset is stored in zookeeper, no in kafka __consumer_offsets. thanks |
@jose9r Try setting auto.commit.interval.ms to 1000 and add a sleep(5) before exiting your program and see if that helps. If so it is a timing/destructor issue, otherwise there's something else wrong. |
I try with auto.commit.interval.ms and sleep and it doesnt work. Im using kafka-manager tool for monitoring, and when I run the program with auto.offset.reset to largest, the group doesn't appear . If I set auto.offset.reset to smallest, the group appears and I can see current offset and queue size. |
That is an indication that there are no new messages coming in on the topic, librdkafka will only commit offsets for received messages, so if you start a new group to read at the end of the partition ("latest") and no new messages are produced it will just idle at the end, without committing. |
Is this still an issue? |
Yes, the problem continues. |
Can you comment on my comment from 4 days ago about not producing new messages? |
i have same program, i want to receive least message.
recive all the message 。 |
Hi, im using kafka with this library and php-rdkafka from https://github.com/arnaud-lb/php-rdkafka/ , and when i create a KafkaConsumer with a new group with setDefaultTopicConf including auto.offset.reset to largest, and i cant consume new messages, there is an error with commit i supose (auto.commit is enable). The debug output with topic,fetch options is here:
%7|1460457919.173|TOPIC|rdkafka#consumer-1| New local topic: queueTest
%7|1460457919.173|DESP|rdkafka#consumer-1| Adding desired topic queueTest [0]
%7|1460457919.173|FETCHSTART|rdkafka#consumer-1| List with 1 partition(s):
%7|1460457919.173|FETCHSTART|rdkafka#consumer-1| queueTest [0] offset INVALID
%7|1460457919.173|OFFSET|rdkafka#consumer-1| 10.0.0.203:9092/1: OffsetFetchRequest(v1) for 1/1 partition(s)
%7|1460457919.273|OFFSETFETCH|rdkafka#consumer-1| List with 1 partition(s):
%7|1460457919.273|OFFSETFETCH|rdkafka#consumer-1| queueTest [0] offset INVALID
%7|1460457919.273|OFFSET|rdkafka#consumer-1| Topic queueTest [0]: setting default offset INVALID
%7|1460457919.273|OFFSETFETCH|rdkafka#consumer-1| 10.0.0.203:9092/1: OffsetFetchResponse: queueTest [0] offset -1
%7|1460457919.273|OFFFETCH|rdkafka#consumer-1| 10.0.0.203:9092/1: OffsetFetch for 1/1 partition(s) returned Success
%7|1460457919.273|FETCHSTART|rdkafka#consumer-1| List with 1 partition(s):
%7|1460457919.273|FETCHSTART|rdkafka#consumer-1| queueTest [0] offset INVALID
%7|1460457919.273|FETCH|rdkafka#consumer-1| Start fetch for queueTest [0] in state none at offset INVALID (v2)
%7|1460457919.273|PARTSTATE|rdkafka#consumer-1| Partition queueTest [0] changed fetch state none -> offset-query
%7|1460457919.273|OFFSET|rdkafka#consumer-1| queueTest [0]: offset reset (at offset INVALID) to END: no previously committed offset available: Local: No offset stored
%7|1460457919.273|OFFSET|rdkafka#consumer-1| queueTest [0]: no current leader for partition, starting offset query timer for offset END
%7|1460457919.773|OFFSET|rdkafka#consumer-1| Topic queueTest [0]: timed offset query for END in state offset-query
%7|1460457919.773|OFFSET|rdkafka#consumer-1| queueTest [0]: no current leader for partition, starting offset query timer for offset END
%7|1460457919.957|STATE|rdkafka#consumer-1| Topic queueTest changed state unknown -> exists
%7|1460457919.957|PARTCNT|rdkafka#consumer-1| Topic queueTest partition count changed from 0 to 1
%7|1460457919.957|BRKDELGT|rdkafka#consumer-1| Broker 10.0.0.203:9092/1 is now leader for topic queueTest [0] with 0 messages (0 bytes) queued
%7|1460457919.957|BRKMIGR|rdkafka#consumer-1| Migrating topic queueTest [0] from (none) to 10.0.0.203:9092/1
%7|1460457919.957|METADATA|rdkafka#consumer-1| 10.0.0.203:9092/1: Requested topic queueTest seen in metadata
%7|1460457920.057|TOPBRK|rdkafka#consumer-1| 10.0.0.203:9092/1: Topic queueTest [0]: joining broker
%7|1460457920.273|OFFSET|rdkafka#consumer-1| Topic queueTest [0]: timed offset query for END in state offset-query
%7|1460457920.273|OFFREQ|rdkafka#consumer-1| 10.0.0.203:9092/1: Partition queueTest [0]: querying for logical offset END (opv 2)
%7|1460457920.273|OFFSET|rdkafka#consumer-1| 10.0.0.203:9092/1: OffsetRequest (1 offsets) for topic queueTest 0
%7|1460457920.273|PARTSTATE|rdkafka#consumer-1| Partition queueTest [0] changed fetch state offset-query -> offset-wait
%7|1460457920.360|OFFSET|rdkafka#consumer-1| Offset END request for queueTest [0] returned offset 4 (4)
%7|1460457920.360|PARTSTATE|rdkafka#consumer-1| Partition queueTest [0] changed fetch state offset-wait -> active
%7|1460457920.460|FETCHDEC|rdkafka#consumer-1| Topic queueTest [0]: fetch decide: updating to version 3 (was 0) at offset 4 (was 0)
%7|1460457920.460|FETCH|rdkafka#consumer-1| 10.0.0.203:9092/1: Topic queueTest [0] at offset 4 (0/100000 msgs, 0/1000000 kb queued) is fetchable:
%7|1460457920.460|FETCHADD|rdkafka#consumer-1| 10.0.0.203:9092/1: Added queueTest [0] to fetch list (1 entries, opv 3)
%7|1460457920.460|FETCH|rdkafka#consumer-1| 10.0.0.203:9092/1: Fetch topic queueTest [0] at offset 4 (v3)
%7|1460457920.460|FETCH|rdkafka#consumer-1| 10.0.0.203:9092/1: Fetch 1/1/1 toppar(s)
%7|1460457920.660|PAUSE|rdkafka#consumer-1| Library pausing 1 partition(s)
%7|1460457920.660|PAUSE|rdkafka#consumer-1| Pause queueTest [0]: at offset INVALID
%7|1460457920.660|OFFSET|rdkafka#consumer-1| Topic queueTest [0]: stored off -1001, committted off -1001
%7|1460457920.660|OFFSET|rdkafka#consumer-1| Topic queueTest [0]:
teamcity[testFinishedsetting offset INVALID for commit
%7|1460457920.660|FETCH|rdkafka#consumer-1| Stopping fetch for queueTest [0] in state active (v5)
%7|1460457920.660|PARTSTATE|rdkafka#consumer-1| Partition queueTest [0] changed fetch state active -> stopping
%7|1460457920.660|PARTSTATE|rdkafka#consumer-1| Partition queueTest [0] changed fetch state stopping -> stopped
%7|1460457920.660|DESP|rdkafka#consumer-1| Removing (un)desired topic queueTest [0]
%7|1460457920.660|RESUME|rdkafka#consumer-1| Library resuming 1 partition(s)
%7|1460457920.660|RESUME|rdkafka#consumer-1| Resume queueTest [0]: at offset INVALID
%7|1460457920.660|CONSUMER|rdkafka#consumer-1| Seek queueTest [0] to offset INVALID
%7|1460457920.660|DESTROY|rdkafka#consumer-1| Terminating instance
%7|1460457920.660|OP|rdkafka#consumer-1| queueTest [0] received op SEEK (v0) in fetch-state stopped
%7|1460457920.660|FETCH|rdkafka#consumer-1| Seek queueTest [0] to offset INVALID in state stopped (v7)
%7|1460457920.660|DESTROY|rdkafka#consumer-1| Destroy internal
%7|1460457920.660|DESTROY|rdkafka#consumer-1| Remove all topics
%7|1460457920.660|PARTCNT|rdkafka#consumer-1| Topic queueTest partition count changed from 1 to 0
%7|1460457920.660|BRKDELGT|rdkafka#consumer-1| Broker 10.0.0.203:9092/1 no longer leader for topic queueTest [0]
%7|1460457920.660|BRKDELGT|rdkafka#consumer-1| No broker is leader for topic queueTest [0]
%7|1460457920.660|BRKMIGR|rdkafka#consumer-1| Migrating topic queueTest [0] from 10.0.0.203:9092/1 to (none)
%7|1460457920.660|TOPPARREMOVE|rdkafka#consumer-1| Removing toppar queueTest [-1]
%7|1460457920.661|FETCH|rdkafka#consumer-1| 10.0.0.203:9092/1: Topic queueTest [0] at offset INVALID (0/100000 msgs, 0/1000000 kb queued) is not fetchable: not in active fetch state
%7|1460457920.661|FETCHADD|rdkafka#consumer-1| 10.0.0.203:9092/1: Removed queueTest [0] from fetch list (0 entries, opv 3)
%7|1460457920.661|TOPBRK|rdkafka#consumer-1| 10.0.0.203:9092/1: Topic queueTest [0]: leaving broker (next leader (none))
%7|1460457920.661|TOPPARREMOVE|rdkafka#consumer-1| Removing toppar queueTest [0]
The text was updated successfully, but these errors were encountered: