Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 153 additions & 46 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
)
from pylabrobot.resources import (
Carrier,
Container,
Coordinate,
Plate,
Resource,
Expand Down Expand Up @@ -1605,6 +1606,83 @@ class LLDMode(enum.Enum):
DUAL = 3
Z_TOUCH_OFF = 4

async def probe_liquid_heights(
self,
containers: List[Container],
use_channels: List[int],
tips: List[HamiltonTip],
resource_offsets: Optional[List[Coordinate]] = None,
move_to_z_safety_after: bool = True,
) -> List[float]:
"""Probe liquid heights for the specified channels.

Moves the channels to the x and y positions of the containers, then probes the liquid height
using the CLLD function.

Returns the liquid height in each well in mm with respect to the bottom of the container cavity.
Returns `None` for channels where the liquid height could not be determined.
"""

if any(not resource.supports_compute_height_volume_functions() for resource in containers):
raise ValueError(
"automatic_surface_following can only be used with containers that support height<->volume functions."
)

resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers)

assert len(containers) == len(use_channels) == len(resource_offsets) == len(tips)

await self.move_all_channels_in_z_safety()

# Check if all channels are on the same x position, then move there
x_pos = [
resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x
for resource, offset in zip(containers, resource_offsets)
]
if len(set(x_pos)) > 1:
raise NotImplementedError(
"automatic_surface_following is not supported for multiple x positions."
)
await self.move_channel_x(0, x_pos[0])

# move channels to above their y positions
y_pos = [
resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y
for resource, offset in zip(containers, resource_offsets)
]
await self.position_channels_in_y_direction(
{channel: y for channel, y in zip(use_channels, y_pos)}
)

# detect liquid heights
current_absolute_liquid_heights = await asyncio.gather(
*[
self.clld_probe_z_height_using_channel(
channel_idx=channel,
move_channels_to_save_pos_after=False,
lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z
+ tip.total_tip_length
- tip.fitting_depth,
start_pos_search=container.get_absolute_location("c", "c", "t").z
+ tip.total_tip_length
- tip.fitting_depth
+ 5,
)
for channel, container, tip in zip(use_channels, containers, tips)
]
)

relative_to_well = [
current_absolute_liquid_heights[i]
- resource.get_absolute_location("c", "c", "cavity_bottom").z
for i, resource in enumerate(containers)
]

if move_to_z_safety_after:
await self.move_all_channels_in_z_safety()

return relative_to_well

async def aspirate(
self,
ops: List[SingleChannelAspiration],
Expand Down Expand Up @@ -1643,11 +1721,13 @@ async def aspirate(
min_z_endpos: Optional[float] = None,
hamilton_liquid_classes: Optional[List[Optional[HamiltonLiquidClass]]] = None,
liquid_surfaces_no_lld: Optional[List[float]] = None,
# remove > 2026-01
immersion_depth_direction: Optional[List[int]] = None,
# PLR:
probe_liquid_height: bool = False,
# remove >2026-01
mix_volume: Optional[List[float]] = None,
mix_cycles: Optional[List[int]] = None,
mix_speed: Optional[List[float]] = None,
immersion_depth_direction: Optional[List[int]] = None,
):
"""Aspirate liquid from the specified channels.

Expand All @@ -1667,20 +1747,17 @@ async def aspirate(
pull_out_distance_transport_air: The distance to pull out when aspirating air, if LLD is
disabled.
second_section_height: The height to start the second section of aspiration.
second_section_ratio: The ratio of [the bottom of the container * 10000] / [the height top of the container].
minimum_height: The minimum height to move to, this is the end of aspiration. The channel
will move linearly from the liquid surface to this height over the course of the aspiration.
second_section_ratio:
minimum_height: The minimum height to move to, this is the end of aspiration. The channel will move linearly from the liquid surface to this height over the course of the aspiration.
immersion_depth: The z distance to move after detecting the liquid, can be into or away from the liquid surface.
surface_following_distance: The distance to follow the liquid surface.
transport_air_volume: The volume of air to aspirate after the liquid.
pre_wetting_volume: The volume of liquid to use for pre-wetting.
lld_mode: The liquid level detection mode to use.
gamma_lld_sensitivity: The sensitivity of the gamma LLD.
dp_lld_sensitivity: The sensitivity of the DP LLD.
aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above
the bottom of the well (presumably) to aspirate from.
detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if
the LLD mode is DUAL.
aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above the bottom of the well (presumably) to aspirate from.
detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if the LLD mode is DUAL.
swap_speed: Swap speed (on leaving liquid) [1mm/s]. Must be between 3 and 1600. Default 100.
settling_time: The time to wait after mix.
mix_position_from_liquid_surface: The height to aspirate from for mix (LLD or absolute terms).
Expand All @@ -1694,17 +1771,15 @@ async def aspirate(
z_drive_speed_during_2nd_section_search: Unknown.
cup_upper_edge: Unknown.
ratio_liquid_rise_to_tip_deep_in: Unknown.
immersion_depth_2nd_section: The depth to move into the liquid for the second section of
aspiration.
immersion_depth_2nd_section: The depth to move into the liquid for the second section of aspiration.

minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before
starting an aspiration.
minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before starting an aspiration.
min_z_endpos: The minimum height to move to, this is the end of aspiration.

hamilton_liquid_classes: Override the default liquid classes. See
pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py
liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0
and 360. Defaults to well bottom + liquid height. Should use absolute z.
hamilton_liquid_classes: Override the default liquid classes. See pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py
liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 and 360. Defaults to well bottom + liquid height. Should use absolute z.

probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function.
"""

# # # TODO: delete > 2026-01 # # #
Expand Down Expand Up @@ -1764,9 +1839,6 @@ async def aspirate(
op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness
for op in ops
]
liquid_surfaces_no_lld = liquid_surfaces_no_lld or [
wb + (op.liquid_height or 0) for wb, op in zip(well_bottoms, ops)
]
if lld_search_height is None:
lld_search_height = [
(
Expand Down Expand Up @@ -1859,6 +1931,28 @@ async def aspirate(
ratio_liquid_rise_to_tip_deep_in = _fill_in_defaults(ratio_liquid_rise_to_tip_deep_in, [0] * n)
immersion_depth_2nd_section = _fill_in_defaults(immersion_depth_2nd_section, [0.0] * n)

if probe_liquid_height:
if any(op.liquid_height is not None for op in ops):
raise ValueError("Cannot use probe_liquid_height when liquid heights are set.")

liquid_heights = await self.probe_liquid_heights(
containers=[op.resource for op in ops],
use_channels=use_channels,
tips=[cast(HamiltonTip, op.tip) for op in ops],
resource_offsets=[op.offset for op in ops],
move_to_z_safety_after=False,
)

# override minimum traversal height because we don't want to move channels up. we are already above the liquid.
minimum_traverse_height_at_beginning_of_a_command = 100
logger.info(f"Detected liquid heights: {liquid_heights}")
else:
liquid_heights = [op.liquid_height or 0 for op in ops]

liquid_surfaces_no_lld = liquid_surfaces_no_lld or [
wb + lh for wb, lh in zip(well_bottoms, liquid_heights)
]

try:
return await self.aspirate_pip(
aspiration_type=[0 for _ in range(n)],
Expand Down Expand Up @@ -1956,6 +2050,8 @@ async def dispense(
jet: Optional[List[bool]] = None,
blow_out: Optional[List[bool]] = None, # "empty" in the VENUS liquid editor
empty: Optional[List[bool]] = None, # truly "empty", does not exist in liquid editor, dm4
# PLR specific
probe_liquid_height: bool = False,
# remove in the future
immersion_depth_direction: Optional[List[int]] = None,
mix_volume: Optional[List[float]] = None,
Expand Down Expand Up @@ -2010,6 +2106,8 @@ async def dispense(
empty: Whether to use "empty" dispense mode for each dispense. Defaults to `False` for all.
Truly empty the tip, not available in the VENUS liquid editor, but is in the firmware
documentation. Dispense mode 4.

probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function.
"""

n = len(ops)
Expand Down Expand Up @@ -2083,9 +2181,6 @@ async def dispense(
op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness
for op in ops
]
liquid_surfaces_no_lld = liquid_surface_no_lld or [
ls + (op.liquid_height or 0) for ls, op in zip(well_bottoms, ops)
]
if lld_search_height is None:
lld_search_height = [
(
Expand Down Expand Up @@ -2158,6 +2253,28 @@ async def dispense(
mix_surface_following_distance = _fill_in_defaults(mix_surface_following_distance, [0.0] * n)
limit_curve_index = _fill_in_defaults(limit_curve_index, [0] * n)

if probe_liquid_height:
if any(op.liquid_height is not None for op in ops):
raise ValueError("Cannot use probe_liquid_height when liquid heights are set.")

liquid_heights = await self.probe_liquid_heights(
containers=[op.resource for op in ops],
use_channels=use_channels,
tips=[cast(HamiltonTip, op.tip) for op in ops],
resource_offsets=[op.offset for op in ops],
move_to_z_safety_after=False,
)

# override minimum traversal height because we don't want to move channels up. we are already above the liquid.
minimum_traverse_height_at_beginning_of_a_command = 100
logger.info(f"Detected liquid heights: {liquid_heights}")
else:
liquid_heights = [op.liquid_height or 0 for op in ops]

liquid_surfaces_no_lld = liquid_surface_no_lld or [
wb + lh for wb, lh in zip(well_bottoms, liquid_heights)
]

try:
ret = await self.dispense_pip(
tip_pattern=channels_involved,
Expand All @@ -2171,7 +2288,7 @@ async def dispense(
second_section_height=[round(sh * 10) for sh in second_section_height],
second_section_ratio=[round(sr * 10) for sr in second_section_ratio],
minimum_height=[round(mh * 10) for mh in minimum_height],
immersion_depth=[round(id_ * 10) for id_ in immersion_depth], # [0, 0]
immersion_depth=[round(id_ * 10) for id_ in immersion_depth],
immersion_depth_direction=immersion_depth_direction,
surface_following_distance=[round(sfd * 10) for sfd in surface_following_distance],
dispense_speed=[round(fr * 10) for fr in flow_rates],
Expand Down Expand Up @@ -7864,17 +7981,15 @@ async def clld_probe_z_height_using_channel(

Args:
channel_idx: The index of the channel to use for probing. Backmost channel = 0.
lowest_immers_pos: The lowest immersion position in mm.
start_pos_lld_search: The start position for z-touch search in mm.
lowest_immers_pos: The lowest immersion position in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth.
start_pos_lld_search: The start position for z-touch search in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth.
channel_speed: The speed of channel movement in mm/sec.
channel_acceleration: The acceleration of the channel in mm/sec**2.
detection_edge: The edge steepness at capacitive LLD detection.
detection_drop: The offset after capacitive LLD edge detection.
post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after
contacting the surface.
post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after contacting the surface.
post_detection_dist: Distance to move into the trajectory after detection in mm.
move_channels_to_save_pos_after: Flag to move channels to a safe position after
operation.
move_channels_to_save_pos_after: Flag to move channels to a safe position after operation.

Returns:
The detected Z-height in mm.
Expand Down Expand Up @@ -7915,26 +8030,18 @@ async def clld_probe_z_height_using_channel(
+ f" and {STARBackend.z_drive_increment_to_mm(9_999)} mm, is {post_detection_dist} mm"
)

lowest_immers_pos_str = f"{lowest_immers_pos_increments:05}"
start_pos_search_str = f"{start_pos_search_increments:05}"
channel_speed_str = f"{channel_speed_increments:05}"
channel_acc_str = f"{channel_acceleration_thousand_increments:03}"
detection_edge_str = f"{detection_edge:04}"
detection_drop_str = f"{detection_drop:04}"
post_detection_dist_str = f"{post_detection_dist_increments:04}"

try:
await self.send_command(
module=STARBackend.channel_id(channel_idx),
command="ZL",
zh=lowest_immers_pos_str, # Lowest immersion position [increment]
zc=start_pos_search_str, # Start position of LLD search [increment]
zl=channel_speed_str, # Speed of channel movement
zr=channel_acc_str, # Acceleration [1000 increment/second^2]
gt=detection_edge_str, # Edge steepness at capacitive LLD detection
gl=detection_drop_str, # Offset after capacitive LLD edge detection
zh=f"{lowest_immers_pos_increments:05}", # Lowest immersion position [increment]
zc=f"{start_pos_search_increments:05}", # Start position of LLD search [increment]
zl=f"{channel_speed_increments:05}", # Speed of channel movement
zr=f"{channel_acceleration_thousand_increments:03}", # Acceleration [1000 increment/second^2]
gt=f"{detection_edge:04}", # Edge steepness at capacitive LLD detection
gl=f"{detection_drop:04}", # Offset after capacitive LLD edge detection
zj=post_detection_trajectory, # Movement of the channel after contacting surface
zi=post_detection_dist_str, # Distance to move up after detection [increment]
zi=f"{post_detection_dist_increments:04}", # Distance to move up after detection [increment]
)
except STARFirmwareError:
await self.move_all_channels_in_z_safety()
Expand Down Expand Up @@ -8257,7 +8364,7 @@ async def position_channels_in_y_direction(self, ys: Dict[int, float], make_spac
raise ValueError("Channel N would hit the front of the robot")

if not all(
int((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_999 # float fixing
round((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_990 # float fixing
for i in range(len(channel_locations) - 1)
):
raise ValueError("Channels must be at least 9mm apart and in descending order")
Expand Down
5 changes: 5 additions & 0 deletions pylabrobot/resources/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def serialize_state(self) -> Dict[str, Any]:
def load_state(self, state: Dict[str, Any]):
self.tracker.load_state(state)

def supports_compute_height_volume_functions(self) -> bool:
return (
self._compute_volume_from_height is not None and self._compute_height_from_volume is not None
)

def compute_volume_from_height(self, height: float) -> float:
"""Compute the volume of liquid in a container from the height of the liquid relative to the
bottom of the container."""
Expand Down