From 53c687cc1584fc0fe8bb038c67767664dd0ad4ae Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 4 Sep 2025 23:20:46 +0100 Subject: [PATCH 1/2] update STAR to STARBackend in STARBackend --- .../backends/hamilton/STAR_backend.py | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 7d019f31e7f..49cefb8c54d 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -989,7 +989,7 @@ def trace_information_to_string(module_identifier: str, trace_information: int) 86: "Gripper drive: Auto adjustment of DMS digital potentiometer not possible", 89: "Gripper drive movement error: drive locked or incremental sensor fault during gripping", 90: "Gripper drive initialized failed", - 91: "iSWAP not initialized. Call star.initialize_iswap().", + 91: "iSWAP not initialized. Call STARBackend.initialize_iswap().", 92: "Gripper drive movement error: drive locked or incremental sensor fault during release", 93: "Gripper drive movement error: position counter over/underflow", 94: "Plate not found", @@ -1117,7 +1117,7 @@ def _dispensing_mode_for_op(empty: bool, jet: bool, blow_out: bool) -> int: class STARBackend(HamiltonLiquidHandler, HamiltonHeaterShakerInterface): - """Interface for the Hamilton STAR.""" + """Interface for the Hamilton STARBackend.""" def __init__( self, @@ -1130,9 +1130,9 @@ def __init__( """Create a new STAR interface. Args: - device_address: the USB device address of the Hamilton STAR. Only useful if using more than + device_address: the USB device address of the Hamilton STARBackend. Only useful if using more than one Hamilton machine over USB. - serial_number: the serial number of the Hamilton STAR. Only useful if using more than one + serial_number: the serial number of the Hamilton STARBackend. Only useful if using more than one Hamilton machine over USB. packet_read_timeout: timeout in seconds for reading a single packet. read_timeout: timeout in seconds for reading a full response. @@ -1242,7 +1242,7 @@ async def get_iswap_version(self) -> str: async def request_pip_channel_version(self, channel: int) -> str: return cast( - str, (await self.send_command(STAR.channel_id(channel), "RF", fmt="rf" + "&" * 17))["rf"] + str, (await self.send_command(STARBackend.channel_id(channel), "RF", fmt="rf" + "&" * 17))["rf"] ) def get_id_from_fw_response(self, resp: str) -> Optional[int]: @@ -1690,7 +1690,7 @@ async def aspirate( min_z_endpos: The minimum height to move to, this is the end of aspiration. hamilton_liquid_classes: Override the default liquid classes. See - pylabrobot/liquid_handling/liquid_classes/hamilton/star.py + pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py liquid_surface_no_lld: Liquid surface at function without LLD [mm]. Must be between 0 and 360. Defaults to well bottom + liquid height. Should use absolute z. """ @@ -1978,7 +1978,7 @@ async def dispense( side_touch_off_distance: The distance to move to the side from the well for a dispense. hamilton_liquid_classes: Override the default liquid classes. See - pylabrobot/liquid_handling/liquid_classes/hamilton/star.py + pylabrobot/liquid_handling/liquid_classes/hamilton/STARBackend.py jet: Whether to use jetting for each dispense. Defaults to `False` for all. Used for determining the dispense mode. True for dispense mode 0 or 1. @@ -3139,7 +3139,7 @@ async def core_check_resource_exists_at_location_center( """Check existence of resource with CoRe gripper tool a "Get plate using CO-RE gripper" + error handling Which channels are used for resource check is dependent on which channels have been used for - `STAR.get_core(p1: int, p2: int)` which is a prerequisite for this check function. + `STARBackend.get_core(p1: int, p2: int)` which is a prerequisite for this check function. Args: location: Location to check for resource @@ -3363,9 +3363,9 @@ async def request_electronic_board_type(self): resp = await self.send_command(module="C0", command="QB") try: - return STAR.BoardType(resp["qb"]) + return STARBackend.BoardType(resp["qb"]) except ValueError: - return STAR.BoardType.UNKNOWN + return STARBackend.BoardType.UNKNOWN # TODO: parse response. async def request_supply_voltage(self): @@ -5842,7 +5842,7 @@ async def move_core_96_head_to_defined_position( 342.5. Default 342.5. """ - # TODO: these are values for a STAR. Find them for a STARlet. + # TODO: these are values for a STARBackend. Find them for a STARlet. self._check_96_position_legal(Coordinate(x, y, z)) assert ( 0 <= minimum_height_at_beginning_of_a_command <= 342.5 @@ -7174,19 +7174,19 @@ async def request_cover_open(self) -> bool: @staticmethod def mm_to_y_drive_increment(value_mm: float) -> int: - return round(value_mm / STAR.y_drive_mm_per_increment) + return round(value_mm / STARBackend.y_drive_mm_per_increment) @staticmethod def y_drive_increment_to_mm(value_mm: int) -> float: - return round(value_mm * STAR.y_drive_mm_per_increment, 2) + return round(value_mm * STARBackend.y_drive_mm_per_increment, 2) @staticmethod def mm_to_z_drive_increment(value_mm: float) -> int: - return round(value_mm / STAR.z_drive_mm_per_increment) + return round(value_mm / STARBackend.z_drive_mm_per_increment) @staticmethod def z_drive_increment_to_mm(value_increments: int) -> float: - return round(value_increments * STAR.z_drive_mm_per_increment, 2) + return round(value_increments * STARBackend.z_drive_mm_per_increment, 2) async def clld_probe_z_height_using_channel( self, @@ -7223,29 +7223,29 @@ async def clld_probe_z_height_using_channel( The detected Z-height in mm. """ - lowest_immers_pos_increments = STAR.mm_to_z_drive_increment(lowest_immers_pos) - start_pos_search_increments = STAR.mm_to_z_drive_increment(start_pos_search) - channel_speed_increments = STAR.mm_to_z_drive_increment(channel_speed) - channel_acceleration_thousand_increments = STAR.mm_to_z_drive_increment( + lowest_immers_pos_increments = STARBackend.mm_to_z_drive_increment(lowest_immers_pos) + start_pos_search_increments = STARBackend.mm_to_z_drive_increment(start_pos_search) + channel_speed_increments = STARBackend.mm_to_z_drive_increment(channel_speed) + channel_acceleration_thousand_increments = STARBackend.mm_to_z_drive_increment( channel_acceleration / 1000 ) - post_detection_dist_increments = STAR.mm_to_z_drive_increment(post_detection_dist) + post_detection_dist_increments = STARBackend.mm_to_z_drive_increment(post_detection_dist) assert 9_320 <= lowest_immers_pos_increments <= 31_200, ( - f"Lowest immersion position must be between \n{STAR.z_drive_increment_to_mm(9_320)}" - + f" and {STAR.z_drive_increment_to_mm(31_200)} mm, is {lowest_immers_pos} mm" + f"Lowest immersion position must be between \n{STARBackend.z_drive_increment_to_mm(9_320)}" + + f" and {STARBackend.z_drive_increment_to_mm(31_200)} mm, is {lowest_immers_pos} mm" ) assert 9_320 <= start_pos_search_increments <= 31_200, ( - f"Start position of LLD search must be between \n{STAR.z_drive_increment_to_mm(9_320)}" - + f" and {STAR.z_drive_increment_to_mm(31_200)} mm, is {start_pos_search} mm" + f"Start position of LLD search must be between \n{STARBackend.z_drive_increment_to_mm(9_320)}" + + f" and {STARBackend.z_drive_increment_to_mm(31_200)} mm, is {start_pos_search} mm" ) assert 20 <= channel_speed_increments <= 15_000, ( - f"LLD search speed must be between \n{STAR.z_drive_increment_to_mm(20)}" - + f"and {STAR.z_drive_increment_to_mm(15_000)} mm/sec, is {channel_speed} mm/sec" + f"LLD search speed must be between \n{STARBackend.z_drive_increment_to_mm(20)}" + + f"and {STARBackend.z_drive_increment_to_mm(15_000)} mm/sec, is {channel_speed} mm/sec" ) assert 5 <= channel_acceleration_thousand_increments <= 150, ( - f"Channel acceleration must be between \n{STAR.z_drive_increment_to_mm(5*1_000)} " - + f" and {STAR.z_drive_increment_to_mm(150*1_000)} mm/sec**2, is {channel_acceleration} mm/sec**2" + f"Channel acceleration must be between \n{STARBackend.z_drive_increment_to_mm(5*1_000)} " + + f" and {STARBackend.z_drive_increment_to_mm(150*1_000)} mm/sec**2, is {channel_acceleration} mm/sec**2" ) assert ( 0 <= detection_edge <= 1_023 @@ -7255,7 +7255,7 @@ async def clld_probe_z_height_using_channel( ), "Offset after capacitive LLD edge detection must be between 0 and 1023" assert 0 <= post_detection_dist_increments <= 9_999, ( "Post cLLD-detection movement distance must be between \n0" - + f" and {STAR.z_drive_increment_to_mm(9_999)} mm, is {post_detection_dist} mm" + + f" and {STARBackend.z_drive_increment_to_mm(9_999)} mm, is {post_detection_dist} mm" ) lowest_immers_pos_str = f"{lowest_immers_pos_increments:05}" @@ -7268,7 +7268,7 @@ async def clld_probe_z_height_using_channel( try: await self.send_command( - module=STAR.channel_id(channel_idx), + module=STARBackend.channel_id(channel_idx), command="ZL", zh=lowest_immers_pos_str, # Lowest immersion position [increment] zc=start_pos_search_str, # Start position of LLD search [increment] @@ -7399,22 +7399,22 @@ async def ztouch_probe_z_height_using_channel( if start_pos_search is None: start_pos_search = 334.7 - tip_len + fitting_depth - tip_len_used_in_increments = (tip_len - fitting_depth) / STAR.z_drive_mm_per_increment + tip_len_used_in_increments = (tip_len - fitting_depth) / STARBackend.z_drive_mm_per_increment channel_head_start_pos = ( start_pos_search + tip_len - fitting_depth ) # start_pos of the head itself! safe_head_bottom_z_pos = ( 99.98 + tip_len - fitting_depth - ) # 99.98 == STAR.z_drive_increment_to_mm(9_320) - safe_head_top_z_pos = 334.7 # 334.7 == STAR.z_drive_increment_to_mm(31_200) + ) # 99.98 == STARBackend.z_drive_increment_to_mm(9_320) + safe_head_top_z_pos = 334.7 # 334.7 == STARBackend.z_drive_increment_to_mm(31_200) - lowest_immers_pos_increments = STAR.mm_to_z_drive_increment(lowest_immers_pos) - start_pos_search_increments = STAR.mm_to_z_drive_increment(channel_head_start_pos) - channel_speed_increments = STAR.mm_to_z_drive_increment(channel_speed) - channel_acceleration_thousand_increments = STAR.mm_to_z_drive_increment( + lowest_immers_pos_increments = STARBackend.mm_to_z_drive_increment(lowest_immers_pos) + start_pos_search_increments = STARBackend.mm_to_z_drive_increment(channel_head_start_pos) + channel_speed_increments = STARBackend.mm_to_z_drive_increment(channel_speed) + channel_acceleration_thousand_increments = STARBackend.mm_to_z_drive_increment( channel_acceleration / 1000 ) - channel_speed_upwards_increments = STAR.mm_to_z_drive_increment(channel_speed_upwards) + channel_speed_upwards_increments = STARBackend.mm_to_z_drive_increment(channel_speed_upwards) assert 0 <= channel_idx <= 15, f"channel_idx must be between 0 and 15, is {channel_idx}" assert 20 <= tip_len <= 120, "Total tip length must be between 20 and 120" @@ -7428,16 +7428,16 @@ async def ztouch_probe_z_height_using_channel( + f" and {safe_head_top_z_pos} mm, is {channel_head_start_pos} mm" ) assert 20 <= channel_speed_increments <= 15_000, ( - f"Z-touch search speed must be between \n{STAR.z_drive_increment_to_mm(20)}" - + f" and {STAR.z_drive_increment_to_mm(15_000)} mm/sec, is {channel_speed} mm/sec" + f"Z-touch search speed must be between \n{STARBackend.z_drive_increment_to_mm(20)}" + + f" and {STARBackend.z_drive_increment_to_mm(15_000)} mm/sec, is {channel_speed} mm/sec" ) assert 5 <= channel_acceleration_thousand_increments <= 150, ( - f"Channel acceleration must be between \n{STAR.z_drive_increment_to_mm(5*1_000)}" - + f" and {STAR.z_drive_increment_to_mm(150*1_000)} mm/sec**2, is {channel_speed} mm/sec**2" + f"Channel acceleration must be between \n{STARBackend.z_drive_increment_to_mm(5*1_000)}" + + f" and {STARBackend.z_drive_increment_to_mm(150*1_000)} mm/sec**2, is {channel_speed} mm/sec**2" ) assert 20 <= channel_speed_upwards_increments <= 15_000, ( - f"Channel retraction speed must be between \n{STAR.z_drive_increment_to_mm(20)}" - + f" and {STAR.z_drive_increment_to_mm(15_000)} mm/sec, is {channel_speed_upwards} mm/sec" + f"Channel retraction speed must be between \n{STARBackend.z_drive_increment_to_mm(20)}" + + f" and {STARBackend.z_drive_increment_to_mm(15_000)} mm/sec, is {channel_speed_upwards} mm/sec" ) assert ( 0 <= detection_limiter_in_PWM <= 125 @@ -7456,7 +7456,7 @@ async def ztouch_probe_z_height_using_channel( push_down_force_in_PWM_str = f"{push_down_force_in_PWM:03}" ztouch_probed_z_height = await self.send_command( - module=STAR.channel_id(channel_idx), + module=STARBackend.channel_id(channel_idx), command="ZH", zb=start_pos_search_str, # begin of searching range [increment] za=lowest_immers_pos_str, # end of searching range [increment] @@ -7468,7 +7468,7 @@ async def ztouch_probe_z_height_using_channel( fmt="rz#####", ) # Subtract tip_length from measurement in increment, and convert to mm - result_in_mm = STAR.z_drive_increment_to_mm( + result_in_mm = STARBackend.z_drive_increment_to_mm( ztouch_probed_z_height["rz"] - tip_len_used_in_increments ) if post_detection_dist != 0: # Safety first @@ -7792,7 +7792,7 @@ async def step_off_foil( await self.move_all_channels_in_z_safety() async def request_volume_in_tip(self, channel: int) -> float: - resp = await self.send_command(STAR.channel_id(channel), "QC", fmt="qc##### (n)") + resp = await self.send_command(STARBackend.channel_id(channel), "QC", fmt="qc##### (n)") _, current_volume = resp["qc"] # first is max volume return float(current_volume) / 10 @@ -7983,7 +7983,7 @@ async def put_in_hotel( assert 0 <= z_position_at_end <= 3_600 assert 0 <= open_gripper_position <= 9_999 - return await self.star.send_command( + return await self.STARBackend.send_command( module="C0", command="PI", xs=f"{hotel_center_x_coord:05}", @@ -8057,7 +8057,7 @@ async def get_from_hotel( assert 0 <= plate_width <= 9_999 assert 0 <= plate_width_tolerance <= 99 - return await self.star.send_command( + return await self.STARBackend.send_command( module="C0", command="PO", xs=f"{hotel_center_x_coord:05}", @@ -8096,7 +8096,7 @@ async def violently_shoot_down_tip(self, channel_idx: int): Consider this method an easter egg. Not for serious use. """ - await self.star.send_command(module=STAR.channel_id(channel_idx), command="SI") + await self.STARBackend.send_command(module=STARBackend.channel_id(channel_idx), command="SI") # Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) From 74e68793ab6af22c4a46284dd0e2474b9ed3acb7 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 4 Sep 2025 23:24:37 +0100 Subject: [PATCH 2/2] type lint --- .../liquid_handling/backends/hamilton/STAR_backend.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 49cefb8c54d..367eb07cc3a 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1242,7 +1242,8 @@ async def get_iswap_version(self) -> str: async def request_pip_channel_version(self, channel: int) -> str: return cast( - str, (await self.send_command(STARBackend.channel_id(channel), "RF", fmt="rf" + "&" * 17))["rf"] + str, + (await self.send_command(STARBackend.channel_id(channel), "RF", fmt="rf" + "&" * 17))["rf"], ) def get_id_from_fw_response(self, resp: str) -> Optional[int]: @@ -7983,7 +7984,7 @@ async def put_in_hotel( assert 0 <= z_position_at_end <= 3_600 assert 0 <= open_gripper_position <= 9_999 - return await self.STARBackend.send_command( + return await self.star.send_command( module="C0", command="PI", xs=f"{hotel_center_x_coord:05}", @@ -8057,7 +8058,7 @@ async def get_from_hotel( assert 0 <= plate_width <= 9_999 assert 0 <= plate_width_tolerance <= 99 - return await self.STARBackend.send_command( + return await self.star.send_command( module="C0", command="PO", xs=f"{hotel_center_x_coord:05}", @@ -8096,7 +8097,7 @@ async def violently_shoot_down_tip(self, channel_idx: int): Consider this method an easter egg. Not for serious use. """ - await self.STARBackend.send_command(module=STARBackend.channel_id(channel_idx), command="SI") + await self.star.send_command(module=STARBackend.channel_id(channel_idx), command="SI") # Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update)