Skip to content

Commit

Permalink
Refactor cancelling procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov committed Dec 12, 2015
1 parent 8a2ba3f commit 2a72b60
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

doc:
cd docs && make html
echo "open file://`pwd`/docs/_build/html/index.html"
@echo "open file://`pwd`/docs/_build/html/index.html"

pep:
pep8 aiopg examples tests
Expand All @@ -18,6 +18,7 @@ vtest: pep flake

cov cover coverage: pep flake
py.test --cov=aiopg --cov=tests --cov-report=html --cov-report=term tests
@echo "open file://`pwd`/htmlcov/index.html"

clean:
find . -name __pycache__ |xargs rm -rf
Expand Down
41 changes: 33 additions & 8 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,23 @@ def _create_waiter(self, func_name):
def _poll(self, waiter, timeout):
assert waiter is self._waiter, (waiter, self._waiter)
self._ready(self._weakref)

@asyncio.coroutine
def cancel():
if not self._isexecuting():
return
self._waiter = asyncio.Future(loop=self._loop)
self._conn.cancel()
try:
yield from self._waiter
except psycopg2.extensions.QueryCanceledError:
pass

try:
yield from asyncio.wait_for(self._waiter, timeout, loop=self._loop)
except (asyncio.CancelledError, asyncio.TimeoutError) as exc:
yield from asyncio.shield(cancel(), loop=self._loop)
raise exc
finally:
self._waiter = None

Expand Down Expand Up @@ -282,14 +297,24 @@ def tpc_recover(self):
@asyncio.coroutine
def cancel(self, timeout=None):
"""Cancel the current database operation."""
waiter = self._create_waiter('cancel')
self._conn.cancel()
if timeout is None:
timeout = self._timeout
try:
yield from self._poll(waiter, timeout)
except psycopg2.extensions.QueryCanceledError:
pass
if timeout is not None:
warnings.warn('timeout parameter is deprecated and never used',
DeprecationWarning)
if not self._isexecuting():
return
if self._waiter is not None:
self._waiter.cancel()

@asyncio.coroutine
def cancel():
self._waiter = asyncio.Future(loop=self._loop)
self._conn.cancel()
try:
yield from self._waiter
except psycopg2.extensions.QueryCanceledError:
pass

yield from asyncio.shield(cancel(), loop=self._loop)

@asyncio.coroutine
def reset(self):
Expand Down
1 change: 0 additions & 1 deletion aiopg/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def execute(self, operation, parameters=None, *, timeout=None):
try:
yield from self._conn._poll(waiter, timeout)
except asyncio.TimeoutError:
yield from self._conn.cancel()
self._impl.close()
raise

Expand Down
20 changes: 18 additions & 2 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def go():

self.loop.run_until_complete(go())

def test_cancel(self):
def test_cancel_noop(self):

@asyncio.coroutine
def go():
Expand All @@ -354,7 +354,23 @@ def test_cancel_with_timeout(self):
@asyncio.coroutine
def go():
conn = yield from self.connect()
yield from conn.cancel(10)
with self.assertWarns(DeprecationWarning):
yield from conn.cancel(10)

self.loop.run_until_complete(go())

def test_cancel_pending_op(self):

@asyncio.coroutine
def go():
conn = yield from self.connect()
cur = yield from conn.cursor()
task = asyncio.async(cur.execute("SELECT pg_sleep(10)"),
loop=self.loop)
yield from asyncio.sleep(0.01, loop=self.loop)
yield from conn.cancel()
with self.assertRaises(asyncio.CancelledError):
yield from task

self.loop.run_until_complete(go())

Expand Down

0 comments on commit 2a72b60

Please sign in to comment.