Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,5 @@ jobs:
timeout-minutes: 5
run: |
uv sync --group docs
uv run async-kernel -a async-docs --shell.timeout=0.1
uv run async-kernel -a async-docs --shell.timeout=1
uv run mkdocs build -s
2 changes: 1 addition & 1 deletion .github/workflows/publish-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Install the project
run: |
uv sync --group docs
uv run async-kernel -a async-docs --shell.timeout=0.1 # The 'async-docs' kernel is specified as the kernel for mkdocs-jupyter
uv run async-kernel -a async-docs --shell.timeout=1 # The 'async-docs' kernel is specified as the kernel for mkdocs-jupyter

- name: Version info
id: version
Expand Down
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Additional steps to build documentation (optional):

```bash
uv sync --group docs
uv run async-kernel -a async-docs --main_shell.timeout=0.1
uv run async-kernel -a async-docs --main_shell.timeout=1
```

### Running tests
Expand Down Expand Up @@ -100,7 +100,7 @@ The 'docs' group specified extra packages are required to build documentation.

```bash
uv sync --group docs
uv run async-kernel -a async-docs --main_shell.timeout=0.1
uv run async-kernel -a async-docs --main_shell.timeout=1
```

#### Test the docs
Expand All @@ -114,7 +114,7 @@ uv run mkdocs build -s
The command:

```bash
uv run async-kernel -a async-docs --main_shell.timeout=0.1
uv run async-kernel -a async-docs --main_shell.timeout=1
```

Defines a new kernel spec with the name "async-docs" that sets the `shell.timeout` to 100ms.
Expand Down
2 changes: 1 addition & 1 deletion docs/notebooks/caller.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@
"source": [
"## Caller methods that return Pending\n",
"\n",
"Pending is like a thread-safe [asyncio.Future](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) like object to return future results. It was called `Pending` to avoid confusion about differences in functionality to that of `asyncio.Future` and `concurrent.futures.Future`.\n",
"Pending is a thread-safe representation of a future result. It was designed to provide thread-safe functionality similar [asyncio.Future](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future) and was named `Pending` to avoid confusion about differences in functionality to that of `asyncio.Future` and `concurrent.futures.Future`.\n",
"\n",
"The following functions return a `Pending` object:\n",
"\n",
Expand Down
100 changes: 93 additions & 7 deletions docs/notebooks/concurrency.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,7 @@
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": [
"suppress-error"
]
}
},
"outputs": [],
"source": [
Expand Down Expand Up @@ -223,13 +220,12 @@
"slide_type": ""
},
"tags": [
"thread",
"suppress-error"
"thread"
]
},
"outputs": [],
"source": [
"# This time we'll use the tag to run the cell in a Thread\n",
"# This time we'll use the tag to run the cell in a worker thread\n",
"await demo()"
]
},
Expand All @@ -249,6 +245,96 @@
"# thread\n",
"%callers # magic provided by async-kernel"
]
},
{
"cell_type": "markdown",
"id": "11",
"metadata": {},
"source": [
"We can also specify CallerCreateOptions as part of the top line"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "12",
"metadata": {},
"outputs": [],
"source": [
"# thread name=\"My thread\"\n",
"%callers"
]
},
{
"cell_type": "markdown",
"id": "13",
"metadata": {},
"source": [
"## Asynchronous magic\n",
"\n",
"Asynchronous line (%) and cell (%%) magic functions are supported.\n",
"\n",
"### thread magic\n",
"\n",
"This will run the code in a thread. When no settings are provided a cell worker thread is used.\n",
"\n",
"#### Comparing thread magic with thread run mode\n",
"\n",
"- thread magic (`%%thread`) is an asynchronous magic that executes the associated **code** in a separate thread. \n",
"- [thread run mode](#run-mode-thread) (`# thread`) instructs the kernel to run the **entire cell** in a separate thread, bypassing the shell execute request queue."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "14",
"metadata": {},
"outputs": [],
"source": [
"# Run the magic 'callers' in a caller worker thread.\n",
"%thread %callers "
]
},
{
"cell_type": "markdown",
"id": "15",
"metadata": {},
"source": [
"To specify a thread (caller) by name"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "16",
"metadata": {},
"outputs": [],
"source": [
"%%thread name=\"My executor\" \n",
"%callers"
]
},
{
"cell_type": "markdown",
"id": "17",
"metadata": {},
"source": [
"Many of arguments accepted on `Caller.get` are also supported. Let's use a thread with a trio backend."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "18",
"metadata": {},
"outputs": [],
"source": [
"%%thread name=\"My trio executor\" backend=trio\n",
"%callers\n",
"\n",
"import trio\n",
"await trio.sleep(0)"
]
}
],
"metadata": {
Expand Down
30 changes: 22 additions & 8 deletions src/async_kernel/asyncshell.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from IPython.core.history import HistoryManager
from IPython.core.interactiveshell import InteractiveShell
from IPython.core.interactiveshell import _modified_open as _modified_open_ # pyright: ignore[reportPrivateUsage]
from IPython.core.magic import Magics, line_cell_magic, line_magic, magics_class
from IPython.core.magic import Magics, line_cell_magic, line_magic, magics_class, no_var_expand
from IPython.utils.tokenutil import token_at_cursor
from jupyter_core.paths import jupyter_runtime_dir
from traitlets import traitlets
Expand All @@ -32,7 +32,7 @@
from async_kernel.compiler import XCachingCompiler
from async_kernel.event_loop.run import get_runtime_matplotlib_guis
from async_kernel.pending import PendingManager
from async_kernel.typing import Channel, Content, Message, NoValue, Tags
from async_kernel.typing import Channel, Content, Message, NoValue, RunMode, Tags

if TYPE_CHECKING:
from collections.abc import Callable, Generator
Expand Down Expand Up @@ -931,6 +931,7 @@ def subshell(self, _) -> None:
)
print(f"Current shell:\t{self.shell}\n\n{subshell_list}")

@no_var_expand
@line_magic
async def pip(self, line: str) -> Any | None:
"""Run the pip package manager for the current environment.
Expand Down Expand Up @@ -965,6 +966,7 @@ async def pip(self, line: str) -> Any | None:

return None

@no_var_expand
@line_magic
async def uv(self, line) -> None:
"""Run the uv package manager for the current environment.
Expand All @@ -979,16 +981,28 @@ async def uv(self, line) -> None:
if process.stderr:
tg.start_soon(_forward_transport_stream, process.stderr, sys.stdout)

@no_var_expand
@line_cell_magic
async def python(self, line: str, cell: str) -> None:
async def thread(self, line: str, cell: str | None = None) -> None:
"""
Run python code.
Run the python code in a caller managed child thread.

Useful only when the primary language is not Python.
Both line and cell magic are supported.

For cell_magic, [CallerCreateOptions][async_kernel.typing.CallerCreateOptions] can be passed as literals.

Example:
%%thread name="Trio executor" backend=trio
"""
shell = SubshellManager.get_shell()
cell = cell or line
await shell.run_cell_async(
shell: AsyncInteractiveShell | AsyncInteractiveSubshell = SubshellManager.get_shell()
if cell is None:
cell = line
options: Any = None
else:
options = RunMode.line_to_options(line)
caller = shell.kernel.caller
await (caller.get(**options).call_soon if options else caller.to_thread)(
shell.run_cell_async,
raw_cell=cell,
store_history=False,
silent=True,
Expand Down
25 changes: 14 additions & 11 deletions src/async_kernel/interface/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from async_kernel.iostream import OutStream
from async_kernel.typing import (
Backend,
CallerCreateOptions,
Channel,
Content,
HandlerType,
Expand Down Expand Up @@ -282,22 +283,22 @@ def message_handler(self, job: Job, send_reply: Callable[[Job, dict], CoroutineT

handler = self._get_handler(job, send_reply)

run_mode = RunMode.queue
run_mode: RunMode | CallerCreateOptions | None = None
msg_type = MsgType(job["msg"]["header"]["msg_type"])

if msg_type is MsgType.execute_request:
caller = self.callers[job["msg"]["channel"]] # pyright: ignore[reportArgumentType]
if content := job["msg"].get("content", {}):
if (code := content.get("code")) and (
mode := RunMode.to_runmode(code.strip().split("\n", maxsplit=1)[0])
):
run_mode = mode
if content.get("silent"):
run_mode = RunMode.task
try:
run_mode = next(mode for tag in utils.get_tags(job) if (mode := RunMode.to_runmode(tag)))
except Exception:
pass
except StopIteration:
if content := job["msg"].get("content", {}):
if (code := content.get("code")) and (
mode := RunMode.to_runmode(code.strip().split("\n", maxsplit=1)[0])
):
run_mode = mode
if content.get("silent"):
run_mode = RunMode.task

elif msg_type in self.handle_in_shell_thread:
caller = self.callers[Channel.shell]
else:
Expand All @@ -306,12 +307,14 @@ def message_handler(self, job: Job, send_reply: Callable[[Job, dict], CoroutineT
caller = caller.get(name=thread_name, no_debug=True)

match run_mode:
case RunMode.queue:
case RunMode.queue | None:
caller.queue_call(handler, job)
case RunMode.task:
caller.call_soon(handler, job)
case RunMode.thread:
caller.to_thread(handler, job)
case _ as options:
caller.get(**options).call_soon(handler, job)

self.log.debug("%s %s %s %s", msg_type, run_mode, handler, job)

Expand Down
40 changes: 35 additions & 5 deletions src/async_kernel/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, Generic, Literal, NotRequired, ParamSpec, Self, TypedDict, TypeVar

from typing_extensions import Sentinel, override
from typing_extensions import Sentinel, get_annotations, override

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -114,15 +114,45 @@ def __hash__(self) -> int:
return hash(self.name)

@classmethod
def to_runmode(cls, value: Any, default: T = None, /) -> Self | T:
"Converts value to `Runmode` or default where it is not possible."
def to_runmode(cls, value: Any, default: T = None, /) -> Self | T | CallerCreateOptions:
"""
Converts value to `Runmode`, `CallerCreateOptions` or default where it is not possible.

`CallerCreateOptions` will only return when a string is passed specifying a thread and options.

Example:
`value = "# thread name='My exeecutor' backend=trio`
"""
try:
return cls(value)
except ValueError:
if isinstance(value, str) and value.startswith(("# ", "##")):
return cls.to_runmode(value[2:], default)
if isinstance(value, str):
if value.startswith(("# ", "##")):
return cls.to_runmode(value[2:].strip(), default)
if value.startswith("thread"):
return cls.line_to_options(value.removeprefix("thread"))
return default

@classmethod
def line_to_options(cls, line: str, /) -> CallerCreateOptions:
"Convert the line string to CallerCreateOptions."
import shlex # noqa: PLC0415

items = CallerCreateOptions()
for v in shlex.split(line):
k, val = v.split("=", maxsplit=1)
try:
items[k] = eval(val)
except Exception:
items[k] = val
if items and not items.get("name"):
msg = "'name' must be specified when providing settings!"
raise ValueError(msg)
if invalid := set(items).difference(get_annotations(CallerCreateOptions)):
msg = f"One or more invalid options found! valid={list(get_annotations(CallerCreateOptions))} invalid={list(invalid)}"
raise ValueError(msg)
return items

queue = "queue"
"Run the message handler using [async_kernel.caller.Caller.queue_call][]."

Expand Down
Loading
Loading