diff --git a/pylabrobot/liquid_handling/backends/opentrons_backend.py b/pylabrobot/liquid_handling/backends/opentrons_backend.py index 9c4c0ca6f58..21d56643d75 100644 --- a/pylabrobot/liquid_handling/backends/opentrons_backend.py +++ b/pylabrobot/liquid_handling/backends/opentrons_backend.py @@ -1,5 +1,5 @@ import uuid -from typing import Dict, List, Optional, Union, cast +from typing import Dict, List, Optional, Tuple, Union, cast from pylabrobot import utils from pylabrobot.liquid_handling.backends.backend import ( @@ -579,6 +579,61 @@ async def list_connected_modules(self) -> List[dict]: """List all connected temperature modules.""" return cast(List[dict], ot_api.modules.list_connected_modules()) + def _pipette_id_for_channel(self, channel: int) -> str: + pipettes = [] + if self.left_pipette is not None: + pipettes.append(self.left_pipette["pipetteId"]) + if self.right_pipette is not None: + pipettes.append(self.right_pipette["pipetteId"]) + if channel < 0 or channel >= len(pipettes): + raise NoChannelError(f"Channel {channel} not available on this OT-2 setup.") + return pipettes[channel] + + def _current_channel_position(self, channel: int) -> Tuple[str, Coordinate]: + """Return the pipette id and current coordinate for a given channel.""" + + pipette_id = self._pipette_id_for_channel(channel) + try: + res = ot_api.lh.save_position(pipette_id=pipette_id) + pos = res["data"]["result"]["position"] + current = Coordinate(pos["x"], pos["y"], pos["z"]) + except Exception as exc: # noqa: BLE001 + raise RuntimeError("Failed to query current pipette position") from exc + + return pipette_id, current + + async def prepare_for_manual_channel_operation(self, channel: int): + """Validate channel exists (no-op otherwise for OT-2).""" + + _ = self._pipette_id_for_channel(channel) + + async def move_channel_x(self, channel: int, x: float): + """Move a channel to an absolute x coordinate using savePosition to seed pose.""" + + pipette_id, current = self._current_channel_position(channel) + target = Coordinate(x=x, y=current.y, z=current.z) + await self.move_pipette_head( + location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id + ) + + async def move_channel_y(self, channel: int, y: float): + """Move a channel to an absolute y coordinate using savePosition to seed pose.""" + + pipette_id, current = self._current_channel_position(channel) + target = Coordinate(x=current.x, y=y, z=current.z) + await self.move_pipette_head( + location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id + ) + + async def move_channel_z(self, channel: int, z: float): + """Move a channel to an absolute z coordinate using savePosition to seed pose.""" + + pipette_id, current = self._current_channel_position(channel) + target = Coordinate(x=current.x, y=current.y, z=z) + await self.move_pipette_head( + location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id + ) + async def move_pipette_head( self, location: Coordinate,