Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarification on transaction isolation and management #424

Closed
cochiseruhulessin opened this issue Nov 16, 2021 · 6 comments
Closed

Clarification on transaction isolation and management #424

cochiseruhulessin opened this issue Nov 16, 2021 · 6 comments
Labels
clean up Refinement to existing functionality

Comments

@cochiseruhulessin
Copy link

Consider the following simulation of concurrent access:

# pylint: skip-file
import asyncio
import os

from databases import Database


async def tx1(db):
    async with db.transaction():
        await db.execute("INSERT INTO foo VALUES (1)")
        await asyncio.sleep(1.5)


async def tx2(db):
    async with db.transaction():
        await asyncio.sleep(0.5)
        result = await db.execute("SELECT * FROM foo")
        assert result is None, result
        await asyncio.sleep(1)


async def main():
    db = Database("postgresql://rdbms:rdbms@localhost")
    await db.connect()
    await db.execute("CREATE TABLE IF NOT EXISTS foo (bar int4)")
    await db.execute("TRUNCATE foo CASCADE")

    await asyncio.gather(
        tx1(db.connection()),
        tx2(db.connection())
    )


if __name__ == '__main__':
    asyncio.run(main())

This code should exit succesfully, but either fails with cannot perform operation: another operation is in progress (which is also weird because a new connection is requested) or at the assert statement. Please provide some clarification regarding the expected transactional behavior and isolation of this module.

@cochiseruhulessin cochiseruhulessin changed the title No real transaction isolation? Clarification on transaction isolation and management Nov 16, 2021
@aminalaee
Copy link
Member

aminalaee commented Nov 18, 2021

I don't have that much knowledge of connection internals, but as I understand the two calls of db.connection() return the same connection object.

To get different connections maybe we can do this instead of passing the connection?

async with db.transaction():
    ...

Now each transaction will have different connection.

@patte
Copy link

patte commented Nov 26, 2021

I'm very much interested in the same question.

While digging I found the 2 possible solutions to the problem in the script provided by @cochiseruhulessin

use a new connection with: db._new_connection()

script
# pylint: skip-file
import asyncio
import os

from databases import Database


async def tx1(db):
    async with db.transaction():
        await db.execute("INSERT INTO foo VALUES (1)")
        await asyncio.sleep(1.5)


async def tx2(db):
    async with db.transaction():
        await asyncio.sleep(0.5)
        result = await db.execute("SELECT * FROM foo")
        assert result is None, result
        await asyncio.sleep(1)


async def main():
    db = Database("postgresql://postgres:postgres@localhost/portal")
    await db.connect()
    await db.execute("CREATE TABLE IF NOT EXISTS foo (bar int4)")
    await db.execute("TRUNCATE foo CASCADE")

    await asyncio.gather(
        tx1(db._new_connection()),
        tx2(db._new_connection())
    )


if __name__ == '__main__':
    asyncio.run(main())

use the connection on transaction with: transaction._connection

script
# pylint: skip-file
import asyncio
import os

from databases import Database


async def tx1(db):
    async with db.transaction() as transaction:
        await transaction._connection.execute("INSERT INTO foo VALUES (1)")
        await asyncio.sleep(1.5)


async def tx2(db):
    async with db.transaction() as transaction:
        await asyncio.sleep(0.5)
        result = await transaction._connection.execute("SELECT * FROM foo")
        assert result is None, result
        await asyncio.sleep(1)


async def main():
    db = Database("postgresql://postgres:postgres@localhost/portal")
    await db.connect()
    await db.execute("CREATE TABLE IF NOT EXISTS foo (bar int4)")
    await db.execute("TRUNCATE foo CASCADE")

    await asyncio.gather(
        tx1(db),
        tx2(db)
    )


if __name__ == '__main__':
    asyncio.run(main())

Both work, but neither is ideal (accessing protected members). Personally I think the second approach (use the connection on the transaction) feels much more natural and clear (similar to how pogi does it). And it would only require to expose the already existing separate connection on the transaction (core.py#L207).

Side note: I need to make queries in fastAPI route handlers. The code in the documentation suggests to use one connection for all queries. How to start and use a transaction on this connection without impacting concurrent requests also accessing the DB? Or is it better to acquire a separate connection on each request as this example is suggesting?

@aminalaee
Copy link
Member

aminalaee commented Nov 30, 2021

I agree that this isn't clear and the accessing transaction connection can be improved. PRs are welcome.

As for your last comment, the reason why the docs say you should use a single connection is that the underlying connection is actually a connection pool.
I don't think using connection-per-request would make sense, soon you'll reach the limits. Transaction-per-request should be allowed, it's been around in Django too.
Maybe we can have different isolation levels for creating transactions, I think SQLAlchemy does that too. I need to take a look into it.

asyncpg Transaction docs here.

@aminalaee aminalaee added the clean up Refinement to existing functionality label Nov 30, 2021
@aminalaee
Copy link
Member

aminalaee commented Dec 1, 2021

Update: We already have support for specifying isolation-level here.

So this should work for the isolation levels and needs to be documented:

db = Database("postgresql://...")
await db.connect()


async with db.transaction(isolation="serialisable"):
   db.execute("SELECT 1")

But using the isolation level won't solve the issue in the example alone.

@aminalaee
Copy link
Member

Added isolation docs in #434 .

@zevisert
Copy link
Contributor

I see this as another fallout from the current (databases <= 0.7.0) handling of ContextVar that I am fixing in #546.

    await asyncio.gather(
        tx1(db.connection()),
        tx2(db.connection())
    )

asyncio.gather creates creates two asyncio.Tasks, which may have been overwriting eachother's references to the active connection and transaction each task should have been using, which would cause the cannot perform operation: another operation is in progress error you had been seeing.

As part of #546 I converted your example into a test, and it looked roughly like this:

@pytest.mark.parametrize("database_url", DATABASE_URLS)
async def test_parallel_transaction_isolation(database_url):
    metadata = sqlalchemy.MetaData()

    notes = sqlalchemy.Table(
        "notes",
        metadata,
        sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
        sqlalchemy.Column("text", sqlalchemy.String(length=100)),
        sqlalchemy.Column("completed", sqlalchemy.Boolean),
    )

    engine = sqlalchemy.create_engine(database_url)
    metadata.create_all(engine)

    setup = asyncio.Event()
    done = asyncio.Event()

    async def tx1(connection):
        async with connection.transaction():
            await db.execute(
                notes.insert(), values={"id": 1, "text": "tx1", "completed": False}
            )
            setup.set()
            await done.wait()

    async def tx2(connection):
        async with connection.transaction():
            await setup.wait()
            result = await db.fetch_all(notes.select())
            assert result == [], result
            done.set()


    async with Database(database_url) as db:
        async with db.connection() as conn:
            await asyncio.gather(tx1(conn), tx2(conn))

And I have a few thoughts for you @cochiseruhulessin to ponder, minor things first:

  • Using asyncio.sleep is less guaranteed than asyncio.Event(), so I've switched to that. Same intent applies though:
    1. Create a new row in one transaction
    2. Before the first transaction closes, enter a new transaction and make sure that inital row isn't observed by the other transaction
  • db.execute returns the number of rows affected, or -1, but never None, as such your assertion will always fail. I switched to db.fetch_all which returns a list of selected rows.

Now the main point here:

  • db.connection() returns the same object. So your use of that prior to starting the new tasks means you end up with code logically equivalent to this:
    async with connection.transaction():
        async with connection.transaction(): # both tx1 and tx2 transactions were opened before any sleep calls
            await connection.execute(
                notes.insert(), values={"id": 1, "text": "tx1", "completed": False}
            )
    
            setup.set()
            await setup.wait() # Events help show when control switches to another task
    
            result = await db.fetch_all(notes.select())
            assert result == [], result
    
            done.set()
            await done.wait() # Again just synchronization 
    • In this scenario, because the connection is shared, you actually end up with nested transactions. If you let each task acquire it's own connection the cannot perform operation: another operation is in progress bug you're mentioning still comes up in current versions of databases, but fix: incorrect concurrent usage of connection and transaction #546 will fix that. What this means is that you'll then be in this "virtual nested transactions" scenario because of the shared connection.
    • If you change your code to let each task get it's own connection, you should see the isolation you're expecting;
      async with Database(database_url) as db:
          await asyncio.gather(tx1(db), tx2(db))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clean up Refinement to existing functionality
Projects
None yet
Development

No branches or pull requests

5 participants