Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent queries on single connection #81

Closed
ryananguiano opened this issue Apr 15, 2019 · 18 comments · Fixed by #108
Closed

Concurrent queries on single connection #81

ryananguiano opened this issue Apr 15, 2019 · 18 comments · Fixed by #108
Labels
feature New feature or request

Comments

@ryananguiano
Copy link

When using asyncpg, I am getting InterfaceError: cannot perform operation: another operation is in progress when running multiple db calls concurrently.

I am able to replicate and isolate the issue with force_rollback=True, although in my project where I encountered this bug, it is happening regardless.

Here is the isolated issue:

import asyncio
import databases
from starlette.applications import Starlette
from starlette.config import Config
from starlette.responses import JSONResponse

config = Config('.env')
DATABASE_URL = config('DATABASE_URL', default='postgresql://postgres@localhost:5432/postgres')

database = databases.Database(DATABASE_URL, force_rollback=True)
app = Starlette()


@app.on_event("startup")
async def startup():
    await database.connect()


@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()


@app.route("/test", methods=["GET"])
async def test_view(request):
    await asyncio.gather(
        get_from_db(),
        get_from_db(),
    )
    return JSONResponse({"success": True})


async def get_from_db():
    return await database.fetch_all("SELECT pg_sleep(1)")


if __name__ == '__main__':
    from starlette.testclient import TestClient
    with TestClient(app) as test_client:
        test_client.get('/test')

Result

$ python app.py
Traceback (most recent call last):
  File "app.py", line 40, in <module>
    test_client.get('/test')
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/requests/sessions.py", line 546, in get
    return self.request('GET', url, **kwargs)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 382, in request
    json=json,
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 211, in send
    raise exc from None
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 208, in send
    loop.run_until_complete(connection(receive, send))
  File "/Users/ryan/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/middleware/errors.py", line 125, in asgi
    raise exc from None
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/middleware/errors.py", line 103, in asgi
    await asgi(receive, _send)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/exceptions.py", line 74, in app
    raise exc from None
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/exceptions.py", line 63, in app
    await instance(receive, sender)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/routing.py", line 41, in awaitable
    response = await func(request)
  File "app.py", line 28, in test_view
    get_from_db(),
  File "app.py", line 34, in get_from_db
    return await database.fetch_all("SELECT pg_sleep(1)")
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 95, in fetch_all
    return await connection.fetch_all(query, values)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 179, in fetch_all
    return await self._connection.fetch_all(self._build_query(query, values))
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/backends/postgres.py", line 137, in fetch_all
    rows = await self._connection.fetch(query, *args)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 421, in fetch
    return await self._execute(query, args, 0, timeout)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 1412, in _execute
    with self._stmt_exclusive_section:
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 1847, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "app.py", line 40, in <module>
    test_client.get('/test')
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 415, in __exit__
    loop.run_until_complete(self.wait_shutdown())
  File "/Users/ryan/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 435, in wait_shutdown
    self.task.result()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 420, in lifespan
    await inner(self.receive_queue.get, self.send_queue.put)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/routing.py", line 483, in asgi
    await self.shutdown()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/routing.py", line 468, in shutdown
    await handler()
  File "app.py", line 21, in shutdown
    await database.disconnect()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 74, in disconnect
    await self._global_transaction.__aexit__()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 256, in __aexit__
    await self.rollback()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 297, in rollback
    await self._transaction.rollback()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/backends/postgres.py", line 215, in rollback
    await self._transaction.rollback()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/transaction.py", line 219, in rollback
    await self.__rollback()
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/transaction.py", line 198, in __rollback
    await self._connection.execute(query)
  File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 273, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 301, in query
  File "asyncpg/protocol/protocol.pyx", line 659, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

When running into this issue with force_rollback=False, since there isn't supposed to be a global connection and transaction, I was able to get around the issue by changing the following function:

async def get_from_db():
    return await database.fetch_all("SELECT pg_sleep(1)")

to:

async def get_from_db():
    async with Connection(database._backend) as conn:
        return await conn.fetch_all("SELECT pg_sleep(1)")

I tried using async with database.connection() as conn: but it would return the same connection that is being currently being used. I guess I was just trying to understand the logic of sharing the pool connection, instead of letting the pool allocate connections (when available), since asyncpg Connection only allows one statement at a time.

@ryananguiano
Copy link
Author

The two solutions I can think of would be to either:

  • Put a lock on the connection while processing a statement
  • Let the pool allocate the connections instead of sharing it in contextvars

@ryananguiano
Copy link
Author

What about being able to do something like this:

async def get_from_db():
    async with database.connection(new_connection=True) as conn:
        return await conn.fetch_all("SELECT pg_sleep(1)")

@tomchristie
Copy link
Member

So you'll only be seeing that issue when you run within a transaction, right, since that's the case where the two operations are running against the same connection?

Are you able to submit a failing test case for this (even if it only runs against asyncpg that'd be fine).

@tomchristie
Copy link
Member

And yes, I think we'll need to have a asyncio lock that's acquired for the connection. It hadn't really occured to me that if you use asyncio flow branching, then you could end up having concurrent operations on the same connection.

ryananguiano added a commit to ryananguiano/databases that referenced this issue Apr 18, 2019
@ryananguiano
Copy link
Author

OK. PR submitted.

I think the lock will solve the issue, as there are legitimate reasons to branch inside of a transaction. But in some cases, I would like to execute a query unrelated to the transaction and run that concurrently in a separate connection.

Do you think that the new_connection=True parameter would be a good idea instead of instantiating manually (Connection(database._backend))?

@tomchristie
Copy link
Member

I'd suggest we treat the "new connection" as a separate issue to this.

We could make a case that we don't support flow branching against a transaction, since eg. the equivalent wouldn't be supported in threading cases either, but I think it's probably worth us covering. Happy to take a look at any pull requests towards this, or else will aim to get onto it at some point.

@dmontagu
Copy link

dmontagu commented May 9, 2019

@tomchristie I'm running into this issue outside of a transaction.

In particular, it seems that the error happens precisely if I ever await a database.execute or similar at one point in the execution, and then later use asyncio.gather to concurrently make requests. Weirdly (to me), as long as I always call the database methods from inside asyncio.gather, I can keep making concurrent requests (I couldn't get to the bottom of the connection mechanics to figure out why this would be the case).

The following code is my attempt at a minimal reproducible demonstration of the behavior. The test1 always succeeds, demonstrating that there is no problem calling concurrent database executions as long as I always call them using asyncio.gather. However, the test2 call results in asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

import asyncio
from databases import Database

if __name__ == "__main__":
    database_uri = "<removed>"

    async def test1():
        database = Database(database_uri)
        await database.connect()
        await asyncio.gather(
            database.execute(query="SELECT 1"),
            database.execute(query="SELECT 1"),
        )
        await asyncio.gather(
            database.execute(query="SELECT 1"),
            database.execute(query="SELECT 1"),
        )
        await database.execute(query="SELECT 1")
        print("Test 1 succeeded")
        print("----------------")


    async def test2():
        database = Database(database_uri)
        await database.connect()
        await database.execute(query="SELECT 1")
        await asyncio.gather(
            database.execute(query="SELECT 1"),
            database.execute(query="SELECT 1"),
        )
        print("Test 2 succeeded")


    asyncio.get_event_loop().run_until_complete(test1())
    asyncio.get_event_loop().run_until_complete(test2())

The result of the execution is:

Test 1 succeeded
----------------
Traceback (most recent call last):
  File "/app/app/core/database.py", line 107, in <module>
    asyncio.get_event_loop().run_until_complete(test2())
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/app/app/core/database.py", line 101, in test2
    database.execute(query="SELECT 1"),
  File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 122, in execute
    return await connection.execute(query, values)
  File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 204, in execute
    return await self._connection.execute(self._build_query(query, values))
  File "/usr/local/lib/python3.7/site-packages/databases/backends/postgres.py", line 157, in execute
    return await self._connection.fetchval(query, *args)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 439, in fetchval
    data = await self._execute(query, args, 1, timeout)
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1412, in _execute
    with self._stmt_exclusive_section:
  File "/usr/local/lib/python3.7/site-packages/asyncpg/connection.py", line 1847, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

Any thoughts on how to get around this would be appreciated!

@ryananguiano
Copy link
Author

Ah! I was experiencing the same issue but was not able to replicate it in isolation.

I think the asyncio.Lock solution should solve this.

@dmontagu
Copy link

dmontagu commented May 10, 2019

Indeed, overriding the Database methods to be called inside asyncio.gather seems to solve all issues:

class MyDatabase(Database):
    async def execute(self, *args, **kwargs):
        return (await asyncio.gather(super().execute(*args, **kwargs)))[0]

    async def fetch_all(self, *args, **kwargs):
        return (await asyncio.gather(super().fetch_all(*args, **kwargs)))[0]

(Replacing Database with MyDatabase in the test1 and test2 calls above, both calls execute successfully.)

I would appreciate any insight into why this works (I only discovered it while trying to produce test cases).

@dmontagu
Copy link

dmontagu commented May 10, 2019

@ryananguiano It looks like an asyncio lock is getting used already (https://github.com/encode/databases/blob/master/databases/core.py#L164), is this not what you are describing?

Also, I haven't actually checked whether the queries are being executed concurrently or if there is a lock preventing concurrent execution. My goal would be to be able to make unrelated (read) queries concurrently so I don't have to wait for multiple DB round trips; if an async lock is still forcing queries to be executed sequentially then that would defeat the primary purpose of my workaround anyway. Is that a reasonable thing to be aiming to achieve with this package/asyncpg?

@ryananguiano
Copy link
Author

One of the issues I ran into was using force_rollback=True in my testing suite and still needing asyncio.gather() to work, and so locking an individual connection would be necessary to prevent the error in that case.

But I do think that a separate solution is probably necessary to solve the issue that you brought up. Your solution could work. I will look at this closely so I can try to understand why that fixes it because there could possibly be a simpler solution.

@LKay
Copy link

LKay commented Jun 20, 2019

I'm having the same issue when using asyncio.gather to run multiple db operations in parallel. Was there any update on this issue recently?

Is there any work around to overcome this or any way to use some other driver such as aiopg (as per #39) which might handle that differently?

@tomchristie
Copy link
Member

Is there any work around to overcome this

Given that we'd need to add locking and force sequential queries, I'd suggest not attempting to execute queries in parallel on the same connection, or using explicit locking in user code. I'd assume other drivers will have exactly the same behavior.

Note that we I don't think we'd expect thread-concurrency drivers to deal with "fork-and-use-same-connection-for-concurrent-queries" to operate correctly, although I'd be interested to hear differently.

(Eg. https://docs.python.org/3/library/sqlite3.html#sqlite3.connect "When using multiple threads with the same connection writing operations should be serialized by the user to avoid data corruption.")

It looks like an asyncio lock is getting used already

@dmontagu - Nope, that's just a lock around the connection acquiry logic, rather than against queries on the connection.

Things we could consider here:

  • API to forcibly return a new connection, independant of any existing connection/transaction.
  • A connection-level Lock around all query operations.

@tomchristie
Copy link
Member

Similar (but for the thread-concurrency case): "The MySQL protocol can not handle multiple threads using the same connection at once." - https://mysqlclient.readthedocs.io/user_guide.html?highlight=thread#mysqldb

@tomchristie
Copy link
Member

psycopg2 does handle it, by locking appropriately. http://initd.org/psycopg/docs/usage.html#thread-and-process-safety

@tomchristie tomchristie changed the title InterfaceError with postgres connections and asyncio.gather Concurrent queries on single connection Jun 20, 2019
@tomchristie
Copy link
Member

I believe this would handle the locking correctly: #108

@tomchristie tomchristie added the feature New feature or request label Jun 20, 2019
@tomchristie
Copy link
Member

Probably going to leave this for a couple of days to think over first.

If you actually want concurrent access then you need to perform any flow branching outside the context of a connection or transaction. Merging #108 would resolve the behavior, by enforcing sequential access on the same connection. I guess that we probably want that supported, but happy to take any feedback before we merge it in.

Also possible that we'll want some explicit connection managment API for cases where that's what you really want (a bit similar to #67).

@tchalupnik
Copy link

tchalupnik commented Aug 3, 2021

I have found workaround for it that works for me:
I had to run asyncio gather with return exceptions because rollback from risen exception was stucking the application so now when error occurs I see it in main thread and reraise it in current asynciothread without collision

tx = db.transaction()
try:
    await tx.start()
    results = await asyncio.gather(*queries, return_exceptions=True)
    if any(isinstance(r, Exception) for r in results):
        raise Exception
except Exception as e:
    await tx.rollback()
    raise e
else:
    await tx.commit()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants