diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 4a233f07a27..504f5f2e95c 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -52,6 +52,7 @@ ) from pylabrobot.resources import ( Carrier, + Container, Coordinate, Plate, Resource, @@ -1605,6 +1606,83 @@ class LLDMode(enum.Enum): DUAL = 3 Z_TOUCH_OFF = 4 + async def probe_liquid_heights( + self, + containers: List[Container], + use_channels: List[int], + tips: List[HamiltonTip], + resource_offsets: Optional[List[Coordinate]] = None, + move_to_z_safety_after: bool = True, + ) -> List[float]: + """Probe liquid heights for the specified channels. + + Moves the channels to the x and y positions of the containers, then probes the liquid height + using the CLLD function. + + Returns the liquid height in each well in mm with respect to the bottom of the container cavity. + Returns `None` for channels where the liquid height could not be determined. + """ + + if any(not resource.supports_compute_height_volume_functions() for resource in containers): + raise ValueError( + "automatic_surface_following can only be used with containers that support height<->volume functions." + ) + + resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers) + + assert len(containers) == len(use_channels) == len(resource_offsets) == len(tips) + + await self.move_all_channels_in_z_safety() + + # Check if all channels are on the same x position, then move there + x_pos = [ + resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x + for resource, offset in zip(containers, resource_offsets) + ] + if len(set(x_pos)) > 1: + raise NotImplementedError( + "automatic_surface_following is not supported for multiple x positions." + ) + await self.move_channel_x(0, x_pos[0]) + + # move channels to above their y positions + y_pos = [ + resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y + for resource, offset in zip(containers, resource_offsets) + ] + await self.position_channels_in_y_direction( + {channel: y for channel, y in zip(use_channels, y_pos)} + ) + + # detect liquid heights + current_absolute_liquid_heights = await asyncio.gather( + *[ + self.clld_probe_z_height_using_channel( + channel_idx=channel, + move_channels_to_save_pos_after=False, + lowest_immers_pos=container.get_absolute_location("c", "c", "cavity_bottom").z + + tip.total_tip_length + - tip.fitting_depth, + start_pos_search=container.get_absolute_location("c", "c", "t").z + + tip.total_tip_length + - tip.fitting_depth + + 5, + ) + for channel, container, tip in zip(use_channels, containers, tips) + ] + ) + + relative_to_well = [ + current_absolute_liquid_heights[i] + - resource.get_absolute_location("c", "c", "cavity_bottom").z + for i, resource in enumerate(containers) + ] + + if move_to_z_safety_after: + await self.move_all_channels_in_z_safety() + + return relative_to_well + async def aspirate( self, ops: List[SingleChannelAspiration], @@ -1643,11 +1721,13 @@ async def aspirate( min_z_endpos: Optional[float] = None, hamilton_liquid_classes: Optional[List[Optional[HamiltonLiquidClass]]] = None, liquid_surfaces_no_lld: Optional[List[float]] = None, - # remove > 2026-01 - immersion_depth_direction: Optional[List[int]] = None, + # PLR: + probe_liquid_height: bool = False, + # remove >2026-01 mix_volume: Optional[List[float]] = None, mix_cycles: Optional[List[int]] = None, mix_speed: Optional[List[float]] = None, + immersion_depth_direction: Optional[List[int]] = None, ): """Aspirate liquid from the specified channels. @@ -1667,9 +1747,8 @@ async def aspirate( pull_out_distance_transport_air: The distance to pull out when aspirating air, if LLD is disabled. second_section_height: The height to start the second section of aspiration. - second_section_ratio: The ratio of [the bottom of the container * 10000] / [the height top of the container]. - minimum_height: The minimum height to move to, this is the end of aspiration. The channel - will move linearly from the liquid surface to this height over the course of the aspiration. + second_section_ratio: + minimum_height: The minimum height to move to, this is the end of aspiration. The channel will move linearly from the liquid surface to this height over the course of the aspiration. immersion_depth: The z distance to move after detecting the liquid, can be into or away from the liquid surface. surface_following_distance: The distance to follow the liquid surface. transport_air_volume: The volume of air to aspirate after the liquid. @@ -1677,10 +1756,8 @@ async def aspirate( lld_mode: The liquid level detection mode to use. gamma_lld_sensitivity: The sensitivity of the gamma LLD. dp_lld_sensitivity: The sensitivity of the DP LLD. - aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above - the bottom of the well (presumably) to aspirate from. - detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if - the LLD mode is DUAL. + aspirate_position_above_z_touch_off: If the LLD mode is Z_TOUCH_OFF, this is the height above the bottom of the well (presumably) to aspirate from. + detection_height_difference_for_dual_lld: Difference between the gamma and DP LLD heights if the LLD mode is DUAL. swap_speed: Swap speed (on leaving liquid) [1mm/s]. Must be between 3 and 1600. Default 100. settling_time: The time to wait after mix. mix_position_from_liquid_surface: The height to aspirate from for mix (LLD or absolute terms). @@ -1694,17 +1771,15 @@ async def aspirate( z_drive_speed_during_2nd_section_search: Unknown. cup_upper_edge: Unknown. ratio_liquid_rise_to_tip_deep_in: Unknown. - immersion_depth_2nd_section: The depth to move into the liquid for the second section of - aspiration. + immersion_depth_2nd_section: The depth to move into the liquid for the second section of aspiration. - minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before - starting an aspiration. + minimum_traverse_height_at_beginning_of_a_command: The minimum height to move to before starting an aspiration. min_z_endpos: The minimum height to move to, this is the end of aspiration. - hamilton_liquid_classes: Override the default liquid classes. See - pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py - liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 - and 360. Defaults to well bottom + liquid height. Should use absolute z. + hamilton_liquid_classes: Override the default liquid classes. See pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py + liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 and 360. Defaults to well bottom + liquid height. Should use absolute z. + + probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function. """ # # # TODO: delete > 2026-01 # # # @@ -1764,9 +1839,6 @@ async def aspirate( op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness for op in ops ] - liquid_surfaces_no_lld = liquid_surfaces_no_lld or [ - wb + (op.liquid_height or 0) for wb, op in zip(well_bottoms, ops) - ] if lld_search_height is None: lld_search_height = [ ( @@ -1859,6 +1931,28 @@ async def aspirate( ratio_liquid_rise_to_tip_deep_in = _fill_in_defaults(ratio_liquid_rise_to_tip_deep_in, [0] * n) immersion_depth_2nd_section = _fill_in_defaults(immersion_depth_2nd_section, [0.0] * n) + if probe_liquid_height: + if any(op.liquid_height is not None for op in ops): + raise ValueError("Cannot use probe_liquid_height when liquid heights are set.") + + liquid_heights = await self.probe_liquid_heights( + containers=[op.resource for op in ops], + use_channels=use_channels, + tips=[cast(HamiltonTip, op.tip) for op in ops], + resource_offsets=[op.offset for op in ops], + move_to_z_safety_after=False, + ) + + # override minimum traversal height because we don't want to move channels up. we are already above the liquid. + minimum_traverse_height_at_beginning_of_a_command = 100 + logger.info(f"Detected liquid heights: {liquid_heights}") + else: + liquid_heights = [op.liquid_height or 0 for op in ops] + + liquid_surfaces_no_lld = liquid_surfaces_no_lld or [ + wb + lh for wb, lh in zip(well_bottoms, liquid_heights) + ] + try: return await self.aspirate_pip( aspiration_type=[0 for _ in range(n)], @@ -1956,6 +2050,8 @@ async def dispense( jet: Optional[List[bool]] = None, blow_out: Optional[List[bool]] = None, # "empty" in the VENUS liquid editor empty: Optional[List[bool]] = None, # truly "empty", does not exist in liquid editor, dm4 + # PLR specific + probe_liquid_height: bool = False, # remove in the future immersion_depth_direction: Optional[List[int]] = None, mix_volume: Optional[List[float]] = None, @@ -2010,6 +2106,8 @@ async def dispense( empty: Whether to use "empty" dispense mode for each dispense. Defaults to `False` for all. Truly empty the tip, not available in the VENUS liquid editor, but is in the firmware documentation. Dispense mode 4. + + probe_liquid_height: PLR-specific parameter. If True, probe the liquid height using cLLD before aspirating to set the liquid_height of every operation instead of using the default 0. Liquid heights must not be set when using this function. """ n = len(ops) @@ -2083,9 +2181,6 @@ async def dispense( op.resource.get_location_wrt(self.deck).z + op.offset.z + op.resource.material_z_thickness for op in ops ] - liquid_surfaces_no_lld = liquid_surface_no_lld or [ - ls + (op.liquid_height or 0) for ls, op in zip(well_bottoms, ops) - ] if lld_search_height is None: lld_search_height = [ ( @@ -2158,6 +2253,28 @@ async def dispense( mix_surface_following_distance = _fill_in_defaults(mix_surface_following_distance, [0.0] * n) limit_curve_index = _fill_in_defaults(limit_curve_index, [0] * n) + if probe_liquid_height: + if any(op.liquid_height is not None for op in ops): + raise ValueError("Cannot use probe_liquid_height when liquid heights are set.") + + liquid_heights = await self.probe_liquid_heights( + containers=[op.resource for op in ops], + use_channels=use_channels, + tips=[cast(HamiltonTip, op.tip) for op in ops], + resource_offsets=[op.offset for op in ops], + move_to_z_safety_after=False, + ) + + # override minimum traversal height because we don't want to move channels up. we are already above the liquid. + minimum_traverse_height_at_beginning_of_a_command = 100 + logger.info(f"Detected liquid heights: {liquid_heights}") + else: + liquid_heights = [op.liquid_height or 0 for op in ops] + + liquid_surfaces_no_lld = liquid_surface_no_lld or [ + wb + lh for wb, lh in zip(well_bottoms, liquid_heights) + ] + try: ret = await self.dispense_pip( tip_pattern=channels_involved, @@ -2171,7 +2288,7 @@ async def dispense( second_section_height=[round(sh * 10) for sh in second_section_height], second_section_ratio=[round(sr * 10) for sr in second_section_ratio], minimum_height=[round(mh * 10) for mh in minimum_height], - immersion_depth=[round(id_ * 10) for id_ in immersion_depth], # [0, 0] + immersion_depth=[round(id_ * 10) for id_ in immersion_depth], immersion_depth_direction=immersion_depth_direction, surface_following_distance=[round(sfd * 10) for sfd in surface_following_distance], dispense_speed=[round(fr * 10) for fr in flow_rates], @@ -7864,17 +7981,15 @@ async def clld_probe_z_height_using_channel( Args: channel_idx: The index of the channel to use for probing. Backmost channel = 0. - lowest_immers_pos: The lowest immersion position in mm. - start_pos_lld_search: The start position for z-touch search in mm. + lowest_immers_pos: The lowest immersion position in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth. + start_pos_lld_search: The start position for z-touch search in mm. This is the position of the channel, NOT including the tip length (as C0 commands do). So you have to add the total_tip_length - fitting_depth. channel_speed: The speed of channel movement in mm/sec. channel_acceleration: The acceleration of the channel in mm/sec**2. detection_edge: The edge steepness at capacitive LLD detection. detection_drop: The offset after capacitive LLD edge detection. - post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after - contacting the surface. + post_detection_trajectory (0, 1): Movement of the channel up (1) or down (0) after contacting the surface. post_detection_dist: Distance to move into the trajectory after detection in mm. - move_channels_to_save_pos_after: Flag to move channels to a safe position after - operation. + move_channels_to_save_pos_after: Flag to move channels to a safe position after operation. Returns: The detected Z-height in mm. @@ -7915,26 +8030,18 @@ async def clld_probe_z_height_using_channel( + f" and {STARBackend.z_drive_increment_to_mm(9_999)} mm, is {post_detection_dist} mm" ) - lowest_immers_pos_str = f"{lowest_immers_pos_increments:05}" - start_pos_search_str = f"{start_pos_search_increments:05}" - channel_speed_str = f"{channel_speed_increments:05}" - channel_acc_str = f"{channel_acceleration_thousand_increments:03}" - detection_edge_str = f"{detection_edge:04}" - detection_drop_str = f"{detection_drop:04}" - post_detection_dist_str = f"{post_detection_dist_increments:04}" - try: await self.send_command( module=STARBackend.channel_id(channel_idx), command="ZL", - zh=lowest_immers_pos_str, # Lowest immersion position [increment] - zc=start_pos_search_str, # Start position of LLD search [increment] - zl=channel_speed_str, # Speed of channel movement - zr=channel_acc_str, # Acceleration [1000 increment/second^2] - gt=detection_edge_str, # Edge steepness at capacitive LLD detection - gl=detection_drop_str, # Offset after capacitive LLD edge detection + zh=f"{lowest_immers_pos_increments:05}", # Lowest immersion position [increment] + zc=f"{start_pos_search_increments:05}", # Start position of LLD search [increment] + zl=f"{channel_speed_increments:05}", # Speed of channel movement + zr=f"{channel_acceleration_thousand_increments:03}", # Acceleration [1000 increment/second^2] + gt=f"{detection_edge:04}", # Edge steepness at capacitive LLD detection + gl=f"{detection_drop:04}", # Offset after capacitive LLD edge detection zj=post_detection_trajectory, # Movement of the channel after contacting surface - zi=post_detection_dist_str, # Distance to move up after detection [increment] + zi=f"{post_detection_dist_increments:04}", # Distance to move up after detection [increment] ) except STARFirmwareError: await self.move_all_channels_in_z_safety() @@ -8257,7 +8364,7 @@ async def position_channels_in_y_direction(self, ys: Dict[int, float], make_spac raise ValueError("Channel N would hit the front of the robot") if not all( - int((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_999 # float fixing + round((channel_locations[i] - channel_locations[i + 1]) * 1000) >= 8_990 # float fixing for i in range(len(channel_locations) - 1) ): raise ValueError("Channels must be at least 9mm apart and in descending order") diff --git a/pylabrobot/resources/container.py b/pylabrobot/resources/container.py index b24f5981717..161e2a7426e 100644 --- a/pylabrobot/resources/container.py +++ b/pylabrobot/resources/container.py @@ -69,6 +69,11 @@ def serialize_state(self) -> Dict[str, Any]: def load_state(self, state: Dict[str, Any]): self.tracker.load_state(state) + def supports_compute_height_volume_functions(self) -> bool: + return ( + self._compute_volume_from_height is not None and self._compute_height_from_volume is not None + ) + def compute_volume_from_height(self, height: float) -> float: """Compute the volume of liquid in a container from the height of the liquid relative to the bottom of the container."""