Skip to content

Commit

Permalink
Merge ec1331e into 42c0c89
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed May 1, 2024
2 parents 42c0c89 + ec1331e commit 5e446f8
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 140 deletions.
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Components
.. autoclass:: Component
.. autoclass:: ContainerComponent
.. autoclass:: CLIApplicationComponent
.. autofunction:: start_component

Concurrency
-----------
Expand Down
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ This library adheres to `Semantic Versioning 2.0 <https://semver.org/>`_.
``CLIApplicationComponent.run()``
* Changed how ``CLIApplicationComponent`` works: they no longer start a service task
that call the ``run()`` method, but instead the runner will call it directly
* Added the ``start_component()`` function which is now the preferred method for
starting components directly (e.g. in test suites)
- **BACKWARD INCOMPATIBLE** Changes in (Asphalt) context handling:

* Dropped the ``TeardownError`` exception in favor of PEP 654 exception groups
Expand Down
11 changes: 6 additions & 5 deletions examples/tutorial1/tests/test_client_server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# isort: off
from __future__ import annotations

from collections.abc import AsyncGenerator

import pytest
from anyio import wait_all_tasks_blocked
from echo.client import ClientComponent
from echo.server import ServerComponent
from pytest import CaptureFixture
from asphalt.core import Context, start_component

from asphalt.core import Context
from echo.client import ClientComponent
from echo.server import ServerComponent

pytestmark = pytest.mark.anyio

Expand All @@ -17,14 +18,14 @@
async def server(capsys: CaptureFixture[str]) -> AsyncGenerator[None, None]:
async with Context():
server = ServerComponent()
await server.start()
await start_component(server)
yield


async def test_client_and_server(server: None, capsys: CaptureFixture[str]) -> None:
async with Context():
client = ClientComponent("Hello!")
await client.start()
await start_component(client)
await client.run()

# Grab the captured output of sys.stdout and sys.stderr from the capsys fixture
Expand Down
1 change: 1 addition & 0 deletions src/asphalt/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ._component import CLIApplicationComponent as CLIApplicationComponent
from ._component import Component as Component
from ._component import ContainerComponent as ContainerComponent
from ._component import start_component as start_component
from ._concurrent import TaskFactory as TaskFactory
from ._concurrent import TaskHandle as TaskHandle
from ._concurrent import start_background_task_factory as start_background_task_factory
Expand Down
176 changes: 174 additions & 2 deletions src/asphalt/core/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,26 @@

from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from collections.abc import Coroutine
from contextlib import ExitStack
from dataclasses import dataclass, field
from logging import getLogger
from traceback import StackSummary
from types import FrameType
from typing import Any

from anyio import create_task_group

from anyio import (
CancelScope,
create_task_group,
get_current_task,
get_running_tasks,
sleep,
)
from anyio.abc import TaskStatus

from ._concurrent import start_service_task
from ._context import current_context
from ._exceptions import NoCurrentContext
from ._utils import PluginContainer, merge_config, qualified_name


Expand All @@ -28,6 +44,10 @@ async def start(self) -> None:
It is advisable for Components to first add all the resources they can to the
context before requesting any from it. This will speed up the dependency
resolution and prevent deadlocks.
.. warning:: It's unadvisable to call this method directly (in case you're doing
it in a test suite). Instead, call :func:`start_component`, as it comes with
extra safeguards.
"""


Expand Down Expand Up @@ -139,3 +159,155 @@ async def run(self) -> int | None:


component_types = PluginContainer("asphalt.components", Component)


async def start_component(
component: Component,
*,
start_timeout: float | None = 20,
startup_scope: CancelScope | None = None,
) -> None:
"""
Start a component and its subcomponents.
:param component: the (root) component to start
:param start_timeout: seconds to wait for all the components in the hierarchy to
start (default: ``20``; set to ``None`` to disable timeout)
:param startup_scope: used only by :func:`run_application`
:raises RuntimeError: if this function is called without an active :class:`Context`
"""
try:
current_context()
except NoCurrentContext:
raise RuntimeError(
"start_component() requires an active Asphalt context"
) from None

with ExitStack() as exit_stack:
if startup_scope is None:
startup_scope = exit_stack.enter_context(CancelScope())

startup_watcher_scope: CancelScope | None = None
if start_timeout is not None:
startup_watcher_scope = await start_service_task(
lambda task_status: _component_startup_watcher(
startup_scope,
component,
start_timeout,
task_status=task_status,
),
"Asphalt component startup watcher task",
)

await component.start()

# Cancel the startup timeout, if any
if startup_watcher_scope:
startup_watcher_scope.cancel()


async def _component_startup_watcher(
startup_cancel_scope: CancelScope,
root_component: Component,
start_timeout: float,
*,
task_status: TaskStatus[CancelScope],
) -> None:
def get_coro_stack_summary(coro: Any) -> StackSummary:
import gc

frames: list[FrameType] = []
while isinstance(coro, Coroutine):
while coro.__class__.__name__ == "async_generator_asend":
# Hack to get past asend() objects
coro = gc.get_referents(coro)[0].ag_await

if frame := getattr(coro, "cr_frame", None):
frames.append(frame)

coro = getattr(coro, "cr_await", None)

frame_tuples = [(f, f.f_lineno) for f in frames]
return StackSummary.extract(frame_tuples)

current_task = get_current_task()
parent_task = next(
task_info
for task_info in get_running_tasks()
if task_info.id == current_task.parent_id
)

with CancelScope() as cancel_scope:
task_status.started(cancel_scope)
await sleep(start_timeout)

if cancel_scope.cancel_called:
return

@dataclass
class ComponentStatus:
name: str
alias: str | None
parent_task_id: int | None
traceback: list[str] = field(init=False, default_factory=list)
children: list[ComponentStatus] = field(init=False, default_factory=list)

import re
import textwrap

component_task_re = re.compile(r"^Starting (\S+) \((.+)\)$")
component_statuses: dict[int, ComponentStatus] = {}
for task in get_running_tasks():
if task.id == parent_task.id:
status = ComponentStatus(qualified_name(root_component), None, None)
elif task.name and (match := component_task_re.match(task.name)):
name: str
alias: str
name, alias = match.groups()
status = ComponentStatus(name, alias, task.parent_id)
else:
continue

status.traceback = get_coro_stack_summary(task.coro).format()
component_statuses[task.id] = status

root_status: ComponentStatus
for task_id, component_status in component_statuses.items():
if component_status.parent_task_id is None:
root_status = component_status
elif parent_status := component_statuses.get(component_status.parent_task_id):
parent_status.children.append(component_status)
if parent_status.alias:
component_status.alias = (
f"{parent_status.alias}.{component_status.alias}"
)

def format_status(status_: ComponentStatus, level: int) -> str:
title = f"{status_.alias or 'root'} ({status_.name})"
if status_.children:
children_output = ""
for i, child in enumerate(status_.children):
prefix = "| " if i < (len(status_.children) - 1) else " "
children_output += "+-" + textwrap.indent(
format_status(child, level + 1),
prefix,
lambda line: line[0] in " +|",
)

output = title + "\n" + children_output
else:
formatted_traceback = "".join(status_.traceback)
if level == 0:
formatted_traceback = textwrap.indent(formatted_traceback, "| ")

output = title + "\n" + formatted_traceback

return output

getLogger(__name__).error(
"Timeout waiting for the root component to start\n"
"Components still waiting to finish startup:\n%s",
textwrap.indent(format_status(root_status, 0).rstrip(), " "),
)
startup_cancel_scope.cancel()

0 comments on commit 5e446f8

Please sign in to comment.