Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch executemany #295

Merged
merged 3 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,22 @@ jobs:

- name: "OSX py 3.5"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.5.9 PGVERSION=12

- name: "OSX py 3.6"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.6.10 PGVERSION=12

- name: "OSX py 3.7"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.7.7 PGVERSION=12

- name: "OSX py 3.8"
os: osx
osx_image: xcode10.2
env: BUILD=tests,wheels PYTHON_VERSION=3.8.3 PGVERSION=12

cache:
Expand Down
10 changes: 10 additions & 0 deletions asyncpg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ async def executemany(self, command: str, args, *, timeout: float=None):

.. versionchanged:: 0.11.0
`timeout` became a keyword-only parameter.

.. versionchanged:: 0.22.0
The execution was changed to be in an implicit transaction if there
was no explicit transaction, so that it will no longer end up with
partial success. If you still need the previous behavior to
progressively execute many args, please use a loop with prepared
statement instead.
"""
self._check_open()
return await self._executemany(command, args, timeout)
Expand Down Expand Up @@ -1010,6 +1017,9 @@ async def _copy_in(self, copy_stmt, source, timeout):
f = source
elif isinstance(source, collections.abc.AsyncIterable):
# assuming calling output returns an awaitable.
# copy_in() is designed to handle very large amounts of data, and
# the source async iterable is allowed to return an arbitrary
# amount of data on every iteration.
reader = source
else:
# assuming source is an instance supporting the buffer protocol.
Expand Down
24 changes: 21 additions & 3 deletions asyncpg/prepared_stmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,24 @@ async def fetchrow(self, *args, timeout=None):
return None
return data[0]

async def __bind_execute(self, args, limit, timeout):
@connresource.guarded
async def executemany(self, args, *, timeout: float=None):
"""Execute the statement for each sequence of arguments in *args*.

:param args: An iterable containing sequences of arguments.
:param float timeout: Optional timeout value in seconds.
:return None: This method discards the results of the operations.

.. versionadded:: 0.22.0
"""
return await self.__do_execute(
lambda protocol: protocol.bind_execute_many(
self._state, args, '', timeout))

async def __do_execute(self, executor):
protocol = self._connection._protocol
try:
data, status, _ = await protocol.bind_execute(
self._state, args, '', limit, True, timeout)
return await executor(protocol)
except exceptions.OutdatedSchemaCacheError:
await self._connection.reload_schema_state()
# We can not find all manually created prepared statements, so just
Expand All @@ -215,6 +228,11 @@ async def __bind_execute(self, args, limit, timeout):
# invalidate themselves (unfortunately, clearing caches again).
self._state.mark_closed()
raise

async def __bind_execute(self, args, limit, timeout):
data, status, _ = await self.__do_execute(
lambda protocol: protocol.bind_execute(
self._state, args, '', limit, True, timeout))
self._last_status = status
return data

Expand Down
2 changes: 2 additions & 0 deletions asyncpg/protocol/consts.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
DEF _MAXINT32 = 2**31 - 1
DEF _COPY_BUFFER_SIZE = 524288
DEF _COPY_SIGNATURE = b"PGCOPY\n\377\r\n\0"
DEF _EXECUTE_MANY_BUF_NUM = 4
DEF _EXECUTE_MANY_BUF_SIZE = 32768
12 changes: 10 additions & 2 deletions asyncpg/protocol/coreproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ cdef class CoreProtocol:
# True - completed, False - suspended
bint result_execute_completed

cpdef is_in_transaction(self)
cdef _process__auth(self, char mtype)
cdef _process__prepare(self, char mtype)
cdef _process__bind_execute(self, char mtype)
Expand Down Expand Up @@ -146,6 +147,7 @@ cdef class CoreProtocol:
cdef _auth_password_message_sasl_continue(self, bytes server_response)

cdef _write(self, buf)
cdef _writelines(self, list buffers)

cdef _read_server_messages(self)

Expand All @@ -155,9 +157,13 @@ cdef class CoreProtocol:

cdef _ensure_connected(self)

cdef WriteBuffer _build_parse_message(self, str stmt_name, str query)
cdef WriteBuffer _build_bind_message(self, str portal_name,
str stmt_name,
WriteBuffer bind_data)
cdef WriteBuffer _build_empty_bind_data(self)
cdef WriteBuffer _build_execute_message(self, str portal_name,
int32_t limit)


cdef _connect(self)
Expand All @@ -166,8 +172,10 @@ cdef class CoreProtocol:
WriteBuffer bind_data, int32_t limit)
cdef _bind_execute(self, str portal_name, str stmt_name,
WriteBuffer bind_data, int32_t limit)
cdef _bind_execute_many(self, str portal_name, str stmt_name,
object bind_data)
cdef bint _bind_execute_many(self, str portal_name, str stmt_name,
object bind_data)
cdef bint _bind_execute_many_more(self, bint first=*)
cdef _bind_execute_many_fail(self, object error, bint first=*)
cdef _bind(self, str portal_name, str stmt_name,
WriteBuffer bind_data)
cdef _execute(self, str portal_name, int32_t limit)
Expand Down
Loading