From 63f157a4f0a797b445fef283554dc1a7da518263 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Tue, 14 Oct 2025 17:30:41 -0700 Subject: [PATCH 1/4] automatic_surface_following --- .../backends/hamilton/STAR_backend.py | 91 +++++++++++++++---- pylabrobot/resources/container.py | 3 + 2 files changed, 77 insertions(+), 17 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 4a233f07a27..998b0cbbe65 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1643,11 +1643,13 @@ async def aspirate( min_z_endpos: Optional[float] = None, hamilton_liquid_classes: Optional[List[Optional[HamiltonLiquidClass]]] = None, liquid_surfaces_no_lld: Optional[List[float]] = None, - # remove > 2026-01 - immersion_depth_direction: Optional[List[int]] = None, + # PLR: + automatic_surface_following: Optional[List[bool]] = None, + # remove >2026-01 mix_volume: Optional[List[float]] = None, mix_cycles: Optional[List[int]] = None, mix_speed: Optional[List[float]] = None, + immersion_depth_direction: Optional[List[int]] = None, ): """Aspirate liquid from the specified channels. @@ -1667,9 +1669,8 @@ async def aspirate( pull_out_distance_transport_air: The distance to pull out when aspirating air, if LLD is disabled. second_section_height: The height to start the second section of aspiration. - second_section_ratio: The ratio of [the bottom of the container * 10000] / [the height top of the container]. - minimum_height: The minimum height to move to, this is the end of aspiration. The channel - will move linearly from the liquid surface to this height over the course of the aspiration. + second_section_ratio: + minimum_height: The minimum height to move to, this is the end of aspiration. The channel will move linearly from the liquid surface to this height over the course of the aspiration. immersion_depth: The z distance to move after detecting the liquid, can be into or away from the liquid surface. surface_following_distance: The distance to follow the liquid surface. transport_air_volume: The volume of air to aspirate after the liquid. @@ -1677,10 +1678,8 @@ async def aspirate( lld_mode: The liquid level detection mode to use. gamma_lld_sensitivity: The sensitivity of the gamma LLD. dp_lld_sensitivity: The sensitivity of the DP LLD. - aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above - the bottom of the well (presumably) to aspirate from. - detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if - the LLD mode is DUAL. + aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above the bottom of the well (presumably) to aspirate from. + detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if the LLD mode is DUAL. swap_speed: Swap speed (on leaving liquid) [1mm/s]. Must be between 3 and 1600. Default 100. settling_time: The time to wait after mix. mix_position_from_liquid_surface: The height to aspirate from for mix (LLD or absolute terms). @@ -1694,17 +1693,15 @@ async def aspirate( z_drive_speed_during_2nd_section_search: Unknown. cup_upper_edge: Unknown. ratio_liquid_rise_to_tip_deep_in: Unknown. - immersion_depth_2nd_section: The depth to move into the liquid for the second section of - aspiration. + immersion_depth_2nd_section: The depth to move into the liquid for the second section of aspiration. - minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before - starting an aspiration. + minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before starting an aspiration. min_z_endpos: The minimum height to move to, this is the end of aspiration. - hamilton_liquid_classes: Override the default liquid classes. See - pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py - liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 - and 360. Defaults to well bottom + liquid height. Should use absolute z. + hamilton_liquid_classes: Override the default liquid classes. See pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py + liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 and 360. Defaults to well bottom + liquid height. Should use absolute z. + + automatic_surface_following: PLR-specific parameter. If True, surface_following_distance will be automatically adjusted based on the detected liquid height and the height->volume function of the Container. `lld_mode` must not be SET for this parameter to be used. """ # # # TODO: delete > 2026-01 # # # @@ -1859,6 +1856,66 @@ async def aspirate( ratio_liquid_rise_to_tip_deep_in = _fill_in_defaults(ratio_liquid_rise_to_tip_deep_in, [0] * n) immersion_depth_2nd_section = _fill_in_defaults(immersion_depth_2nd_section, [0.0] * n) + if automatic_surface_following is None: + automatic_surface_following = [False] * n + + if any(automatic_surface_following): + channels_with_automatic_surface_following = [ + use_channels[i] for i in range(n) if automatic_surface_following[i] + ] + + containers = [op.resource for op in ops] + if any( + not containers[i].supports_compute_volume_from_height() + for i in range(n) + if automatic_surface_following[i] + ): + raise ValueError( + "automatic_surface_following can only be used with containers that support compute_volume_from_height()." + ) + + # move channels to above their positions + await self.position_channels_in_y_direction( + {channel: y for channel, y in zip(use_channels, y_positions)} + ) + + # detect liquid heights + current_liquid_heights = await asyncio.gather( + *[ + self.clld_probe_z_height_using_channel( + channel_idx=channel, move_channels_to_save_pos_after=False + ) + for channel in use_channels + ] + ) + + current_volumes = [ + container.compute_volume_from_height(height=lh) + for container, lh in zip(containers, current_liquid_heights) + ] + + # compute new liquid_height after aspiration + after_aspiration_liquid_heights = [ + op.resource.compute_height_from_volume(volume - op.volume) + for volume, op in zip(current_volumes, ops) + ] + + # compute new surface_following_distance + surface_following_distance = [ + (after_aspiration_liquid_heights[i] - current_liquid_heights[i]) + if automatic_surface_following[i] + else surface_following_distance[i] + for i in range(n) + ] + + # check if the surface_following_distance would fall below the minimum height + for i in range(n): + if (well_bottoms[i] + surface_following_distance[i]) < minimum_height[i]: + raise ValueError( + f"automatic_surface_following would result in a surface_following_distance that goes below the minimum_height. " + f"Well bottom: {well_bottoms[i]}, surface_following_distance: {surface_following_distance[i]}, minimum_height: {minimum_height[i]}" + ) + try: return await self.aspirate_pip( aspiration_type=[0 for _ in range(n)], diff --git a/pylabrobot/resources/container.py b/pylabrobot/resources/container.py index b24f5981717..3a2af5cf079 100644 --- a/pylabrobot/resources/container.py +++ b/pylabrobot/resources/container.py @@ -69,6 +69,9 @@ def serialize_state(self) -> Dict[str, Any]: def load_state(self, state: Dict[str, Any]): self.tracker.load_state(state) + def supports_compute_volume_from_height(self) -> bool: + return self._compute_volume_from_height is not None + def compute_volume_from_height(self, height: float) -> float: """Compute the volume of liquid in a container from the height of the liquid relative to the bottom of the container.""" From 8c78dbe0a317ec921f4c7039215d2ee3e782dab8 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Tue, 14 Oct 2025 19:11:07 -0700 Subject: [PATCH 2/4] STARBackend.{aspirate,disense} probe_liquid_height parameter --- .../backends/hamilton/STAR_backend.py | 219 +++++++++++------- pylabrobot/resources/container.py | 6 +- 2 files changed, 137 insertions(+), 88 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 998b0cbbe65..d50c8fc7e92 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1605,6 +1605,87 @@ class LLDMode(enum.Enum): DUAL = 3 Z_TOUCH_OFF = 4 + async def probe_liquid_heights( + self, + resources: List[Resource], + use_channels: List[int], + tips: List[HamiltonTip], + resource_offsets: Optional[List[Coordinate]] = None, + move_to_z_safety_after: bool = True, + ): + """Probe liquid heights for the specified channels. + + Returns the liquid height in each well in mm with respect to the bottom of the container cavity. + """ + + if any(not resource.supports_compute_height_volume_functions() for resource in resources): + raise ValueError( + "automatic_surface_following can only be used with containers that support height<->volume functions." + ) + + resource_offsets = resource_offsets or [Coordinate.zero()] * len(resources) + + assert len(resources) == len(use_channels) == len(resource_offsets) == len(tips) + + # if the liquid height is not specified, we need to detect it using CLLD + async def try_clld(channel, container, tip): + try: + return await self.clld_probe_z_height_using_channel( + channel_idx=channel, + move_channels_to_save_pos_after=False, + lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z + + tip.total_tip_length + - tip.fitting_depth, + start_pos_search=container.get_absolute_location("c", "c", "t").z + + tip.total_tip_length + - tip.fitting_depth + + 5, + ) + except STARFirmwareError as e: + print(f"Channel {channel} in well {container.name} failed: {e}") + return None + + await self.move_all_channels_in_z_safety() + + # Check if all channels are on the same x position, then move there + x_pos = [ + resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x + for resource, offset in zip(resources, resource_offsets) + ] + if len(set(x_pos)) > 1: + raise NotImplementedError( + "automatic_surface_following is not supported for multiple x positions." + ) + await self.move_channel_x(0, x_pos[0]) + + # move channels to above their y positions + y_pos = [ + resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y + for resource, offset in zip(resources, resource_offsets) + ] + await self.position_channels_in_y_direction( + {channel: y for channel, y in zip(use_channels, y_pos)} + ) + + # detect liquid heights + current_absolute_liquid_heights = await asyncio.gather( + *[ + try_clld(channel, resource, tip) + for channel, resource, tip in zip(use_channels, resources, tips) + ] + ) + + relative_to_well = [ + current_absolute_liquid_heights[i] + - resource.get_absolute_location("c", "c", "cavity_bottom").z + for i, resource in enumerate(resources) + ] + + if move_to_z_safety_after: + await self.move_all_channels_in_z_safety() + + return relative_to_well + async def aspirate( self, ops: List[SingleChannelAspiration], @@ -1644,7 +1725,7 @@ async def aspirate( hamilton_liquid_classes: Optional[List[Optional[HamiltonLiquidClass]]] = None, liquid_surfaces_no_lld: Optional[List[float]] = None, # PLR: - automatic_surface_following: Optional[List[bool]] = None, + probe_liquid_height: bool = False, # remove >2026-01 mix_volume: Optional[List[float]] = None, mix_cycles: Optional[List[int]] = None, @@ -1701,7 +1782,7 @@ async def aspirate( hamilton_liquid_classes: Override the default liquid classes. See pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 and 360. Defaults to well bottom + liquid height. Should use absolute z. - automatic_surface_following: PLR-specific parameter. If True, surface_following_distance will be automatically adjusted based on the detected liquid height and the height->volume function of the Container. `lld_mode` must not be SET for this parameter to be used. + probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function. """ # # # TODO: delete > 2026-01 # # # @@ -1761,9 +1842,6 @@ async def aspirate( op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness for op in ops ] - liquid_surfaces_no_lld = liquid_surfaces_no_lld or [ - wb + (op.liquid_height or 0) for wb, op in zip(well_bottoms, ops) - ] if lld_search_height is None: lld_search_height = [ ( @@ -1856,65 +1934,24 @@ async def aspirate( ratio_liquid_rise_to_tip_deep_in = _fill_in_defaults(ratio_liquid_rise_to_tip_deep_in, [0] * n) immersion_depth_2nd_section = _fill_in_defaults(immersion_depth_2nd_section, [0.0] * n) - if automatic_surface_following is None: - automatic_surface_following = [False] * n - - if any(automatic_surface_following): - channels_with_automatic_surface_following = [ - use_channels[i] for i in range(n) if automatic_surface_following[i] - ] - - containers = [op.resource for op in ops] - if any( - not containers[i].supports_compute_volume_from_height() - for i in range(n) - if automatic_surface_following[i] - ): - raise ValueError( - "automatic_surface_following can only be used with containers that support compute_volume_from_height()." - ) - - # move channels to above their positions - await self.position_channels_in_y_direction( - {channel: y for channel, y in zip(use_channels, y_positions)} - ) - - # detect liquid heights - current_liquid_heights = await asyncio.gather( - *[ - self.clld_probe_z_height_using_channel( - channel_idx=channel, move_channels_to_save_pos_after=False - ) - for channel in use_channels - ] + if probe_liquid_height: + liquid_heights = await self.probe_liquid_heights( + resources=[op.resource for op in ops], + use_channels=use_channels, + tips=[op.tip for op in ops], + resource_offsets=[op.offset for op in ops], + move_to_z_safety_after=False, ) - current_volumes = [ - container.compute_volume_from_height(height=lh) - for container, lh in zip(containers, current_liquid_heights) - ] - - # compute new liquid_height after aspiration - after_aspiration_liquid_heights = [ - op.resource.compute_height_from_volume(volume - op.volume) - for volume, op in zip(current_volumes, ops) - ] - - # compute new surface_following_distance - surface_following_distance = [ - (after_aspiration_liquid_heights[i] - current_liquid_heights[i]) - if automatic_surface_following[i] - else surface_following_distance[i] - for i in range(n) - ] + # override minimum traversal height because we don't want to move channels up. we are already above the liquid. + minimum_traverse_height_at_beginning_of_a_command = 100 + logger.info(f"Detected liquid heights: {liquid_heights}") + else: + liquid_heights = [op.liquid_height or 0 for op in ops] - # check if the surface_following_distance would fall below the minimum height - for i in range(n): - if (well_bottoms[i] + surface_following_distance[i]) < minimum_height[i]: - raise ValueError( - f"automatic_surface_following would result in a surface_following_distance that goes below the minimum_height. " - f"Well bottom: {well_bottoms[i]}, surface_following_distance: {surface_following_distance[i]}, minimum_height: {minimum_height[i]}" - ) + liquid_surfaces_no_lld = liquid_surfaces_no_lld or [ + wb + lh for wb, lh in zip(well_bottoms, liquid_heights) + ] try: return await self.aspirate_pip( @@ -2013,6 +2050,8 @@ async def dispense( jet: Optional[List[bool]] = None, blow_out: Optional[List[bool]] = None, # "empty" in the VENUS liquid editor empty: Optional[List[bool]] = None, # truly "empty", does not exist in liquid editor, dm4 + # PLR specific + probe_liquid_height: bool = False, # remove in the future immersion_depth_direction: Optional[List[int]] = None, mix_volume: Optional[List[float]] = None, @@ -2067,6 +2106,8 @@ async def dispense( empty: Whether to use "empty" dispense mode for each dispense. Defaults to `False` for all. Truly empty the tip, not available in the VENUS liquid editor, but is in the firmware documentation. Dispense mode 4. + + probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function. """ n = len(ops) @@ -2140,9 +2181,6 @@ async def dispense( op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness for op in ops ] - liquid_surfaces_no_lld = liquid_surface_no_lld or [ - ls + (op.liquid_height or 0) for ls, op in zip(well_bottoms, ops) - ] if lld_search_height is None: lld_search_height = [ ( @@ -2215,6 +2253,25 @@ async def dispense( mix_surface_following_distance = _fill_in_defaults(mix_surface_following_distance, [0.0] * n) limit_curve_index = _fill_in_defaults(limit_curve_index, [0] * n) + if probe_liquid_height: + liquid_heights = await self.probe_liquid_heights( + resources=[op.resource for op in ops], + use_channels=use_channels, + tips=[op.tip for op in ops], + resource_offsets=[op.offset for op in ops], + move_to_z_safety_after=False, + ) + + # override minimum traversal height because we don't want to move channels up. we are already above the liquid. + minimum_traverse_height_at_beginning_of_a_command = 100 + logger.info(f"Detected liquid heights: {liquid_heights}") + else: + liquid_heights = [op.liquid_height or 0 for op in ops] + + liquid_surfaces_no_lld = liquid_surface_no_lld or [ + wb + lh for wb, lh in zip(well_bottoms, liquid_heights) + ] + try: ret = await self.dispense_pip( tip_pattern=channels_involved, @@ -7921,17 +7978,15 @@ async def clld_probe_z_height_using_channel( Args: channel_idx: The index of the channel to use for probing. Backmost channel = 0. - lowest_immers_pos: The lowest immersion position in mm. - start_pos_lld_search: The start position for z-touch search in mm. + lowest_immers_pos: The lowest immersion position in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth. + start_pos_lld_search: The start position for z-touch search in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth. channel_speed: The speed of channel movement in mm/sec. channel_acceleration: The acceleration of the channel in mm/sec**2. detection_edge: The edge steepness at capacitive LLD detection. detection_drop: The offset after capacitive LLD edge detection. - post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after - contacting the surface. + post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after contacting the surface. post_detection_dist: Distance to move into the trajectory after detection in mm. - move_channels_to_save_pos_after: Flag to move channels to a safe position after - operation. + move_channels_to_save_pos_after: Flag to move channels to a safe position after operation. Returns: The detected Z-height in mm. @@ -7972,26 +8027,18 @@ async def clld_probe_z_height_using_channel( + f" and {STARBackend.z_drive_increment_to_mm(9_999)} mm, is {post_detection_dist} mm" ) - lowest_immers_pos_str = f"{lowest_immers_pos_increments:05}" - start_pos_search_str = f"{start_pos_search_increments:05}" - channel_speed_str = f"{channel_speed_increments:05}" - channel_acc_str = f"{channel_acceleration_thousand_increments:03}" - detection_edge_str = f"{detection_edge:04}" - detection_drop_str = f"{detection_drop:04}" - post_detection_dist_str = f"{post_detection_dist_increments:04}" - try: await self.send_command( module=STARBackend.channel_id(channel_idx), command="ZL", - zh=lowest_immers_pos_str, # Lowest immersion position [increment] - zc=start_pos_search_str, # Start position of LLD search [increment] - zl=channel_speed_str, # Speed of channel movement - zr=channel_acc_str, # Acceleration [1000 increment/second^2] - gt=detection_edge_str, # Edge steepness at capacitive LLD detection - gl=detection_drop_str, # Offset after capacitive LLD edge detection + zh=f"{lowest_immers_pos_increments:05}", # Lowest immersion position [increment] + zc=f"{start_pos_search_increments:05}", # Start position of LLD search [increment] + zl=f"{channel_speed_increments:05}", # Speed of channel movement + zr=f"{channel_acceleration_thousand_increments:03}", # Acceleration [1000 increment/second^2] + gt=f"{detection_edge:04}", # Edge steepness at capacitive LLD detection + gl=f"{detection_drop:04}", # Offset after capacitive LLD edge detection zj=post_detection_trajectory, # Movement of the channel after contacting surface - zi=post_detection_dist_str, # Distance to move up after detection [increment] + zi=f"{post_detection_dist_increments:04}", # Distance to move up after detection [increment] ) except STARFirmwareError: await self.move_all_channels_in_z_safety() @@ -8314,7 +8361,7 @@ async def position_channels_in_y_direction(self, ys: Dict[int, float], make_spac raise ValueError("Channel N would hit the front of the robot") if not all( - int((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_999 # float fixing + round((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_990 # float fixing for i in range(len(channel_locations) - 1) ): raise ValueError("Channels must be at least 9mm apart and in descending order") diff --git a/pylabrobot/resources/container.py b/pylabrobot/resources/container.py index 3a2af5cf079..161e2a7426e 100644 --- a/pylabrobot/resources/container.py +++ b/pylabrobot/resources/container.py @@ -69,8 +69,10 @@ def serialize_state(self) -> Dict[str, Any]: def load_state(self, state: Dict[str, Any]): self.tracker.load_state(state) - def supports_compute_volume_from_height(self) -> bool: - return self._compute_volume_from_height is not None + def supports_compute_height_volume_functions(self) -> bool: + return ( + self._compute_volume_from_height is not None and self._compute_height_from_volume is not None + ) def compute_volume_from_height(self, height: float) -> float: """Compute the volume of liquid in a container from the height of the liquid relative to the From 0032f30ac14c64b973f1ee6175ad6bb402eef437 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Tue, 14 Oct 2025 21:36:29 -0700 Subject: [PATCH 3/4] typecheck --- .../backends/hamilton/STAR_backend.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index d50c8fc7e92..0a41c98423b 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -52,6 +52,7 @@ ) from pylabrobot.resources import ( Carrier, + Container, Coordinate, Plate, Resource, @@ -1607,7 +1608,7 @@ class LLDMode(enum.Enum): async def probe_liquid_heights( self, - resources: List[Resource], + containers: List[Container], use_channels: List[int], tips: List[HamiltonTip], resource_offsets: Optional[List[Coordinate]] = None, @@ -1618,14 +1619,14 @@ async def probe_liquid_heights( Returns the liquid height in each well in mm with respect to the bottom of the container cavity. """ - if any(not resource.supports_compute_height_volume_functions() for resource in resources): + if any(not resource.supports_compute_height_volume_functions() for resource in containers): raise ValueError( "automatic_surface_following can only be used with containers that support height<->volume functions." ) - resource_offsets = resource_offsets or [Coordinate.zero()] * len(resources) + resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers) - assert len(resources) == len(use_channels) == len(resource_offsets) == len(tips) + assert len(containers) == len(use_channels) == len(resource_offsets) == len(tips) # if the liquid height is not specified, we need to detect it using CLLD async def try_clld(channel, container, tip): @@ -1650,7 +1651,7 @@ async def try_clld(channel, container, tip): # Check if all channels are on the same x position, then move there x_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x - for resource, offset in zip(resources, resource_offsets) + for resource, offset in zip(containers, resource_offsets) ] if len(set(x_pos)) > 1: raise NotImplementedError( @@ -1661,7 +1662,7 @@ async def try_clld(channel, container, tip): # move channels to above their y positions y_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y - for resource, offset in zip(resources, resource_offsets) + for resource, offset in zip(containers, resource_offsets) ] await self.position_channels_in_y_direction( {channel: y for channel, y in zip(use_channels, y_pos)} @@ -1671,14 +1672,14 @@ async def try_clld(channel, container, tip): current_absolute_liquid_heights = await asyncio.gather( *[ try_clld(channel, resource, tip) - for channel, resource, tip in zip(use_channels, resources, tips) + for channel, resource, tip in zip(use_channels, containers, tips) ] ) relative_to_well = [ current_absolute_liquid_heights[i] - resource.get_absolute_location("c", "c", "cavity_bottom").z - for i, resource in enumerate(resources) + for i, resource in enumerate(containers) ] if move_to_z_safety_after: @@ -1936,9 +1937,9 @@ async def aspirate( if probe_liquid_height: liquid_heights = await self.probe_liquid_heights( - resources=[op.resource for op in ops], + containers=[op.resource for op in ops], use_channels=use_channels, - tips=[op.tip for op in ops], + tips=[cast(HamiltonTip, op.tip) for op in ops], resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, ) @@ -2255,9 +2256,9 @@ async def dispense( if probe_liquid_height: liquid_heights = await self.probe_liquid_heights( - resources=[op.resource for op in ops], + containers=[op.resource for op in ops], use_channels=use_channels, - tips=[op.tip for op in ops], + tips=[cast(HamiltonTip, op.tip) for op in ops], resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, ) From 7da071b5670fa1dedd7fd168a2155aeb03a3050b Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Tue, 14 Oct 2025 21:44:49 -0700 Subject: [PATCH 4/4] liquid_height can't be set, probe_liquid_heights will raise if no liquid found --- .../backends/hamilton/STAR_backend.py | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 0a41c98423b..504f5f2e95c 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1613,10 +1613,14 @@ async def probe_liquid_heights( tips: List[HamiltonTip], resource_offsets: Optional[List[Coordinate]] = None, move_to_z_safety_after: bool = True, - ): + ) -> List[float]: """Probe liquid heights for the specified channels. + Moves the channels to the x and y positions of the containers, then probes the liquid height + using the CLLD function. + Returns the liquid height in each well in mm with respect to the bottom of the container cavity. + Returns `None` for channels where the liquid height could not be determined. """ if any(not resource.supports_compute_height_volume_functions() for resource in containers): @@ -1628,24 +1632,6 @@ async def probe_liquid_heights( assert len(containers) == len(use_channels) == len(resource_offsets) == len(tips) - # if the liquid height is not specified, we need to detect it using CLLD - async def try_clld(channel, container, tip): - try: - return await self.clld_probe_z_height_using_channel( - channel_idx=channel, - move_channels_to_save_pos_after=False, - lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z - + tip.total_tip_length - - tip.fitting_depth, - start_pos_search=container.get_absolute_location("c", "c", "t").z - + tip.total_tip_length - - tip.fitting_depth - + 5, - ) - except STARFirmwareError as e: - print(f"Channel {channel} in well {container.name} failed: {e}") - return None - await self.move_all_channels_in_z_safety() # Check if all channels are on the same x position, then move there @@ -1671,8 +1657,18 @@ async def try_clld(channel, container, tip): # detect liquid heights current_absolute_liquid_heights = await asyncio.gather( *[ - try_clld(channel, resource, tip) - for channel, resource, tip in zip(use_channels, containers, tips) + self.clld_probe_z_height_using_channel( + channel_idx=channel, + move_channels_to_save_pos_after=False, + lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z + + tip.total_tip_length + - tip.fitting_depth, + start_pos_search=container.get_absolute_location("c", "c", "t").z + + tip.total_tip_length + - tip.fitting_depth + + 5, + ) + for channel, container, tip in zip(use_channels, containers, tips) ] ) @@ -1936,6 +1932,9 @@ async def aspirate( immersion_depth_2nd_section = _fill_in_defaults(immersion_depth_2nd_section, [0.0] * n) if probe_liquid_height: + if any(op.liquid_height is not None for op in ops): + raise ValueError("Cannot use probe_liquid_height when liquid heights are set.") + liquid_heights = await self.probe_liquid_heights( containers=[op.resource for op in ops], use_channels=use_channels, @@ -2255,6 +2254,9 @@ async def dispense( limit_curve_index = _fill_in_defaults(limit_curve_index, [0] * n) if probe_liquid_height: + if any(op.liquid_height is not None for op in ops): + raise ValueError("Cannot use probe_liquid_height when liquid heights are set.") + liquid_heights = await self.probe_liquid_heights( containers=[op.resource for op in ops], use_channels=use_channels, @@ -2286,7 +2288,7 @@ async def dispense( second_section_height=[round(sh * 10) for sh in second_section_height], second_section_ratio=[round(sr * 10) for sr in second_section_ratio], minimum_height=[round(mh * 10) for mh in minimum_height], - immersion_depth=[round(id_ * 10) for id_ in immersion_depth], # [0, 0] + immersion_depth=[round(id_ * 10) for id_ in immersion_depth], immersion_depth_direction=immersion_depth_direction, surface_following_distance=[round(sfd * 10) for sfd in surface_following_distance], dispense_speed=[round(fr * 10) for fr in flow_rates],