Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't acquire connection from pool #113

Closed
serg666 opened this issue Apr 6, 2016 · 18 comments
Closed

Can't acquire connection from pool #113

serg666 opened this issue Apr 6, 2016 · 18 comments
Labels

Comments

@serg666
Copy link

serg666 commented Apr 6, 2016

Hi!

I have got the problem with aiopg when acquiring new connection from pool. I have used a simple code for this:

import asyncio

import aiopg.sa
import sqlalchemy as sa

import tornado.web
import tornado.platform.asyncio

async def create_engine():
    return await aiopg.sa.create_engine(
        'dbname=dbname user=user password=password host=127.0.0.1',
        echo=True
    )

loop = asyncio.get_event_loop()
engine = loop.run_until_complete(create_engine())
metadata = sa.MetaData()


t1 = sa.Table('t1', metadata,
              sa.Column('id', sa.Integer, primary_key=True),
              sa.Column('name', sa.String(255), nullable=False))


t2 = sa.Table('t2', metadata,
              sa.Column('id', sa.Integer, primary_key=True),
              sa.Column('name', sa.String(255), nullable=False))


async def fetch_t2():
    async with engine.acquire() as conn:
        await conn.execute(t2.select().where(t2.c.id == 4))


class ReqHandler(tornado.web.RequestHandler):
    async def post(self):
        async with engine.acquire() as conn:
            async with conn.begin():
                await conn.execute(t1.select().where(t1.c.id == 1))
                await fetch_t2()
                await conn.execute(t1.insert().values(name='some name'))

        self.write("Hello world!\n")


app = tornado.web.Application([
    (r'/', ReqHandler)
])

if __name__ == '__main__':
    tornado.platform.asyncio.AsyncIOMainLoop().install()
    app.listen(8080)
    loop.run_forever()

Then I provide a load of 100 concurrent requests and after a while the service hangs. It seems to me that aiopg can not get another connection from the pool, because when I increase the maximum pool size load test has been passed

I have used the following:

aiopg-0.9.2
psycopg2-2.6.1
SQLAlchemy-1.0.12
tornado-4.3

@jettify
Copy link
Member

jettify commented Apr 6, 2016

Hi
could you try to execute your code but without transaction?:

        async with engine.acquire() as conn:
                await conn.execute("SELECT 1;")

I think it is more likely issue with transaction.

@serg666
Copy link
Author

serg666 commented Apr 6, 2016

Hi! Thanks for feedback.

I have modified the code as following:

import asyncio

import aiopg.sa
import sqlalchemy as sa

import tornado.web
import tornado.platform.asyncio

async def create_engine():
    return await aiopg.sa.create_engine(
        'dbname=dbname user=user password=password host=127.0.0.1',
        echo=True
    )

loop = asyncio.get_event_loop()
engine = loop.run_until_complete(create_engine())
metadata = sa.MetaData()


t1 = sa.Table('t1', metadata,
              sa.Column('id', sa.Integer, primary_key=True),
              sa.Column('name', sa.String(255), nullable=False))


t2 = sa.Table('t2', metadata,
              sa.Column('id', sa.Integer, primary_key=True),
              sa.Column('name', sa.String(255), nullable=False))


async def fetch_t2():
    async with engine.acquire() as conn:
        await conn.execute(t2.select().where(t2.c.id == 4))


class ReqHandler(tornado.web.RequestHandler):
    async def post(self):
        async with engine.acquire() as conn:
            #async with conn.begin():
            await conn.execute(t1.select().where(t1.c.id == 1))
            await fetch_t2()
            await conn.execute(t1.insert().values(name='some name'))

        self.write("Hello world!\n")


app = tornado.web.Application([
    (r'/', ReqHandler)
])

if __name__ == '__main__':
    tornado.platform.asyncio.AsyncIOMainLoop().install()
    app.listen(8080)
    loop.run_forever()

but the problem persists

I provide a load of 120 concurrent requests during 10 minutes and after a while the service hangs again.

In aiopg logs I see something like this:

2016-04-06 15:49:30,669 INFO  aiopg: [MainThread] {'id_1': 4}
2016-04-06 15:49:30,679 INFO  tornado.access: [MainThread] 200 POST / (192.168.156.104) 927.32ms
2016-04-06 15:49:30,690 INFO  aiopg: [MainThread] SELECT t1.id, t1.name 
FROM t1 
WHERE t1.id = %(id_1)s
2016-04-06 15:49:30,690 INFO  aiopg: [MainThread] {'id_1': 1}
2016-04-06 15:49:30,694 INFO  tornado.access: [MainThread] 200 POST / (192.168.156.104) 1087.84ms
2016-04-06 15:49:30,698 INFO  aiopg: [MainThread] SELECT t1.id, t1.name 
FROM t1 
WHERE t1.id = %(id_1)s
2016-04-06 15:49:30,698 INFO  aiopg: [MainThread] {'id_1': 1}
2016-04-06 15:49:30,709 INFO  aiopg: [MainThread] INSERT INTO t1 (name) VALUES (%(name)s) RETURNING t1.id
2016-04-06 15:49:30,709 INFO  aiopg: [MainThread] {'name': 'some name'}
2016-04-06 15:49:30,725 INFO  aiopg: [MainThread] SELECT t2.id, t2.name 
FROM t2 
WHERE t2.id = %(id_1)s
2016-04-06 15:49:30,740 INFO  aiopg: [MainThread] {'id_1': 4}
2016-04-06 15:49:30,750 INFO  aiopg: [MainThread] INSERT INTO t1 (name) VALUES (%(name)s) RETURNING t1.id
2016-04-06 15:49:30,750 INFO  aiopg: [MainThread] {'name': 'some name'}
2016-04-06 15:49:30,762 INFO  tornado.access: [MainThread] 200 POST / (192.168.156.104) 804.12ms
2016-04-06 15:49:30,794 INFO  aiopg: [MainThread] SELECT t1.id, t1.name 
FROM t1 
WHERE t1.id = %(id_1)s
2016-04-06 15:49:30,802 INFO  aiopg: [MainThread] {'id_1': 1}
2016-04-06 15:49:30,809 INFO  tornado.access: [MainThread] 200 POST / (192.168.156.104) 815.93ms
2016-04-06 15:49:30,836 INFO  aiopg: [MainThread] SELECT t1.id, t1.name 
FROM t1 
WHERE t1.id = %(id_1)s
2016-04-06 15:49:30,836 INFO  aiopg: [MainThread] {'id_1': 1}
2016-04-06 15:49:30,842 INFO  aiopg: [MainThread] INSERT INTO t1 (name) VALUES (%(name)s) RETURNING t1.id
2016-04-06 15:49:30,842 INFO  aiopg: [MainThread] {'name': 'some name'}
2016-04-06 15:49:30,852 INFO  aiopg: [MainThread] SELECT t1.id, t1.name 
FROM t1 
WHERE t1.id = %(id_1)s
2016-04-06 15:49:30,864 INFO  aiopg: [MainThread] {'id_1': 1}
2016-04-06 15:49:30,876 INFO  tornado.access: [MainThread] 200 POST / (192.168.156.104) 299.97ms
2016-04-06 15:49:30,913 INFO  aiopg: [MainThread] SELECT t1.id, t1.name 
FROM t1 
WHERE t1.id = %(id_1)s
2016-04-06 15:49:30,925 INFO  aiopg: [MainThread] {'id_1': 1}
2016-04-06 15:49:30,945 INFO  tornado.access: [MainThread] 200 POST / (192.168.156.104) 865.60ms

... and then the service hangs...

@serg666
Copy link
Author

serg666 commented Apr 6, 2016

Note that pool maxsize is 10 connections as set by default. If I increase the maximum pool size, load test has been passed

@serg666
Copy link
Author

serg666 commented Apr 6, 2016

It seems to me, when pool size goes to max limit size another connections does not return to pool and aiopg can`t acquire new connection from the pool or something like this.

@jettify
Copy link
Member

jettify commented Apr 6, 2016

From logs, I see you still do inserts... I suspect that connection stuck in unknown state due to transaction error or so, I may be wrong.

@serg666
Copy link
Author

serg666 commented Apr 6, 2016

If U wish U can run this code with pool max size = 10 (by default) and provide load about 120 concurrent requests during some time and see that insert has stopped and service hangs....

The point is, when I increase max pool size e.g. to 1000 the load test has been passed

@serg666
Copy link
Author

serg666 commented Apr 6, 2016

May be it is not the problem and I can set max pool size veeeeeery long, but I wonder why the service hangs on small pool size.

@serg666
Copy link
Author

serg666 commented Apr 6, 2016

If I press KeyboardInterrupt I get this

^CTraceback (most recent call last):
  File "bug.py", line 56, in <module>
    app = tornado.web.Application([
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 295, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 1218, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/lib/python3.5/selectors.py", line 432, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
2016-04-06 16:47:26,542 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda239604e0>
2016-04-06 16:47:26,546 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda23960630>
2016-04-06 16:47:26,550 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda23954cc0>
2016-04-06 16:47:26,561 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda23954d30>
2016-04-06 16:47:26,600 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda23960d30>
2016-04-06 16:47:26,611 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda23960780>
2016-04-06 16:47:26,616 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda239601d0>
Exception ignored in: <generator object _wrap_awaitable at 0x7fda236a9e08>
Traceback (most recent call last):
  File "<string>", line 6, in _wrap_awaitable
  File "bug.py", line 44, in post
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/utils.py", line 121, in __aexit__
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/sa/engine.py", line 153, in release
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/pool.py", line 241, in release
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 524, in async
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 539, in ensure_future
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 209, in create_task
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
Exception ignored in: <generator object _wrap_awaitable at 0x7fda23656e60>
Traceback (most recent call last):
  File "<string>", line 6, in _wrap_awaitable
  File "bug.py", line 44, in post
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/utils.py", line 121, in __aexit__
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/sa/engine.py", line 153, in release
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/pool.py", line 241, in release
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 524, in async
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 539, in ensure_future
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 209, in create_task
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
Exception ignored in: <generator object _wrap_awaitable at 0x7fda236a9888>
Traceback (most recent call last):
  File "<string>", line 6, in _wrap_awaitable
  File "bug.py", line 44, in post
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/utils.py", line 121, in __aexit__
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/sa/engine.py", line 153, in release
  File "/home/test/bug/lib/python3.5/site-packages/aiopg/pool.py", line 241, in release
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 524, in async
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 539, in ensure_future
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 209, in create_task
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed
2016-04-06 16:47:26,643 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda2548c978>
2016-04-06 16:47:26,655 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda239609e8>
2016-04-06 16:47:26,664 ERROR asyncio: [Dummy-1] Unclosed connection
connection: <aiopg.connection.Connection object at 0x7fda239547f0>

@ghost
Copy link

ghost commented Apr 26, 2016

I think the question should be "is it possible to get another connection from pool if previous not returned yet"?

@runtel
Copy link
Contributor

runtel commented May 24, 2016

I have some like this issue.
For detail:
echo False
size 10
freesize 10
used 0
acquiring 0

It was print before
`print("echo", self.pgsql.echo, "size", self.pgsql.size, "freesize",
self.pgsql.freesize, "used", len(self.pgsql._used),"acquiring", self.pgsql._acquiring)

        async with self.pgsql.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(sql)`

error:
EXCEPTION <class 'AssertionError'> IN (/usr/local/runtel/manager3/common/base.py, LINE 207 "async with self.pgsql.acquire() as conn:"): <aiopg.connection.Connection object at 0x7fd1e465af60>

@mpaolini
Copy link
Contributor

the current pool implementation does not have a timeout, and you can deadlock it easily if you acquire two connections in a nested manner

attaching an easier test script to reproduce

import asyncio
import logging

import aiopg.sa

logger = logging.getLogger()

CONN_STRING = 'postgresql://test:test@127.0.0.1/aiopg_test'

async def create_engine(pool_size):
    return await aiopg.sa.create_engine(
        CONN_STRING,
        maxsize=pool_size
    )


async def q2(engine, task_id):
    logger.debug('starting task {}'.format(task_id))
    async with engine.acquire():
        logger.debug('acquired conn task {}'.format(task_id))
        pass
    logger.debug('released conn task {}'.format(task_id))


async def q1(engine, task_id, nested):
    logger.debug('starting task {}'.format(task_id))
    async with engine.acquire():
        logger.debug('acquired conn task {}'.format(task_id))
        if nested:
            await q2(engine, task_id)


async def main(concurrency, iterations, pool_size, nested, loop):
    engine = await create_engine(pool_size)
    for n in range(iterations):
        print('iteration={n}'.format_map(locals()))
        tasks = [asyncio.ensure_future(q1(engine, i, nested), loop=loop) for i in range(concurrency)]
        await asyncio.wait(tasks, loop=loop)


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--concurrency', type=int, default=10)
    parser.add_argument('-i', '--iterations', type=int, default=10)
    parser.add_argument('-p', '--pool-size', type=int, default=10)
    parser.add_argument('-n', '--nested', action='store_true')
    args = parser.parse_args()
    loop = asyncio.get_event_loop()
    logging.basicConfig(level=logging.DEBUG, format='%(funcName)s %(lineno)d: %(message)s')
    loop.run_until_complete(main(args.concurrency, args.iterations, args.pool_size, args.nested, loop=loop))

not nested, everything ok

python test_pool.py -c 1000

nested, if concurrency is higher than pool size it deadlocks

python test_pool.py -c 100 -p 9 -n

@mpaolini
Copy link
Contributor

related to #16

@serg666
Copy link
Author

serg666 commented Jul 15, 2016

This is good news. We will look forward to the completion of the task #16 , but it is open from 2014 :-)

@mpaolini
Copy link
Contributor

task #16 it is not really needed, see this comment #125 (comment)

as of that comment, we might even consider closing this current issue as invalid @jettify

@asvetlov
Copy link
Member

Use aiohttp.Timeout or asyncio.wait_for for adding timeouts for connection acquiring.

@serg666
Copy link
Author

serg666 commented Jul 15, 2016

@mpaolini how can be rewritten your code above to solve the problem with nested, if concurrency is higher than pool size?

@asvetlov
Copy link
Member

For your case just pass already acquired connection to fetch_t2:

import asyncio

import aiopg.sa
import sqlalchemy as sa

import tornado.web
import tornado.platform.asyncio

async def create_engine():
    return await aiopg.sa.create_engine(
        'dbname=dbname user=user password=password host=127.0.0.1',
        echo=True
    )

loop = asyncio.get_event_loop()
engine = loop.run_until_complete(create_engine())
metadata = sa.MetaData()


t1 = sa.Table('t1', metadata,
              sa.Column('id', sa.Integer, primary_key=True),
              sa.Column('name', sa.String(255), nullable=False))


t2 = sa.Table('t2', metadata,
              sa.Column('id', sa.Integer, primary_key=True),
              sa.Column('name', sa.String(255), nullable=False))


async def fetch_t2(conn):
    await conn.execute(t2.select().where(t2.c.id == 4))


class ReqHandler(tornado.web.RequestHandler):
    async def post(self):
        async with engine.acquire() as conn:
            #async with conn.begin():
            await conn.execute(t1.select().where(t1.c.id == 1))
            await fetch_t2(conn)
            await conn.execute(t1.insert().values(name='some name'))

        self.write("Hello world!\n")


app = tornado.web.Application([
    (r'/', ReqHandler)
])

if __name__ == '__main__':
    tornado.platform.asyncio.AsyncIOMainLoop().install()
    app.listen(8080)
    loop.run_forever()

@serg666
Copy link
Author

serg666 commented Jul 18, 2016

Of course it solves, but what about nested connection acquiring as @mpaolini said, is it possible?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants