Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close cannot be used while an asynchronous query is underway #364

Closed
ingmferrer opened this issue Aug 1, 2017 · 52 comments · Fixed by #452
Closed

close cannot be used while an asynchronous query is underway #364

ingmferrer opened this issue Aug 1, 2017 · 52 comments · Fixed by #452

Comments

@ingmferrer
Copy link

I'm getting this error if I do multiple fetchs. I'm not sure what I'm doing wrong:


Exception ignored in: <function ResultProxy.__init__.<locals>.<lambda> at 0x7f979638fe18>
Traceback (most recent call last):
  File "/home/anthbot/Pyvenv/anthbotv5/lib/python3.5/site-packages/aiopg/sa/result.py", line 235, in <lambda>
    self._weak = weakref.ref(self, lambda wr: cursor.close())
  File "/home/anthbot/Pyvenv/anthbotv5/lib/python3.5/site-packages/aiopg/cursor.py", line 50, in close
    self._impl.close()
psycopg2.ProgrammingError: close cannot be used while an asynchronous query is underway

This is the query:

query = sa.select([role_item]).where(role_item.c.auto_role_id == auto_role_id)
        return await fetch_ite(query)

This is the function:

async def _fetch(query, type=DBType.ONE):
    async with engine.acquire() as conn:
        res = await conn.execute(query)
        if type == DBType.ONE:
            return await res.fetchone()
        elif type == DBType.ALL:
            return await res.fetchall()
        elif type == DBType.ITE:
            return res
        return None

Any help would be appreciated.

@Skorpyon
Copy link

Skorpyon commented Aug 18, 2017

Same issue:

Exception ignored in: <function ResultProxy.__init__.<locals>.<lambda> at 0x6ca86b973a60>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiopg/sa/result.py", line 235, in <lambda>
    self._weak = weakref.ref(self, lambda wr: cursor.close())
  File "/usr/local/lib/python3.6/site-packages/aiopg/cursor.py", line 50, in close
    self._impl.close()
psycopg2.ProgrammingError: close cannot be used while an asynchronous query is underway
Exception ignored in: <function ResultProxy.__init__.<locals>.<lambda> at 0x6ca86b973ea0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/aiopg/sa/result.py", line 235, in <lambda>
    self._weak = weakref.ref(self, lambda wr: cursor.close())
  File "/usr/local/lib/python3.6/site-packages/aiopg/cursor.py", line 50, in close
    self._impl.close()
psycopg2.ProgrammingError: close cannot be used while an asynchronous query is underway

Problem is - stacktrace unexpected short, so impossible understand what part of code made this query, what query, how handle it.

@DeoLeung
Copy link

DeoLeung commented Sep 8, 2017

same here, any hints on what's went wrong?

@barrachri
Copy link

barrachri commented Sep 10, 2017

Same problem here.

the problem is that self._weak = weakref.ref(self, lambda wr: cursor.close()) try to close the connection when a query is underway (as you read from the traceback).

Strange because that line of code is there from a long time:
557ad3c#diff-e2e6e38bb0fbb338fc152d8f6b75ca37

@barrachri
Copy link

barrachri commented Sep 10, 2017

This for me resolves the error

if cursor.description is not None:
            self._metadata = ResultMetaData(self, cursor.description)
            self._weak = None

Version of the libraries

aiohttp==2.2.5
aiopg==0.13.0
psycopg2==2.7.3.1
SQLAlchemy==1.1.9
SQLAlchemy-Utils==0.32.14

Query

async with self.db.acquire() as conn:
            last_entry = mLog.alias()
            query = select([
                mLog.c.name,
                mLog.c.user_id,
                extract('EPOCH', (last_entry.c.created_at - mLog.c.created_at)).label('usage')
            ]).select_from(
                mLog.join(last_entry, mLog.c.name == last_entry.c.name)
            )\
            .where(
                (mLog.c.user_id == user_id) & \
                (last_entry.c.user_id == user_id) & \
                (mLog.c.action == 'create') & \
                (last_entry.c.action == 'destroy')
            )
            result = await conn.execute(query)

@Skorpyon
Copy link

Any update? Can core developers comment this issue? This exception bore me all time. Please give some advice why its happen and what to do.

@sajjadalee
Copy link

Any update? really tiring my work

@jettify
Copy link
Member

jettify commented Oct 17, 2017

I will take a look, please share script or test case if possible, so I can reproduce locally

@sajjadalee
Copy link

I am trying to run this script :
https://github.com/mixerp/mixerp/blob/master/src/FrontEnd/db/blank-db.sql

but postgre got stuck. Please replicate it on your machine

Thanks

@Skorpyon
Copy link

@sajjadalee my brain stuck with your script too :D
Maybe need split it to some readable and understandable parts?
And execute it one by one?
Its amazing, 11Mb SQL querry :D

@Skorpyon
Copy link

I got this exception (close cannot...) with small usual queries like SELECT blabla, bla FROM blabla WHERE blabla=true;

@sajjadalee
Copy link

😄 bravo. Its an opensource solution and i wanted to contribute in it. However postgre is such an disappointment so far :(

@Skorpyon
Copy link

@sajjadalee you may try asyncpg or it's SQLAlchemy wrapper asyncpgsa, just you need check query timeouts, because your script is giant and may take long time for execution. I have plan move from aiopg to this library, because psycopg async mode disappoint me.

@jettify
Copy link
Member

jettify commented Oct 17, 2017

@sajjadalee That sql script is large, this is not type of workload aiopg is optimized for. Have you tried regula psycopg2 in sync mode?

@jettify
Copy link
Member

jettify commented Oct 17, 2017

Looks like @barrachri is right, basically ResultProxy for some reason loose reference tries to close cursor, while original query still executing. One option is to refactor code to use context manager

async with conn.execute(sql) as cursor:
async for v in cursor:
result.append(v)
assert result == [(1,), (2, ), (3, ), (4, ), (5, )]

It should explicitly close cursor.

Can anyone test this solution?

@barrachri
Copy link

I'll try to find time later today to test it.

Thanks @jettify

@pacefalm
Copy link

@jettify This appears to have worked for me.

Thanks!

@jettify
Copy link
Member

jettify commented Oct 19, 2017

@asvetlov should we remove code that implicitly closes cursor in order to avoid this issue?

@barrachri
Copy link

barrachri commented Oct 19, 2017

It works also for me.
But I would like to keep the syntax

result = await conn.execute(query)

Is there a specific reason to keep track of these weakrefs?
Shouldn't be garbage collected automatically?

@asvetlov
Copy link
Member

Hmm.
I'm inclining to keep weakrefs at least until the library will drop yield from syntax and use async context managers everywhere.

@jiujitsu
Copy link

jiujitsu commented Dec 4, 2017

Any update on the status of this?

@asvetlov
Copy link
Member

asvetlov commented Dec 5, 2017

No, we don't perform hidden conversations and secret commits.

@thehesiod
Copy link
Contributor

thehesiod commented Jan 4, 2018

btw I got this error while using the aiopg cursor context so that doesn't seem to fix the issue:

  File "/weather/fbn/asyncio/db_utils.py", line 76, in _db_execute
    return cur.rowcount, []
  File "/weather/fbn/asyncio/db_utils.py", line 33, in __aexit__
    return self._ctx.__exit__(exc_type, exc_val, exc_tb)
  File "/usr/local/lib/python3.6/site-packages/aiopg/utils.py", line 220, in __exit__
    self._cur.close()
  File "/usr/local/lib/python3.6/site-packages/aiopg/cursor.py", line 50, in close
    self._impl.close()
psycopg2.ProgrammingError: close cannot be used while an asynchronous query is underway

along with:

  File "/usr/local/lib/python3.6/site-packages/aiopg/cursor.py", line 113, in execute
    yield from self._conn._poll(waiter, timeout)
  File "/usr/local/lib/python3.6/site-packages/aiopg/connection.py", line 241, in _poll
    raise exc
  File "/usr/local/lib/python3.6/site-packages/aiopg/connection.py", line 238, in _poll
    yield from asyncio.wait_for(self._waiter, timeout, loop=self._loop)
  File "/usr/local/lib/python3.6/asyncio/tasks.py", line 362, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

And this happened during a timeout, so it looks like it's due to: https://github.com/aio-libs/aiopg/blob/master/aiopg/cursor.py#L116

And this will then cause the connection to not be returned to the pool since https://github.com/aio-libs/aiopg/blob/master/aiopg/utils.py#L241 will not run.

band-aid: #415

Fascinating as 2a72b60#diff-3416729d6745fac0ca3dd44b22a69068 is supposed to have cancelled the connection...perhaps the close is timing out / cancel failing?

@jettify
Copy link
Member

jettify commented Jan 4, 2018

Issue is strange, any chance you reuse cursor/connection between different coroutines?

@thehesiod
Copy link
Contributor

@jettify not from what I see, here's what I do:

class _SyncCtxWrapper:
    # wraps an async context around a sync context, need this until https://github.com/aio-libs/aiopg/pull/265 is merged
    def __init__(self, sync_ctx):
        self._ctx = sync_ctx

    async def __aenter__(self):
        return self._ctx.__enter__()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return self._ctx.__exit__(exc_type, exc_val, exc_tb)

cursor_ctx = _SyncCtxWrapper(await self._pg_pool.cursor())
async with cursor_ctx as cur:
    await cur.execute(query, parameters)

@thehesiod
Copy link
Contributor

thehesiod commented Jan 4, 2018

seems like close is async, perhaps it's timing out, then it tries to close again and get the error. Sounds like perhaps it needs to force drop the conn from the pool? This is really rare, only happened once in a year in our prod environment

@DeadLemon
Copy link

DeadLemon commented Jan 6, 2018

Got same issue in staging environment after 10-15 min in idle, stable reproduce.
aiohttp==2.3.7, aiopg==0.13.2, peewee==2.10.2, peewee-async==0.5.10

@thehesiod
Copy link
Contributor

@AbdullaM5 could you try my patch in #415 to see if it helps? In my situation once I hit this error my aiopg pool was stuck. Also if you could provide a script that can reproduce this with a docker postgres server that would be awesome!

@runtel
Copy link
Contributor

runtel commented Jan 19, 2018

@thehesiod The patch did not help.

@jettify
Copy link
Member

jettify commented Jan 19, 2018

That patch already released in new aiopg version. Can you show code example that reproduces this issue?

@ekarlso
Copy link

ekarlso commented Feb 8, 2018

hi, I am getting the same issue on latest git

@smagafurov
Copy link
Contributor

Same here - "close cannot be used while an asynchronous query is underway" after "TimeoutError"

@thehesiod
Copy link
Contributor

anyone with a reproducible case?

@runtel
Copy link
Contributor

runtel commented Mar 15, 2018

Make issue on latest git:

import asyncio
from aiopg.sa import create_engine
import datetime
import sqlalchemy as sa
import random

test_table = sa.Table(
    'test_table', sa.MetaData(),
    sa.Column('va1'),
    sa.Column('va2'),
    sa.Column('va3'),
    sa.Column('va4'),
    sa.Column('va5'),
    sa.Column('va6'),
    sa.Column('va7'),
    sa.Column('va8'),
)

create = "CREATE TABLE test_table(" \
         "va1 varchar(100), " \
         "va2 varchar(100), " \
         "va3 varchar(100), " \
         "va4 varchar(100), " \
         "va5 varchar(100), " \
         "va6 varchar(100), " \
         "va7 varchar(100), " \
         "va8 varchar(100))"

select = test_table.select()
insert = test_table.insert().values(
    va1="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va2="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va3="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va4="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va5="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va6="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va7="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd",
    va8="1edsfsdfasdfasdfasdfadfdsfasdfasdfasdfasdfjhasdlfkhjalskdjhflkajsdhflkasdfasdfasdfd"
)
drop = "DROP TABLE test_table"

async def sql_insert(engine, insert):
    await asyncio.sleep(random.random())
    async with engine.acquire() as conn:
        rp = await conn.execute(select)
        # res = await rp.fetchall()

    async with engine.acquire() as conn:
        rp = await conn.execute(insert)
        print(rp)
    print(engine.size, engine.freesize)


async def clear(engine):
    async with engine.acquire() as conn:
        try:
            await conn.execute(drop)
        except:
            pass

        await conn.execute(create)


async def aiopg_test(engine):
    for i in range(1, 10001):
        await sql_insert(engine, insert)


async def test():
    engine = await create_engine(
        host="192.168.0.2",
        port=5432,
        database="demo",
        user="user",
        password="pass",
        minsize=1, maxsize=10)
    await clear(engine)

    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))
    asyncio.ensure_future(aiopg_test(engine))

    await asyncio.sleep(10000)

loop = asyncio.get_event_loop()
loop.run_until_complete(test())
loop.close()

If you uncomment the line "res = await rp.fetchall()" the error does not occur.

ProxyResult.fetchall() closing cursor, may be).

@thehesiod
Copy link
Contributor

@runtel running that script locally on my mac with python 3.6.3, latest released aiopg + psycopg2 and it just hangs. Perhaps want to create a docker container with it?

@runtel
Copy link
Contributor

runtel commented Mar 15, 2018

@thehesiod Engine connect to database? I was running script on same environment(mac + python 3.6.4, latest released aiopg + psycopg2).

@thehesiod
Copy link
Contributor

thehesiod commented Mar 15, 2018

aha, looks like there's a huge/infinite timeout by default for connect :) ok repro'd!

@thehesiod
Copy link
Contributor

thehesiod commented Mar 15, 2018

@runtel so in this example, the exception is happening due to the weakref in the SQ ResultsProxy trying to close the cursor. (update) second run with maxsize=1 and the issue still happens, interesting...digging

@thehesiod
Copy link
Contributor

@runtel ok figured it out, in this function:

    async with engine.acquire() as conn:
        rp = await conn.execute(insert)
        print(rp)

after print, _PoolAcquireContextManager.__aexit__ is called, which released the connection back to the free pool, after this returns, the weakref's finalizer in aiopg.sa.result.ResultProxy (self._weak) is called, trying to close the connection. Given that the connection was already returned to the pool, it may be in an active execute triggering the error. Now, how to fix this....obviously having a weakref that does a close sounds really dangerous. @jettify for ideas

@thehesiod
Copy link
Contributor

I think maybe each connection needs to keep track of all the cursors, and then release then when the connection is released back to the pool, with the cursor.close() then becoming a no-op in this case

@jettify
Copy link
Member

jettify commented Mar 15, 2018

Tricky problem. To workaround this issue aiopg.sa.result.ResultProxy should be explicitly closed before returning connection.

@thehesiod
Copy link
Contributor

I'll work on a PR to fix this in aiopg as well. First updating all my old PRs :)

@jettify
Copy link
Member

jettify commented Mar 15, 2018

I think tracking cursors is good idea, also we should:

  1. force one cursor per connection in any given time
  2. close or error if previous cursor is not finalized

@runtel
Copy link
Contributor

runtel commented Mar 16, 2018

I have one more question) When i init many engine.acquire(), aiopg should append new connections to pool. This happening, but "slowly". Try to set maxsize=10.

@thehesiod
Copy link
Contributor

@runtel sorry not following your last comment, in your example maxsize=10, have another example?

@thehesiod
Copy link
Contributor

@jettify should we bump to 0.14 as it can change behavior?

@runtel
Copy link
Contributor

runtel commented Mar 22, 2018

I can make test on production. Is need?

@kjopek
Copy link

kjopek commented Jul 31, 2018

This is really important patch also for us. When it will be merged?

@thehesiod
Copy link
Contributor

all my aiopg PRs have been stuck :(

@runtel
Copy link
Contributor

runtel commented Sep 18, 2018

Up)

@nikitagromov
Copy link

You should close cursor by calling cursor.close() after using fetchone.

@vir-mir
Copy link
Member

vir-mir commented Nov 3, 2018

@asvetlov release new version, please

@vir-mir
Copy link
Member

vir-mir commented Mar 23, 2019

Dear all. Please see this pull request #548.

@Pliner Pliner mentioned this issue Mar 21, 2021
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.