Skip to content

Commit

Permalink
Finish v0.9.0rc1
Browse files Browse the repository at this point in the history
- revise async handling
- update protocol and add IMU, speaker and motor config APIs
  • Loading branch information
SillyFreak committed Feb 21, 2019
2 parents beaf97a + 601c9c4 commit 8bf6dcc
Show file tree
Hide file tree
Showing 9 changed files with 489 additions and 284 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ install:
# command to run tests
script:
- pytest

after_success:
- coveralls
74 changes: 49 additions & 25 deletions hedgehog/client/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from concurrent_utils.pipe import PipeEnd
from concurrent_utils.component import Component, component_coro_wrapper, start_component
from hedgehog.protocol import errors, ClientSide
from hedgehog.protocol.async_sockets import DealerRouterSocket
from hedgehog.protocol.messages import Message, ack, io, analog, digital, motor, servo, process
from hedgehog.protocol.zmq.asyncio import DealerRouterSocket
from hedgehog.protocol.messages import Message, ack, io, analog, digital, motor, servo, imu, process, speaker
from . import shutdown_handler
from .async_handlers import EventHandler, HandlerRegistry, process_handler
from .async_handlers import AsyncHandler, HandlerRegistry, ProcessHandler

logger = logging.getLogger(__name__)

Expand All @@ -28,7 +28,6 @@ def __init__(self, ctx: zmq.asyncio.Context, endpoint: str='tcp://127.0.0.1:1078
self.registry = HandlerRegistry()
self.socket = None # type: DealerRouterSocket
self._reply_condition = asyncio.Condition()
self._handlers: Deque[Sequence[EventHandler]] = deque()
self._replies: Deque[Sequence[Message]] = deque()

self._open_count = 0
Expand Down Expand Up @@ -160,21 +159,13 @@ def is_closed(self):

@property
@asynccontextmanager
async def job(self):
async def _job(self):
if self._open_count == 0:
raise RuntimeError("The client is not active, use `async with client:`")

async with self._reply_condition:
yield

def _push_replies(self, msgs: Sequence[Message]) -> None:
self._replies.append(msgs)
self._reply_condition.notify()

async def _pop_replies(self) -> Sequence[Message]:
await self._reply_condition.wait()
return self._replies.popleft()

async def _handle_updates(self):
while True:
_, msgs = await self.socket.recv_msgs()
Expand All @@ -183,14 +174,18 @@ async def _handle_updates(self):
# either, all messages are replies corresponding to the previous requests,
# or all messages are asynchronous updates
if msgs[0].is_async:
assert all(msg.is_async for msg in msgs)

# handle asynchronous messages
logger.debug("Receive updates: %s", msgs)
self.registry.handle_async(msgs)
self.registry.handle_updates(msgs)
else:
assert not any(msg.is_async for msg in msgs)

# handle synchronous messages
handlers = self._handlers.popleft()
self.registry.register(handlers, msgs)
self._push_replies(msgs)
self.registry.complete_register(msgs)
self._replies.append(msgs)
self._reply_condition.notify()

async def _workload(self, *, commands: PipeEnd, events: PipeEnd) -> None:
with DealerRouterSocket(self.ctx, zmq.DEALER, side=ClientSide) as self.socket:
Expand All @@ -215,7 +210,7 @@ async def _workload(self, *, commands: PipeEnd, events: PipeEnd) -> None:
async def workload(self, *, commands: PipeEnd, events: PipeEnd) -> None:
return await component_coro_wrapper(self._workload, commands=commands, events=events)

async def send(self, msg: Message, handler: EventHandler=None) -> Optional[Message]:
async def send(self, msg: Message, handler: AsyncHandler=None) -> Optional[Message]:
reply, = await self.send_multipart((msg, handler))
if isinstance(reply, ack.Acknowledgement):
if reply.code != ack.OK:
Expand All @@ -224,8 +219,8 @@ async def send(self, msg: Message, handler: EventHandler=None) -> Optional[Messa
else:
return reply

async def send_multipart(self, *cmds: Tuple[Message, EventHandler]) -> Any:
async with self.job:
async def send_multipart(self, *cmds: Tuple[Message, AsyncHandler]) -> Any:
async with self._job:
if self._shutdown:
replies = [ack.Acknowledgement(ack.FAILED_COMMAND, "Emergency Shutdown activated") for _ in cmds]
for _, handler in cmds:
Expand All @@ -236,7 +231,7 @@ async def send_multipart(self, *cmds: Tuple[Message, EventHandler]) -> Any:
return await self._send(tuple(request for request, _ in cmds), tuple(handler for _, handler in cmds))

async def shutdown(self) -> None:
async with self.job:
async with self._job:
if not self._shutdown:
self._shutdown = True
self.registry.shutdown()
Expand All @@ -246,11 +241,13 @@ async def shutdown(self) -> None:
msgs.extend(servo.Action(port, False, 0) for port in range(0, 4))
await self._send(msgs, tuple(None for _ in msgs))

async def _send(self, requests: Sequence[Message], handlers: Sequence[EventHandler]) -> Sequence[Message]:
async def _send(self, requests: Sequence[Message], handlers: Sequence[AsyncHandler]) -> Sequence[Message]:
logger.debug("Send commands: %s", requests)
self.registry.prepare_register(handlers)
await self.socket.send_msgs((), requests)
self._handlers.append(handlers)
replies = await self._pop_replies()

await self._reply_condition.wait()
replies = self._replies.popleft()
logger.debug("Receive replies: %s", replies)
return replies

Expand Down Expand Up @@ -289,6 +286,18 @@ async def get_io_config(self, port: int) -> int:
assert response.port == port
return response.flags

async def configure_motor(self, port: int, config: motor.Config) -> int:
await self.send(motor.ConfigAction(port, config))

async def configure_motor_dc(self, port: int) -> int:
await self.configure_motor(port, motor.DcConfig())

async def configure_motor_encoder(self, port: int, encoder_a_port: int, encoder_b_port: int) -> int:
await self.configure_motor(port, motor.EncoderConfig(encoder_a_port, encoder_b_port))

async def configure_motor_stepper(self, port: int) -> int:
await self.configure_motor(port, motor.StepperConfig())

async def set_motor(self, port: int, state: int, amount: int=0,
reached_state: int=motor.POWER, relative: int=None, absolute: int=None,
on_reached: Callable[[int, int], None]=None) -> None:
Expand Down Expand Up @@ -340,9 +349,21 @@ async def get_servo_command(self, port: int) -> Tuple[bool, int]:
assert response.port == port
return response.active, response.position

async def get_imu_rate(self) -> Tuple[int, int, int]:
response = cast(imu.RateReply, await self.send(imu.RateRequest()))
return response.x, response.y, response.z

async def get_imu_acceleration(self) -> Tuple[int, int, int]:
response = cast(imu.AccelerationReply, await self.send(imu.AccelerationRequest()))
return response.x, response.y, response.z

async def get_imu_pose(self) -> Tuple[int, int, int]:
response = cast(imu.PoseReply, await self.send(imu.PoseRequest()))
return response.x, response.y, response.z

async def execute_process(self, *args: str, working_dir: str=None, on_stdout=None, on_stderr=None, on_exit=None) -> int:
if on_stdout is not None or on_stderr is not None or on_exit is not None:
handler = process_handler(on_stdout, on_stderr, on_exit)
handler = ProcessHandler(on_stdout, on_stderr, on_exit)
else:
handler = None
response = cast(process.ExecuteReply, await self.send(process.ExecuteAction(*args, working_dir=working_dir), handler))
Expand All @@ -354,6 +375,9 @@ async def signal_process(self, pid: int, signal: int=2) -> None:
async def send_process_data(self, pid: int, chunk: bytes=b'') -> None:
await self.send(process.StreamAction(pid, process.STDIN, chunk))

async def set_speaker(self, frequency: Optional[int]) -> None:
await self.send(speaker.Action(frequency))


class HedgehogClient(HedgehogClientMixin, AsyncClient):
pass
Expand Down
136 changes: 84 additions & 52 deletions hedgehog/client/async_handlers.py
Original file line number Diff line number Diff line change
@@ -1,104 +1,136 @@
from typing import cast, Any, Callable, Dict, Generator, List, Sequence, Set, Tuple, Type
from typing import cast, Any, Callable, Deque, Dict, Generator, List, Sequence, Set, Tuple, Type, Union

import asyncio
from collections import deque

from hedgehog.protocol.messages import ReplyMsg, Message, ack, motor, process
from hedgehog.protocol.messages import Message, ack, analog, digital, io, motor, process, servo


EventHandler = Generator[Set[Tuple[Type[Message], Any]],
Message,
None]
UpdateHandler = Generator[None, Message, None]
UpdateKey = Tuple[Type[Message], Any]
AsyncHandler = Callable[[Message], Tuple[Set[UpdateKey], UpdateHandler]]


def process_handler(on_stdout, on_stderr, on_exit, sequential=True):
# initialize
reply = yield
class ProcessHandler:
def __init__(self, on_stdout, on_stderr, on_exit, sequential=True):
self._on_stdout = on_stdout
self._on_stderr = on_stderr
self._on_exit = on_exit
self._sequential = sequential
pass

pid = reply.pid
events = {(process.StreamUpdate, pid),
(process.ExitUpdate, pid)}

async def run_update(update):
async def handle_update(self, update: Union[process.StreamUpdate, process.ExitUpdate]) -> None:
if isinstance(update, process.StreamUpdate):
if update.fileno == process.STDOUT:
if on_stdout is not None:
await on_stdout(pid, update.fileno, update.chunk)
if self._on_stdout is not None:
await self._on_stdout(self.pid, update.fileno, update.chunk)
else:
if on_stderr is not None:
await on_stderr(pid, update.fileno, update.chunk)
if self._on_stderr is not None:
await self._on_stderr(self.pid, update.fileno, update.chunk)
elif isinstance(update, process.ExitUpdate):
if on_exit is not None:
await on_exit(pid, update.exit_code)
if self._on_exit is not None:
await self._on_exit(self.pid, update.exit_code)
else: # pragma: nocover
assert False, update

tasks = [] # type: List[asyncio.Task]

if sequential:
def _handle_updates_sequential(self, tasks: List[asyncio.Task]):
queue = asyncio.Queue()

async def run_updates():
async def run_updates() -> None:
while True:
update = await queue.get()
await run_update(update)
await self.handle_update(update)
if isinstance(update, process.ExitUpdate):
break

def on_update(update):
queue.put_nowait(update)
tasks.append(asyncio.create_task(run_updates()))

tasks.append(asyncio.ensure_future(run_updates()))
else:
def on_update(update):
tasks.append(asyncio.ensure_future(run_update(update)))
while True:
update = yield
queue.put_nowait(update)
if isinstance(update, process.ExitUpdate):
break

try:
# update
update = yield events
def _handle_updates_concurrent(self, tasks: List[asyncio.Task]):
while True:
on_update(update)
update = yield
finally:
for task in tasks:
task.cancel()
tasks.append(asyncio.create_task(self.handle_update(update)))
if isinstance(update, process.ExitUpdate):
break

def _handle_updates(self):
tasks: List[asyncio.Task] = []

try:
if self._sequential:
yield from self._handle_updates_sequential(tasks)
else:
yield from self._handle_updates_concurrent(tasks)

# here we expect shutdown
try:
yield
except StopIteration:
raise
else:
raise RuntimeError("generator didn't stop")
finally:
for task in tasks:
task.cancel()

def __call__(self, reply: process.ExecuteReply) -> Tuple[Set[UpdateKey], UpdateHandler]:
self.pid = reply.pid
update_keys = {(process.StreamUpdate, self.pid), (process.ExitUpdate, self.pid)}
update_handler: UpdateHandler = self._handle_updates()
next(update_handler)
return update_keys, update_handler


class HandlerRegistry(object):
_update_keys = {
# motor.StateUpdate: lambda update: cast(motor.StateUpdate, update).port,
io.CommandUpdate: lambda update: cast(io.CommandUpdate, update).port,
analog.Update: lambda update: cast(analog.Update, update).port,
digital.Update: lambda update: cast(digital.Update, update).port,
motor.CommandUpdate: lambda update: cast(motor.CommandUpdate, update).port,
motor.StateUpdate: lambda update: cast(motor.StateUpdate, update).port,
servo.CommandUpdate: lambda update: cast(servo.CommandUpdate, update).port,
process.StreamUpdate: lambda update: cast(process.StreamUpdate, update).pid,
process.ExitUpdate: lambda update: cast(process.ExitUpdate, update).pid,
} # type: Dict[Type[Message], Callable[[Message], Any]]

@staticmethod
def _update_key(update: Message) -> Tuple[Type[Message], Any]:
def _update_key(update: Message) -> UpdateKey:
cls = type(update)
return cls, HandlerRegistry._update_keys[cls](update)

def __init__(self) -> None:
self._handlers = {} # type: Dict[Tuple[Type[Message], Any], EventHandler]
self._queue: Deque[Sequence[AsyncHandler]] = deque()
self._handlers = {} # type: Dict[UpdateKey, UpdateHandler]

def prepare_register(self, handlers: Sequence[AsyncHandler]):
self._queue.append(handlers)

def register(self, handlers: Sequence[EventHandler], replies: Sequence[Message]):
def complete_register(self, replies: Sequence[Message]):
handlers = self._queue.popleft()
assert len(handlers) == len(replies)
for handler, reply in zip(handlers, replies):
if handler is None:
continue
if isinstance(reply, ack.Acknowledgement) and reply.code != ack.OK:
handler.close()
# we simply don't call the EventHandler and everything is fine
continue

next(handler)
events = handler.send(reply)
for event in events:
if event in self._handlers:
self._handlers[event].close()
self._handlers[event] = handler
update_keys, update_handler = handler(reply)
for update_key in update_keys:
if update_key in self._handlers:
self._handlers[update_key].close()
self._handlers[update_key] = update_handler

def handle_async(self, updates: Sequence[Message]) -> None:
def handle_updates(self, updates: Sequence[Message]) -> None:
for update in updates:
event = HandlerRegistry._update_key(update)
if event in self._handlers:
self._handlers[event].send(update)
update_key = HandlerRegistry._update_key(update)
if update_key in self._handlers:
self._handlers[update_key].send(update)

def shutdown(self) -> None:
for handler in self._handlers.values():
Expand Down

0 comments on commit 8bf6dcc

Please sign in to comment.