Skip to content

Commit

Permalink
Batch executemany (#295)
Browse files Browse the repository at this point in the history
Now `Bind` and `Execute` pairs are batched into 4 x 32KB buffers to take
advantage of `writelines()`. A single `Sync` is sent at last, so that
all args live in the same transaction.

pgbench results of inserting 1000 rows per query with executemany() on 
Python 3.6 of 2.2GHz 2015 MacBook Air (best out of 5 runs):

asyncpg 0.18.2:

    710 queries in 30.31 seconds
    Latency: min 341.88ms; max 636.29ms; mean 425.022ms; std: 39.782ms (9.36%)
    Latency distribution: 25% under 401.67ms; 50% under 414.26ms; 75% under 435.37ms; 90% under 478.39ms; 99% under 576.638ms; 99.99% under 636.299ms
    Queries/sec: 23.42
    Rows/sec: 23424.32

This patch:

    4125 queries in 30.02 seconds
    Latency: min 23.14ms; max 734.91ms; mean 72.723ms; std: 49.226ms (67.69%)
    Latency distribution: 25% under 59.958ms; 50% under 65.414ms; 75% under 71.538ms; 90% under 80.95ms; 99% under 175.375ms; 99.99% under 734.912ms
    Queries/sec: 137.39
    Rows/sec: 137389.64

This is a backwards incompatible change.  Here `executemany()` becomes
atomic, whereas previously any error in the middle of argument iteration
would retain the results of the preceding set of arguments unless an explicit
transaction block was used.

Closes: #289
  • Loading branch information
fantix committed Nov 26, 2020
1 parent 8b313bd commit 690048d
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 100 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,22 @@ jobs:

- name: "OSX py 3.5"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.5.9 PGVERSION=12

- name: "OSX py 3.6"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.6.10 PGVERSION=12

- name: "OSX py 3.7"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.7.7 PGVERSION=12

- name: "OSX py 3.8"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.8.3 PGVERSION=12

cache:
Expand Down
10 changes: 10 additions & 0 deletions asyncpg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ async def executemany(self, command: str, args, *, timeout: float=None):
.. versionchanged:: 0.11.0
`timeout` became a keyword-only parameter.
.. versionchanged:: 0.22.0
The execution was changed to be in an implicit transaction if there
was no explicit transaction, so that it will no longer end up with
partial success. If you still need the previous behavior to
progressively execute many args, please use a loop with prepared
statement instead.
"""
self._check_open()
return await self._executemany(command, args, timeout)
Expand Down Expand Up @@ -1010,6 +1017,9 @@ async def _copy_in(self, copy_stmt, source, timeout):
f = source
elif isinstance(source, collections.abc.AsyncIterable):
# assuming calling output returns an awaitable.
# copy_in() is designed to handle very large amounts of data, and
# the source async iterable is allowed to return an arbitrary
# amount of data on every iteration.
reader = source
else:
# assuming source is an instance supporting the buffer protocol.
Expand Down
24 changes: 21 additions & 3 deletions asyncpg/prepared_stmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,24 @@ async def fetchrow(self, *args, timeout=None):
return None
return data[0]

async def __bind_execute(self, args, limit, timeout):
@connresource.guarded
async def executemany(self, args, *, timeout: float=None):
"""Execute the statement for each sequence of arguments in *args*.
:param args: An iterable containing sequences of arguments.
:param float timeout: Optional timeout value in seconds.
:return None: This method discards the results of the operations.
.. versionadded:: 0.22.0
"""
return await self.__do_execute(
lambda protocol: protocol.bind_execute_many(
self._state, args, '', timeout))

async def __do_execute(self, executor):
protocol = self._connection._protocol
try:
data, status, _ = await protocol.bind_execute(
self._state, args, '', limit, True, timeout)
return await executor(protocol)
except exceptions.OutdatedSchemaCacheError:
await self._connection.reload_schema_state()
# We can not find all manually created prepared statements, so just
Expand All @@ -215,6 +228,11 @@ async def __bind_execute(self, args, limit, timeout):
# invalidate themselves (unfortunately, clearing caches again).
self._state.mark_closed()
raise

async def __bind_execute(self, args, limit, timeout):
data, status, _ = await self.__do_execute(
lambda protocol: protocol.bind_execute(
self._state, args, '', limit, True, timeout))
self._last_status = status
return data

Expand Down
2 changes: 2 additions & 0 deletions asyncpg/protocol/consts.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
DEF _MAXINT32 = 2**31 - 1
DEF _COPY_BUFFER_SIZE = 524288
DEF _COPY_SIGNATURE = b"PGCOPY\n\377\r\n\0"
DEF _EXECUTE_MANY_BUF_NUM = 4
DEF _EXECUTE_MANY_BUF_SIZE = 32768
12 changes: 10 additions & 2 deletions asyncpg/protocol/coreproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ cdef class CoreProtocol:
# True - completed, False - suspended
bint result_execute_completed

cpdef is_in_transaction(self)
cdef _process__auth(self, char mtype)
cdef _process__prepare(self, char mtype)
cdef _process__bind_execute(self, char mtype)
Expand Down Expand Up @@ -146,6 +147,7 @@ cdef class CoreProtocol:
cdef _auth_password_message_sasl_continue(self, bytes server_response)

cdef _write(self, buf)
cdef _writelines(self, list buffers)

cdef _read_server_messages(self)

Expand All @@ -155,9 +157,13 @@ cdef class CoreProtocol:

cdef _ensure_connected(self)

cdef WriteBuffer _build_parse_message(self, str stmt_name, str query)
cdef WriteBuffer _build_bind_message(self, str portal_name,
str stmt_name,
WriteBuffer bind_data)
cdef WriteBuffer _build_empty_bind_data(self)
cdef WriteBuffer _build_execute_message(self, str portal_name,
int32_t limit)


cdef _connect(self)
Expand All @@ -166,8 +172,10 @@ cdef class CoreProtocol:
WriteBuffer bind_data, int32_t limit)
cdef _bind_execute(self, str portal_name, str stmt_name,
WriteBuffer bind_data, int32_t limit)
cdef _bind_execute_many(self, str portal_name, str stmt_name,
object bind_data)
cdef bint _bind_execute_many(self, str portal_name, str stmt_name,
object bind_data)
cdef bint _bind_execute_many_more(self, bint first=*)
cdef _bind_execute_many_fail(self, object error, bint first=*)
cdef _bind(self, str portal_name, str stmt_name,
WriteBuffer bind_data)
cdef _execute(self, str portal_name, int32_t limit)
Expand Down
Loading

0 comments on commit 690048d

Please sign in to comment.