Skip to content

Commit

Permalink
pythonGH-117881: fix athrow().throw()/asend().throw() concurrent acce…
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed May 1, 2024
1 parent 2520eed commit fc7e1aa
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 2 deletions.
199 changes: 197 additions & 2 deletions Lib/test/test_asyncgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,151 @@ async def gen():
r'anext\(\): asynchronous generator is already running'):
an.__next__()

with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
an.send(None)

def test_async_gen_asend_throw_concurrent_with_send(self):
import types

@types.coroutine
def _async_yield(v):
return (yield v)

class MyExc(Exception):
pass

async def agenfn():
while True:
try:
await _async_yield(None)
except MyExc:
pass
return
yield


agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.asend(None)

with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.throw(MyExc)

with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
gen2.send(None)

def test_async_gen_athrow_throw_concurrent_with_send(self):
import types

@types.coroutine
def _async_yield(v):
return (yield v)

class MyExc(Exception):
pass

async def agenfn():
while True:
try:
await _async_yield(None)
except MyExc:
pass
return
yield


agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.athrow(MyExc)

with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.throw(MyExc)

with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"):
gen2.send(None)

def test_async_gen_asend_throw_concurrent_with_throw(self):
import types

@types.coroutine
def _async_yield(v):
return (yield v)

class MyExc(Exception):
pass

async def agenfn():
try:
yield
except MyExc:
pass
while True:
try:
await _async_yield(None)
except MyExc:
pass


agen = agenfn()
with self.assertRaises(StopIteration):
agen.asend(None).send(None)

gen = agen.athrow(MyExc)
gen.throw(MyExc)
gen2 = agen.asend(MyExc)

with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.throw(MyExc)

with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
gen2.send(None)

def test_async_gen_athrow_throw_concurrent_with_throw(self):
import types

@types.coroutine
def _async_yield(v):
return (yield v)

class MyExc(Exception):
pass

async def agenfn():
try:
yield
except MyExc:
pass
while True:
try:
await _async_yield(None)
except MyExc:
pass

agen = agenfn()
with self.assertRaises(StopIteration):
agen.asend(None).send(None)

gen = agen.athrow(MyExc)
gen.throw(MyExc)
gen2 = agen.athrow(None)

with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.throw(MyExc)

with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"):
gen2.send(None)

def test_async_gen_3_arg_deprecation_warning(self):
async def gen():
yield 123
Expand Down Expand Up @@ -1571,6 +1716,8 @@ async def main():
self.assertIsInstance(message['exception'], ZeroDivisionError)
self.assertIn('unhandled exception during asyncio.run() shutdown',
message['message'])
del message, messages
gc_collect()

def test_async_gen_expression_01(self):
async def arange(n):
Expand Down Expand Up @@ -1624,6 +1771,7 @@ async def main():
asyncio.run(main())

self.assertEqual([], messages)
gc_collect()

def test_async_gen_await_same_anext_coro_twice(self):
async def async_iterate():
Expand Down Expand Up @@ -1809,9 +1957,56 @@ class MyException(Exception):
g = gen()
with self.assertRaises(MyException):
g.aclose().throw(MyException)
del g
gc_collect()

del g
gc_collect() # does not warn unawaited

def test_asend_send_already_running(self):
@types.coroutine
def _async_yield(v):
return (yield v)

async def agenfn():
while True:
await _async_yield(1)
return
yield

agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.asend(None)

with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.send(None)

del gen2
gc_collect() # does not warn unawaited


def test_athrow_send_already_running(self):
@types.coroutine
def _async_yield(v):
return (yield v)

async def agenfn():
while True:
await _async_yield(1)
return
yield

agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.athrow(Exception)

with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.send(None)

del gen2
gc_collect() # does not warn unawaited

if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
prevent concurrent access to an async generator via athrow().throw() or asend().throw()
37 changes: 37 additions & 0 deletions Objects/genobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,7 @@ async_gen_asend_send(PyAsyncGenASend *o, PyObject *arg)

if (o->ags_state == AWAITABLE_STATE_INIT) {
if (o->ags_gen->ag_running_async) {
o->ags_state = AWAITABLE_STATE_CLOSED;
PyErr_SetString(
PyExc_RuntimeError,
"anext(): asynchronous generator is already running");
Expand Down Expand Up @@ -1817,10 +1818,24 @@ async_gen_asend_throw(PyAsyncGenASend *o, PyObject *const *args, Py_ssize_t narg
return NULL;
}

if (o->ags_state == AWAITABLE_STATE_INIT) {
if (o->ags_gen->ag_running_async) {
o->ags_state = AWAITABLE_STATE_CLOSED;
PyErr_SetString(
PyExc_RuntimeError,
"anext(): asynchronous generator is already running");
return NULL;
}

o->ags_state = AWAITABLE_STATE_ITER;
o->ags_gen->ag_running_async = 1;
}

result = gen_throw((PyGenObject*)o->ags_gen, args, nargs);
result = async_gen_unwrap_value(o->ags_gen, result);

if (result == NULL) {
o->ags_gen->ag_running_async = 0;
o->ags_state = AWAITABLE_STATE_CLOSED;
}

Expand Down Expand Up @@ -2209,10 +2224,31 @@ async_gen_athrow_throw(PyAsyncGenAThrow *o, PyObject *const *args, Py_ssize_t na
return NULL;
}

if (o->agt_state == AWAITABLE_STATE_INIT) {
if (o->agt_gen->ag_running_async) {
o->agt_state = AWAITABLE_STATE_CLOSED;
if (o->agt_args == NULL) {
PyErr_SetString(
PyExc_RuntimeError,
"aclose(): asynchronous generator is already running");
}
else {
PyErr_SetString(
PyExc_RuntimeError,
"athrow(): asynchronous generator is already running");
}
return NULL;
}

o->agt_state = AWAITABLE_STATE_ITER;
o->agt_gen->ag_running_async = 1;
}

retval = gen_throw((PyGenObject*)o->agt_gen, args, nargs);
if (o->agt_args) {
retval = async_gen_unwrap_value(o->agt_gen, retval);
if (retval == NULL) {
o->agt_gen->ag_running_async = 0;
o->agt_state = AWAITABLE_STATE_CLOSED;
}
return retval;
Expand All @@ -2226,6 +2262,7 @@ async_gen_athrow_throw(PyAsyncGenAThrow *o, PyObject *const *args, Py_ssize_t na
return NULL;
}
if (retval == NULL) {
o->agt_gen->ag_running_async = 0;
o->agt_state = AWAITABLE_STATE_CLOSED;
}
if (PyErr_ExceptionMatches(PyExc_StopAsyncIteration) ||
Expand Down

0 comments on commit fc7e1aa

Please sign in to comment.