Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/async_kernel/caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,13 +1042,11 @@ async def scheduler():
pen_.result()
finally:
queue.stop()
pen_.cancel()
for pen in unfinished:
pen.remove_done_callback(queue.append)
if cancel_unfinished:
pen.cancel("Cancelled by as_completed")
with anyio.CancelScope():
await pen_.wait(result=False)
await pen_.cancel_wait(shield=True)

async def wait(
self,
Expand Down
12 changes: 9 additions & 3 deletions src/async_kernel/pending.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,12 +612,18 @@ def cancel(self, msg: str | None = None) -> bool:
canceller(msg)
return self._cancelled is not None

async def cancel_wait(self, msg: str | None, *, timeout: float | None = None) -> None:
"Cancel the pending and wait for it to be done."
async def cancel_wait(self, msg: str | None = None, *, timeout: float | None = None, shield: bool = False) -> None:
"""
Cancel the pending and wait for it to be done.

Args:
timeout: Timeout in seconds.
shield: Shield from external cancellation.
"""
if not self._done:
self.cancel(msg)
if not self._done:
await self.wait(result=False, timeout=timeout)
await self.wait(result=False, timeout=timeout, shield=shield)

def cancelled(self) -> bool:
"""
Expand Down
Loading