diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8ca2e25b..a3a9e7c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,15 +57,24 @@ jobs: # Pyside does not have free-threaded binary run: uv sync --locked --dev --group gui - - name: Run tests if CI debugging - if: runner.debug == '1' - timeout-minutes: 3 + - name: CI debugging env windows + if: ${{runner.debug == '1' && startsWith(matrix.os, 'windows')}} + run: | + $Env:CI_DEBUGGING="1" + + - name: CI debugging env non-windows + if: ${{runner.debug == '1' && !startsWith(matrix.os, 'windows')}} run: | export CI_DEBUGGING=1 + + - name: Run tests debug mode + if: ${{runner.debug == '1'}} + timeout-minutes: 3 + run: | uv run pytest -vvl --override-ini=log_cli_level=DEBUG --override-ini=log_cli=true - name: Run tests - if: runner.debug != '1' + if: ${{runner.debug != '1'}} timeout-minutes: 3 run: uv run pytest -vvl diff --git a/src/async_kernel/kernel.py b/src/async_kernel/kernel.py index 6b01fdfb..acaba764 100644 --- a/src/async_kernel/kernel.py +++ b/src/async_kernel/kernel.py @@ -7,7 +7,6 @@ import os import signal import sys -import time from collections.abc import Callable from contextlib import asynccontextmanager from io import TextIOBase @@ -25,6 +24,7 @@ from async_kernel.common import Fixed, KernelInterrupt from async_kernel.debugger import Debugger from async_kernel.interface import HasInterface +from async_kernel.pending import Pending from async_kernel.shell.base import ShellPendingManager from async_kernel.typing import ( CallerCreateOptions, @@ -44,7 +44,6 @@ from contextvars import ContextVar from types import CoroutineType, FrameType - from async_kernel.pending import Pending from async_kernel.typing import Content, Message __all__ = ["Kernel", "KernelInterrupt"] @@ -165,7 +164,7 @@ class Kernel(HasInterface[T_interface_co], LoggingConfigurable, Generic[T_interf _restart = False _handler_cache: ClassVar[dict[tuple[str | None, MsgType, Callable], HandlerType]] = {} _subshells: dict[str, T_shell_co] - _interrupt_requested: bool | Literal["FORCE"] = False + _interrupt_requested: None | Pending = None @traitlets.default("help_links") def _default_help_links(self) -> tuple[dict[str, str], ...]: @@ -277,59 +276,36 @@ async def running(self) -> AsyncGenerator[None]: def _signal_handler(self, signum, frame: FrameType | None) -> None: "Handle interrupt signals." - match self._interrupt_requested: - case "FORCE": - self._interrupt_requested = False + if pen := self._interrupt_requested: + self._interrupt_requested = None + if not pen.done(): + pen.set_result(None) raise KernelInterrupt - case True: - if frame and frame.f_globals is self.main_shell.user_ns: - self._interrupt_requested = False - raise KernelInterrupt - self.last_interrupt_frame = frame - - def clearlast_interrupt_frame(): - if self.last_interrupt_frame is frame: - self.last_interrupt_frame = None - - def re_raise(): - if self.last_interrupt_frame is frame: - self._interrupt_now(force=True) - - # Race to check if the main thread should be interrupted. - self.callers[Channel.shell].call_direct(clearlast_interrupt_frame) - self.callers[Channel.control].call_later(1, re_raise) - case False: - signal.default_int_handler(signum, frame) - - def _interrupt_now(self, *, force=False) -> None: - """ - Request an interrupt of the currently running shell thread. - - If called from the main thread, sets the interrupt request flag and sends a SIGINT signal - to the current process. On Windows, uses `signal.raise_signal`; on other platforms, uses `os.kill`. - If `force` is True, sets the interrupt request flag to "FORCE". + signal.default_int_handler(signum, frame) - Args: - force: If True, requests a forced interrupt. Defaults to False. - """ - # Restricted this to when the shell is running in the main thread. - if self.parent.callers[Channel.shell].id == Caller.CALLER_MAIN_THREAD_ID: - self._interrupt_requested = "FORCE" if force else True - if sys.platform == "win32": - signal.raise_signal(signal.SIGINT) - time.sleep(0) - else: - os.kill(os.getpid(), signal.SIGINT) - - def do_interrupt(self) -> None: + async def do_interrupt(self) -> None: """ Interrupt/cancel non-silent active execute requests. """ - if (sys.platform != "emscripten") and (not self.debugger.enabled or not self.debugger.stopped_threads): - self._interrupt_now() + assert Caller() is self.callers[Channel.control], "Must be called from the control thread." for pen in self.active_execute_requests.copy(): if not pen.metadata.get("kwargs", {}).get("silent", False): pen.cancel(self._interrupt_message) + if ( + (sys.platform != "emscripten") + and (not self.debugger.enabled or not self.debugger.stopped_threads) + and (caller := Caller("MainThread")).running # Can only interrupt the main thread + ): + self._interrupt_requested = pen = Pending() + caller.call_direct(lambda: pen.set_result(None)) + try: + await pen.wait(result=False, timeout=1, protect=True) + except TimeoutError: + if sys.platform == "win32": + signal.raise_signal(signal.SIGINT) + else: + os.kill(os.getpid(), signal.SIGINT) + await pen.wait(result=False, timeout=10, protect=True) def _patch_signal(self) -> Callable[[], None]: @@ -585,7 +561,7 @@ async def comm_close(self, job: Job[Content], /) -> None: async def interrupt_request(self, job: Job[Content], /) -> Content: """Handle an [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt).""" - self.do_interrupt() + await self.do_interrupt() return {} async def shutdown_request(self, job: Job[Content], /) -> Content: diff --git a/src/async_kernel/pending.py b/src/async_kernel/pending.py index 040064ef..96e89904 100644 --- a/src/async_kernel/pending.py +++ b/src/async_kernel/pending.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, ClassVar, Literal, Self, final, overload import anyio -from aiologic.lowlevel import create_async_event, create_async_waiter, create_green_event +from aiologic.lowlevel import create_async_event, create_green_event from typing_extensions import override import async_kernel @@ -500,8 +500,8 @@ async def wait( """ try: if not self._done: - waiter = create_async_waiter(shield=shield) - self.add_done_callback(lambda _: waiter.wake()) + waiter = create_async_event(shield=shield) + self.add_done_callback(lambda _: waiter.set()) if timeout is None: await waiter else: diff --git a/tests/test_callable_interface.py b/tests/test_callable_interface.py index 5bc8e28c..b786c34c 100644 --- a/tests/test_callable_interface.py +++ b/tests/test_callable_interface.py @@ -11,7 +11,6 @@ from async_kernel.compat.json import pack_json_str, unpack_json from async_kernel.interface import start_kernel_callable_interface from async_kernel.interface.callable import CallableInterface -from tests import utils if TYPE_CHECKING: from async_kernel.typing import Message @@ -92,24 +91,3 @@ async def test_prevent_multiple_instances(self, interface): async def test_keyboard_interrupt(self, interface) -> None: with pytest.raises(KeyboardInterrupt): signal.raise_signal(signal.SIGINT) - - async def test_kernel_interrupt_pending(self, interface: CallableInterface): - interface.kernel.shell.user_ns["ready_event"] = ready = Event() - code = "ready_event.set()\nimport anyio\nawait anyio.sleep_forever()" - pen = interface.kernel.caller.call_soon(interface.kernel.do_execute, code, silent=False) - await ready - interface.kernel.do_interrupt() - result = await pen - assert result - - async def test_kernel_interrupt_keyboard(self, interface: CallableInterface): - interface.kernel.shell.user_ns["ready_event"] = ready = Event() - - async def run_code(): - code = f"ready_event.set()\nimport time\ntime.sleep({utils.TIMEOUT * 2})" - pen = interface.kernel.caller.call_soon(interface.kernel.do_execute, code, silent=False) - await ready - interface.kernel.do_interrupt() - await pen.wait(result=False) - - await interface.kernel.caller.to_thread(run_code)