From 67bea5dca130677d38e2be6354a695e1a9a218a0 Mon Sep 17 00:00:00 2001 From: Phil Underwood Date: Thu, 1 Jun 2023 23:12:43 +0100 Subject: [PATCH] Add timeouts --- laser_egismos.py | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/laser_egismos.py b/laser_egismos.py index b1130f6..ed725f2 100644 --- a/laser_egismos.py +++ b/laser_egismos.py @@ -27,6 +27,8 @@ __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/furbrain/CircuitPython_laser_egismos.git" +import time + try: from typing import Sequence, Tuple except ImportError: @@ -34,6 +36,8 @@ import busio +DEFAULT_TIMEOUT = 5.0 + class LaserError(RuntimeError): """ @@ -65,6 +69,12 @@ class TooBrightError(LaserError): """ +class LaserTimeOutError(LaserError): + """ + Laser took too long to respond + """ + + class _LaserBase: # pylint: disable=too-few-public-methods FRAME_END = 0xA8 @@ -81,16 +91,18 @@ class _LaserBase: READ_DEV_TYPE = 0x02 READ_SW_VERSION = 0x01 - def __init__(self, uart: busio.UART, address=0x01): + def __init__(self, uart: busio.UART, address=0x01, timeout=DEFAULT_TIMEOUT): """ Access an Egismos Laser distance module v2 :param ~busio.UART uart: uart to use to connect. Should have baud rate set to 9600 :param address: address to use, default is 0x01; you should only change this if using multiple devices + :param timeout: timeout to wait for a response from the device """ - self.uart = uart - self.address = address + self.uart: busio.UART = uart + self.address: int = address + self.timeout: float = timeout def _build_frame( self, command: int, address=None, data: Sequence[int] = None @@ -177,18 +189,23 @@ class Laser(_LaserBase): def _read_frame(self): # wait for an AA to start - # F.I.X.M.E. this can hang - needs timeout option + timeout_due = time.monotonic() + self.timeout buffer = b"\x00" while buffer[0] != self.FRAME_START: buffer = self.uart.read(1) or b"\x00" + if time.monotonic() > timeout_due: + raise LaserTimeOutError("Timed Out waiting for FRAME_START") while buffer[-1] != self.FRAME_END: buffer += self.uart.read(1) or b"" + if time.monotonic() > timeout_due: + raise LaserTimeOutError("Timed Out waiting for FRAME_END") return buffer def _send_and_receive( self, command: int, data: int = None, address: int = None ) -> bytes: frame = self._build_frame(command, address, data) + self.uart.reset_input_buffer() # clear input before writing self.uart.write(frame) frame = self._read_frame() read_data = self._process_frame(address, command, frame) @@ -278,12 +295,12 @@ class AsyncLaser(_LaserBase): Same as `Laser`, but with async methods, requires the `asyncio` module """ - def __init__(self, uart: busio.UART, address=0x01): + def __init__(self, uart: busio.UART, address=0x01, timeout=DEFAULT_TIMEOUT): # pylint: disable=import-outside-toplevel import asyncio uart.timeout = 0 - super().__init__(uart, address) + super().__init__(uart, address, timeout) self.async_reader = asyncio.StreamReader(uart) async def _read_frame(self): @@ -297,9 +314,15 @@ async def _read_frame(self): async def _send_and_receive( self, command: int, data: int = None, address: int = None ) -> bytes: + # pylint: disable=import-outside-toplevel + import asyncio + frame = self._build_frame(command, address, data) self.uart.write(frame) - frame = await self._read_frame() + try: + frame = await asyncio.wait_for(self._read_frame(), self.timeout) + except asyncio.TimeoutError as exc: + raise LaserTimeOutError("Did not receive response within timeout") from exc read_data = self._process_frame(address, command, frame) return read_data