Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7811,6 +7811,112 @@ async def send_hhs_command(self, index: int, command: str, **kwargs) -> str:
assert isinstance(resp, str)
return resp

# ------------ STAR(RS-232/TCC1/2)-connected Hamilton Heater Cooler (HHS) -------------

async def check_type_is_hhc(self, device_number: int):
"""
Convenience method to check that connected device is an HHC.
Executed through firmware query
"""

firmware_version = await self.send_command(module=f"T{device_number}", command="RF")
if "Hamilton Heater Cooler" not in firmware_version:
raise ValueError(
f"Device number {device_number} does not connect to a Hamilton"
f" Heater-Cooler, found {firmware_version} instead."
f"Have you called the wrong device number?"
)

async def initialize_hhc(self, device_number: int) -> str:
"""Initialize Hamilton Heater Cooler (HHC) at specified TCC port

Args:
device_number: TCC connect number to the HHC
"""

module_pointer = f"T{device_number}"

# Request module configuration
try:
await self.send_command(module=module_pointer, command="QU")
except TimeoutError as exc:
error_message = (
f"No Hamilton Heater Cooler found at device_number {device_number}"
f", have you checked your connections? Original error: {exc}"
)
raise ValueError(error_message) from exc

await self.check_type_is_hhc(device_number)

# Request module configuration
hhc_init_status = await self.send_command(module=module_pointer, command="QW", fmt="qw#")
hhc_init_status = hhc_init_status["qw"]

info = "HHC already initialized"
# Initializing HHS if necessary
if hhc_init_status != 1:
# Initialize device
await self.send_command(module=module_pointer, command="LI")
info = f"HHS at device number {device_number} initialized."

return info

async def start_temperature_control_at_hhc(
self,
device_number: int,
temp: Union[float, int],
):
"""Start temperature regulation of specified HHC"""

await self.check_type_is_hhc(device_number)
assert 0 < temp <= 105

# Ensure proper temperature input handling
if isinstance(temp, (float, int)):
safe_temp_str = f"{round(temp * 10):04d}"
else:
safe_temp_str = str(temp)

return await self.send_command(
module=f"T{device_number}",
command="TA", # temperature adjustment
ta=safe_temp_str,
tb="1800", # TODO: identify precise purpose?
tc="0020", # TODO: identify precise purpose?
)

async def get_temperature_at_hhc(self, device_number: int) -> dict:
"""Query current temperatures of both sensors of specified HHC"""

await self.check_type_is_hhc(device_number)

request_temperature = await self.send_command(module=f"T{device_number}", command="RT")
processed_t_info = [int(x) / 10 for x in request_temperature.split("+")[-2:]]

return {
"middle_T": processed_t_info[0],
"edge_T": processed_t_info[-1],
}

async def query_whether_temperature_reached_at_hhc(self, device_number: int):
"""Stop temperature regulation of specified HHC"""

await self.check_type_is_hhc(device_number)
query_current_control_status = await self.send_command(
module=f"T{device_number}", command="QD", fmt="qd#"
)

return query_current_control_status["qd"] == 0

async def stop_temperature_control_at_hhc(self, device_number: int):
"""Stop temperature regulation of specified HHC"""

await self.check_type_is_hhc(device_number)

return await self.send_command(module=f"T{device_number}", command="TO")

# -------------- Extra - Probing labware with STAR - making STAR into a CMM --------------


class UnSafe:
"""
Expand Down