Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions pylabrobot/liquid_handling/backends/hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(
self.id_ = 0

self._reading_thread: Optional[threading.Thread] = None
self._reading_thread_stop = threading.Event()
self._waiting_tasks: List[HamiltonTask] = []
self._tth2tti: dict[int, int] = {} # hash to tip type index

Expand All @@ -101,10 +102,19 @@ def __setattr__(self, name: str, value: Any) -> None:
async def setup(self):
await super().setup()
await self.io.setup()
self._reading_thread_stop.clear()
self._reading_thread = threading.Thread(target=self._reading_thread_main, daemon=True)
self._reading_thread.start()

async def stop(self):
self._reading_thread_stop.set()
if self._reading_thread is not None:
self._reading_thread.join(timeout=10)
self._reading_thread = None
for task in self._waiting_tasks:
task.fut.set_exception(RuntimeError("Stopping HamiltonLiquidHandler."))
task.loop.call_soon_threadsafe(
task.fut.set_exception, RuntimeError("Stopping HamiltonLiquidHandler.")
)
self._waiting_tasks.clear()
self._tth2tti.clear()
await self.io.stop()
Expand Down Expand Up @@ -296,16 +306,16 @@ def _start_reading(
cmd: str,
timeout: int,
) -> None:
"""Submit a task to the reading thread. Starts reading thread if it is not already running."""
"""Submit a task to the reading thread."""

timeout_time = time.time() + timeout
self._waiting_tasks.append(
HamiltonTask(id_=id_, loop=loop, fut=fut, cmd=cmd, timeout_time=timeout_time)
)

# Start reading thread if it is not already running.
if len(self._waiting_tasks) == 1: # self._reading_thread is None
self._reading_thread = threading.Thread(target=self._reading_thread_main)
if self._reading_thread is None or not self._reading_thread.is_alive():
self._reading_thread_stop.clear()
self._reading_thread = threading.Thread(target=self._reading_thread_main, daemon=True)
self._reading_thread.start()

@abstractmethod
Expand All @@ -326,7 +336,7 @@ def _reading_thread_main(self) -> None:
loop.run_until_complete(self._continuously_read())

async def _continuously_read(self) -> None:
"""Continuously read from the USB port until all tasks are completed.
"""Continuously read from the USB port until stop is requested.

Tasks are stored in the `self._waiting_tasks` list, and contain a future that will be
completed when the task is finished. Tasks are submitted to the list using the
Expand All @@ -337,7 +347,7 @@ async def _continuously_read(self) -> None:
list. If a task has timed out, complete the future with a `TimeoutError`.
"""

while len(self._waiting_tasks) > 0:
while not self._reading_thread_stop.is_set():
for idx in range(len(self._waiting_tasks) - 1, -1, -1): # reverse order to allow deletion
task = self._waiting_tasks[idx]
if time.time() > task.timeout_time:
Expand All @@ -348,6 +358,10 @@ async def _continuously_read(self) -> None:
)
del self._waiting_tasks[idx]

if len(self._waiting_tasks) == 0:
await asyncio.sleep(0.01)
continue

try:
resp = (await self.io.read()).decode("utf-8")
except TimeoutError:
Expand Down Expand Up @@ -379,8 +393,6 @@ async def _continuously_read(self) -> None:
del self._waiting_tasks[idx]
break

self._reading_thread = None

def _ops_to_fw_positions(
self, ops: Sequence[PipettingOp], use_channels: List[int]
) -> Tuple[List[int], List[int], List[bool]]:
Expand Down
Loading