Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Watchable AsyncStatus and extend the wrap decorator #176

Merged
merged 24 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0a5533e
(#117) (#45) extend wrap to take args and kwargs
dperl-dls Apr 4, 2024
8b926b4
(#117) (#45) add watchable status and some tests
dperl-dls Apr 5, 2024
d1148e3
(#117) (#45) make AS and WAS both derive from base
dperl-dls Apr 8, 2024
0c67305
(#117) (#45) update classes to use WatchableAsyncStatus.
dperl-dls Apr 8, 2024
dab791b
(#117) (#45) make Watcher match bluesky spec
dperl-dls Apr 9, 2024
adc784e
(#117) (#45) add timeouts
dperl-dls Apr 11, 2024
5f0d555
(#117) (#45) add timeout test
dperl-dls Apr 15, 2024
27d022c
(#117) (#45) only update elapsed time in status if not supplied
dperl-dls Apr 15, 2024
c0ab271
(#117) (#45) address review comments
dperl-dls Apr 18, 2024
493ba2a
(#117) (#45) update sim motor
dperl-dls Apr 19, 2024
85f4c28
(#117) (#45) finish motor move status from set put complete
dperl-dls Apr 19, 2024
d8ff94a
(#117) (#45) tidy tests
dperl-dls Apr 19, 2024
b04e60a
Merge branch 'main' into 117_45_extend_asyncstatus_wrap
dperl-dls May 1, 2024
730ee2c
(#117) (#45) address review comments
dperl-dls May 1, 2024
0fc2ccb
(#117) (#45) update tests to use new sim signal helper
dperl-dls May 1, 2024
d5eff2b
(#228) remove synchronous move method
dperl-dls May 1, 2024
f34ff7e
(#117) (#45) make sim motor look like others
dperl-dls May 1, 2024
dbc2d02
Merge branch 'main' into 117_45_extend_asyncstatus_wrap
dperl-dls May 7, 2024
1798b41
(#117) (#45) fix sim motor
dperl-dls May 7, 2024
42e6075
replace with asyncio.timeout with wait_for
dperl-dls May 7, 2024
3be4f14
Merge branch 'main' into 117_45_extend_asyncstatus_wrap
dperl-dls May 15, 2024
2a12be3
Fix merge issues and update for review comments
dperl-dls May 16, 2024
16f77b0
Merge branch 'main' into 117_45_extend_asyncstatus_wrap
coretl May 17, 2024
5691fcf
mock time elapsed in flaky test
dperl-dls May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ophyd_async/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
ShapeProvider,
StaticDirectoryProvider,
)
from .async_status import AsyncStatus
from .async_status import AsyncStatus, WatchableAsyncStatus
from .detector import (
DetectorControl,
DetectorTrigger,
Expand Down Expand Up @@ -78,6 +78,7 @@
"set_sim_value",
"wait_for_value",
"AsyncStatus",
"WatchableAsyncStatus",
"DirectoryInfo",
"DirectoryProvider",
"NameProvider",
Expand Down
128 changes: 95 additions & 33 deletions src/ophyd_async/core/async_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,41 @@

import asyncio
import functools
from typing import Awaitable, Callable, Coroutine, List, Optional, cast
import time
from dataclasses import asdict, replace
from typing import (
AsyncIterator,
Awaitable,
Callable,
Generic,
SupportsFloat,
Type,
TypeVar,
cast,
)

from bluesky.protocols import Status

from .utils import Callback, T
from .utils import Callback, P, T, Watcher, WatcherUpdate

AS = TypeVar("AS", bound="AsyncStatus")
WAS = TypeVar("WAS", bound="WatchableAsyncStatus")

class AsyncStatus(Status):

class AsyncStatusBase(Status):
"""Convert asyncio awaitable to bluesky Status interface"""

def __init__(
self,
awaitable: Awaitable,
watchers: Optional[List[Callable]] = None,
):
def __init__(self, awaitable: Awaitable, timeout: SupportsFloat | None = None):
if isinstance(timeout, SupportsFloat):
timeout = float(timeout)
if isinstance(awaitable, asyncio.Task):
self.task = awaitable
else:
self.task = asyncio.create_task(awaitable) # type: ignore
self.task = asyncio.create_task(
asyncio.wait_for(awaitable, timeout=timeout)
)
coretl marked this conversation as resolved.
Show resolved Hide resolved
self.task.add_done_callback(self._run_callbacks)
self._callbacks = cast(List[Callback[Status]], [])
self._watchers = watchers
self._callbacks = cast(list[Callback[Status]], [])
dperl-dls marked this conversation as resolved.
Show resolved Hide resolved

def __await__(self):
return self.task.__await__()
Expand All @@ -39,15 +52,11 @@ def _run_callbacks(self, task: asyncio.Task):
for callback in self._callbacks:
callback(self)

# TODO: remove ignore and bump min version when bluesky v1.12.0 is released
def exception( # type: ignore
self, timeout: Optional[float] = 0.0
) -> Optional[BaseException]:
def exception(self, timeout: float | None = 0.0) -> BaseException | None:
if timeout != 0.0:
raise Exception(
raise ValueError(
"cannot honour any timeout other than 0 in an asynchronous function"
)

if self.task.done():
try:
return self.task.exception()
Expand All @@ -67,22 +76,6 @@ def success(self) -> bool:
and self.task.exception() is None
)

def watch(self, watcher: Callable):
"""Add watcher to the list of interested parties.

Arguments as per Bluesky :external+bluesky:meth:`watch` protocol.
"""
if self._watchers is not None:
self._watchers.append(watcher)

@classmethod
def wrap(cls, f: Callable[[T], Coroutine]) -> Callable[[T], "AsyncStatus"]:
@functools.wraps(f)
def wrap_f(self) -> AsyncStatus:
return AsyncStatus(f(self))

return wrap_f

def __repr__(self) -> str:
if self.done:
if e := self.exception():
Expand All @@ -94,3 +87,72 @@ def __repr__(self) -> str:
return f"<{type(self).__name__}, task: {self.task.get_coro()}, {status}>"

__str__ = __repr__


class AsyncStatus(AsyncStatusBase):
@classmethod
def wrap(cls: Type[AS], f: Callable[P, Awaitable]) -> Callable[P, AS]:
@functools.wraps(f)
def wrap_f(*args: P.args, **kwargs: P.kwargs) -> AS:
# We can't type this more properly because Concatenate/ParamSpec doesn't
# yet support keywords
# https://peps.python.org/pep-0612/#concatenating-keyword-parameters
timeout = kwargs.get("timeout")
assert isinstance(timeout, SupportsFloat) or timeout is None
return cls(f(*args, **kwargs), timeout=timeout)

# type is actually functools._Wrapped[P, Awaitable, P, AS]
# but functools._Wrapped is not necessarily available
return cast(Callable[P, AS], wrap_f)


class WatchableAsyncStatus(AsyncStatusBase, Generic[T]):
"""Convert AsyncIterator of WatcherUpdates to bluesky Status interface."""

def __init__(
self,
iterator: AsyncIterator[WatcherUpdate[T]],
timeout: SupportsFloat | None = None,
):
self._watchers: list[Watcher] = []
self._start = time.monotonic()
self._last_update: WatcherUpdate[T] | None = None
super().__init__(self._notify_watchers_from(iterator), timeout)

async def _notify_watchers_from(self, iterator: AsyncIterator[WatcherUpdate[T]]):
async for update in iterator:
self._last_update = (
update
if update.time_elapsed is not None
else replace(update, time_elapsed=time.monotonic() - self._start)
)
for watcher in self._watchers:
self._update_watcher(watcher, self._last_update)

def _update_watcher(self, watcher: Watcher, update: WatcherUpdate[T]):
vals = asdict(
update, dict_factory=lambda d: {k: v for k, v in d if v is not None}
)
watcher(**vals)

def watch(self, watcher: Watcher):
self._watchers.append(watcher)
if self._last_update:
self._update_watcher(watcher, self._last_update)

@classmethod
def wrap(
cls: Type[WAS],
f: Callable[P, AsyncIterator[WatcherUpdate[T]]],
) -> Callable[P, WAS]:
"""Wrap an AsyncIterator in a WatchableAsyncStatus. If it takes
'timeout' as an argument, this must be a float or None, and it
will be propagated to the status."""

@functools.wraps(f)
def wrap_f(*args: P.args, **kwargs: P.kwargs) -> WAS:
coretl marked this conversation as resolved.
Show resolved Hide resolved
timeout = kwargs.get("timeout")
assert isinstance(timeout, SupportsFloat) or timeout is None
return cls(f(*args, **kwargs), timeout=timeout)

return cast(Callable[P, WAS], wrap_f)
36 changes: 17 additions & 19 deletions src/ophyd_async/core/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

from ophyd_async.protocols import AsyncConfigurable, AsyncReadable

from .async_status import AsyncStatus
from .async_status import AsyncStatus, WatchableAsyncStatus
from .device import Device
from .signal import SignalR
from .utils import DEFAULT_TIMEOUT, merge_gathered_dicts
from .utils import DEFAULT_TIMEOUT, WatcherUpdate, merge_gathered_dicts

T = TypeVar("T")

Expand Down Expand Up @@ -189,7 +189,7 @@ def __init__(
self._trigger_info: Optional[TriggerInfo] = None
# For kickoff
self._watchers: List[Callable] = []
self._fly_status: Optional[AsyncStatus] = None
self._fly_status: Optional[WatchableAsyncStatus] = None
self._fly_start: float

self._intial_frame: int
Expand Down Expand Up @@ -300,28 +300,26 @@ async def _prepare(self, value: T) -> None:
exposure=self._trigger_info.livetime,
)

@AsyncStatus.wrap
async def kickoff(self) -> None:
self._fly_status = AsyncStatus(self._fly(), self._watchers)
def kickoff(self, timeout: Optional[float] = None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code kicked off self._observe_writer_indices and returned immediately, then returned this status in complete. This code now actually awaits this Status, which is wrong.

However, I think a simplification of the code is the better fix:

    @AsyncStatus.wrap
    async def kickoff(self) -> None:
        # Nothing to do in kickoff, we already armed in prepare
        pass

    @WatchableAsyncStatus.wrap
    async def complete(self):
        async for index in self.writer.observe_indices_written(self._frame_writing_timeout):    
            yield WatcherUpdate(
                name=self.name,
                current=index,
                initial=self._initial_frame,
                target=end_observation,
                unit="",
                precision=0,
                time_elapsed=time.monotonic() - self._fly_start,
            )
            if index >= end_observation:
                break

Then can delete _fly and _observe_writer_indices...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flyable tells us that kickoff and complete must both return statuses.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, they're both being wrapped by *AsyncStatus so return a Status. It's just that kickoff returns one that is immediately done...

self._fly_start = time.monotonic()

async def _fly(self) -> None:
await self._observe_writer_indicies(self._last_frame)
self._fly_status = WatchableAsyncStatus(
self._observe_writer_indicies(self._last_frame), timeout
)
return self._fly_status

async def _observe_writer_indicies(self, end_observation: int):
async for index in self.writer.observe_indices_written(
self._frame_writing_timeout
):
for watcher in self._watchers:
watcher(
name=self.name,
current=index,
initial=self._initial_frame,
target=end_observation,
unit="",
precision=0,
time_elapsed=time.monotonic() - self._fly_start,
)
yield WatcherUpdate(
name=self.name,
current=index,
initial=self._initial_frame,
target=end_observation,
unit="",
precision=0,
time_elapsed=time.monotonic() - self._fly_start,
)
if index >= end_observation:
break

Expand Down
28 changes: 24 additions & 4 deletions src/ophyd_async/core/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@

import asyncio
import functools
from typing import AsyncGenerator, Callable, Dict, Generic, Optional, Tuple, Type, Union
from typing import (
AsyncGenerator,
Callable,
Dict,
Generic,
Optional,
Tuple,
Type,
Union,
)

from bluesky.protocols import (
Descriptor,
Expand All @@ -11,6 +20,7 @@
Movable,
Reading,
Stageable,
Status,
Subscribable,
)

Expand Down Expand Up @@ -272,7 +282,9 @@ def soft_signal_r_and_backend(
return (signal, backend)


async def observe_value(signal: SignalR[T], timeout=None) -> AsyncGenerator[T, None]:
async def observe_value(
signal: SignalR[T], timeout=None, done_status: Status | None = None
) -> AsyncGenerator[T, None]:
coretl marked this conversation as resolved.
Show resolved Hide resolved
"""Subscribe to the value of a signal so it can be iterated from.

Parameters
Expand All @@ -288,18 +300,26 @@ async def observe_value(signal: SignalR[T], timeout=None) -> AsyncGenerator[T, N
async for value in observe_value(sig):
do_something_with(value)
"""
q: asyncio.Queue[T] = asyncio.Queue()

q: asyncio.Queue[T | None] = asyncio.Queue()
coretl marked this conversation as resolved.
Show resolved Hide resolved
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)

if done_status is not None:
done_status.add_callback(lambda _: q.put_nowait(None))

signal.subscribe_value(q.put_nowait)
try:
while True:
yield await get_value()
item = await get_value()
if item is not None:
yield item
else:
break
finally:
signal.clear_sub(q.put_nowait)

Expand Down
40 changes: 40 additions & 0 deletions src/ophyd_async/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

import asyncio
import logging
from dataclasses import dataclass
from typing import (
Any,
Awaitable,
Callable,
Dict,
Generic,
Iterable,
List,
Optional,
ParamSpec,
Protocol,
Type,
TypeVar,
Union,
Expand All @@ -18,6 +23,7 @@
from bluesky.protocols import Reading

T = TypeVar("T")
P = ParamSpec("P")
Callback = Callable[[T], None]

#: A function that will be called with the Reading and value when the
Expand Down Expand Up @@ -77,6 +83,40 @@ def __str__(self) -> str:
return self.format_error_string(indent="")


@dataclass(frozen=True)
class WatcherUpdate(Generic[T]):
"""A dataclass such that, when expanded, it provides the kwargs for a watcher"""

current: T
initial: T
target: T
name: str | None = None
unit: str | None = None
precision: float | None = None
fraction: float | None = None
time_elapsed: float | None = None
time_remaining: float | None = None


C = TypeVar("C", contravariant=True)


class Watcher(Protocol, Generic[C]):
@staticmethod
def __call__(
*,
current: C,
initial: C,
target: C,
name: str | None,
unit: str | None,
precision: float | None,
fraction: float | None,
time_elapsed: float | None,
time_remaining: float | None,
) -> Any: ...


coretl marked this conversation as resolved.
Show resolved Hide resolved
async def wait_for_connection(**coros: Awaitable[None]):
"""Call many underlying signals, accumulating exceptions and returning them

Expand Down
Loading
Loading