diff --git a/pylabrobot/plate_reading/biotek_backend.py b/pylabrobot/plate_reading/biotek_backend.py index c86fd4ec2d1..13b2d9852be 100644 --- a/pylabrobot/plate_reading/biotek_backend.py +++ b/pylabrobot/plate_reading/biotek_backend.py @@ -6,7 +6,7 @@ import re import time from dataclasses import dataclass -from typing import List, Literal, Optional, Tuple, Union +from typing import Dict, Iterable, List, Literal, Optional, Tuple, Union from pylabrobot.resources import Plate, Well @@ -72,6 +72,54 @@ def retry(func, *args, **kwargs): time.sleep(delay) +def _non_overlapping_rectangles( + points: Iterable[Tuple[int, int]], +) -> List[Tuple[int, int, int, int]]: + """Find non-overlapping rectangles that cover all given points. + + Example: + >>> points = [ + >>> (1, 1), + >>> (2, 2), (2, 3), (2, 4), + >>> (3, 2), (3, 3), (3, 4), + >>> (4, 2), (4, 3), (4, 4), (4, 5), + >>> (5, 2), (5, 3), (5, 4), (5, 5), + >>> (6, 2), (6, 3), (6, 4), (6, 5), + >>> (7, 2), (7, 3), (7, 4), + >>> ] + >>> non_overlapping_rectangles(points) + [ + (1, 1, 1, 1), + (2, 2, 7, 4), + (4, 5, 6, 5), + ] + """ + + pts = set(points) + rects = [] + + while pts: + # start a rectangle from one arbitrary point + r0, c0 = min(pts) + # expand right + c1 = c0 + while (r0, c1 + 1) in pts: + c1 += 1 + # expand downward as long as entire row segment is filled + r1 = r0 + while all((r1 + 1, c) in pts for c in range(c0, c1 + 1)): + r1 += 1 + + rects.append((r0, c0, r1, c1)) + # remove covered points + for r in range(r0, r1 + 1): + for c in range(c0, c1 + 1): + pts.discard((r, c)) + + rects.sort() + return rects + + class Cytation5Backend(ImageReaderBackend): """Backend for biotek cytation 5 image reader. @@ -564,16 +612,14 @@ async def set_temperature(self, temperature: float): async def stop_heating_or_cooling(self): return await self.send_command("g", "00000") - def _parse_body(self, body: bytes) -> List[List[Optional[float]]]: - start_index = body.index(b"01,01") + def _parse_body(self, body: bytes) -> Dict[Tuple[int, int], float]: + start_index = 22 end_index = body.rindex(b"\r\n") num_rows = 8 rows = body[start_index:end_index].split(b"\r\n,")[:num_rows] assert self._plate is not None, "Plate must be set before reading data" - parsed_data: List[List[Optional[float]]] = [ - [None for _ in range(self._plate.num_items_x)] for _ in range(self._plate.num_items_y) - ] + parsed_data: Dict[Tuple[int, int], float] = {} for row in rows: values = row.split(b",") grouped_values = [values[i : i + 3] for i in range(0, len(values), 3)] @@ -583,10 +629,20 @@ def _parse_body(self, body: bytes) -> List[List[Optional[float]]]: row_index = int(group[0].decode()) - 1 # 1-based index in the response column_index = int(group[1].decode()) - 1 # 1-based index in the response value = float(group[2].decode()) - parsed_data[row_index][column_index] = value + parsed_data[(row_index, column_index)] = value return parsed_data + def _data_dict_to_list( + self, data: Dict[Tuple[int, int], float], plate: Plate + ) -> List[List[Optional[float]]]: + result: List[List[Optional[float]]] = [ + [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) + ] + for (row, col), value in data.items(): + result[row][col] = value + return result + async def set_plate(self, plate: Plate): # 08120112207434014351135308559127881422 # ^^^^ plate size z @@ -640,24 +696,14 @@ async def set_plate(self, plate: Plate): self._plate = plate return resp - def _get_min_max_row_col(self, wells: List[Well], plate: Plate) -> Tuple[int, int, int, int]: + def _get_min_max_row_col_tuples( + self, wells: List[Well], plate: Plate + ) -> List[Tuple[int, int, int, int]]: # min_row, min_col, max_row, max_col # check if all wells are in the same plate plates = set(well.parent for well in wells) if len(plates) != 1 or plates.pop() != plate: raise ValueError("All wells must be in the specified plate") - - # check if wells are in a grid - rows = sorted(set(well.get_row() for well in wells)) - columns = sorted(set(well.get_column() for well in wells)) - min_row, max_row, min_col, max_col = rows[0], rows[-1], columns[0], columns[-1] - if ( - (len(rows) * len(columns) != len(wells)) - or rows != list(range(min_row, max_row + 1)) - or columns != list(range(min_col, max_col + 1)) - ): - raise ValueError("Wells must be in a grid") - - return min_row, max_row, min_col, max_col + return _non_overlapping_rectangles((well.get_row(), well.get_column()) for well in wells) async def read_absorbance( self, plate: Plate, wells: List[Well], wavelength: int @@ -668,19 +714,22 @@ async def read_absorbance( await self.set_plate(plate) wavelength_str = str(wavelength).zfill(4) - min_row, max_row, min_col, max_col = self._get_min_max_row_col(wells, plate) - cmd = f"004701{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000010600008{wavelength_str}1" - checksum = str(sum(cmd.encode()) % 100).zfill(2) - cmd = cmd + checksum + "\x03" - await self.send_command("D", cmd) + data: Dict[Tuple[int, int], float] = {} - resp = await self.send_command("O") - assert resp == b"\x060000\x03" + for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): + cmd = f"004701{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000010600008{wavelength_str}1" + checksum = str(sum(cmd.encode()) % 100).zfill(2) + cmd = cmd + checksum + "\x03" + await self.send_command("D", cmd) - # read data - body = await self._read_until(b"\x03", timeout=60 * 3) - assert resp is not None - return self._parse_body(body) + resp = await self.send_command("O") + assert resp == b"\x060000\x03" + + # read data + body = await self._read_until(b"\x03", timeout=60 * 3) + assert resp is not None + data.update(self._parse_body(body)) + return self._data_dict_to_list(data, plate) async def read_luminescence( self, plate: Plate, wells: List[Well], focal_height: float, integration_time: float = 1 @@ -704,22 +753,23 @@ async def read_luminescence( integration_time_seconds_s = str(integration_time_seconds * 5).zfill(2) integration_time_milliseconds_s = str(int(float(integration_time_milliseconds * 50))).zfill(2) - min_row, max_row, min_col, max_col = self._get_min_max_row_col(wells, plate) - - cmd = f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000012300{integration_time_seconds_s}{integration_time_milliseconds_s}200200-001000-003000000000000000000013510" # 0812 - checksum = str((sum(cmd.encode()) + 8) % 100).zfill(2) # don't know why +8 - cmd = cmd + checksum - await self.send_command("D", cmd) + data: Dict[Tuple[int, int], float] = {} + for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): + cmd = f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000012300{integration_time_seconds_s}{integration_time_milliseconds_s}200200-001000-003000000000000000000013510" # 0812 + checksum = str((sum(cmd.encode()) + 8) % 100).zfill(2) # don't know why +8 + cmd = cmd + checksum + await self.send_command("D", cmd) - resp = await self.send_command("O") - assert resp == b"\x060000\x03" + resp = await self.send_command("O") + assert resp == b"\x060000\x03" - # 2m10s of reading per 1 second of integration time - # allow 60 seconds flat - timeout = 60 + integration_time_seconds * (2 * 60 + 10) - body = await self._read_until(b"\x03", timeout=timeout) - assert body is not None - return self._parse_body(body) + # 2m10s of reading per 1 second of integration time + # allow 60 seconds flat + timeout = 60 + integration_time_seconds * (2 * 60 + 10) + body = await self._read_until(b"\x03", timeout=timeout) + assert body is not None + data.update(self._parse_body(body)) + return self._data_dict_to_list(data, plate) async def read_fluorescence( self, @@ -743,21 +793,24 @@ async def read_fluorescence( excitation_wavelength_str = str(excitation_wavelength).zfill(4) emission_wavelength_str = str(emission_wavelength).zfill(4) - min_row, max_row, min_col, max_col = self._get_min_max_row_col(wells, plate) - cmd = ( - f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}0001200100001100100000135000100200200{excitation_wavelength_str}000" - f"{emission_wavelength_str}000000000000000000210011" - ) - checksum = str((sum(cmd.encode()) + 7) % 100).zfill(2) # don't know why +7 - cmd = cmd + checksum + "\x03" - resp = await self.send_command("D", cmd) - resp = await self.send_command("O") - assert resp == b"\x060000\x03" + data: Dict[Tuple[int, int], float] = {} + for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate): + cmd = ( + f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}0001200100001100100000135000100200200{excitation_wavelength_str}000" + f"{emission_wavelength_str}000000000000000000210011" + ) + checksum = str((sum(cmd.encode()) + 7) % 100).zfill(2) # don't know why +7 + cmd = cmd + checksum + "\x03" + resp = await self.send_command("D", cmd) + + resp = await self.send_command("O") + assert resp == b"\x060000\x03" - body = await self._read_until(b"\x03", timeout=60 * 2) - assert body is not None - return self._parse_body(body) + body = await self._read_until(b"\x03", timeout=60 * 2) + assert body is not None + data.update(self._parse_body(body)) + return self._data_dict_to_list(data, plate) async def _abort(self) -> None: await self.send_command("x", wait_for_response=False) diff --git a/pylabrobot/plate_reading/biotek_tests.py b/pylabrobot/plate_reading/biotek_tests.py index a91cace192d..51970e25fec 100644 --- a/pylabrobot/plate_reading/biotek_tests.py +++ b/pylabrobot/plate_reading/biotek_tests.py @@ -183,6 +183,61 @@ async def test_read_absorbance(self): [0.1427, 0.1174, 0.0684, 0.0657, 0.0732, 0.067, 0.0602, 0.079, 0.0667, 0.1103, 0.129, 0.1316], ] + async def test_read_luminescence_partial(self): + self.backend.io.read.side_effect = _byte_iter( + # plate + "\x06" + + "\x03" + # focal height + + "\x06" + + "\x03" + # read block 1 + + "\x06" + + "0350000000000000010000000000490300000\x03" + + "\x060000\x03" + + "01,1,\r000:00:00.0,237,01,01,0000003\r\n,02,01,0000003\r\n,03,01,0000005\r\n,04,01,0000004\r\n,05,01,0000003\r\n,06,01,0000002\r\n,07,01,0000000\r\n237\x1a132\x1a0000\x03" + # read block 2 + + "\x06" + + "0350000000000000010000000000030200000\x03" + + "\x060000\x03" + + "01,1,\r000:00:00.0,237,02,02,0000043,02,03,0000014\r\n,03,03,0000014,03,02,0000012\r\n,04,02,0000011,04,03,0000014\r\n,05,03,0000010,05,02,0000010\r\n,06,02,0000011,06,03,0000027\r\n,07,03,0000009,07,02,0000010\r\n237\x1a160\x1a0000\x03" + # read block 3 + + "\x06" + + "0350000000000000010000000000000170000\x03" + + "\x060000\x03" + + "01,1,\r000:00:00.0,237,04,04,0000018\r\n,05,04,0000017\r\n,06,04,0000014\r\n237\x1a210\x1a0000\x03" + ) + + plate = CellVis_96_wellplate_350uL_Fb(name="plate") + wells = plate["A1"] + plate["B1:G3"] + plate["D4:F4"] + resp = await self.backend.read_luminescence( + focal_height=4.5, integration_time=0.4, plate=plate, wells=wells + ) + + print(self.backend.io.write.mock_calls) + self.backend.io.write.assert_any_call(b"D") + self.backend.io.write.assert_any_call( + b"008401010107010001200100001100100000123000020200200-001000-00300000000000000000001351086" + ) + self.backend.io.write.assert_any_call( + b"008401020207030001200100001100100000123000020200200-001000-00300000000000000000001351090" + ) + self.backend.io.write.assert_any_call( + b"008401040406040001200100001100100000123000020200200-001000-00300000000000000000001351094" + ) + self.backend.io.write.assert_any_call(b"O") + + assert resp == [ + [3.0, None, None, None, None, None, None, None, None, None, None, None], + [3.0, 43.0, 14.0, None, None, None, None, None, None, None, None, None], + [5.0, 12.0, 14.0, None, None, None, None, None, None, None, None, None], + [4.0, 11.0, 14.0, 18.0, None, None, None, None, None, None, None, None], + [3.0, 10.0, 10.0, 17.0, None, None, None, None, None, None, None, None], + [2.0, 11.0, 27.0, 14.0, None, None, None, None, None, None, None, None], + [0.0, 10.0, 9.0, None, None, None, None, None, None, None, None, None], + [None, None, None, None, None, None, None, None, None, None, None, None], + ] + async def test_read_fluorescence(self): self.backend.io.read.side_effect = _byte_iter( "\x06"