Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions pylabrobot/thermocycling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,7 @@
"""This module contains the thermocycling related classes and functions."""

from .backend import ThermocyclerBackend
from .chatterbox import ThermocyclerChatterboxBackend
from .opentrons import OpentronsThermocyclerModuleV1, OpentronsThermocyclerModuleV2
from .opentrons_backend import OpentronsThermocyclerBackend
from .proflex import ProflexBackend
from .standard import Step
from .thermo_fisher import *
from .thermocycler import Thermocycler

__all__ = [
"ThermocyclerBackend",
"ThermocyclerChatterboxBackend",
"Thermocycler",
"ProflexBackend",
"Step",
"OpentronsThermocyclerBackend",
"OpentronsThermocyclerModuleV1",
"OpentronsThermocyclerModuleV2",
]
2 changes: 2 additions & 0 deletions pylabrobot/thermocycling/thermo_fisher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .atc import ATCBackend
from .proflex import ProflexBackend
19 changes: 19 additions & 0 deletions pylabrobot/thermocycling/thermo_fisher/atc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pylabrobot.thermocycling.thermo_fisher.thermo_fisher_thermocycler import (
ThermoFisherThermocyclerBackend,
)


class ATCBackend(ThermoFisherThermocyclerBackend):
async def close_lid(self):
if self.bid != "31":
raise NotImplementedError("Lid control is only available for BID 31 (ATC)")
res = await self.send_command({"cmd": "lidclose"}, response_timeout=20, read_once=False)
if self._parse_scpi_response(res)["status"] != "OK":
raise ValueError("Failed to close lid")

async def open_lid(self):
if self.bid != "31":
raise NotImplementedError("Lid control is only available for BID 31 (ATC)")
res = await self.send_command({"cmd": "lidopen"}, response_timeout=20, read_once=False)
if self._parse_scpi_response(res)["status"] != "OK":
raise ValueError("Failed to open lid")
11 changes: 11 additions & 0 deletions pylabrobot/thermocycling/thermo_fisher/proflex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pylabrobot.thermocycling.thermo_fisher.thermo_fisher_thermocycler import (
ThermoFisherThermocyclerBackend,
)


class ProflexBackend(ThermoFisherThermocyclerBackend):
async def open_lid(self):
raise NotImplementedError("Open lid command is not implemented for Proflex thermocycler")

async def close_lid(self):
raise NotImplementedError("Close lid command is not implemented for Proflex thermocycler")
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import unittest
import unittest.mock

from pylabrobot.thermocycling.proflex import ProflexBackend
from pylabrobot.thermocycling.standard import Protocol, Stage, Step
from pylabrobot.thermocycling.thermo_fisher.proflex import ProflexBackend


class TestProflexBackend(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -168,7 +168,7 @@ async def test_run_protocol(self):
-cover= 105
-mode= Fast
-coverEnabled= On
-notes=
-notes=\x20
</multiline.write>
"""
).strip()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
import logging
import re
import xml.etree.ElementTree as ET
from abc import ABCMeta
from base64 import b64decode
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, cast
from xml.dom import minidom

from pylabrobot.io import Socket
from pylabrobot.thermocycling.backend import ThermocyclerBackend
from pylabrobot.thermocycling.standard import LidStatus, Protocol, Stage, Step

from .backend import ThermocyclerBackend


def _generate_run_info_files(
protocol: Protocol,
Expand Down Expand Up @@ -203,7 +203,7 @@ def stage_to_scpi(stage: Stage, stage_index: int, stage_name_prefix: str) -> dic
return data


class ProflexBackend(ThermocyclerBackend):
class ThermoFisherThermocyclerBackend(ThermocyclerBackend, metaclass=ABCMeta):
"""Backend for Proflex thermocycler."""

def __init__(self, ip: str, port: int = 7000, shared_secret: bytes = b"f4ct0rymt55"):
Expand Down Expand Up @@ -401,8 +401,11 @@ async def _load_num_blocks_and_type(self):
elif self.bid == "13":
self._num_blocks = 3
self.num_temp_zones = 2
elif self.bid == "31":
self._num_blocks = 1
self.num_temp_zones = 1
else:
raise NotImplementedError("Only BID 12 and 13 are supported")
raise NotImplementedError("Only BID 31, 12 and 13 are supported")

async def is_block_running(self, block_id: int) -> bool:
run_name = await self.get_run_name(block_id=block_id)
Expand Down Expand Up @@ -778,7 +781,7 @@ async def get_run_info(self, protocol: Protocol, block_id: int) -> "RunProgress"
run_name = await self.get_run_name(block_id=block_id)
if not progress:
self.logger.info("Protocol completed")
return ProflexBackend.RunProgress(
return ThermoFisherThermocyclerBackend.RunProgress(
running=False,
stage="completed",
elapsed_time=await self.get_elapsed_run_time_from_log(run_name=run_name),
Expand All @@ -788,7 +791,7 @@ async def get_run_info(self, protocol: Protocol, block_id: int) -> "RunProgress"
if progress["RunTitle"] == "-":
await self._read_response(timeout=5)
self.logger.info("Protocol completed")
return ProflexBackend.RunProgress(
return ThermoFisherThermocyclerBackend.RunProgress(
running=False,
stage="completed",
elapsed_time=await self.get_elapsed_run_time_from_log(run_name=run_name),
Expand All @@ -797,7 +800,7 @@ async def get_run_info(self, protocol: Protocol, block_id: int) -> "RunProgress"

if progress["Stage"] == "POSTRun":
self.logger.info("Protocol in POSTRun")
return ProflexBackend.RunProgress(
return ThermoFisherThermocyclerBackend.RunProgress(
running=True,
stage="POSTRun",
elapsed_time=await self.get_elapsed_run_time_from_log(run_name=run_name),
Expand All @@ -820,7 +823,7 @@ async def get_run_info(self, protocol: Protocol, block_id: int) -> "RunProgress"
break
await asyncio.sleep(5)
self.logger.info("Infinite hold")
return ProflexBackend.RunProgress(
return ThermoFisherThermocyclerBackend.RunProgress(
running=False,
stage="infinite_hold",
elapsed_time=time_elapsed,
Expand All @@ -829,7 +832,7 @@ async def get_run_info(self, protocol: Protocol, block_id: int) -> "RunProgress"

self.logger.info(f"Elapsed time: {time_elapsed}")
self.logger.info(f"Remaining time: {remaining_time}")
return ProflexBackend.RunProgress(
return ThermoFisherThermocyclerBackend.RunProgress(
running=True,
stage=progress["Stage"],
elapsed_time=time_elapsed,
Expand All @@ -852,12 +855,6 @@ async def setup(
await self.set_block_idle_temp(temp=block_idle_temp, block_id=block_index)
await self.set_cover_idle_temp(temp=cover_idle_temp, block_id=block_index)

async def open_lid(self):
raise NotImplementedError("Open lid command is not implemented for Proflex thermocycler")

async def close_lid(self):
raise NotImplementedError("Close lid command is not implemented for Proflex thermocycler")

async def deactivate_lid(self, block_id: Optional[int] = None):
assert block_id is not None, "block_id must be specified"
return await self.set_cover_idle_temp(temp=105, control_enabled=False, block_id=block_id)
Expand Down