Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer.committed([TopicPartition('test', 0)]) blocks and hangs #68

Closed
fillest opened this issue Nov 11, 2016 · 9 comments
Closed

consumer.committed([TopicPartition('test', 0)]) blocks and hangs #68

fillest opened this issue Nov 11, 2016 · 9 comments

Comments

@fillest
Copy link

fillest commented Nov 11, 2016

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, kafka 0.10.1.0
I have a topic "test" p = TopicPartition('test', 0), to which I've pushed some messages but haven't commited an offset yet (on purpose). c.position([p]) gives -1001 (gave me a "wtf" moment but as I've googled (took quite some time though) it is ok and means that there is nothing comitted yet). But c.committed([p]) just hangs (and can't be interrupted by ctrl-c).
I've turned on 'debug': 'all' and this is the tail of the log:

%7|1478876711.684|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 99)
%7|1478876711.784|RECV|test-client#consumer-1| 127.0.0.1:9092/0: Received FetchResponse (v1, 36 bytes, CorrId 99, rtt 100.80ms)
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Topic test [0] MessageSet size 0, error "Success", MaxOffset 27, Ver 2/2
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch reply: Success
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch topic test [0] at offset 27 (v2)
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch 1/1/1 toppar(s)
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch for 1 toppars, fetching=1, backoff=-9455ms
%7|1478876711.785|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 100)
%7|1478876711.886|RECV|test-client#consumer-1| 127.0.0.1:9092/0: Received FetchResponse (v1, 36 bytes, CorrId 100, rtt 101.10ms)
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Topic test [0] MessageSet size 0, error "Success", MaxOffset 27, Ver 2/2
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch reply: Success
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch topic test [0] at offset 27 (v2)
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch 1/1/1 toppar(s)
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch for 1 toppars, fetching=1, backoff=-9556ms
%7|1478876711.886|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 101)
%7|1478876711.993|RECV|test-client#consumer-1| 127.0.0.1:9092/0: Received FetchResponse (v1, 36 bytes, CorrId 101, rtt 106.74ms)
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Topic test [0] MessageSet size 0, error "Success", MaxOffset 27, Ver 2/2
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch reply: Success
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch topic test [0] at offset 27 (v2)
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch 1/1/1 toppar(s)
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch for 1 toppars, fetching=1, backoff=-9663ms
%7|1478876711.993|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 102)

The messages look quite similar and are printed very fast. Also note the negative backoff, I guess it's not normal

@fillest fillest changed the title consumer.committed([TopicPartition('test', 0)]) hangs consumer.committed([TopicPartition('test', 0)]) blocks and hangs Nov 11, 2016
@edenhill
Copy link
Contributor

re -1001 confusion: understandable! we'll add a constant for that to lessen the confusion.

@edenhill
Copy link
Contributor

Negative backoffs are okay (just an impl detail) when there is no backoff in place.
The fast rate is due to fetch.wait.max.ms which defaults to 100ms, so when nothing new is happening you'll have 10 Fetch requests per second.

@edenhill
Copy link
Contributor

edenhill commented Nov 11, 2016

Is your code any different from this:

def test_pos ():
    c = Consumer({"bootstrap.servers":"0",
                  "group.id":"xx3",
                  "enable.auto.commit":False,
                  "default.topic.config":{"auto.offset.reset":"earliest"}})

    c.subscribe(["test"])
    while True:
        m = c.poll(1)
        print(m)
        tp = TopicPartition("test", 0)
        cs = c.committed([tp])
        print('committed offsets for',tp, 'is', cs)

    c.close()

This works as expected for me, committed() is not hanging:

None
('committed offsets for', TopicPartition{topic=test,partition=0,offset=-1001,error=None}, 'is', [TopicPartition{topic=test,partition=0,offset=-1001,error=None}])
<cimpl.Message object at 0x7f1248b06fa0>
('committed offsets for', TopicPartition{topic=test,partition=0,offset=-1001,error=None}, 'is', [TopicPartition{topic=test,partition=0,offset=-1001,error=None}])
<cimpl.Message object at 0x7f1248ac0050>
...

@edenhill
Copy link
Contributor

ping @fillest

1 similar comment
@edenhill
Copy link
Contributor

ping @fillest

@fillest
Copy link
Author

fillest commented Dec 11, 2016

Sorry for the delays, I'm overloaded at work at the moment :(

@edenhill
Copy link
Contributor

No problem :)

@edenhill
Copy link
Contributor

@fillest any progress on this?
See comment: #68 (comment)

@edenhill
Copy link
Contributor

Closed due to inactivity

dtheodor pushed a commit to dtheodor/confluent-kafka-python that referenced this issue Sep 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants