Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Combine args of executemany() in batches #289

Closed
wants to merge 2 commits into from

Conversation

fantix
Copy link
Member

@fantix fantix commented May 14, 2018

Is it a good idea to combine all the args in executemany() into a single network packet single buffer to send (thx Yury) with a list of Bind and Execute command pairs? And, is it a good idea to make this method atomic? Please kindly give me some advices when time. Many thanks!

  • Ends with only one Sync command, that means executemany() will be atomic in an implicit transaction, if not called from an existing transaction.
  • This should reduce round-trip time to database back and forth, with a cost that ...
  • memory usage is no longer smooth - it will have to encode all the bind args into memory before sending to database in a batch.
    • Theoretically it is possible to combine the args into small groups, in order for a balance between reduced RTT and memory usage, as well as to detect an error earlier.
  • Assuming that reducing RTT is helpful, this RFC also added executemany() to prepared statement.

References:

@1st1
Copy link
Member

1st1 commented May 22, 2018

Is it a good idea to combine all the args in executemany() into a single network packet with a list of Bind and Execute command pairs?

Yes, although I wouldn't do that exhaustively. Instead, I think, it makes sense to batch queries. Say one wants to execute 990 queries, we can do that in 10 steps by with batch size set to 100.

This preserves backwards compatibility and allows to execute thousands of queries via a single await call efficiently. I'm -1 on the current solution.

And, is it a good idea to make this method atomic?

I'd say that asyncpg is a low-level interface and shouldn't provide any transactional guarantees. Wrapping an await executemany() line in an async with transaction() is simple enough. @elprans your thoughts?

@elprans
Copy link
Member

elprans commented May 22, 2018

I think adding batchsize and documenting the lack of implicit transactionality is enough.

@fantix
Copy link
Member Author

fantix commented May 23, 2018

Got it, thanks! I'm with you. Will update the PR.

@1st1
Copy link
Member

1st1 commented May 23, 2018

Wait ;) I'll post an update on this in 30 minutes ;)

@fantix
Copy link
Member Author

fantix commented May 23, 2018

No hurry 😃

@1st1
Copy link
Member

1st1 commented May 23, 2018

Quoting a comment from the PR:

      The execution was changed to be in a implicit transaction if there
      was no explicit transaction, so that it will no longer end up with
      partial success. 

IIRC combining Postgres messages in one write doesn't guarantee atomicity. Only using explicit BEGIN; / COMMIT; does.

      It also combined all args into one network packet
      to reduce round-trip time, therefore you should make sure not to
      blow up your memory with a super long iterable.

A single network packet isn't that big usually. The PR combines all data in a single buffer, which can result in many packets sent with arbitrary delays between them by the OS.

A few more comments:

Batching by number of queries is actually suboptimal. Packed arguments of a single query can require an arbitrary big buffer, so if we try to guess an optimal a number of queries per batch we can end up in a situation when our write buffers are too small or too big. Giving users a configurable option also doesn't make a lot of sense.

Currently we simply send queries one by one. This PR changes that to accumulate all of them in a single big write. This is wrong as it effectively disables flow control when a large number of queries is batched.

Instead, I propose to batch writes in 32kb blocks. A single block can fit many or just one query. The block size won't be configurable (it will be a constant in protocol/consts.pxi.)

We can later optimize this by creating up to four 32kb blocks at a time and calling transport.writelines(), which is optimized (in uvloop) to write them in the most efficient way without copying.

Now, adding batching isn't, strictly speaking, a backwards compatible change. Ideally, we should add a new keyword-only batch=False argument. The PR that adds this shouldn't change the current code too much: basically we just need to add some code for buffer gathering. I propose to close this PR and work on batching based on a buffer size in a new PR.

@elprans your thoughts?

@fantix
Copy link
Member Author

fantix commented May 23, 2018

It must be pretty late in the night at your place. Thanks for the reply!

The execution was changed to be in a implicit transaction if there
was no explicit transaction, so that it will no longer end up with
partial success.

IIRC combining Postgres messages in one write doesn't guarantee atomicity. Only using explicit BEGIN; / COMMIT; does.

Combining doesn't, using a single Sync does I think, according to PostgreSQL docs here:

At completion of each series of extended-query messages, the frontend should issue a Sync message. This parameterless message causes the backend to close the current transaction if it's not inside a BEGIN/COMMIT transaction block (“close” meaning to commit if no error, or roll back if error).

And the test_execute_many_atomic() did pass, so I thought it was at least atomic. But I'm not sure if an implicit transaction bounded by Sync is fully ACID or not. Looking into PostgreSQL source code, Sync triggers a finish_xact_command() which commits the transaction. Following deeper into the rabbit hole, it does look like the same transaction control as BEGIN; / COMMIT;.

It also combined all args into one network packet
to reduce round-trip time, therefore you should make sure not to
blow up your memory with a super long iterable.

A single network packet isn't that big usually. The PR combines all data in a single buffer, which can result in many packets sent with arbitrary delays between them by the OS.

Oh yes, thanks for the correction!

Instead, I propose to batch writes in 32kb blocks. A single block can fit many or just one query. The block size won't be configurable (it will be a constant in protocol/consts.pxi.)

We can later optimize this by creating up to four 32kb blocks at a time and calling transport.writelines(), which is optimized (in uvloop) to write them in the most efficient way without copying.

That would be awesome! I'll look into uvloop code for this.

@1st1
Copy link
Member

1st1 commented May 23, 2018

At completion of each series of extended-query messages, the frontend should issue a Sync message. This parameterless message causes the backend to close the current transaction if it's not inside a BEGIN/COMMIT transaction block (“close” meaning to commit if no error, or roll back if error).

And the test_execute_many_atomic() did pass, so I thought it was at least atomic. But I'm not sure if an implicit transaction bounded by Sync is fully ACID or not. Looking into PostgreSQL source code, Sync triggers a finish_xact_command() which commits the transaction. Following deeper into the rabbit hole, it does look like the same transaction control as BEGIN; / COMMIT;.

Interesting, thanks for looking into this! I'll also take a look at postgres source tomorrow.

In any case, I now think we should always batch queries in executemany() (i.e. there's no need for a new batch=False keyword-only parameter). Let's look at how executemany() works without and with batching:

  • Current implementation: every query message is sent with a trailing 'Sync' message. If an N-th query fails, N-1 successful queries before it are "committed".

  • With batching: we send a bunch of queries and a single 'Sync' message per batch. If an N-th query fails, approximately N - (number_of_successful_batches * average_batch_size) queries are "committed".

So batching might change how many queries are committed before the failed one. In practice, though, it shouldn't really matter. Usually a user won't be able to pinpoint the exact point of failure anyways. Either they do handle errors in executemany() by wrapping their code in a transaction or they don't, and batching won't change anything.

Therefore we can probably always batch and we should better document executemany() explaining cases where it's recommended to run it in an explicit transaction.

Lastly, it would be interesting to implement the basic batching and run benchmarks. I expect to see a nice perf improvement, but if we don't see any then we don't want to complicate the code.

@elprans
Copy link
Member

elprans commented May 23, 2018

I think we should either send one Sync per iteration, or one Sync per the whole thing. Otherwise the transactional behaviour of executemany becomes unpredictable.

I'm leaning toward making executemany atomic regardless of how we batch Execute's.

@1st1
Copy link
Member

1st1 commented May 23, 2018

I think we should either send one Sync per iteration, or one Sync per the whole thing. Otherwise the transactional behaviour of executemany becomes unpredictable.

Alright, let's try to send one Sync at the end of all batches.

Here's a concrete plan:

  1. We essentially want a similar implementation to Protocol.copy_in(): batch bind args into buffers and use asyncio flow control to send them. _COPY_BUFFER_SIZE is currently set to 512kb, I think we need a smaller buffer size for executemany; I'd set it to 32kb.

  2. If an error occurs at any point we stop sending data, issue a Sync message, wait for the response, and then propagate the error.

  3. Lastly, with the last batch (and if no error has yet occurred) we send a Sync message, and wait for the response.


Strictly speaking this is a backwards incompatible change:

  • currently some of the queries sent via executemany() get committed before an error.
  • we want to rollback everything to the state before the call if an error has occurred.

In practice, I doubt that there are valid use cases where a user wants a partial commit, so we can just go forward with the proposal and call it a bugfix and perf-improvement.

@sergeyspatar
Copy link

sergeyspatar commented May 23, 2018

Hi guys, I want to share with you my recent observation which may be relevant to your discussion here.

I have an executemany() that updates a bunch of rows in a table by primary key without explicit transaction. Another process updates rows in the same table in a transaction, and the order of these updates is random. From time to time asyncpg throws DeadlockDetectedError:

  File "/usr/lib/python3/dist-packages/asyncpg/connection.py", line 265, in executemany
    return await self._executemany(command, args, timeout)
  File "/usr/lib/python3/dist-packages/asyncpg/connection.py", line 1318, in _executemany
    result, _ = await self._do_execute(query, executor, timeout)
  File "/usr/lib/python3/dist-packages/asyncpg/connection.py", line 1333, in _do_execute
    result = await executor(stmt, None)
  File "asyncpg/protocol/protocol.pyx", line 239, in bind_execute_many
asyncpg.exceptions.DeadlockDetectedError: deadlock detected
DETAIL:  Process 23343 waits for ShareLock on transaction 166814612; blocked by process 7594.
Process 7594 waits for ShareLock on transaction 166814613; blocked by process 23343.
HINT:  See server log for query details.

I thought executemany() is equivalent to running execute() in a for-loop, just optimized for the same SQL command. But it seems, there is an implicit transaction involved. There is nothing the docs about it. Would it be better if a user could choose whether he/she wants all commands to commit or fail together or independently until first failed/all successful?

For me the only way to avoid dead locks now is to rewrite one-line code to a loop, which will also execute slower.

@elprans
Copy link
Member

elprans commented May 23, 2018

@sergeyspatar Currently, executemany is almost equivalent to a for loop, as it "commits" the changes on every iteration. That said, all SQL commands run in an implicit transaction, so you can get a deadlock regardless. The correct way to fix this is to avoid doing updates in this fashion, as the PostgreSQL documentation recommends:

The best defense against deadlocks is generally to avoid them by being certain that all applications using a database acquire locks on multiple objects in a consistent order.

If doing consistent updates is not possible, then retrying on a DeadlockDetectedError would be a valid recourse. (once executemany is made atomic)

@fantix
Copy link
Member Author

fantix commented May 24, 2018

Sorry I'm a bit confused - a transaction with a single UPDATE SQL shouldn't cause a deadlock I think? I tried below test with asyncpg 0.15.0 and master, it never fails in minutes:

import asyncio
import asyncpg

dsn = 'postgresql://localhost/postgres'


async def tx():
    conn = await asyncpg.connect(dsn)
    while True:
        async with conn.transaction():
            print('in transaction')
            # update id=2 first to cause a deadlock
            await conn.execute(
                'UPDATE test_deadlock SET value = 20 WHERE id = 2')
            print('after update 2')
            await conn.execute(
                'UPDATE test_deadlock SET value = 10 WHERE id = 1')
            print('after update 1')


async def main():
    # initialize table and data for the test
    conn = await asyncpg.connect(dsn)
    try:
        await conn.execute(
            'CREATE TABLE test_deadlock (id INT PRIMARY KEY, value INT)')
        await conn.execute('INSERT INTO test_deadlock VALUES (1, null)')
        await conn.execute('INSERT INTO test_deadlock VALUES (2, null)')
    except asyncpg.DuplicateTableError:
        pass

    # start concurrent coroutine and set exit condition
    running = [True]
    asyncio.ensure_future(tx()).add_done_callback(lambda fut: running.clear())

    while running:
        print('before executemany')
        await conn.executemany(
            'UPDATE test_deadlock SET value = $1 WHERE id = $2', [
                (1, 1),
                (2, 2),
            ])
        print('after executemany')


asyncio.get_event_loop().run_until_complete(main())

However it fails randomly in milliseconds with executemany() wrapped in an explicit transaction, or with this PR and presumably future atomic executemany():

Traceback (most recent call last):
  File "t.py", line 17, in tx
    'UPDATE test_deadlock SET value = 10 WHERE id = 1')
  File "/Volumes/.../python3.6/site-packages/asyncpg/connection.py", line 238, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 319, in query
asyncpg.exceptions.DeadlockDetectedError: deadlock detected
DETAIL:  Process 76587 waits for ShareLock on transaction 371748; blocked by process 76586.
Process 76586 waits for ShareLock on transaction 371747; blocked by process 76587.
HINT:  See server log for query details.

Also tried with 128 rows in the table, same result. Therefore @sergeyspatar could you please share your asyncpg version? Are there any database triggers on the table?

But anyway, suggestion from @elprans is definitely a good practice to follow.

@fantix
Copy link
Member Author

fantix commented May 24, 2018

@1st1 I tweaked pgbench a bit to test executemany() (inserting 1000 rows per query), here're my results on my 2.2GHz MBA 2015:

asyncpg 0.15.0

686 queries in 30.29 seconds
Latency: min 300.36ms; max 600.66ms; mean 439.755ms; std: 35.403ms (8.05%)
Latency distribution: 25% under 417.41ms; 50% under 432.76ms; 75% under 453.88ms; 90% under 488.538ms; 99% under 564.253ms; 99.99% under 600.669ms
Queries/sec: 22.65
Rows/sec: 22649.67

This PR just for testing:

4421 queries in 30.03 seconds
Latency: min 28.56ms; max 886.25ms; mean 67.85ms; std: 42.833ms (63.13%)
Latency distribution: 25% under 57.032ms; 50% under 62.013ms; 75% under 67.748ms; 90% under 77.698ms; 99% under 147.696ms; 99.99% under 886.251ms
Queries/sec: 147.23
Rows/sec: 147229.98

Seems to be pretty improving!

Did a rough fix according to the "concrete plan" above, and the result is about the same (each row is 133 bytes, 5 batches per query):

3873 queries in 30.05 seconds
Latency: min 21.84ms; max 596.38ms; mean 77.504ms; std: 36.262ms (46.79%)
Latency distribution: 25% under 63.369ms; 50% under 70.883ms; 75% under 79.422ms; 90% under 97.334ms; 99% under 178.645ms; 99.99% under 596.382ms
Queries/sec: 128.9
Rows/sec: 128900.4

@fantix fantix changed the title [RFC] Combine args of executemany() in a batch [RFC] Combine args of executemany() in batches May 24, 2018
@elprans
Copy link
Member

elprans commented May 24, 2018

Sorry I'm a bit confused - a transaction with a single UPDATE SQL shouldn't cause a deadlock I think? I tried below test with asyncpg 0.15.0 and master, it never fails in minutes:

If your statement causes functions or triggers to fire, or anything that increases the cid of the transaction and updates more tuples, you may get a deadlock.

Also, conn.execute("query; query") is an implicit transaction block.

@fantix
Copy link
Member Author

fantix commented May 24, 2018

Ah that explains :) thanks

@1st1
Copy link
Member

1st1 commented May 24, 2018

@fantix

@1st1 I tweaked pgbench a bit to test executemany() (inserting 1000 rows per query), here're my results on my 2.2GHz MBA 2015:
[..]
Seems to be pretty improving!

Great! A green light from me to try to implement #289 (comment).

@sergeyspatar
Copy link

@elprans

Are there any database triggers on the table?

You are right. A trigger on this table was updating another table... I changed the logic and no dead locks so far. Sorry for a false alarm.

fantix added a commit to fantix/asyncpg that referenced this pull request May 29, 2018
@fantix fantix mentioned this pull request May 29, 2018
2 tasks
@fantix
Copy link
Member Author

fantix commented May 29, 2018

Closing in favor of #295

@fantix fantix closed this May 29, 2018
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 26, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 26, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 26, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 26, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 26, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 28, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 28, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 28, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 28, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 28, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 28, 2018
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
elprans pushed a commit that referenced this pull request Oct 3, 2019
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: #289
fantix added a commit to fantix/asyncpg that referenced this pull request Oct 7, 2019
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 2, 2020
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 24, 2020
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
fantix added a commit to fantix/asyncpg that referenced this pull request Nov 24, 2020
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

Closes: MagicStack#289
elprans pushed a commit that referenced this pull request Nov 26, 2020
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

pgbench results of inserting 1000 rows per query with executemany() on 
Python 3.6 of 2.2GHz 2015 MacBook Air (best out of 5 runs):

asyncpg 0.18.2:

    710 queries in 30.31 seconds
    Latency: min 341.88ms; max 636.29ms; mean 425.022ms; std: 39.782ms (9.36%)
    Latency distribution: 25% under 401.67ms; 50% under 414.26ms; 75% under 435.37ms; 90% under 478.39ms; 99% under 576.638ms; 99.99% under 636.299ms
    Queries/sec: 23.42
    Rows/sec: 23424.32

This patch:

    4125 queries in 30.02 seconds
    Latency: min 23.14ms; max 734.91ms; mean 72.723ms; std: 49.226ms (67.69%)
    Latency distribution: 25% under 59.958ms; 50% under 65.414ms; 75% under 71.538ms; 90% under 80.95ms; 99% under 175.375ms; 99.99% under 734.912ms
    Queries/sec: 137.39
    Rows/sec: 137389.64

This is a backwards incompatible change.  Here `executemany()` becomes
atomic, whereas previously any error in the middle of argument iteration
would retain the results of the preceding set of arguments unless an explicit
transaction block was used.

Closes: #289
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants