Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion pylabrobot/liquid_handling/backends/opentrons_backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from typing import Dict, List, Optional, Union, cast
from typing import Dict, List, Optional, Tuple, Union, cast

from pylabrobot import utils
from pylabrobot.liquid_handling.backends.backend import (
Expand Down Expand Up @@ -579,6 +579,61 @@ async def list_connected_modules(self) -> List[dict]:
"""List all connected temperature modules."""
return cast(List[dict], ot_api.modules.list_connected_modules())

def _pipette_id_for_channel(self, channel: int) -> str:
pipettes = []
if self.left_pipette is not None:
pipettes.append(self.left_pipette["pipetteId"])
if self.right_pipette is not None:
pipettes.append(self.right_pipette["pipetteId"])
if channel < 0 or channel >= len(pipettes):
raise NoChannelError(f"Channel {channel} not available on this OT-2 setup.")
return pipettes[channel]

def _current_channel_position(self, channel: int) -> Tuple[str, Coordinate]:
"""Return the pipette id and current coordinate for a given channel."""

pipette_id = self._pipette_id_for_channel(channel)
try:
res = ot_api.lh.save_position(pipette_id=pipette_id)
pos = res["data"]["result"]["position"]
current = Coordinate(pos["x"], pos["y"], pos["z"])
except Exception as exc: # noqa: BLE001
raise RuntimeError("Failed to query current pipette position") from exc

return pipette_id, current

async def prepare_for_manual_channel_operation(self, channel: int):
"""Validate channel exists (no-op otherwise for OT-2)."""

_ = self._pipette_id_for_channel(channel)

async def move_channel_x(self, channel: int, x: float):
"""Move a channel to an absolute x coordinate using savePosition to seed pose."""

pipette_id, current = self._current_channel_position(channel)
target = Coordinate(x=x, y=current.y, z=current.z)
await self.move_pipette_head(
location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id
)

async def move_channel_y(self, channel: int, y: float):
"""Move a channel to an absolute y coordinate using savePosition to seed pose."""

pipette_id, current = self._current_channel_position(channel)
target = Coordinate(x=current.x, y=y, z=current.z)
await self.move_pipette_head(
location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id
)

async def move_channel_z(self, channel: int, z: float):
"""Move a channel to an absolute z coordinate using savePosition to seed pose."""

pipette_id, current = self._current_channel_position(channel)
target = Coordinate(x=current.x, y=current.y, z=z)
await self.move_pipette_head(
location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id
)

async def move_pipette_head(
self,
location: Coordinate,
Expand Down