Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSError: [Errno 98] Address already in use when calling send function #558

Open
7 tasks done
heatherKoo07 opened this issue Jul 7, 2023 · 9 comments
Open
7 tasks done

Comments

@heatherKoo07
Copy link

heatherKoo07 commented Jul 7, 2023

Issues

GitHub issues are for bugs. If you have questions, please ask them on the mailing list.

Checklist

  • Does your title concisely summarize the problem?
  • Did you include a minimal, reproducible example?
  • What OS are you using?
  • What version of Dramatiq are you using?
  • What did you do?
  • What did you expect would happen?
  • What happened?

What OS are you using?

Debian 12

What version of Dramatiq are you using?

v1.12.0

What did you do?

I'm using dramatiq in a flask app with flask-melodramatiq. I got an OSError from POST request where dramatiq workers are supposed to run an async job.

What did you expect would happen?

The task is supposed to be processed successfully

What happened?

The below error was raised and tasks in queue were not processed but remained for hours. I had to restart flask to solve this issue.

Jun 24 02:09:32 c14-irkif docker-kif-web: msg = actor.send(vm, **user_arg_map)
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/dramatiq/actor.py", line 112, in send
Jun 24 02:09:32 c14-irkif docker-kif-web: return self.send_with_options(args=args, kwargs=kwargs)
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/dramatiq/actor.py", line 131, in send_with_options
Jun 24 02:09:32 c14-irkif docker-kif-web: return self.broker.enqueue(message, delay=delay)
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/dramatiq/brokers/rabbitmq.py", line 319, in enqueue
Jun 24 02:09:32 c14-irkif docker-kif-web: self.channel.basic_publish(
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/dramatiq/brokers/rabbitmq.py", line 153, in channel
Jun 24 02:09:32 c14-irkif docker-kif-web: channel = self.state.channel = self.connection.channel()
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/dramatiq/brokers/rabbitmq.py", line 125, in connection
Jun 24 02:09:32 c14-irkif docker-kif-web: connection = self.state.connection = pika.BlockingConnection(
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 360, in __init__
Jun 24 02:09:32 c14-irkif docker-kif-web: self._impl = self._create_connection(parameters, _impl_class)
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 435, in _create_connection
Jun 24 02:09:32 c14-irkif docker-kif-web: ioloop = select_connection.IOLoop()
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/pika/adapters/select_connection.py", line 374, in __init__
Jun 24 02:09:32 c14-irkif docker-kif-web: self._poller = self._get_poller(self._get_remaining_interval,
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/pika/adapters/select_connection.py", line 431, in _get_poller
Jun 24 02:09:32 c14-irkif docker-kif-web: poller = SelectPoller(**kwargs)
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/pika/adapters/select_connection.py", line 626, in __init__
Jun 24 02:09:32 c14-irkif docker-kif-web: self._r_interrupt, self._w_interrupt = self._get_interrupt_pair()
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/pika/adapters/select_connection.py", line 916, in _get_interrupt_pair
Jun 24 02:09:32 c14-irkif docker-kif-web: return pika.compat._nonblocking_socketpair()  # pylint: disable=W0212
Jun 24 02:09:32 c14-irkif docker-kif-web: File "/usr/local/lib/python3.8/site-packages/pika/compat.py", line 242, in _nonblocking_socketpair
Jun 24 02:09:32 c14-irkif docker-kif-web: lsock.bind((host, 0))
Jun 24 02:09:32 c14-irkif docker-kif-web: OSError: [Errno 98] Address already in use
@Bogdanp
Copy link
Owner

Bogdanp commented Jul 8, 2023

This is probably some sort of resource leak. The underlying RabbitMQ client we use, pika, creates a socket pair per connection in order to handle interruptions, so this error is probably a red herring and the real issue is you have too many open connections.

If you're using gevent, this issue may be relevant.

@heatherKoo07
Copy link
Author

Thank you! Yes I do use gevent from gunicorn and have two custom broker middlewares. I addded the middlewares to the broker defined from flask_melodramatiq like below.

broker = flask_melodramatiq.Broker()
dramatiq.set_broker(broker)
broker.add_middleware(
    _progress := Progress(),
    before=middleware.Retries,
)
broker.add_middleware(
    DatabaseMiddleware(),
    after=flask_melodramatiq.lazy_broker.AppContextMiddleware,
)

According to the known issue link, I need to create a Middleware class and delete broker connection in the class like this. I'm not sure how to do it with flask_melodramatiq tho

del dramatiq.get_broker().connection

@heatherKoo07
Copy link
Author

I found that the resource leak was from database connection. I open a transaction for each task with a middleware and MySQL keeps the connections open for 8 hours by default even though I close sessions because of sqlalchemy pooling. I think I can fix it by decreasing the wait_timeout configuration in MySQL to kill the processes ealier than 8 hours.

@arseniiarsenii
Copy link

arseniiarsenii commented Sep 26, 2023

I am encountering the same issue. I used an outdated version of Dramatiq before, but after reading #381 I updated it to version 1.14.2. I am also using Gevent version 23.9.1.

However, unlike the OP I am not using MySQL and flask_melodramatiq or DB session middleware. Instead, I am using Postgres 13.1 with SQLAlchemy 1.4.48.

I open a DB session using SQLAlchemy's sessionmaker context manager in each individual actor function, so I believe sessions are guaranteed to be closed when the actor exits.

I only encountered this error under high load. It is the exact same exception produced by Pika when calling actor.send()

My version of Pika is 1.3.2.

@arseniiarsenii
Copy link

I have spent a lot of time debugging this issue and still could not find the cause.

I had a hypothesis: SQLAlchemy was creating a new thread-local connection pool for each message and then was failing to properly close connections when message handling was done. This could have been leading to a connection leakage.

I've setup a testing environment:

Monitoring active and idle Postgres connections in one tab with this query:

SELECT
    datid,
    state,
    pid,
    application_name
FROM pg_stat_activity
WHERE true
AND state IN ('active', 'idle')
AND datname = 'my_db_name'
ORDER BY backend_start DESC;

Created a test actor to imitate tasks that use the DB:

engine = create_engine(config.DB_DSN, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=True, autoflush=False, expire_on_commit=True, bind=engine)


@dramatiq.actor(queue_name="test_db", priority=0)
def mock_db_actor(n: int) -> None:
    import time
    from sqlalchemy import text

    with SessionLocal() as db_session:
        logger.info(f"Session {n} opened")
        logger.info(db_session.execute(text("SELECT 1;")).scalar_one())
        time.sleep(5)
    logger.info(f"Session {n} closed")


if __name__ == "__main__":
    x = 100
    for n in range(x):
        mock_db_actor.send(n)
    print(f"Spawned {x} tasks")

Then I started testing. I was launching the worker with this code:

dramatiq.cli.main(
    broker=__file__,
    processes=1,
    threads=10,
    worker_shutdown_timeout=60_000,
)

I started spawning 100 messages at a time and monitoring the SQL query.

All I could see were 5 open idle connections. The connection count has never grown past that number. Then I tried shutting down the container with the worker and all connections were successfully closed and no longer appeared in the query output.

I then repeated the experiment with updated DB engine settings. This time I used the following code to init the engine:

null_pool_engine = create_engine(config.DB_DSN, poolclass=NullPool)
SessionLocal = sessionmaker(autocommit=True, autoflush=False, expire_on_commit=True, bind=null_pool_engine)

this disabled the connection pooling entirely.

This time I was not able to see any Idle connections in the SQL query's output, although I've set it to auto-refresh every second.

Conclusion:

  1. SQLAlchemy's session pool can and will dispose connections correctly when message handling is complete and when worker process exits. I could not find any signs of a resource leak here.
  2. Although there are 10 active worker threads, there are only 5 open idle connections to Postgres. I don't have an explanation for this. Furthermore, I've never seen any of these connections acquire status 'active' during my tests. All of them were idle the whole time. This is probably due to the dynamics of the query, which was quick to execute and only made connections active during execution, so I wasn't able to catch them in this state.
  3. I think we can now rule out the possibility of a resource leakage in the DB connection code. The error has to be caused by a bug in dramatiq or pika.
  4. I still have no idea how to reproduce the issue.

Any help is appreciated. Hope to hear back from @Bogdanp about this case soon.

@heatherKoo07
Copy link
Author

heatherKoo07 commented Oct 31, 2023

This error came back to us again even though I fixed possible resource leak in my app. I think the problem is port conflicts between rabbitmq and mysql when they open connections. I hope to hear back too @Bogdanp

@heatherKoo07
Copy link
Author

heatherKoo07 commented Nov 5, 2023

The underlying RabbitMQ client we use, pika, creates a socket pair per connection in order to handle interruptions

After some investigation, I found that the socket pairs returned from pika.compat._nonblocking_socketpair() are not closed. I think database connection has nothing to do with this issue. Is it a dramatiq bug that doesn't close the sockets? @Bogdanp

@heatherKoo07
Copy link
Author

I reproduced the error and found that RabbitMQ connection is leaking in dramatiq. When messages are enqueued to RabbitMQ, the pika.BlockingConnections are not closed after the messages are delivered. This eats up all the local socket ports under high load and raises Address already in use error. I created a PR for the fix in #589 @Bogdanp Please review this

@arseniiarsenii
Copy link

@heatherKoo07 thank you for your effort, can you please describe a way to reliably reproduce this issue? I would like to test the solution, suggested by @Bogdanp here

Pasting it here for people who will be searching for the same issue and will find this thread:

class CloseMiddleware(Middleware):
    def after_enqueue(self, broker, message, delay):
        del broker.connection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants