Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer cannot recover from broker failure #267

Closed
vineetgoel opened this issue Dec 11, 2017 · 13 comments
Closed

Producer cannot recover from broker failure #267

vineetgoel opened this issue Dec 11, 2017 · 13 comments
Assignees
Labels

Comments

@vineetgoel
Copy link
Contributor

It looks like the AIOKafkaProducer is unable to recover from the broker going down.

I tried with the following code:

from aiokafka import AIOKafkaProducer
import asyncio


loop = asyncio.get_event_loop()


producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')


async def main():
    await producer.start()
    i = 0
    while True:
        print(f'Attempting to send')
        await producer.send('fooobarrrrr', b'Hello my friend')
        i += 1
        await asyncio.sleep(1.0)


if __name__ == '__main__':
    loop.run_until_complete(main())

Looks like if I take down my locally running broker and bring it back up, the program just goes into a loop with the following log message:

Unable to update metadata from []
Unable to update metadata from []
Unable to update metadata from []
Unable to update metadata from []
Unable to update metadata from []
Unable to update metadata from []

It looks like the producer is unable to get the cluster broker ids after a broker failure.

@vineetgoel
Copy link
Contributor Author

@tvoinarovskyi is this a known issue? Am I doing something wrong/is there a planned fix?

@tvoinarovskyi
Copy link
Member

Hey there. Thanks for the bug report. I am a bit stuck with a full refactor of subscription state and a little time on my hands, thus the late response.
To my big surprise, I can reproduce this bug. There was quite some work done on the producer in 0.3.1, so I thought those should go away. For information please post your Broker and aiokafka versions used.
Atm, I can't really dig deep into what's happening, but I am sure we did not have this before. I suspect it's either something introduced in 0.3.1 or a problem with Broker 0.11+.

@ask
Copy link
Contributor

ask commented Jan 2, 2018

I'm reproducing with Kafka 0.11.0.1, aiokafka 0.3.1

Once stopping Kafka, it logs this very fast, like multiple times per second which seems like
a spinloop (well, not really, but a bit faster than necessary).

[2018-01-02 15:07:02,394: WARNING]: No broker metadata found in MetadataResponse
[2018-01-02 15:07:02,394: ERROR]: Error fetching metadata for topic posts: <class 'kafka.errors.InvalidReplicationFactorError'>
[2018-01-02 15:07:02,395: ERROR]: Error fetching metadata for topic word-counts-word_counts-changelog: <class 'kafka.errors.InvalidReplicationFactorError'>
[2018-01-02 15:07:02,395: ERROR]: Error fetching metadata for topic word-counts-__assignor-__leader: <class 'kafka.errors.InvalidReplicationFactorError'>
[2018-01-02 15:07:02,396: ERROR]: Error fetching metadata for topic word-counts-examples.word_count.count_words: <class 'kafka.errors.InvalidReplicationFactorError'>

Then I start Kafka backup, and aiokafka is stuck just logging this every 0.1 seconds:

[2018-01-02 15:07:02,396: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:02,496: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:02,597: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:02,697: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:02,798: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:02,899: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,000: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,101: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,202: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,303: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,404: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,504: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,605: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,707: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,809: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:03,911: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,012: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,113: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,215: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,317: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,419: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,519: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,620: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,722: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,823: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()
[2018-01-02 15:07:04,924: ERROR]: Skipping heartbeat: no active group: NoBrokersAvailable()

@vineetgoel
Copy link
Contributor Author

I am on kafka 0.10.2.1 and aiokafka 0.3.1

@tvoinarovskyi
Copy link
Member

@ask The issue is about Producer, but your log shows Consumer errors. Does the consumer fail in your case? How many brokers and topics do you have?

@vineetgoel
Copy link
Contributor Author

Any updates on this @tvoinarovskyi ?

@tvoinarovskyi
Copy link
Member

@vineet-rh Am finishing the #286, no progress on this thus far. I do think it's not complicated, just needs to be tracked down. Sorry.

@tvoinarovskyi
Copy link
Member

@ask About your error, it seems like a separate error from this one. I will file up a new issue for this. As far as I can tell you created a topic with a replication factor of more nodes than those available. Frankly, I think Java's client does not handle this case either, no handlers for INVALID_REPLICATION_FACTOR in code... Will need to try to reproduce the case later.

@ask
Copy link
Contributor

ask commented Jan 18, 2018

Hmm. Thanks for investigating! Since it worked at startup and it created the topics successfully then, it only broke after I restarted the broker. I guess it must have changed replication factor after restarting (or the number of brokers changed, but this was local with a single broker)?

Apropos, is there an easy way to add a callback when the connection to the broker is closed? I have a topic creation cache that I probably have to reset when disconnected.

@tvoinarovskyi
Copy link
Member

@ask Hmm, there is certainly no API to understand when a connection is closed. The connections by design are meant to be transparent to the user, there are also cases when 2 sockets can be opened to a single node (coordination socket is separate).
Rereading the question, do you mean the case when all nodes disconnect? If so the NoBrokersAvailable should be raised exactly in that case. In producer API it's on produce() in Consumer should be getone and getmany (There may be a problem with async for design here, as it swallows most of the errors, hmm).

Anyway, if you don't find a way to do that, please open an issue with a use case, will add something ASAP.

@tvoinarovskyi
Copy link
Member

Hmm, now I can't reproduce this in master... @vineet-rh can you confirm this too?

@tvoinarovskyi
Copy link
Member

Ok, so actually it's a strange broker behaviour on startup. There is a case, where broker can return 0 nodes in metadata response. Now aiokafka believes it truly and erases all nodes from its cache =)

@tvoinarovskyi
Copy link
Member

Should be fixed by #297

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants