diff --git a/pylabrobot/plate_reading/__init__.py b/pylabrobot/plate_reading/__init__.py index a0018232632..81cd7d1f03b 100644 --- a/pylabrobot/plate_reading/__init__.py +++ b/pylabrobot/plate_reading/__init__.py @@ -1,4 +1,11 @@ -from .biotek_backend import Cytation5Backend, Cytation5ImagingConfig +from .agilent_biotek_backend import BioTekPlateReaderBackend +from .agilent_biotek_cytation_backend import ( + Cytation5Backend, + Cytation5ImagingConfig, + CytationBackend, + CytationImagingConfig, +) +from .agilent_biotek_synergyh1_backend import SynergyH1Backend from .chatterbox import PlateReaderChatterboxBackend from .clario_star_backend import CLARIOstarBackend from .image_reader import ImageReader diff --git a/pylabrobot/plate_reading/agilent_biotek_backend.py b/pylabrobot/plate_reading/agilent_biotek_backend.py new file mode 100644 index 00000000000..0c4bf865415 --- /dev/null +++ b/pylabrobot/plate_reading/agilent_biotek_backend.py @@ -0,0 +1,595 @@ +import asyncio +import enum +import logging +import time +from typing import Dict, Iterable, List, Optional, Tuple + +from pylabrobot.io.ftdi import FTDI +from pylabrobot.plate_reading.backend import PlateReaderBackend +from pylabrobot.resources import Plate, Well + +logger = logging.getLogger(__name__) + + +class BioTekPlateReaderBackend(PlateReaderBackend): + """Backend for Agilent BioTek plate readers.""" + + def __init__( + self, + timeout: float = 20, + device_id: Optional[str] = None, + ) -> None: + super().__init__() + self.timeout = timeout + + self.io = FTDI(device_id=device_id) + + self._version: Optional[str] = None + + self._plate: Optional[Plate] = None + self._shaking = False + self._slow_mode: Optional[bool] = None + + def _non_overlapping_rectangles( + self, + points: Iterable[Tuple[int, int]], + ) -> List[Tuple[int, int, int, int]]: + """Find non-overlapping rectangles that cover all given points. + + Example: + >>> points = [ + >>> (1, 1), + >>> (2, 2), (2, 3), (2, 4), + >>> (3, 2), (3, 3), (3, 4), + >>> (4, 2), (4, 3), (4, 4), (4, 5), + >>> (5, 2), (5, 3), (5, 4), (5, 5), + >>> (6, 2), (6, 3), (6, 4), (6, 5), + >>> (7, 2), (7, 3), (7, 4), + >>> ] + >>> non_overlapping_rectangles(points) + [ + (1, 1, 1, 1), + (2, 2, 7, 4), + (4, 5, 6, 5), + ] + """ + + pts = set(points) + rects = [] + + while pts: + # start a rectangle from one arbitrary point + r0, c0 = min(pts) + # expand right + c1 = c0 + while (r0, c1 + 1) in pts: + c1 += 1 + # expand downward as long as entire row segment is filled + r1 = r0 + while all((r1 + 1, c) in pts for c in range(c0, c1 + 1)): + r1 += 1 + + rects.append((r0, c0, r1, c1)) + # remove covered points + for r in range(r0, r1 + 1): + for c in range(c0, c1 + 1): + pts.discard((r, c)) + + rects.sort() + return rects + + async def setup(self) -> None: + logger.info(f"{self.__class__.__name__} setting up") + + await self.io.setup() + await self.io.usb_reset() + await self.io.set_latency_timer(16) + await self.io.set_baudrate(9600) # 0x38 0x41 + await self.io.set_line_property(8, 2, 0) # 8 data bits, 2 stop bits, no parity + SIO_RTS_CTS_HS = 0x1 << 8 + await self.io.set_flowctrl(SIO_RTS_CTS_HS) + await self.io.set_rts(True) + + # see if we need to adjust baudrate. This appears to be the case sometimes. + try: + self._version = await self.get_firmware_version() + except TimeoutError: + await self.io.set_baudrate(38_461) # 4e c0 + self._version = await self.get_firmware_version() + + self._shaking = False + self._shaking_task: Optional[asyncio.Task] = None + + async def stop(self) -> None: + logger.info(f"{self.__class__.__name__} stopping") + await self.stop_shaking() + await self.io.stop() + + self._slow_mode = None + + @property + def version(self) -> str: + if self._version is None: + raise RuntimeError(f"{self.__class__.__name__}: Firmware version is not set") + return self._version + + @property + def abs_wavelength_range(self) -> tuple[int, int]: + return (230, 999) + + @property + def focal_height_range(self) -> tuple[float, float]: + return (4.5, 13.88) + + @property + def excitation_range(self) -> tuple[int, int]: + return (250, 700) + + @property + def emission_range(self) -> tuple[int, int]: + return (250, 700) + + @property + def supports_heating(self) -> bool: + return False + + @property + def supports_cooling(self) -> bool: + return False + + @property + def temperature_range(self) -> Tuple[Optional[float], Optional[float]]: + """Return (min_temp, max_temp). + If cooling is not supported (heating only), min_temp is None. + If heating is not supported (cooling only), max_temp is None. + """ + max_temp = 45.0 if self.supports_heating else None # default BioTek max + min_temp = 4.0 if self.supports_cooling else None # default cooling minimum + return (min_temp, max_temp) + + async def _purge_buffers(self) -> None: + """Purge the RX and TX buffers, as implemented in Gen5.exe""" + for _ in range(6): + await self.io.usb_purge_rx_buffer() + await self.io.usb_purge_tx_buffer() + + async def _read_until(self, terminator: bytes, timeout: Optional[float] = None) -> bytes: + """If timeout is None, use self.timeout""" + if timeout is None: + timeout = self.timeout + x = None + res = b"" + t0 = time.time() + while x != terminator: + x = await self.io.read(1) + res += x + + if time.time() - t0 > timeout: + logger.debug(f"{self.__class__.__name__} received incomplete %s", res) + raise TimeoutError(f"{self.__class__.__name__}: Timeout while waiting for response") + + if x == b"": + await asyncio.sleep(0.01) + + logger.debug(f"{self.__class__.__name__} received %s", res) + return res + + async def send_command( + self, + command: str, + parameter: Optional[str] = None, + wait_for_response=True, + timeout: Optional[float] = None, + ) -> Optional[bytes]: + await self._purge_buffers() + + await self.io.write(command.encode()) + logger.debug(f"{self.__class__.__name__} sent %s", command) + response: Optional[bytes] = None + if wait_for_response or parameter is not None: + response = await self._read_until( + b"\x06" if parameter is not None else b"\x03", timeout=timeout + ) + + if parameter is not None: + await self.io.write(parameter.encode()) + logger.debug(f"{self.__class__.__name__} sent %s", parameter) + if wait_for_response: + response = await self._read_until(b"\x03", timeout=timeout) + + return response + + async def get_serial_number(self) -> str: + resp = await self.send_command("C", timeout=1) + assert resp is not None + return resp[1:].split(b" ")[0].decode() + + async def get_firmware_version(self) -> str: + resp = await self.send_command("e", timeout=1) + assert resp is not None + return " ".join(resp[1:-1].decode().split(" ")[3:4]) + + async def _set_slow_mode(self, slow: bool): + if self._slow_mode == slow: + return + await self.send_command("&", "S1" if slow else "S0") + self._slow_mode = slow + + async def open(self, slow: bool = False): + await self._set_slow_mode(slow) + return await self.send_command("J") + + async def close(self, plate: Optional[Plate], slow: bool = False): + # reset cache + self._plate = None + + await self._set_slow_mode(slow) + if plate is not None: + await self.set_plate(plate) + return await self.send_command("A") + + async def home(self): + return await self.send_command("i", "x") + + async def get_current_temperature(self) -> float: + """Get current temperature in degrees Celsius.""" + resp = await self.send_command("h", timeout=1) + assert resp is not None + return int(resp[1:-1]) / 100000 + + async def set_temperature(self, temperature: float): + """Set temperature in degrees Celsius.""" + if not self.supports_heating and not self.supports_cooling: + raise NotImplementedError(f"{self.__class__.__name__} does not support temperature control.") + + tmin, tmax = self.temperature_range + current_temperature = await self.get_current_temperature() + + if (tmin is not None and temperature < tmin) or (tmax is not None and temperature > tmax): + raise ValueError( + f"{self.__class__.__name__}: " + f"Requested temperature {temperature}°C is outside supported range " + f"{tmin}-{tmax}°C" + ) + if temperature < current_temperature and not self.supports_cooling: + raise ValueError(f"{self.__class__.__name__}: Cooling is not supported.") + if temperature > current_temperature and not self.supports_heating: + raise ValueError(f"{self.__class__.__name__}: Heating is not supported.") + + return await self.send_command("g", f"{int(temperature * 1000):05}") + + async def stop_heating_or_cooling(self): + return await self.send_command("g", "00000") + + def _parse_body(self, body: bytes) -> List[List[Optional[float]]]: + assert self._plate is not None, "Plate must be set before reading data" + plate = self._plate + start_index = 22 + end_index = body.rindex(b"\r\n") + num_rows = plate.num_items_y + rows = body[start_index:end_index].split(b"\r\n,")[:num_rows] + + parsed_data: Dict[Tuple[int, int], float] = {} + for row in rows: + values = row.split(b",") + grouped_values = [values[i : i + 3] for i in range(0, len(values), 3)] + + for group in grouped_values: + assert len(group) == 3 + row_index = int(group[0].decode()) - 1 # 1-based index in the response + column_index = int(group[1].decode()) - 1 # 1-based index in the response + value = float(group[2].decode()) + parsed_data[(row_index, column_index)] = value + + result: List[List[Optional[float]]] = [ + [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) + ] + for (row_idx, col_idx), value in parsed_data.items(): + result[row_idx][col_idx] = value + return result + + async def set_plate(self, plate: Plate): + # 08120112207434014351135308559127881422 + # ^^^^ plate size z + # ^^^^^ plate size x + # ^^^^^ plate size y + # ^^^^^ bottom right x + # ^^^^^ top left x + # ^^^^^ bottom right y + # ^^^^^ top left y + # ^^ columns + # ^^ rows + + if plate is self._plate: + return + + rows = plate.num_items_y + columns = plate.num_items_x + + bottom_right_well = plate.get_item(plate.num_items - 1) + assert bottom_right_well.location is not None + bottom_right_well_center = bottom_right_well.location + bottom_right_well.get_anchor( + x="c", y="c" + ) + top_left_well = plate.get_item(0) + assert top_left_well.location is not None + top_left_well_center = top_left_well.location + top_left_well.get_anchor(x="c", y="c") + + plate_size_y = plate.get_size_y() + plate_size_x = plate.get_size_x() + plate_size_z = plate.get_size_z() + if plate.lid is not None: + plate_size_z += plate.lid.get_size_z() - plate.lid.nesting_z_height + + top_left_well_center_y = plate.get_size_y() - top_left_well_center.y # invert y axis + bottom_right_well_center_y = plate.get_size_y() - bottom_right_well_center.y # invert y axis + + cmd = ( + f"{rows:02}" + f"{columns:02}" + f"{int(top_left_well_center_y * 100):05}" + f"{int(bottom_right_well_center_y * 100):05}" + f"{int(top_left_well_center.x * 100):05}" + f"{int(bottom_right_well_center.x * 100):05}" + f"{int(plate_size_y * 100):05}" + f"{int(plate_size_x * 100):05}" + f"{int(plate_size_z * 100):04}" + "\x03" + ) + + resp = await self.send_command("y", cmd, timeout=1) + self._plate = plate + return resp + + def _get_min_max_row_col_tuples( + self, wells: List[Well], plate: Plate + ) -> List[Tuple[int, int, int, int]]: # min_row, min_col, max_row, max_col + # check if all wells are in the same plate + plates = set(well.parent for well in wells) + if len(plates) != 1 or plates.pop() != plate: + raise ValueError("All wells must be in the specified plate") + return self._non_overlapping_rectangles((well.get_row(), well.get_column()) for well in wells) + + async def read_absorbance(self, plate: Plate, wells: List[Well], wavelength: int) -> List[Dict]: + min_abs, max_abs = self.abs_wavelength_range + if not (min_abs <= wavelength <= max_abs): + raise ValueError(f"{self.__class__.__name__}: wavelength must be within {min_abs}-{max_abs}") + + await self.set_plate(plate) + + wavelength_str = str(wavelength).zfill(4) + all_data: List[List[Optional[float]]] = [ + [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) + ] + + for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): + cmd = f"004701{min_row + 1:02}{min_col + 1:02}{max_row + 1:02}{max_col + 1:02}000120010000110010000010600008{wavelength_str}1" + checksum = str(sum(cmd.encode()) % 100).zfill(2) + cmd = cmd + checksum + "\x03" + await self.send_command("D", cmd) + + resp = await self.send_command("O") + assert resp == b"\x060000\x03" + + # read data + body = await self._read_until(b"\x03", timeout=60 * 3) + assert body is not None + parsed_data = self._parse_body(body) + # Merge data + for r in range(plate.num_items_y): + for c in range(plate.num_items_x): + if parsed_data[r][c] is not None: + all_data[r][c] = parsed_data[r][c] + + # Get current temperature + try: + temp = await self.get_current_temperature() + except TimeoutError: + temp = float("nan") + + return [ + { + "wavelength": wavelength, + "data": all_data, + "temperature": temp, + "time": time.time(), + } + ] + + async def read_luminescence( + self, plate: Plate, wells: List[Well], focal_height: float, integration_time: float = 1 + ) -> List[Dict]: + min_fh, max_fh = self.focal_height_range + if not (min_fh <= focal_height <= max_fh): + raise ValueError(f"{self.__class__.__name__}: focal height must be within {min_fh}-{max_fh}") + + await self.set_plate(plate) + + cmd = f"3{14220 + int(1000 * focal_height)}\x03" + await self.send_command("t", cmd) + + integration_time_seconds = int(integration_time) + assert 0 <= integration_time_seconds <= 60, "Integration time seconds must be between 0 and 60" + integration_time_milliseconds = integration_time - int(integration_time) + # TODO: I don't know if the multiple of 0.2 is a firmware requirement, but it's what gen5.exe requires. + # round because of floating point precision issues + assert ( + round(integration_time_milliseconds * 10) % 2 == 0 + ), "Integration time milliseconds must be a multiple of 0.2" + integration_time_seconds_s = str(integration_time_seconds * 5).zfill(2) + integration_time_milliseconds_s = str(int(float(integration_time_milliseconds * 50))).zfill(2) + + all_data: List[List[Optional[float]]] = [ + [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) + ] + for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): + cmd = f"008401{min_row + 1:02}{min_col + 1:02}{max_row + 1:02}{max_col + 1:02}000120010000110010000012300{integration_time_seconds_s}{integration_time_milliseconds_s}200200-001000-003000000000000000000013510" # 0812 + checksum = str((sum(cmd.encode()) + 8) % 100).zfill(2) # don't know why +8 + cmd = cmd + checksum + await self.send_command("D", cmd) + + resp = await self.send_command("O") + assert resp == b"\x060000\x03" + + # 2m10s of reading per 1 second of integration time + # allow 60 seconds flat + timeout = 60 + integration_time_seconds * (2 * 60 + 10) + body = await self._read_until(b"\x03", timeout=timeout) + assert body is not None + parsed_data = self._parse_body(body) + # Merge data + for r in range(plate.num_items_y): + for c in range(plate.num_items_x): + if parsed_data[r][c] is not None: + all_data[r][c] = parsed_data[r][c] + + # Get current temperature + try: + temp = await self.get_current_temperature() + except TimeoutError: + temp = float("nan") + + return [ + { + "data": all_data, + "temperature": temp, + "time": time.time(), + } + ] + + async def read_fluorescence( + self, + plate: Plate, + wells: List[Well], + excitation_wavelength: int, + emission_wavelength: int, + focal_height: float, + ) -> List[Dict]: + min_fh, max_fh = self.focal_height_range + if not (min_fh <= focal_height <= max_fh): + raise ValueError(f"{self.__class__.__name__}: focal height must be within {min_fh}-{max_fh}") + + min_ex, max_ex = self.excitation_range + if not (min_ex <= excitation_wavelength <= max_ex): + raise ValueError( + f"{self.__class__.__name__}: excitation wavelength must be {min_ex}-{max_ex}" + ) + + min_em, max_em = self.emission_range + if not (min_em <= emission_wavelength <= max_em): + raise ValueError(f"{self.__class__.__name__}: emission wavelength must be {min_em}-{max_em}") + + await self.set_plate(plate) + + cmd = f"{614220 + int(1000 * focal_height)}\x03" + await self.send_command("t", cmd) + + excitation_wavelength_str = str(excitation_wavelength).zfill(4) + emission_wavelength_str = str(emission_wavelength).zfill(4) + + all_data: List[List[Optional[float]]] = [ + [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) + ] + for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): + cmd = ( + f"008401{min_row + 1:02}{min_col + 1:02}{max_row + 1:02}{max_col + 1:02}0001200100001100100000135000100200200{excitation_wavelength_str}000" + f"{emission_wavelength_str}000000000000000000210011" + ) + checksum = str((sum(cmd.encode()) + 7) % 100).zfill(2) # don't know why +7 + cmd = cmd + checksum + "\x03" + await self.send_command("D", cmd) + + resp = await self.send_command("O") + assert resp == b"\x060000\x03" + + body = await self._read_until(b"\x03", timeout=60 * 2) + assert body is not None + parsed_data = self._parse_body(body) + # Merge data + for r in range(plate.num_items_y): + for c in range(plate.num_items_x): + if parsed_data[r][c] is not None: + all_data[r][c] = parsed_data[r][c] + + # Get current temperature + try: + temp = await self.get_current_temperature() + except TimeoutError: + temp = float("nan") + + return [ + { + "ex_wavelength": excitation_wavelength, + "em_wavelength": emission_wavelength, + "data": all_data, + "temperature": temp, + "time": time.time(), + } + ] + + async def _abort(self) -> None: + await self.send_command("x", wait_for_response=False) + + class ShakeType(enum.IntEnum): + LINEAR = 0 + ORBITAL = 1 + + async def shake(self, shake_type: ShakeType, frequency: int) -> None: + """Warning: the duration for shaking has to be specified on the machine, and the maximum is + 16 minutes. As a hack, we start shaking for the maximum duration every time as long as stop + is not called. I think the machine might open the door at the end of the 16 minutes and then + move it back in. We have to find a way to shake continuously, which is possible in protocol-mode + with kinetics. + + Args: + frequency: speed, in mm. 360 CPM = 6mm; 410 CPM = 5mm; 493 CPM = 4mm; 567 CPM = 3mm; 731 CPM = 2mm; 1096 CPM = 1mm + """ + + max_duration = 16 * 60 # 16 minutes + self._shaking_started = asyncio.Event() + + async def shake_maximal_duration(): + """This method will start the shaking, but returns immediately after + shaking has started.""" + shake_type_bit = str(shake_type.value) + duration = str(max_duration).zfill(3) + assert 1 <= frequency <= 6, "Frequency must be between 1 and 6" + cmd = f"0033010101010100002000000013{duration}{shake_type_bit}{frequency}01" + checksum = str((sum(cmd.encode()) + 73) % 100).zfill(2) # don't know why +73 + cmd = cmd + checksum + "\x03" + await self.send_command("D", cmd) + + resp = await self.send_command("O") + assert resp == b"\x060000\x03" + + if not self._shaking_started.is_set(): + self._shaking_started.set() + + async def shake_continuous(): + while self._shaking: + await shake_maximal_duration() + + # short sleep allows = frequent checks for fast stopping + seconds_since_start: float = 0 + loop_wait_time = 0.25 + while seconds_since_start < max_duration and self._shaking: + seconds_since_start += loop_wait_time + await asyncio.sleep(loop_wait_time) + + self._shaking = True + self._shaking_task = asyncio.create_task(shake_continuous()) + + await self._shaking_started.wait() + + async def stop_shaking(self) -> None: + if self._shaking: + await self._abort() + self._shaking = False + if self._shaking_task is not None: + self._shaking_task.cancel() + try: + await self._shaking_task + except asyncio.CancelledError: + # Task cancellation is expected here; safe to ignore this exception. + pass + self._shaking_task = None diff --git a/pylabrobot/plate_reading/biotek_backend.py b/pylabrobot/plate_reading/agilent_biotek_cytation_backend.py similarity index 59% rename from pylabrobot/plate_reading/biotek_backend.py rename to pylabrobot/plate_reading/agilent_biotek_cytation_backend.py index f333a588524..962828e18ee 100644 --- a/pylabrobot/plate_reading/biotek_backend.py +++ b/pylabrobot/plate_reading/agilent_biotek_cytation_backend.py @@ -1,14 +1,16 @@ import asyncio import atexit -import enum import logging import math import re import time +import warnings from dataclasses import dataclass -from typing import Dict, Iterable, List, Literal, Optional, Tuple, Union +from typing import List, Literal, Optional, Tuple, Union -from pylabrobot.resources import Plate, Well +from pylabrobot.plate_reading.agilent_biotek_backend import BioTekPlateReaderBackend +from pylabrobot.plate_reading.backend import ImagerBackend +from pylabrobot.resources import Plate try: import PySpin # type: ignore @@ -19,8 +21,6 @@ USE_PYSPIN = False _PYSPIN_IMPORT_ERROR = e -from pylabrobot.io.ftdi import FTDI -from pylabrobot.plate_reading.backend import ImageReaderBackend from pylabrobot.plate_reading.standard import ( Exposure, FocalPosition, @@ -31,18 +31,17 @@ Objective, ) -logger = logging.getLogger("pylabrobot.plate_reading.biotek") - - SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR = ( PySpin.SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR if USE_PYSPIN else -1 ) PixelFormat_Mono8 = PySpin.PixelFormat_Mono8 if USE_PYSPIN else -1 SpinnakerException = PySpin.SpinnakerException if USE_PYSPIN else Exception +logger = logging.getLogger(__name__) + @dataclass -class Cytation5ImagingConfig: +class CytationImagingConfig: camera_serial_number: Optional[str] = None max_image_read_attempts: int = 50 @@ -72,56 +71,8 @@ def retry(func, *args, **kwargs): time.sleep(delay) -def _non_overlapping_rectangles( - points: Iterable[Tuple[int, int]], -) -> List[Tuple[int, int, int, int]]: - """Find non-overlapping rectangles that cover all given points. - - Example: - >>> points = [ - >>> (1, 1), - >>> (2, 2), (2, 3), (2, 4), - >>> (3, 2), (3, 3), (3, 4), - >>> (4, 2), (4, 3), (4, 4), (4, 5), - >>> (5, 2), (5, 3), (5, 4), (5, 5), - >>> (6, 2), (6, 3), (6, 4), (6, 5), - >>> (7, 2), (7, 3), (7, 4), - >>> ] - >>> non_overlapping_rectangles(points) - [ - (1, 1, 1, 1), - (2, 2, 7, 4), - (4, 5, 6, 5), - ] - """ - - pts = set(points) - rects = [] - - while pts: - # start a rectangle from one arbitrary point - r0, c0 = min(pts) - # expand right - c1 = c0 - while (r0, c1 + 1) in pts: - c1 += 1 - # expand downward as long as entire row segment is filled - r1 = r0 - while all((r1 + 1, c) in pts for c in range(c0, c1 + 1)): - r1 += 1 - - rects.append((r0, c0, r1, c1)) - # remove covered points - for r in range(r0, r1 + 1): - for c in range(c0, c1 + 1): - pts.discard((r, c)) - - rects.sort() - return rects - - -class Cytation5Backend(ImageReaderBackend): - """Backend for biotek cytation 5 image reader. +class CytationBackend(BioTekPlateReaderBackend, ImagerBackend): + """Backend for Agilent BioTek Cytation plate readers. The camera is interfaced using the Spinnaker SDK, and the camera used during development is the Point Grey Research Inc. Blackfly BFLY-U3-23S6M. This uses a Sony IMX249 sensor. @@ -131,56 +82,30 @@ def __init__( self, timeout: float = 20, device_id: Optional[str] = None, - imaging_config: Optional[Cytation5ImagingConfig] = None, + imaging_config: Optional[CytationImagingConfig] = None, ) -> None: - super().__init__() - self.timeout = timeout + super().__init__(timeout=timeout, device_id=device_id) - self.io = FTDI(device_id=device_id) - - self.spinnaker_system: Optional["PySpin.SystemPtr"] = None - self.cam: Optional["PySpin.CameraPtr"] = None - self.imaging_config = imaging_config or Cytation5ImagingConfig() + self._spinnaker_system: Optional["PySpin.SystemPtr"] = None + self._cam: Optional["PySpin.CameraPtr"] = None + self.imaging_config = imaging_config or CytationImagingConfig() self._filters: Optional[List[Optional[ImagingMode]]] = self.imaging_config.filters self._objectives: Optional[List[Optional[Objective]]] = self.imaging_config.objectives - self._version: Optional[str] = None - - self._plate: Optional[Plate] = None self._exposure: Optional[Exposure] = None self._focal_height: Optional[FocalPosition] = None self._gain: Optional[Gain] = None self._imaging_mode: Optional["ImagingMode"] = None self._row: Optional[int] = None self._column: Optional[int] = None - self._shaking = False self._pos_x: Optional[float] = None self._pos_y: Optional[float] = None self._objective: Optional[Objective] = None - self._slow_mode: Optional[bool] = None - self._acquiring = False async def setup(self, use_cam: bool = False) -> None: - logger.info("[cytation5] setting up") - - await self.io.setup() - await self.io.usb_reset() - await self.io.set_latency_timer(16) - await self.io.set_baudrate(9600) # 0x38 0x41 - await self.io.set_line_property(8, 2, 0) # 8 data bits, 2 stop bits, no parity - SIO_RTS_CTS_HS = 0x1 << 8 - await self.io.set_flowctrl(SIO_RTS_CTS_HS) - await self.io.set_rts(True) - - # see if we need to adjust baudrate. This appears to be the case sometimes. - try: - self._version = await self.get_firmware_version() - except TimeoutError: - await self.io.set_baudrate(38_461) # 4e c0 - self._version = await self.get_firmware_version() + logger.info(f"{self.__class__.__name__} setting up") - self._shaking = False - self._shaking_task: Optional[asyncio.Task] = None + await super().setup() if use_cam: try: @@ -193,6 +118,39 @@ async def setup(self, use_cam: bool = False) -> None: await self.stop() raise + async def stop(self): + await super().stop() + + if self._acquiring: + self.stop_acquisition() + + logger.info(f"{self.__class__.__name__} stopping") + await self.stop_shaking() + await self.io.stop() + + self._stop_camera() + + self._objectives = None + self._filters = None + self._slow_mode = None + + self._exposure = None + self._focal_height = None + self._gain = None + self._imaging_mode = None + self._row = None + self._column = None + self._pos_x, self._pos_y = 0, 0 + self._objective = None + + @property + def supports_heating(self): + return True + + @property + def supports_cooling(self): + return True + async def _set_up_camera(self) -> None: atexit.register(self._stop_camera) @@ -204,13 +162,13 @@ async def _set_up_camera(self) -> None: if self.imaging_config is None: raise RuntimeError("Imaging configuration is not set.") - logger.debug("[cytation5] setting up camera") + logger.debug(f"{self.__class__.__name__} setting up camera") # -- Retrieve singleton reference to system object (Spinnaker) -- - self.spinnaker_system = PySpin.System.GetInstance() - version = self.spinnaker_system.GetLibraryVersion() + self._spinnaker_system = PySpin.System.GetInstance() + version = self._spinnaker_system.GetLibraryVersion() logger.debug( - "[cytation5] Library version: %d.%d.%d.%d", + f"{self.__class__.__name__} Library version: %d.%d.%d.%d", version.major, version.minor, version.type, @@ -218,42 +176,44 @@ async def _set_up_camera(self) -> None: ) # -- Get the camera by serial number, or the first. -- - cam_list = self.spinnaker_system.GetCameras() + cam_list = self._spinnaker_system.GetCameras() num_cameras = cam_list.GetSize() - logger.debug("[cytation5] number of cameras detected: %d", num_cameras) + logger.debug(f"{self.__class__.__name__} number of cameras detected: %d", num_cameras) for cam in cam_list: info = self._get_device_info(cam) serial_number = info["DeviceSerialNumber"] - logger.debug("[cytation5] camera detected: %s", serial_number) + logger.debug(f"{self.__class__.__name__} camera detected: %s", serial_number) if ( self.imaging_config.camera_serial_number is not None and serial_number == self.imaging_config.camera_serial_number ): - self.cam = cam - logger.info("[cytation5] using camera with serial number %s", serial_number) + self._cam = cam + logger.info(f"{self.__class__.__name__} using camera with serial number %s", serial_number) break else: # if no specific camera was found by serial number so use the first one if num_cameras > 0: - self.cam = cam_list.GetByIndex(0) + self._cam = cam_list.GetByIndex(0) logger.info( - "[cytation5] using first camera with serial number %s", info["DeviceSerialNumber"] + f"{self.__class__.__name__} using first camera with serial number %s", + info["DeviceSerialNumber"], ) else: - logger.error("[cytation5] no cameras found") - self.cam = None + logger.error(f"{self.__class__.__name__}: No cameras found") + self._cam = None cam_list.Clear() - if self.cam is None: + if self._cam is None: raise RuntimeError( - "No camera found. Make sure the camera is connected and the serial " "number is correct." + f"{self.__class__.__name__}: No camera found. Make sure the camera is connected and the serial " + "number is correct." ) # -- Initialize camera -- for _ in range(10): try: - self.cam.Init() # SpinnakerException: Spinnaker: Could not read the XML URL [-1010] + self._cam.Init() # SpinnakerException: Spinnaker: Could not read the XML URL [-1010] break except: # noqa await asyncio.sleep(0.1) @@ -263,7 +223,7 @@ async def _set_up_camera(self) -> None: "Failed to initialize camera. Make sure the camera is connected and the " "Spinnaker SDK is installed correctly." ) - nodemap = self.cam.GetNodeMap() + nodemap = self._cam.GetNodeMap() # -- Configure trigger to be software -- # This is needed for longer exposure times (otherwise 27.8ms is the maximum) @@ -271,7 +231,7 @@ async def _set_up_camera(self) -> None: ptr_trigger_selector = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerSelector")) if not PySpin.IsReadable(ptr_trigger_selector) or not PySpin.IsWritable(ptr_trigger_selector): raise RuntimeError( - "unable to configure TriggerSelector " "(can't read or write TriggerSelector)" + "unable to configure TriggerSelector (can't read or write TriggerSelector)" ) ptr_frame_start = PySpin.CEnumEntryPtr(ptr_trigger_selector.GetEntryByName("FrameStart")) if not PySpin.IsReadable(ptr_frame_start): @@ -307,22 +267,16 @@ async def _set_up_camera(self) -> None: if self._objectives is None: await self._load_objectives() - @property - def version(self) -> str: - if self._version is None: - raise RuntimeError("Firmware version is not set") - return self._version - @property def objectives(self) -> List[Optional[Objective]]: if self._objectives is None: - raise RuntimeError("Objectives are not set") + raise RuntimeError(f"{self.__class__.__name__}: Objectives are not set") return self._objectives @property def filters(self) -> List[Optional[ImagingMode]]: if self._filters is None: - raise RuntimeError("Filters are not set") + raise RuntimeError(f"{self.__class__.__name__}: Filters are not set") return self._filters async def _load_filters(self): @@ -457,41 +411,27 @@ async def _load_objectives(self): } self._objectives.append(annulus_part_number2objective[annulus_part_number]) else: - raise RuntimeError(f"Unsupported version: {self.version}") - - async def stop(self) -> None: - if self._acquiring: - self.stop_acquisition() - - logger.info("[cytation5] stopping") - await self.stop_shaking() - await self.io.stop() - - self._stop_camera() - - self._objectives = None - self._filters = None - self._slow_mode = None + raise RuntimeError(f"{self.__class__.__name__}: Unsupported version: {self.version}") def _stop_camera(self) -> None: - if self.cam is not None: + if self._cam is not None: if self._acquiring: self.stop_acquisition() self._reset_trigger() - self.cam.DeInit() - self.cam = None - if self.spinnaker_system is not None: - self.spinnaker_system.ReleaseInstance() + self._cam.DeInit() + self._cam = None + if self._spinnaker_system is not None: + self._spinnaker_system.ReleaseInstance() def _reset_trigger(self): - if self.cam is None: + if self._cam is None: return # adopted from example try: - nodemap = self.cam.GetNodeMap() + nodemap = self._cam.GetNodeMap() node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerMode")) if not PySpin.IsReadable(node_trigger_mode) or not PySpin.IsWritable(node_trigger_mode): return @@ -504,435 +444,6 @@ def _reset_trigger(self): except PySpin.SpinnakerException: pass - async def _purge_buffers(self) -> None: - """Purge the RX and TX buffers, as implemented in Gen5.exe""" - for _ in range(6): - await self.io.usb_purge_rx_buffer() - await self.io.usb_purge_tx_buffer() - - async def _read_until(self, char: bytes, timeout: Optional[float] = None) -> bytes: - """If timeout is None, use self.timeout""" - if timeout is None: - timeout = self.timeout - x = None - res = b"" - t0 = time.time() - while x != char: - x = await self.io.read(1) - res += x - - if time.time() - t0 > timeout: - logger.debug("[cytation5] received incomplete %s", res) - raise TimeoutError("Timeout while waiting for response") - - if x == b"": - await asyncio.sleep(0.01) - - logger.debug("[cytation5] received %s", res) - return res - - async def send_command( - self, - command: str, - parameter: Optional[str] = None, - wait_for_response=True, - timeout: Optional[float] = None, - ) -> Optional[bytes]: - await self._purge_buffers() - - await self.io.write(command.encode()) - logger.debug("[cytation5] sent %s", command) - response: Optional[bytes] = None - if wait_for_response or parameter is not None: - response = await self._read_until( - b"\x06" if parameter is not None else b"\x03", timeout=timeout - ) - - if parameter is not None: - await self.io.write(parameter.encode()) - logger.debug("[cytation5] sent %s", parameter) - if wait_for_response: - response = await self._read_until(b"\x03", timeout=timeout) - - return response - - async def get_serial_number(self) -> str: - resp = await self.send_command("C", timeout=1) - assert resp is not None - return resp[1:].split(b" ")[0].decode() - - async def get_firmware_version(self) -> str: - resp = await self.send_command("e", timeout=1) - assert resp is not None - return " ".join(resp[1:-1].decode().split(" ")[3:4]) - - async def _set_slow_mode(self, slow: bool): - if self._slow_mode == slow: - return - await self.send_command("&", "S1" if slow else "S0") - self._slow_mode = slow - - async def open(self, slow: bool = False): - await self._set_slow_mode(slow) - return await self.send_command("J") - - async def close(self, plate: Optional[Plate], slow: bool = False): - # reset cache - self._plate = None - self._exposure = None - self._focal_height = None - self._gain = None - self._imaging_mode = None - self._row = None - self._column = None - self._pos_x, self._pos_y = 0, 0 - self._objective = None - - await self._set_slow_mode(slow) - if plate is not None: - await self.set_plate(plate) - self._row, self._column = None, None - return await self.send_command("A") - - async def home(self): - return await self.send_command("i", "x") - - async def get_current_temperature(self) -> float: - """Get current temperature in degrees Celsius.""" - resp = await self.send_command("h", timeout=1) - assert resp is not None - return int(resp[1:-1]) / 100000 - - async def set_temperature(self, temperature: float): - """Set temperature in degrees Celsius.""" - return await self.send_command("g", f"{int(temperature * 1000):05}") - - async def stop_heating_or_cooling(self): - return await self.send_command("g", "00000") - - def _parse_body(self, body: bytes) -> List[List[Optional[float]]]: - assert self._plate is not None, "Plate must be set before reading data" - plate = self._plate - start_index = 22 - end_index = body.rindex(b"\r\n") - num_rows = plate.num_items_y - rows = body[start_index:end_index].split(b"\r\n,")[:num_rows] - - parsed_data: Dict[Tuple[int, int], float] = {} - for row in rows: - values = row.split(b",") - grouped_values = [values[i : i + 3] for i in range(0, len(values), 3)] - - for group in grouped_values: - assert len(group) == 3 - row_index = int(group[0].decode()) - 1 # 1-based index in the response - column_index = int(group[1].decode()) - 1 # 1-based index in the response - value = float(group[2].decode()) - parsed_data[(row_index, column_index)] = value - - result: List[List[Optional[float]]] = [ - [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) - ] - for (row_idx, col_idx), value in parsed_data.items(): - result[row_idx][col_idx] = value - return result - - async def set_plate(self, plate: Plate): - # 08120112207434014351135308559127881422 - # ^^^^ plate size z - # ^^^^^ plate size x - # ^^^^^ plate size y - # ^^^^^ bottom right x - # ^^^^^ top left x - # ^^^^^ bottom right y - # ^^^^^ top left y - # ^^ columns - # ^^ rows - - if plate is self._plate: - return - - rows = plate.num_items_y - columns = plate.num_items_x - - bottom_right_well = plate.get_item(plate.num_items - 1) - assert bottom_right_well.location is not None - bottom_right_well_center = bottom_right_well.location + bottom_right_well.get_anchor( - x="c", y="c" - ) - top_left_well = plate.get_item(0) - assert top_left_well.location is not None - top_left_well_center = top_left_well.location + top_left_well.get_anchor(x="c", y="c") - - plate_size_y = plate.get_size_y() - plate_size_x = plate.get_size_x() - plate_size_z = plate.get_size_z() - if plate.lid is not None: - plate_size_z += plate.lid.get_size_z() - plate.lid.nesting_z_height - - top_left_well_center_y = plate.get_size_y() - top_left_well_center.y # invert y axis - bottom_right_well_center_y = plate.get_size_y() - bottom_right_well_center.y # invert y axis - - cmd = ( - f"{rows:02}" - f"{columns:02}" - f"{int(top_left_well_center_y*100):05}" - f"{int(bottom_right_well_center_y*100):05}" - f"{int(top_left_well_center.x*100):05}" - f"{int(bottom_right_well_center.x*100):05}" - f"{int(plate_size_y*100):05}" - f"{int(plate_size_x*100):05}" - f"{int(plate_size_z*100):04}" - "\x03" - ) - - resp = await self.send_command("y", cmd, timeout=1) - self._plate = plate - return resp - - def _get_min_max_row_col_tuples( - self, wells: List[Well], plate: Plate - ) -> List[Tuple[int, int, int, int]]: # min_row, min_col, max_row, max_col - # check if all wells are in the same plate - plates = set(well.parent for well in wells) - if len(plates) != 1 or plates.pop() != plate: - raise ValueError("All wells must be in the specified plate") - return _non_overlapping_rectangles((well.get_row(), well.get_column()) for well in wells) - - async def read_absorbance(self, plate: Plate, wells: List[Well], wavelength: int) -> List[Dict]: - if not 230 <= wavelength <= 999: - raise ValueError("Wavelength must be between 230 and 999") - - await self.set_plate(plate) - - wavelength_str = str(wavelength).zfill(4) - all_data: List[List[Optional[float]]] = [ - [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) - ] - - for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): - cmd = f"004701{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000010600008{wavelength_str}1" - checksum = str(sum(cmd.encode()) % 100).zfill(2) - cmd = cmd + checksum + "\x03" - await self.send_command("D", cmd) - - resp = await self.send_command("O") - assert resp == b"\x060000\x03" - - # read data - body = await self._read_until(b"\x03", timeout=60 * 3) - assert body is not None - parsed_data = self._parse_body(body) - # Merge data - for r in range(plate.num_items_y): - for c in range(plate.num_items_x): - if parsed_data[r][c] is not None: - all_data[r][c] = parsed_data[r][c] - - # Get current temperature - try: - temp = await self.get_current_temperature() - except TimeoutError: - temp = float("nan") - - return [ - { - "wavelength": wavelength, - "data": all_data, - "temperature": temp, - "time": time.time(), - } - ] - - async def read_luminescence( - self, plate: Plate, wells: List[Well], focal_height: float, integration_time: float = 1 - ) -> List[Dict]: - if not 4.5 <= focal_height <= 13.88: - raise ValueError("Focal height must be between 4.5 and 13.88") - - await self.set_plate(plate) - - cmd = f"3{14220 + int(1000*focal_height)}\x03" - await self.send_command("t", cmd) - - integration_time_seconds = int(integration_time) - assert 0 <= integration_time_seconds <= 60, "Integration time seconds must be between 0 and 60" - integration_time_milliseconds = integration_time - int(integration_time) - # TODO: I don't know if the multiple of 0.2 is a firmware requirement, but it's what gen5.exe requires. - # round because of floating point precision issues - assert ( - round(integration_time_milliseconds * 10) % 2 == 0 - ), "Integration time milliseconds must be a multiple of 0.2" - integration_time_seconds_s = str(integration_time_seconds * 5).zfill(2) - integration_time_milliseconds_s = str(int(float(integration_time_milliseconds * 50))).zfill(2) - - all_data: List[List[Optional[float]]] = [ - [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) - ] - for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): - cmd = f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000012300{integration_time_seconds_s}{integration_time_milliseconds_s}200200-001000-003000000000000000000013510" # 0812 - checksum = str((sum(cmd.encode()) + 8) % 100).zfill(2) # don't know why +8 - cmd = cmd + checksum - await self.send_command("D", cmd) - - resp = await self.send_command("O") - assert resp == b"\x060000\x03" - - # 2m10s of reading per 1 second of integration time - # allow 60 seconds flat - timeout = 60 + integration_time_seconds * (2 * 60 + 10) - body = await self._read_until(b"\x03", timeout=timeout) - assert body is not None - parsed_data = self._parse_body(body) - # Merge data - for r in range(plate.num_items_y): - for c in range(plate.num_items_x): - if parsed_data[r][c] is not None: - all_data[r][c] = parsed_data[r][c] - - # Get current temperature - try: - temp = await self.get_current_temperature() - except TimeoutError: - temp = float("nan") - - return [ - { - "data": all_data, - "temperature": temp, - "time": time.time(), - } - ] - - async def read_fluorescence( - self, - plate: Plate, - wells: List[Well], - excitation_wavelength: int, - emission_wavelength: int, - focal_height: float, - ) -> List[Dict]: - if not 4.5 <= focal_height <= 13.88: - raise ValueError("Focal height must be between 4.5 and 13.88") - if not 250 <= excitation_wavelength <= 700: - raise ValueError("Excitation wavelength must be between 250 and 700") - if not 250 <= emission_wavelength <= 700: - raise ValueError("Emission wavelength must be between 250 and 700") - - await self.set_plate(plate) - - cmd = f"{614220 + int(1000*focal_height)}\x03" - await self.send_command("t", cmd) - - excitation_wavelength_str = str(excitation_wavelength).zfill(4) - emission_wavelength_str = str(emission_wavelength).zfill(4) - - all_data: List[List[Optional[float]]] = [ - [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) - ] - for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): - cmd = ( - f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}0001200100001100100000135000100200200{excitation_wavelength_str}000" - f"{emission_wavelength_str}000000000000000000210011" - ) - checksum = str((sum(cmd.encode()) + 7) % 100).zfill(2) # don't know why +7 - cmd = cmd + checksum + "\x03" - resp = await self.send_command("D", cmd) - - resp = await self.send_command("O") - assert resp == b"\x060000\x03" - - body = await self._read_until(b"\x03", timeout=60 * 2) - assert body is not None - parsed_data = self._parse_body(body) - # Merge data - for r in range(plate.num_items_y): - for c in range(plate.num_items_x): - if parsed_data[r][c] is not None: - all_data[r][c] = parsed_data[r][c] - - # Get current temperature - try: - temp = await self.get_current_temperature() - except TimeoutError: - temp = float("nan") - - return [ - { - "ex_wavelength": excitation_wavelength, - "em_wavelength": emission_wavelength, - "data": all_data, - "temperature": temp, - "time": time.time(), - } - ] - - async def _abort(self) -> None: - await self.send_command("x", wait_for_response=False) - - class ShakeType(enum.IntEnum): - LINEAR = 0 - ORBITAL = 1 - - async def shake(self, shake_type: ShakeType, frequency: int) -> None: - """Warning: the duration for shaking has to be specified on the machine, and the maximum is - 16 minutes. As a hack, we start shaking for the maximum duration every time as long as stop - is not called. I think the machine might open the door at the end of the 16 minutes and then - move it back in. We have to find a way to shake continuously, which is possible in protocol-mode - with kinetics. - - Args: - frequency: speed, in mm. 360 CPM = 6mm; 410 CPM = 5mm; 493 CPM = 4mm; 567 CPM = 3mm; 731 CPM = 2mm; 1096 CPM = 1mm - """ - - max_duration = 16 * 60 # 16 minutes - self._shaking_started = asyncio.Event() - - async def shake_maximal_duration(): - """This method will start the shaking, but returns immediately after - shaking has started.""" - shake_type_bit = str(shake_type.value) - duration = str(max_duration).zfill(3) - assert 1 <= frequency <= 6, "Frequency must be between 1 and 6" - cmd = f"0033010101010100002000000013{duration}{shake_type_bit}{frequency}01" - checksum = str((sum(cmd.encode()) + 73) % 100).zfill(2) # don't know why +73 - cmd = cmd + checksum + "\x03" - await self.send_command("D", cmd) - - resp = await self.send_command("O") - assert resp == b"\x060000\x03" - - if not self._shaking_started.is_set(): - self._shaking_started.set() - - async def shake_continuous(): - while self._shaking: - await shake_maximal_duration() - - # short sleep allows = frequent checks for fast stopping - seconds_since_start: float = 0 - loop_wait_time = 0.25 - while seconds_since_start < max_duration and self._shaking: - seconds_since_start += loop_wait_time - await asyncio.sleep(loop_wait_time) - - self._shaking = True - self._shaking_task = asyncio.create_task(shake_continuous()) - - await self._shaking_started.wait() - - async def stop_shaking(self) -> None: - if self._shaking: - await self._abort() - self._shaking = False - if self._shaking_task is not None: - self._shaking_task.cancel() - try: - await self._shaking_task - except asyncio.CancelledError: - pass - self._shaking_task = None - def _get_device_info(self, cam): """Get device info for cameras.""" # should have keys: @@ -982,19 +493,19 @@ def _get_device_info(self, cam): return device_info def start_acquisition(self): - if self.cam is None: - raise RuntimeError("Camera is not initialized.") + if self._cam is None: + raise RuntimeError(f"{self.__class__.__name__}: Camera is not initialized.") if self._acquiring: return - retry(self.cam.BeginAcquisition) + retry(self._cam.BeginAcquisition) self._acquiring = True def stop_acquisition(self): - if self.cam is None: - raise RuntimeError("Camera is not initialized.") + if self._cam is None: + raise RuntimeError(f"{self.__class__.__name__}: Camera is not initialized.") if not self._acquiring: return - retry(self.cam.EndAcquisition) + retry(self._cam.EndAcquisition) self._acquiring = False async def led_on(self, intensity: int = 10): @@ -1080,14 +591,14 @@ async def set_position(self, x: float, y: float): await asyncio.sleep(0.1) async def set_auto_exposure(self, auto_exposure: Literal["off", "once", "continuous"]): - if self.cam is None: + if self._cam is None: raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") - if self.cam.ExposureAuto.GetAccessMode() != PySpin.RW: + if self._cam.ExposureAuto.GetAccessMode() != PySpin.RW: raise RuntimeError("unable to write ExposureAuto") retry( - self.cam.ExposureAuto.SetValue, + self._cam.ExposureAuto.SetValue, { "off": PySpin.ExposureAuto_Off, "once": PySpin.ExposureAuto_Once, @@ -1102,7 +613,7 @@ async def set_exposure(self, exposure: Exposure): logger.debug("Exposure time is already set to %s", exposure) return - if self.cam is None: + if self._cam is None: raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") # either set auto exposure to continuous, or turn off @@ -1112,19 +623,19 @@ async def set_exposure(self, exposure: Exposure): self._exposure = "machine-auto" return raise ValueError("exposure must be a number or 'auto'") - retry(self.cam.ExposureAuto.SetValue, PySpin.ExposureAuto_Off) + retry(self._cam.ExposureAuto.SetValue, PySpin.ExposureAuto_Off) # set exposure time (in microseconds) - if self.cam.ExposureTime.GetAccessMode() != PySpin.RW: + if self._cam.ExposureTime.GetAccessMode() != PySpin.RW: raise RuntimeError("unable to write ExposureTime") exposure_us = int(exposure * 1000) - min_et = retry(self.cam.ExposureTime.GetMin) + min_et = retry(self._cam.ExposureTime.GetMin) if exposure_us < min_et: raise ValueError(f"exposure must be >= {min_et}") - max_et = retry(self.cam.ExposureTime.GetMax) + max_et = retry(self._cam.ExposureTime.GetMax) if exposure_us > max_et: raise ValueError(f"exposure must be <= {max_et}") - retry(self.cam.ExposureTime.SetValue, exposure_us) + retry(self._cam.ExposureTime.SetValue, exposure_us) self._exposure = exposure async def select(self, row: int, column: int): @@ -1139,7 +650,7 @@ async def select(self, row: int, column: int): async def set_gain(self, gain: Gain): """gain of unknown units, or "machine-auto" """ - if self.cam is None: + if self._cam is None: raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") if gain == self._gain: @@ -1149,7 +660,7 @@ async def set_gain(self, gain: Gain): if not (gain == "machine-auto" or 0 <= gain <= 30): raise ValueError("gain must be between 0 and 30 (inclusive), or 'auto'") - nodemap = self.cam.GetNodeMap() + nodemap = self._cam.GetNodeMap() # set/disable automatic gain node_gain_auto = PySpin.CEnumerationPtr(nodemap.GetNode("GainAuto")) @@ -1208,7 +719,7 @@ async def set_objective(self, objective: Objective): self._objective = objective async def set_imaging_mode(self, mode: ImagingMode, led_intensity: int): - if self.cam is None: + if self._cam is None: raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") if mode == self._imaging_mode: @@ -1259,8 +770,8 @@ async def _acquire_image( color_processing_algorithm: int = SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR, pixel_format: int = PixelFormat_Mono8, ) -> Image: - assert self.cam is not None - nodemap = self.cam.GetNodeMap() + assert self._cam is not None + nodemap = self._cam.GetNodeMap() assert self.imaging_config is not None, "Need to set imaging_config first" @@ -1272,8 +783,8 @@ async def _acquire_image( try: node_softwaretrigger_cmd.Execute() - timeout = int(self.cam.ExposureTime.GetValue() / 1000 + 1000) # from example - image_result = self.cam.GetNextImage(timeout) + timeout = int(self._cam.ExposureTime.GetValue() / 1000 + 1000) # from example + image_result = self._cam.GetNextImage(timeout) if not image_result.IsIncomplete(): processor = PySpin.ImageProcessor() processor.SetColorProcessing(color_processing_algorithm) @@ -1331,7 +842,7 @@ async def capture( assert overlap is None, "not implemented yet" - if self.cam is None: + if self._cam is None: raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") await self.set_plate(plate) @@ -1401,8 +912,26 @@ def image_size(magnification: float) -> Tuple[float, float]: if auto_stop_acquisition: self.stop_acquisition() - exposure_ms = float(self.cam.ExposureTime.GetValue()) / 1000 + exposure_ms = float(self._cam.ExposureTime.GetValue()) / 1000 assert self._focal_height is not None, "Focal height not set. Run set_focus() first." focal_height_val = float(self._focal_height) return ImagingResult(images=images, exposure_time=exposure_ms, focal_height=focal_height_val) + + +class Cytation5ImagingConfig(CytationImagingConfig): + def __init__(self, *args, **kwargs): + warnings.warn( + "`Cytation5ImagingConfig` is deprecated. Please use `CytationImagingConfig` instead. ", + FutureWarning, + ) + super().__init__(*args, **kwargs) + + +class Cytation5Backend(CytationBackend): + def __init__(self, *args, **kwargs): + warnings.warn( + "`Cytation5Backend` is deprecated. Please use `CytationBackend` instead. ", + FutureWarning, + ) + super().__init__(*args, **kwargs) diff --git a/pylabrobot/plate_reading/agilent_biotek_synergyh1_backend.py b/pylabrobot/plate_reading/agilent_biotek_synergyh1_backend.py new file mode 100644 index 00000000000..796cd1e5ba7 --- /dev/null +++ b/pylabrobot/plate_reading/agilent_biotek_synergyh1_backend.py @@ -0,0 +1,71 @@ +import asyncio +import logging +import time +from typing import Optional + +from pylibftdi import FtdiError + +from pylabrobot.plate_reading.agilent_biotek_backend import BioTekPlateReaderBackend + +logger = logging.getLogger(__name__) + + +class SynergyH1Backend(BioTekPlateReaderBackend): + """Backend for Agilent BioTek Synergy H1 plate readers.""" + + @property + def supports_heating(self): + return True + + @property + def supports_cooling(self): + return False + + @property + def focal_height_range(self): + return (4.5, 10.68) + + async def _read_until( + self, terminator: bytes, timeout: Optional[float] = None, chunk_size: int = 512 + ) -> bytes: + if timeout is None: + timeout = self.timeout + + deadline = time.time() + timeout + buf = bytearray() + + while True: + if time.time() > deadline: + logger.debug( + f"{self.__class__.__name__} _read_until timed out; partial buffer (hex): %s", buf.hex() + ) + raise TimeoutError( + f"{self.__class__.__name__} _read_until timed out waiting for {terminator!r}; partial={buf.hex()}" + ) + + try: + data = await self.io.read(chunk_size) + if len(data) == 0: + await asyncio.sleep(0.02) + continue + + buf.extend(data) + + if terminator in buf: + idx = buf.index(terminator) + len(terminator) + full = bytes(buf[:idx]) + logger.debug( + f"{self.__class__.__name__} _read_until received %d bytes (hex prefix): %s", + len(full), + full[:200].hex(), + ) + return full + + except FtdiError as e: + logger.warning( + f"{self.__class__.__name__} transient FtdiError while reading: %s — retrying", e + ) + await asyncio.sleep(0.05) + continue + except Exception: + raise diff --git a/pylabrobot/plate_reading/biotek_tests.py b/pylabrobot/plate_reading/biotek_tests.py index b4d6a167a1c..7e5aeaef2f0 100644 --- a/pylabrobot/plate_reading/biotek_tests.py +++ b/pylabrobot/plate_reading/biotek_tests.py @@ -5,7 +5,7 @@ import unittest.mock from typing import Iterator -from pylabrobot.plate_reading.biotek_backend import Cytation5Backend +from pylabrobot.plate_reading.agilent_biotek_cytation_backend import CytationBackend from pylabrobot.resources import CellVis_24_wellplate_3600uL_Fb, CellVis_96_wellplate_350uL_Fb @@ -18,7 +18,7 @@ class TestCytation5Backend(unittest.IsolatedAsyncioTestCase): """Tests for the Cytation5Backend.""" async def asyncSetUp(self): - self.backend = Cytation5Backend(timeout=0.1) + self.backend = CytationBackend(timeout=0.1) self.backend.io = unittest.mock.MagicMock() self.backend.io.setup = unittest.mock.AsyncMock() self.backend.io.stop = unittest.mock.AsyncMock() diff --git a/pylabrobot/plate_reading/chatterbox.py b/pylabrobot/plate_reading/chatterbox.py index 6f4da390abd..4def8f3a806 100644 --- a/pylabrobot/plate_reading/chatterbox.py +++ b/pylabrobot/plate_reading/chatterbox.py @@ -46,7 +46,7 @@ def row_label(r: int) -> str: # Header top = " " * (len(row_label(num_cols - 1)) + 1) + "|" for c in range(num_cols): - top += f"{c+1:>{cell_width}}|" + top += f"{c + 1:>{cell_width}}|" print(top) # Divider diff --git a/pylabrobot/plate_reading/clario_star_backend.py b/pylabrobot/plate_reading/clario_star_backend.py index 21ab5526c87..23435b86bc0 100644 --- a/pylabrobot/plate_reading/clario_star_backend.py +++ b/pylabrobot/plate_reading/clario_star_backend.py @@ -127,7 +127,7 @@ async def _wait_for_ready_and_return(self, ret, timeout=150): if len(command_status) != 24: logger.warning( - "unexpected response %s. I think a command status response is always 24 " "bytes", + "unexpected response %s. I think a command status response is always 24 bytes", command_status, ) continue @@ -140,7 +140,7 @@ async def _wait_for_ready_and_return(self, ret, timeout=150): if command_status[2] != 0x18 or command_status[3] != 0x0C or command_status[4] != 0x01: logger.warning( - "unexpected response %s. I think 18 0c 01 indicates a command status " "response", + "unexpected response %s. I think 18 0c 01 indicates a command status response", command_status, ) diff --git a/pylabrobot/plate_reading/imager.py b/pylabrobot/plate_reading/imager.py index 65d2e262b5b..9720435f641 100644 --- a/pylabrobot/plate_reading/imager.py +++ b/pylabrobot/plate_reading/imager.py @@ -102,7 +102,7 @@ def __init__( def _will_assign_resource(self, resource: Resource): if len(self.children) >= 1: raise ValueError( - f"Imager {self} already has a plate assigned " f"(attempting to assign {resource})" + f"Imager {self} already has a plate assigned (attempting to assign {resource})" ) def get_plate(self) -> Plate: diff --git a/pylabrobot/plate_reading/molecular_devices_backend.py b/pylabrobot/plate_reading/molecular_devices_backend.py index 434a1d5e4ac..ca94a084a71 100644 --- a/pylabrobot/plate_reading/molecular_devices_backend.py +++ b/pylabrobot/plate_reading/molecular_devices_backend.py @@ -520,7 +520,7 @@ async def _set_plate_position(self, settings: MolecularDevicesSettings) -> None: dy = loc_A1.y - loc_B1.y x_pos_cmd = f"!XPOS {top_left_well_center.x:.3f} {dx:.3f} {num_cols}" - y_pos_cmd = f"!YPOS {size_y-top_left_well_center.y:.3f} {dy:.3f} {num_rows}" + y_pos_cmd = f"!YPOS {size_y - top_left_well_center.y:.3f} {dy:.3f} {num_rows}" await self.send_command(x_pos_cmd) await self.send_command(y_pos_cmd)