Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 112 additions & 59 deletions pylabrobot/plate_reading/biotek_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
import time
from dataclasses import dataclass
from typing import List, Literal, Optional, Tuple, Union
from typing import Dict, Iterable, List, Literal, Optional, Tuple, Union

from pylabrobot.resources import Plate, Well

Expand Down Expand Up @@ -72,6 +72,54 @@ def retry(func, *args, **kwargs):
time.sleep(delay)


def _non_overlapping_rectangles(
points: Iterable[Tuple[int, int]],
) -> List[Tuple[int, int, int, int]]:
"""Find non-overlapping rectangles that cover all given points.

Example:
>>> points = [
>>> (1, 1),
>>> (2, 2), (2, 3), (2, 4),
>>> (3, 2), (3, 3), (3, 4),
>>> (4, 2), (4, 3), (4, 4), (4, 5),
>>> (5, 2), (5, 3), (5, 4), (5, 5),
>>> (6, 2), (6, 3), (6, 4), (6, 5),
>>> (7, 2), (7, 3), (7, 4),
>>> ]
>>> non_overlapping_rectangles(points)
[
(1, 1, 1, 1),
(2, 2, 7, 4),
(4, 5, 6, 5),
]
"""

pts = set(points)
rects = []

while pts:
# start a rectangle from one arbitrary point
r0, c0 = min(pts)
# expand right
c1 = c0
while (r0, c1 + 1) in pts:
c1 += 1
# expand downward as long as entire row segment is filled
r1 = r0
while all((r1 + 1, c) in pts for c in range(c0, c1 + 1)):
r1 += 1

rects.append((r0, c0, r1, c1))
# remove covered points
for r in range(r0, r1 + 1):
for c in range(c0, c1 + 1):
pts.discard((r, c))

rects.sort()
return rects


class Cytation5Backend(ImageReaderBackend):
"""Backend for biotek cytation 5 image reader.

Expand Down Expand Up @@ -564,16 +612,14 @@ async def set_temperature(self, temperature: float):
async def stop_heating_or_cooling(self):
return await self.send_command("g", "00000")

def _parse_body(self, body: bytes) -> List[List[Optional[float]]]:
start_index = body.index(b"01,01")
def _parse_body(self, body: bytes) -> Dict[Tuple[int, int], float]:
start_index = 22
end_index = body.rindex(b"\r\n")
num_rows = 8
rows = body[start_index:end_index].split(b"\r\n,")[:num_rows]

assert self._plate is not None, "Plate must be set before reading data"
parsed_data: List[List[Optional[float]]] = [
[None for _ in range(self._plate.num_items_x)] for _ in range(self._plate.num_items_y)
]
parsed_data: Dict[Tuple[int, int], float] = {}
for row in rows:
values = row.split(b",")
grouped_values = [values[i : i + 3] for i in range(0, len(values), 3)]
Expand All @@ -583,10 +629,20 @@ def _parse_body(self, body: bytes) -> List[List[Optional[float]]]:
row_index = int(group[0].decode()) - 1 # 1-based index in the response
column_index = int(group[1].decode()) - 1 # 1-based index in the response
value = float(group[2].decode())
parsed_data[row_index][column_index] = value
parsed_data[(row_index, column_index)] = value

return parsed_data

def _data_dict_to_list(
self, data: Dict[Tuple[int, int], float], plate: Plate
) -> List[List[Optional[float]]]:
result: List[List[Optional[float]]] = [
[None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y)
]
for (row, col), value in data.items():
result[row][col] = value
return result

async def set_plate(self, plate: Plate):
# 08120112207434014351135308559127881422
# ^^^^ plate size z
Expand Down Expand Up @@ -640,24 +696,14 @@ async def set_plate(self, plate: Plate):
self._plate = plate
return resp

def _get_min_max_row_col(self, wells: List[Well], plate: Plate) -> Tuple[int, int, int, int]:
def _get_min_max_row_col_tuples(
self, wells: List[Well], plate: Plate
) -> List[Tuple[int, int, int, int]]: # min_row, min_col, max_row, max_col
# check if all wells are in the same plate
plates = set(well.parent for well in wells)
if len(plates) != 1 or plates.pop() != plate:
raise ValueError("All wells must be in the specified plate")

# check if wells are in a grid
rows = sorted(set(well.get_row() for well in wells))
columns = sorted(set(well.get_column() for well in wells))
min_row, max_row, min_col, max_col = rows[0], rows[-1], columns[0], columns[-1]
if (
(len(rows) * len(columns) != len(wells))
or rows != list(range(min_row, max_row + 1))
or columns != list(range(min_col, max_col + 1))
):
raise ValueError("Wells must be in a grid")

return min_row, max_row, min_col, max_col
return _non_overlapping_rectangles((well.get_row(), well.get_column()) for well in wells)

async def read_absorbance(
self, plate: Plate, wells: List[Well], wavelength: int
Expand All @@ -668,19 +714,22 @@ async def read_absorbance(
await self.set_plate(plate)

wavelength_str = str(wavelength).zfill(4)
min_row, max_row, min_col, max_col = self._get_min_max_row_col(wells, plate)
cmd = f"004701{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000010600008{wavelength_str}1"
checksum = str(sum(cmd.encode()) % 100).zfill(2)
cmd = cmd + checksum + "\x03"
await self.send_command("D", cmd)
data: Dict[Tuple[int, int], float] = {}

resp = await self.send_command("O")
assert resp == b"\x060000\x03"
for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate):
cmd = f"004701{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000010600008{wavelength_str}1"
checksum = str(sum(cmd.encode()) % 100).zfill(2)
cmd = cmd + checksum + "\x03"
await self.send_command("D", cmd)

# read data
body = await self._read_until(b"\x03", timeout=60 * 3)
assert resp is not None
return self._parse_body(body)
resp = await self.send_command("O")
assert resp == b"\x060000\x03"

# read data
body = await self._read_until(b"\x03", timeout=60 * 3)
assert resp is not None
data.update(self._parse_body(body))
return self._data_dict_to_list(data, plate)

async def read_luminescence(
self, plate: Plate, wells: List[Well], focal_height: float, integration_time: float = 1
Expand All @@ -704,22 +753,23 @@ async def read_luminescence(
integration_time_seconds_s = str(integration_time_seconds * 5).zfill(2)
integration_time_milliseconds_s = str(int(float(integration_time_milliseconds * 50))).zfill(2)

min_row, max_row, min_col, max_col = self._get_min_max_row_col(wells, plate)

cmd = f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000012300{integration_time_seconds_s}{integration_time_milliseconds_s}200200-001000-003000000000000000000013510" # 0812
checksum = str((sum(cmd.encode()) + 8) % 100).zfill(2) # don't know why +8
cmd = cmd + checksum
await self.send_command("D", cmd)
data: Dict[Tuple[int, int], float] = {}
for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate):
cmd = f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}000120010000110010000012300{integration_time_seconds_s}{integration_time_milliseconds_s}200200-001000-003000000000000000000013510" # 0812
checksum = str((sum(cmd.encode()) + 8) % 100).zfill(2) # don't know why +8
cmd = cmd + checksum
await self.send_command("D", cmd)

resp = await self.send_command("O")
assert resp == b"\x060000\x03"
resp = await self.send_command("O")
assert resp == b"\x060000\x03"

# 2m10s of reading per 1 second of integration time
# allow 60 seconds flat
timeout = 60 + integration_time_seconds * (2 * 60 + 10)
body = await self._read_until(b"\x03", timeout=timeout)
assert body is not None
return self._parse_body(body)
# 2m10s of reading per 1 second of integration time
# allow 60 seconds flat
timeout = 60 + integration_time_seconds * (2 * 60 + 10)
body = await self._read_until(b"\x03", timeout=timeout)
assert body is not None
data.update(self._parse_body(body))
return self._data_dict_to_list(data, plate)

async def read_fluorescence(
self,
Expand All @@ -743,21 +793,24 @@ async def read_fluorescence(

excitation_wavelength_str = str(excitation_wavelength).zfill(4)
emission_wavelength_str = str(emission_wavelength).zfill(4)
min_row, max_row, min_col, max_col = self._get_min_max_row_col(wells, plate)
cmd = (
f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}0001200100001100100000135000100200200{excitation_wavelength_str}000"
f"{emission_wavelength_str}000000000000000000210011"
)
checksum = str((sum(cmd.encode()) + 7) % 100).zfill(2) # don't know why +7
cmd = cmd + checksum + "\x03"
resp = await self.send_command("D", cmd)

resp = await self.send_command("O")
assert resp == b"\x060000\x03"
data: Dict[Tuple[int, int], float] = {}
for min_row, min_col, max_row, max_col in self._get_min_max_row_col_tuples(wells, plate):
cmd = (
f"008401{min_row+1:02}{min_col+1:02}{max_row+1:02}{max_col+1:02}0001200100001100100000135000100200200{excitation_wavelength_str}000"
f"{emission_wavelength_str}000000000000000000210011"
)
checksum = str((sum(cmd.encode()) + 7) % 100).zfill(2) # don't know why +7
cmd = cmd + checksum + "\x03"
resp = await self.send_command("D", cmd)

resp = await self.send_command("O")
assert resp == b"\x060000\x03"

body = await self._read_until(b"\x03", timeout=60 * 2)
assert body is not None
return self._parse_body(body)
body = await self._read_until(b"\x03", timeout=60 * 2)
assert body is not None
data.update(self._parse_body(body))
return self._data_dict_to_list(data, plate)

async def _abort(self) -> None:
await self.send_command("x", wait_for_response=False)
Expand Down
55 changes: 55 additions & 0 deletions pylabrobot/plate_reading/biotek_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,61 @@ async def test_read_absorbance(self):
[0.1427, 0.1174, 0.0684, 0.0657, 0.0732, 0.067, 0.0602, 0.079, 0.0667, 0.1103, 0.129, 0.1316],
]

async def test_read_luminescence_partial(self):
self.backend.io.read.side_effect = _byte_iter(
# plate
"\x06"
+ "\x03"
# focal height
+ "\x06"
+ "\x03"
# read block 1
+ "\x06"
+ "0350000000000000010000000000490300000\x03"
+ "\x060000\x03"
+ "01,1,\r000:00:00.0,237,01,01,0000003\r\n,02,01,0000003\r\n,03,01,0000005\r\n,04,01,0000004\r\n,05,01,0000003\r\n,06,01,0000002\r\n,07,01,0000000\r\n237\x1a132\x1a0000\x03"
# read block 2
+ "\x06"
+ "0350000000000000010000000000030200000\x03"
+ "\x060000\x03"
+ "01,1,\r000:00:00.0,237,02,02,0000043,02,03,0000014\r\n,03,03,0000014,03,02,0000012\r\n,04,02,0000011,04,03,0000014\r\n,05,03,0000010,05,02,0000010\r\n,06,02,0000011,06,03,0000027\r\n,07,03,0000009,07,02,0000010\r\n237\x1a160\x1a0000\x03"
# read block 3
+ "\x06"
+ "0350000000000000010000000000000170000\x03"
+ "\x060000\x03"
+ "01,1,\r000:00:00.0,237,04,04,0000018\r\n,05,04,0000017\r\n,06,04,0000014\r\n237\x1a210\x1a0000\x03"
)

plate = CellVis_96_wellplate_350uL_Fb(name="plate")
wells = plate["A1"] + plate["B1:G3"] + plate["D4:F4"]
resp = await self.backend.read_luminescence(
focal_height=4.5, integration_time=0.4, plate=plate, wells=wells
)

print(self.backend.io.write.mock_calls)
self.backend.io.write.assert_any_call(b"D")
self.backend.io.write.assert_any_call(
b"008401010107010001200100001100100000123000020200200-001000-00300000000000000000001351086"
)
self.backend.io.write.assert_any_call(
b"008401020207030001200100001100100000123000020200200-001000-00300000000000000000001351090"
)
self.backend.io.write.assert_any_call(
b"008401040406040001200100001100100000123000020200200-001000-00300000000000000000001351094"
)
self.backend.io.write.assert_any_call(b"O")

assert resp == [
[3.0, None, None, None, None, None, None, None, None, None, None, None],
[3.0, 43.0, 14.0, None, None, None, None, None, None, None, None, None],
[5.0, 12.0, 14.0, None, None, None, None, None, None, None, None, None],
[4.0, 11.0, 14.0, 18.0, None, None, None, None, None, None, None, None],
[3.0, 10.0, 10.0, 17.0, None, None, None, None, None, None, None, None],
[2.0, 11.0, 27.0, 14.0, None, None, None, None, None, None, None, None],
[0.0, 10.0, 9.0, None, None, None, None, None, None, None, None, None],
[None, None, None, None, None, None, None, None, None, None, None, None],
]

async def test_read_fluorescence(self):
self.backend.io.read.side_effect = _byte_iter(
"\x06"
Expand Down