-
-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RabbitmqBroker open fd leak with gunicorn and gevent #381
Comments
Hey Greg, I haven't had time this week to think about this very deeply so I might have more ideas over the weekend, but have you tried adding a middleware to your app to explicitly close the connections after each request? For example: def dramatiq_middleware(env, start_response):
try:
return app(env, start_response)
finally:
del get_broker().connection Since you mention each request is served by a new greenlet, this shouldn't negatively impact performance. It'd be good to come up with a better solution for gevent clients overall, though. The lack of pooling, in particular, is not great. |
I tried adding this: class Middleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
try:
return self.app(environ, start_response)
finally:
del dramatiq.get_broker().connection
app_with_middleware = Middleware(app) It seems to do what you'd expect, removing the connection from |
On a deeper look, I think I gave you bad advice/the connection deleter in the |
@gdvalle let me know if the issue still occurs for you on master. I think the middleware should now work properly. |
@Bogdanp Middleware now works on master! |
@Bogdanp I still have the issue do I need to use this middleware or just having 1.11 should be enough ? |
@wsantos yes, the middleware is necessary. The fix only made the middleware work. |
Hey, Bogdan!
This is not exactly a dramatiq issue in and of itself, but I think it's a common enough setup to warrant an issue and discussion here.
When running gunicorn on Linux with gevent workers I noticed on every request that enqueues a message, a new connection is created, eventually leading to file descriptor exhaustion that culminates in an
OSError: [Errno 98] Address already in use
as pika attempts to bind on port 0, or just a plainOSError: [Errno 24] Too many open files
.Each gunicorn request appears to have a new/empty
threading.local()
storage, so each timeBroker.connection
attempts to re-use the thread-local connection state it falls into making a new one. Each new connection is also stored in a non-thread-localset()
,self.connections
, which causes the process to hang onto the now unusable connection, and the GC is unable to tear those down to free up the ports.self.connections
just keeps growing until file descriptors run out.This behavior is different from the
gthread
worker, which seems to reuse threads for requests and preserve that thread-local storage. And dramatiq itself, by subclassingThread
and spawning workers, also doesn't suffer.Here's an example to reproduce the issue. Save as
main.py
:Install the dependencies:
Start RabbitMQ somewhere.
Run the app:
Run the worker:
Start making requests:
For a fix, I swapped out the thread-local connection storage for a dict that stores connections by Greenlet.minimal_ident. I think this should retain thread-safety, and it allows gunicorn actually reuse those connections.
This probably isn't perfect. It's not including the gevent hub ident in the key, so unsafe access could happen there, and I assume an ideal solution would be agnostic for gevent and threaded runtimes. But, hopefully this is enough to shed some light on the topic.
Also, somewhat related, I noticed pika just merged a gevent adapter a few days ago: https://github.com/pika/pika/blob/master/pika/adapters/gevent_connection.py, but I haven't dug in to see what it'd take to make that work with either the BlockingConnection style API or designing a dramatiq broker around that.
The text was updated successfully, but these errors were encountered: