diff --git a/pylabrobot/liquid_handling/backends/hamilton/base.py b/pylabrobot/liquid_handling/backends/hamilton/base.py index ce93d43b9fd..73ff83be6f7 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/base.py +++ b/pylabrobot/liquid_handling/backends/hamilton/base.py @@ -84,6 +84,7 @@ def __init__( self.id_ = 0 self._reading_thread: Optional[threading.Thread] = None + self._reading_thread_stop = threading.Event() self._waiting_tasks: List[HamiltonTask] = [] self._tth2tti: dict[int, int] = {} # hash to tip type index @@ -101,10 +102,19 @@ def __setattr__(self, name: str, value: Any) -> None: async def setup(self): await super().setup() await self.io.setup() + self._reading_thread_stop.clear() + self._reading_thread = threading.Thread(target=self._reading_thread_main, daemon=True) + self._reading_thread.start() async def stop(self): + self._reading_thread_stop.set() + if self._reading_thread is not None: + self._reading_thread.join(timeout=10) + self._reading_thread = None for task in self._waiting_tasks: - task.fut.set_exception(RuntimeError("Stopping HamiltonLiquidHandler.")) + task.loop.call_soon_threadsafe( + task.fut.set_exception, RuntimeError("Stopping HamiltonLiquidHandler.") + ) self._waiting_tasks.clear() self._tth2tti.clear() await self.io.stop() @@ -296,16 +306,16 @@ def _start_reading( cmd: str, timeout: int, ) -> None: - """Submit a task to the reading thread. Starts reading thread if it is not already running.""" + """Submit a task to the reading thread.""" timeout_time = time.time() + timeout self._waiting_tasks.append( HamiltonTask(id_=id_, loop=loop, fut=fut, cmd=cmd, timeout_time=timeout_time) ) - # Start reading thread if it is not already running. - if len(self._waiting_tasks) == 1: # self._reading_thread is None - self._reading_thread = threading.Thread(target=self._reading_thread_main) + if self._reading_thread is None or not self._reading_thread.is_alive(): + self._reading_thread_stop.clear() + self._reading_thread = threading.Thread(target=self._reading_thread_main, daemon=True) self._reading_thread.start() @abstractmethod @@ -326,7 +336,7 @@ def _reading_thread_main(self) -> None: loop.run_until_complete(self._continuously_read()) async def _continuously_read(self) -> None: - """Continuously read from the USB port until all tasks are completed. + """Continuously read from the USB port until stop is requested. Tasks are stored in the `self._waiting_tasks` list, and contain a future that will be completed when the task is finished. Tasks are submitted to the list using the @@ -337,7 +347,7 @@ async def _continuously_read(self) -> None: list. If a task has timed out, complete the future with a `TimeoutError`. """ - while len(self._waiting_tasks) > 0: + while not self._reading_thread_stop.is_set(): for idx in range(len(self._waiting_tasks) - 1, -1, -1): # reverse order to allow deletion task = self._waiting_tasks[idx] if time.time() > task.timeout_time: @@ -348,6 +358,10 @@ async def _continuously_read(self) -> None: ) del self._waiting_tasks[idx] + if len(self._waiting_tasks) == 0: + await asyncio.sleep(0.01) + continue + try: resp = (await self.io.read()).decode("utf-8") except TimeoutError: @@ -379,8 +393,6 @@ async def _continuously_read(self) -> None: del self._waiting_tasks[idx] break - self._reading_thread = None - def _ops_to_fw_positions( self, ops: Sequence[PipettingOp], use_channels: List[int] ) -> Tuple[List[int], List[int], List[bool]]: