Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,24 @@ jobs:
# Pyside does not have free-threaded binary
run: uv sync --locked --dev --group gui

- name: Run tests if CI debugging
if: runner.debug == '1'
timeout-minutes: 3
- name: CI debugging env windows
if: ${{runner.debug == '1' && startsWith(matrix.os, 'windows')}}
run: |
$Env:CI_DEBUGGING="1"

- name: CI debugging env non-windows
if: ${{runner.debug == '1' && !startsWith(matrix.os, 'windows')}}
run: |
export CI_DEBUGGING=1

- name: Run tests debug mode
if: ${{runner.debug == '1'}}
timeout-minutes: 3
run: |
uv run pytest -vvl --override-ini=log_cli_level=DEBUG --override-ini=log_cli=true

- name: Run tests
if: runner.debug != '1'
if: ${{runner.debug != '1'}}
timeout-minutes: 3
run: uv run pytest -vvl

Expand Down
74 changes: 25 additions & 49 deletions src/async_kernel/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import signal
import sys
import time
from collections.abc import Callable
from contextlib import asynccontextmanager
from io import TextIOBase
Expand All @@ -25,6 +24,7 @@
from async_kernel.common import Fixed, KernelInterrupt
from async_kernel.debugger import Debugger
from async_kernel.interface import HasInterface
from async_kernel.pending import Pending
from async_kernel.shell.base import ShellPendingManager
from async_kernel.typing import (
CallerCreateOptions,
Expand All @@ -44,7 +44,6 @@
from contextvars import ContextVar
from types import CoroutineType, FrameType

from async_kernel.pending import Pending
from async_kernel.typing import Content, Message

__all__ = ["Kernel", "KernelInterrupt"]
Expand Down Expand Up @@ -165,7 +164,7 @@ class Kernel(HasInterface[T_interface_co], LoggingConfigurable, Generic[T_interf
_restart = False
_handler_cache: ClassVar[dict[tuple[str | None, MsgType, Callable], HandlerType]] = {}
_subshells: dict[str, T_shell_co]
_interrupt_requested: bool | Literal["FORCE"] = False
_interrupt_requested: None | Pending = None

@traitlets.default("help_links")
def _default_help_links(self) -> tuple[dict[str, str], ...]:
Expand Down Expand Up @@ -277,59 +276,36 @@ async def running(self) -> AsyncGenerator[None]:
def _signal_handler(self, signum, frame: FrameType | None) -> None:
"Handle interrupt signals."

match self._interrupt_requested:
case "FORCE":
self._interrupt_requested = False
if pen := self._interrupt_requested:
self._interrupt_requested = None
if not pen.done():
pen.set_result(None)
raise KernelInterrupt
case True:
if frame and frame.f_globals is self.main_shell.user_ns:
self._interrupt_requested = False
raise KernelInterrupt
self.last_interrupt_frame = frame

def clearlast_interrupt_frame():
if self.last_interrupt_frame is frame:
self.last_interrupt_frame = None

def re_raise():
if self.last_interrupt_frame is frame:
self._interrupt_now(force=True)

# Race to check if the main thread should be interrupted.
self.callers[Channel.shell].call_direct(clearlast_interrupt_frame)
self.callers[Channel.control].call_later(1, re_raise)
case False:
signal.default_int_handler(signum, frame)

def _interrupt_now(self, *, force=False) -> None:
"""
Request an interrupt of the currently running shell thread.

If called from the main thread, sets the interrupt request flag and sends a SIGINT signal
to the current process. On Windows, uses `signal.raise_signal`; on other platforms, uses `os.kill`.
If `force` is True, sets the interrupt request flag to "FORCE".
signal.default_int_handler(signum, frame)

Args:
force: If True, requests a forced interrupt. Defaults to False.
"""
# Restricted this to when the shell is running in the main thread.
if self.parent.callers[Channel.shell].id == Caller.CALLER_MAIN_THREAD_ID:
self._interrupt_requested = "FORCE" if force else True
if sys.platform == "win32":
signal.raise_signal(signal.SIGINT)
time.sleep(0)
else:
os.kill(os.getpid(), signal.SIGINT)

def do_interrupt(self) -> None:
async def do_interrupt(self) -> None:
"""
Interrupt/cancel non-silent active execute requests.
"""
if (sys.platform != "emscripten") and (not self.debugger.enabled or not self.debugger.stopped_threads):
self._interrupt_now()
assert Caller() is self.callers[Channel.control], "Must be called from the control thread."
for pen in self.active_execute_requests.copy():
if not pen.metadata.get("kwargs", {}).get("silent", False):
pen.cancel(self._interrupt_message)
if (
(sys.platform != "emscripten")
and (not self.debugger.enabled or not self.debugger.stopped_threads)
and (caller := Caller("MainThread")).running # Can only interrupt the main thread
):
self._interrupt_requested = pen = Pending()
caller.call_direct(lambda: pen.set_result(None))
try:
await pen.wait(result=False, timeout=1, protect=True)
except TimeoutError:
if sys.platform == "win32":
signal.raise_signal(signal.SIGINT)
else:
os.kill(os.getpid(), signal.SIGINT)
await pen.wait(result=False, timeout=10, protect=True)

def _patch_signal(self) -> Callable[[], None]:

Expand Down Expand Up @@ -585,7 +561,7 @@ async def comm_close(self, job: Job[Content], /) -> None:

async def interrupt_request(self, job: Job[Content], /) -> Content:
"""Handle an [interrupt request](https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt)."""
self.do_interrupt()
await self.do_interrupt()
return {}

async def shutdown_request(self, job: Job[Content], /) -> Content:
Expand Down
6 changes: 3 additions & 3 deletions src/async_kernel/pending.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import TYPE_CHECKING, Any, ClassVar, Literal, Self, final, overload

import anyio
from aiologic.lowlevel import create_async_event, create_async_waiter, create_green_event
from aiologic.lowlevel import create_async_event, create_green_event
from typing_extensions import override

import async_kernel
Expand Down Expand Up @@ -500,8 +500,8 @@ async def wait(
"""
try:
if not self._done:
waiter = create_async_waiter(shield=shield)
self.add_done_callback(lambda _: waiter.wake())
waiter = create_async_event(shield=shield)
self.add_done_callback(lambda _: waiter.set())
if timeout is None:
await waiter
else:
Expand Down
22 changes: 0 additions & 22 deletions tests/test_callable_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from async_kernel.compat.json import pack_json_str, unpack_json
from async_kernel.interface import start_kernel_callable_interface
from async_kernel.interface.callable import CallableInterface
from tests import utils

if TYPE_CHECKING:
from async_kernel.typing import Message
Expand Down Expand Up @@ -92,24 +91,3 @@ async def test_prevent_multiple_instances(self, interface):
async def test_keyboard_interrupt(self, interface) -> None:
with pytest.raises(KeyboardInterrupt):
signal.raise_signal(signal.SIGINT)

async def test_kernel_interrupt_pending(self, interface: CallableInterface):
interface.kernel.shell.user_ns["ready_event"] = ready = Event()
code = "ready_event.set()\nimport anyio\nawait anyio.sleep_forever()"
pen = interface.kernel.caller.call_soon(interface.kernel.do_execute, code, silent=False)
await ready
interface.kernel.do_interrupt()
result = await pen
assert result

async def test_kernel_interrupt_keyboard(self, interface: CallableInterface):
interface.kernel.shell.user_ns["ready_event"] = ready = Event()

async def run_code():
code = f"ready_event.set()\nimport time\ntime.sleep({utils.TIMEOUT * 2})"
pen = interface.kernel.caller.call_soon(interface.kernel.do_execute, code, silent=False)
await ready
interface.kernel.do_interrupt()
await pen.wait(result=False)

await interface.kernel.caller.to_thread(run_code)
Loading