Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/comet/driver/ers/ac3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from comet.driver.generic import Instrument, InstrumentError
from typing import Optional


__all__ = ["AC3"]


Expand Down Expand Up @@ -63,9 +62,6 @@ def next_error(self) -> Optional[InstrumentError]:
return InstrumentError(code, self.ERROR_MESSAGES.get(code, "unknown error"))
return None

def __init__(self, resource):
self._resource = resource

def _query(self, message: str) -> str:
"""Send query and validate response.

Expand All @@ -78,7 +74,7 @@ def _query(self, message: str) -> str:
Raises:
RuntimeError: If device returns error
"""
response = self._resource.query(message)
response = self.resource.query(message)
if response.strip() == "?":
raise RuntimeError(f"Command failed: {message}")
return response.strip()
Expand Down
110 changes: 89 additions & 21 deletions src/comet/emulator/ers/ac3.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,143 @@
"""Driver for ECR AC3 thermal chuck"""

import time
from dataclasses import dataclass
from enum import IntEnum


from comet.emulator import Emulator
from comet.emulator import message, run

__all__ = ["AC3Emulator"]


class Mode(IntEnum):
NORMAL = 1
STANDBY = 2
DEFROST = 3
PURGE = 4


class Status(IntEnum):
TEMPERATURE_REACHED = 0
HEATING = 1
COOLING = 2
ERROR = 8


@dataclass
class State:
temperature: float = 25.0
target_temperature: float = 25.0
mode: Mode = Mode.NORMAL
control_status: Status = Status.TEMPERATURE_REACHED
hold_mode: int = 11
dewpoint_control_status: bool = True
dewpoint: float = -20.0


class Logic:
def __init__(self, state: State) -> None:
self.state = state
self.ramp_rate_c_per_s: float = 1.0
self.temperature_tolerance: float = 0.2
self._last_update_monotonic: float = time.monotonic()

def update_state(self) -> None:
now = time.monotonic()
dt = now - self._last_update_monotonic

state = self.state
active = state.mode in (Mode.NORMAL, Mode.STANDBY)

if active:
delta = state.target_temperature - state.temperature
max_step = self.ramp_rate_c_per_s * dt

if abs(delta) <= self.temperature_tolerance:
state.temperature = state.target_temperature
state.control_status = Status.TEMPERATURE_REACHED
else:
step = min(abs(delta), max_step)
if delta > 0:
state.temperature += step
state.control_status = Status.HEATING
else:
state.temperature -= step
state.control_status = Status.COOLING

# snap if close enough
if (
abs(state.target_temperature - state.temperature)
<= self.temperature_tolerance
):
state.temperature = state.target_temperature
state.control_status = Status.TEMPERATURE_REACHED
else:
state.control_status = Status.TEMPERATURE_REACHED

self._last_update_monotonic = now


class AC3Emulator(Emulator):
def __init__(self) -> None:
super().__init__()

self.temperature: float = 25.0
self.target_temperature: float = 25.0
self.mode: int = 1
self.control_status: int = 1
self.hold_mode: int = 11
self.dewpoint_control_status: bool = True
self.dewpoint: float = -20.0
self.state = State()
self.logic = Logic(self.state)

@message(r"^RC$")
def get_temperature(self) -> str:
return f"C{int(self.temperature*10):+05d}"
self.logic.update_state()
return f"C{int(self.state.temperature * 10):+05d}"

@message(r"^RT$")
def get_target_temperature(self) -> str:
return f"T{int(self.target_temperature*10):+05d}"
return f"T{int(self.state.target_temperature * 10):+05d}"

@message(r"^ST([+-]\d{4})$")
def set_target_temperature(self, value: str) -> str:
self.target_temperature = float(value) / 10
self.temperature = self.target_temperature
self.state.target_temperature = float(value) / 10
return "OK"

@message(r"^RO$")
def get_mode(self) -> str:
return f"O{self.mode}"
return f"O{self.state.mode:d}"

@message(r"^SO([1234])$")
def set_mode(self, value: str) -> str:
self.mode = int(value)
self.state.mode = Mode(int(value))
return "OK"

@message(r"^RF$")
def get_dewpoint(self) -> str:
return f"F{int(self.dewpoint*10):+05d}"
return f"F{int(self.state.dewpoint * 10):+05d}"

@message(r"^RD$")
def get_dewpoint_control_status(self) -> str:
return f"D{int(self.dewpoint_control_status)}"
return f"D{int(self.state.dewpoint_control_status)}"

@message(r"^SD([01])$")
def set_dewpoint_control_status(self, value: str) -> str:
self.dewpoint_control_status = bool(int(value))
self.state.dewpoint_control_status = bool(int(value))
return "OK"

@message(r"^RH$")
def get_hold_mode(self) -> str:
return f"H{self.hold_mode:02d}"
return f"H{self.state.hold_mode:02d}"

@message(r"^SH([01])$")
def set_hold_mode(self, value: str) -> str:
val = int(value)
if val == 0:
self.hold_mode = 0
self.state.hold_mode = 0
else:
self.hold_mode = 11
self.state.hold_mode = 11
return "OK"

@message(r"^RI$")
def get_control_status(self) -> str:
return f"I{self.control_status}"
self.logic.update_state()
return f"I{self.state.control_status:d}"

@message(r"^RE$")
def get_error(self) -> str:
Expand Down
22 changes: 13 additions & 9 deletions tests/test_emulator_ers_ac3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from comet.emulator.ers.ac3 import AC3Emulator
from comet.emulator.ers.ac3 import AC3Emulator, Mode


@pytest.fixture
Expand All @@ -9,7 +9,7 @@ def emulator():


def test_identify(emulator):
assert emulator("RI") == "I1"
assert emulator("RI") == "I0"


def test_get_temperature(emulator):
Expand All @@ -22,10 +22,10 @@ def test_get_target_temperature(emulator):

def test_set_target_temperature(emulator):
assert emulator("ST+0300") == "OK"
assert emulator.target_temperature == 30.0
assert emulator.state.target_temperature == 30.0

assert emulator("ST-0300") == "OK"
assert emulator.target_temperature == -30.0
assert emulator.state.target_temperature == -30.0


def test_get_mode(emulator):
Expand All @@ -34,7 +34,7 @@ def test_get_mode(emulator):

def test_set_mode(emulator):
assert emulator("SO2") == "OK"
assert emulator.mode == 2
assert emulator.state.mode == Mode.STANDBY


def test_get_dewpoint(emulator):
Expand All @@ -47,10 +47,10 @@ def test_get_dewpoint_control_status(emulator):

def test_set_dewpoint_control_status(emulator):
assert emulator("SD0") == "OK"
assert not emulator.dewpoint_control_status
assert not emulator.state.dewpoint_control_status

assert emulator("SD1") == "OK"
assert emulator.dewpoint_control_status
assert emulator.state.dewpoint_control_status


def test_get_hold_mode(emulator):
Expand All @@ -59,14 +59,18 @@ def test_get_hold_mode(emulator):

def test_set_hold_mode(emulator):
assert emulator("SH1") == "OK"
assert emulator.hold_mode == 11
assert emulator.state.hold_mode == 11

assert emulator("SH0") == "OK"
assert emulator.hold_mode == 0
assert emulator.state.hold_mode == 0


def test_get_control_status(emulator):
assert emulator("RI") == "I0"
assert emulator("ST+0300") == "OK"
assert emulator("RI") == "I1"
assert emulator("ST-0300") == "OK"
assert emulator("RI") == "I2"


def test_get_error_code(emulator):
Expand Down
Loading