-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
async producer has unbounded internal queue #297
Comments
this sounds odd -- the SimpleProducer should operate in a sync / blocking mode by default. can you paste a test case to reproduce? |
I confirm that memory leak in producer with |
I guess this happens because growing queue. |
Thanks for the quick patch! Though it might be that from a client point of view, blocking on send would be more desirable than raising an error. What do you think? |
Kafka is about a big data. If you can not send your data to kafka it is better to spill data to disk and try to resend later. |
there is currently no bounds on the async producer's internal queue size. setting either honestly I would not recommend the current async producer code for production use -- it needs to be fixed for several issues (this one included). at my company we wrote our own async producer class that wraps the sync simpleproducer. I will see if we can get that merged back upstream. |
FWIW, async and multiprocessing both have a host of issues. I'm still On Fri, Jan 23, 2015 at 9:32 AM, Dana Powers notifications@github.com
|
#331 also addresses this issue |
#331 is merged and supports this via async_queue_maxsize and async_queue_put_timeout configuration parameters to the producer |
I'm using the provided example code for SimpleProducer to send out 1 kB messages in an infinite loop, as fast as possible. What I see is that I can pass ~60k messages/sec to the kafka-python library, although only ~1k messages are actually sent out towards the kafka brokers. In the meantime, resident memory usage of the producer is going up steadily and without an upper bound.
I'm new to kafka-python, and I'm not familiar with its iternals, but it seems as if the communication between my client code and kafka-python was async (regardless of instantiating the SimpleProcuder with async=False), and messages were queued internally by the kafka-python library in an unlimited queue.
Is that intutition right? Is there a way to use the library in a kind of "blocking" mode where memory usage is limited?
I'm using kafka-python v0.9.2 with two v0.8.2 kafka brokers, all nodes are connected with a 10 Gbps network.
The text was updated successfully, but these errors were encountered: