Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Fix for multi-exec transaction cancelled error #225

Merged
merged 2 commits into from
May 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions aioredis/commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def wrapper(*args, **kw):
return wrapper
return attr

@asyncio.coroutine
def execute(self, *, return_exceptions=False):
"""Execute all buffered commands.

Expand All @@ -156,22 +157,22 @@ def execute(self, *, return_exceptions=False):
self._done = True

if self._pipeline:
return self._do_execute(return_exceptions=return_exceptions)
if isinstance(self._pool_or_conn, AbcPool):
with (yield from self._pool_or_conn) as conn:
return (yield from self._do_execute(
conn, return_exceptions=return_exceptions))
else:
return (yield from self._do_execute(
self._pool_or_conn,
return_exceptions=return_exceptions))
else:
return self._gather_result(return_exceptions)
return (yield from self._gather_result(return_exceptions))

@asyncio.coroutine
def _do_execute(self, *, return_exceptions=False):
if isinstance(self._pool_or_conn, AbcPool):
with (yield from self._pool_or_conn) as conn:
yield from asyncio.gather(*self._send_pipeline(conn),
loop=self._loop,
return_exceptions=True)
else:
conn = self._pool_or_conn
yield from asyncio.gather(*self._send_pipeline(conn),
loop=self._loop,
return_exceptions=True)
def _do_execute(self, conn, *, return_exceptions=False):
yield from asyncio.gather(*self._send_pipeline(conn),
loop=self._loop,
return_exceptions=True)
return (yield from self._gather_result(return_exceptions))

@asyncio.coroutine
Expand Down Expand Up @@ -247,13 +248,8 @@ class MultiExec(Pipeline):
error_class = MultiExecError

@asyncio.coroutine
def _do_execute(self, *, return_exceptions=False):
def _do_execute(self, conn, *, return_exceptions=False):
self._waiters = waiters = []
is_pool = isinstance(self._pool_or_conn, AbcPool)
if is_pool:
conn = yield from self._pool_or_conn.acquire()
else:
conn = self._pool_or_conn
multi = conn.execute('MULTI')
coros = list(self._send_pipeline(conn))
exec_ = conn.execute('EXEC')
Expand All @@ -264,8 +260,6 @@ def _do_execute(self, *, return_exceptions=False):
except asyncio.CancelledError:
yield from gather
finally:
if is_pool:
self._pool_or_conn.release(conn)
if conn.closed:
for fut in waiters:
fut.cancel()
Expand Down
21 changes: 21 additions & 0 deletions tests/transaction_commands_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,24 @@ def test_transaction__watch_error(redis, create_redis, server, loop):
yield from fut1
with pytest.raises(WatchVariableError):
yield from fut2


@pytest.mark.run_loop
def test_multi_exec_and_pool_release(redis):
# Test the case when pool connection is released before
# `exec` result is received.

slow_script = """
local a = tonumber(redis.call('time')[1])
local b = a + 1
while (a < b)
do
a = tonumber(redis.call('time')[1])
end
"""

tr = redis.multi_exec()
fut1 = tr.eval(slow_script)
ret, = yield from tr.execute()
assert ret is None
assert (yield from fut1) is None