Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

_query_lock implementation causes "Future attached to a different loop" error when gather() is used with global Database(force_rollback=True) instance in app tests #157

@rafalp

Description

@rafalp

This is tangentially related to #125

When writing tests for my project I've found that following Starlette's database usage docs and Databases tests docs in app results in setup that makes it impossible to test application logic that uses gather to initialize multiple queries at single time.

Reproduction

Starlette's database example directs developers to create single Database instance for their application:

database = databases.Database(DATABASE_URL)

app = Starlette()

app.add_event_handler("startup", database.connect)
app.add_event_handler("shutdown", database.disconnect)

However in tests you would like to enforce some sort of isolation between tests. Easy way to achieve this is to run all tests in isolation. Hence force_rollback flag on Database. If developer updates their implementation, they will end with something like this:

if TESTING:
    database = Database(TEST_DATABASE_URL, force_rollback=True)
else:
    database = Database(DATABASE_URL)

app = Starlette()

app.add_event_handler("startup", database.connect)
app.add_event_handler("shutdown", database.disconnect)

This code will run fine until we try to use gather somewhere in the logic and write a test for it:

from asyncio import gather

from .database import database


async def logic_using_gather():
    return await gather(
        database.fetch_one("SELECT pg_sleep(1)"),
        database.fetch_one("SELECT pg_sleep(1)"),
    )


@pytest.fixture(scope="function")
async def db():
    async with database:
        yield


@pytest.mark.asyncio
async def test_my_logic(db):
    await logic_using_gather()

We will get an error:

RuntimeError: Task <Task pending coro=<Database.execute() running at /usr/local/lib/python3.7/site-packages/databases/core.py:122> cb=[gather.<locals>._done_callback() at /usr/local/lib/python3.7/asyncio/tasks.py:691]> got Future <Future pending> attached to a different loop

The isssue

When Database is initialized with force_rollback=True, it internally creates single "global connection" object which in turn internally creates asyncio.Lock instance for itself.

asyncio.Lock can be initialized with loop argument, but if none is provided, it will try to get current running event loop, or create new event loop is none exists. The former is the case when app is running behind one of ASGI servers that provide event loop, while latter is the case in the test setups.

Error is result of test-runner provided event loop running task which itself tries to await asyncio.Lock() that was initialized with its own event loop. This is not the case when queries are ran one after another, because first query will release the lock before next one is sent to event loop for execution:

await database.fetch_one("SELECT pg_sleep(1)")
await database.fetch_one("SELECT pg_sleep(1)")

However when gather is used, both tasks are sent to event loop at same time, and because _query_lock._locked is already set to True, await _query_lock will cause asyncio.Lock to create and send waiting task to event loop. This task will be sent to different event loop than one running rest of the logic, producing the resulting error.

Reduced example

Reaching back to the discussion in #125, its pointed out that there's a test in databases for using gather to await multiple queries, but this test checks unrealistic use of Database together with gather:

@pytest.mark.parametrize("database_url", DATABASE_URLS)
@async_adapter
async def test_concurrent_access_on_single_connection(database_url):
    async with Database(database_url, force_rollback=True) as database:

        async def db_lookup():
            if str(database_url).startswith("postgresql"):
                await database.fetch_one("SELECT pg_sleep(1)")

        await asyncio.gather(db_lookup(), db_lookup())

Whereas more real (and failing) usage would look like this:

@pytest.mark.parametrize("database_url", DATABASE_URLS)
def test_concurrent_access_on_single_connection(database_url):
    database = Database(database_url, force_rollback=True)

    @async_adapter
    async def run_db_lookups():
        async with database:
            async def db_lookup():
                if str(database_url).startswith("postgresql"):
                    await database.fetch_one("SELECT pg_sleep(1)")

            await asyncio.gather(db_lookup(), db_lookup())
    
    run_db_lookups()

Above test will fail with same error because now database and async_adapter will use two different event loops for their logic.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions