From 220bc6301def07c492c84e6d92ddbaf2c7a2d205 Mon Sep 17 00:00:00 2001 From: Bernhard Arnold Date: Wed, 18 Mar 2026 14:06:47 +0100 Subject: [PATCH] feature(ac3): add temperature gradient logic to emulator Changing target temperature now updates readings over time using a simple gradient. --- src/comet/driver/ers/ac3.py | 6 +- src/comet/emulator/ers/ac3.py | 110 ++++++++++++++++++++++++++------- tests/test_emulator_ers_ac3.py | 22 ++++--- 3 files changed, 103 insertions(+), 35 deletions(-) diff --git a/src/comet/driver/ers/ac3.py b/src/comet/driver/ers/ac3.py index 4ce16b0..5780f0c 100644 --- a/src/comet/driver/ers/ac3.py +++ b/src/comet/driver/ers/ac3.py @@ -3,7 +3,6 @@ from comet.driver.generic import Instrument, InstrumentError from typing import Optional - __all__ = ["AC3"] @@ -63,9 +62,6 @@ def next_error(self) -> Optional[InstrumentError]: return InstrumentError(code, self.ERROR_MESSAGES.get(code, "unknown error")) return None - def __init__(self, resource): - self._resource = resource - def _query(self, message: str) -> str: """Send query and validate response. @@ -78,7 +74,7 @@ def _query(self, message: str) -> str: Raises: RuntimeError: If device returns error """ - response = self._resource.query(message) + response = self.resource.query(message) if response.strip() == "?": raise RuntimeError(f"Command failed: {message}") return response.strip() diff --git a/src/comet/emulator/ers/ac3.py b/src/comet/emulator/ers/ac3.py index a5b6c20..0eb4143 100644 --- a/src/comet/emulator/ers/ac3.py +++ b/src/comet/emulator/ers/ac3.py @@ -1,75 +1,143 @@ """Driver for ECR AC3 thermal chuck""" +import time +from dataclasses import dataclass +from enum import IntEnum + + from comet.emulator import Emulator from comet.emulator import message, run __all__ = ["AC3Emulator"] +class Mode(IntEnum): + NORMAL = 1 + STANDBY = 2 + DEFROST = 3 + PURGE = 4 + + +class Status(IntEnum): + TEMPERATURE_REACHED = 0 + HEATING = 1 + COOLING = 2 + ERROR = 8 + + +@dataclass +class State: + temperature: float = 25.0 + target_temperature: float = 25.0 + mode: Mode = Mode.NORMAL + control_status: Status = Status.TEMPERATURE_REACHED + hold_mode: int = 11 + dewpoint_control_status: bool = True + dewpoint: float = -20.0 + + +class Logic: + def __init__(self, state: State) -> None: + self.state = state + self.ramp_rate_c_per_s: float = 1.0 + self.temperature_tolerance: float = 0.2 + self._last_update_monotonic: float = time.monotonic() + + def update_state(self) -> None: + now = time.monotonic() + dt = now - self._last_update_monotonic + + state = self.state + active = state.mode in (Mode.NORMAL, Mode.STANDBY) + + if active: + delta = state.target_temperature - state.temperature + max_step = self.ramp_rate_c_per_s * dt + + if abs(delta) <= self.temperature_tolerance: + state.temperature = state.target_temperature + state.control_status = Status.TEMPERATURE_REACHED + else: + step = min(abs(delta), max_step) + if delta > 0: + state.temperature += step + state.control_status = Status.HEATING + else: + state.temperature -= step + state.control_status = Status.COOLING + + # snap if close enough + if ( + abs(state.target_temperature - state.temperature) + <= self.temperature_tolerance + ): + state.temperature = state.target_temperature + state.control_status = Status.TEMPERATURE_REACHED + else: + state.control_status = Status.TEMPERATURE_REACHED + + self._last_update_monotonic = now + + class AC3Emulator(Emulator): def __init__(self) -> None: super().__init__() - - self.temperature: float = 25.0 - self.target_temperature: float = 25.0 - self.mode: int = 1 - self.control_status: int = 1 - self.hold_mode: int = 11 - self.dewpoint_control_status: bool = True - self.dewpoint: float = -20.0 + self.state = State() + self.logic = Logic(self.state) @message(r"^RC$") def get_temperature(self) -> str: - return f"C{int(self.temperature*10):+05d}" + self.logic.update_state() + return f"C{int(self.state.temperature * 10):+05d}" @message(r"^RT$") def get_target_temperature(self) -> str: - return f"T{int(self.target_temperature*10):+05d}" + return f"T{int(self.state.target_temperature * 10):+05d}" @message(r"^ST([+-]\d{4})$") def set_target_temperature(self, value: str) -> str: - self.target_temperature = float(value) / 10 - self.temperature = self.target_temperature + self.state.target_temperature = float(value) / 10 return "OK" @message(r"^RO$") def get_mode(self) -> str: - return f"O{self.mode}" + return f"O{self.state.mode:d}" @message(r"^SO([1234])$") def set_mode(self, value: str) -> str: - self.mode = int(value) + self.state.mode = Mode(int(value)) return "OK" @message(r"^RF$") def get_dewpoint(self) -> str: - return f"F{int(self.dewpoint*10):+05d}" + return f"F{int(self.state.dewpoint * 10):+05d}" @message(r"^RD$") def get_dewpoint_control_status(self) -> str: - return f"D{int(self.dewpoint_control_status)}" + return f"D{int(self.state.dewpoint_control_status)}" @message(r"^SD([01])$") def set_dewpoint_control_status(self, value: str) -> str: - self.dewpoint_control_status = bool(int(value)) + self.state.dewpoint_control_status = bool(int(value)) return "OK" @message(r"^RH$") def get_hold_mode(self) -> str: - return f"H{self.hold_mode:02d}" + return f"H{self.state.hold_mode:02d}" @message(r"^SH([01])$") def set_hold_mode(self, value: str) -> str: val = int(value) if val == 0: - self.hold_mode = 0 + self.state.hold_mode = 0 else: - self.hold_mode = 11 + self.state.hold_mode = 11 return "OK" @message(r"^RI$") def get_control_status(self) -> str: - return f"I{self.control_status}" + self.logic.update_state() + return f"I{self.state.control_status:d}" @message(r"^RE$") def get_error(self) -> str: diff --git a/tests/test_emulator_ers_ac3.py b/tests/test_emulator_ers_ac3.py index 04d0fdc..48c7458 100644 --- a/tests/test_emulator_ers_ac3.py +++ b/tests/test_emulator_ers_ac3.py @@ -1,6 +1,6 @@ import pytest -from comet.emulator.ers.ac3 import AC3Emulator +from comet.emulator.ers.ac3 import AC3Emulator, Mode @pytest.fixture @@ -9,7 +9,7 @@ def emulator(): def test_identify(emulator): - assert emulator("RI") == "I1" + assert emulator("RI") == "I0" def test_get_temperature(emulator): @@ -22,10 +22,10 @@ def test_get_target_temperature(emulator): def test_set_target_temperature(emulator): assert emulator("ST+0300") == "OK" - assert emulator.target_temperature == 30.0 + assert emulator.state.target_temperature == 30.0 assert emulator("ST-0300") == "OK" - assert emulator.target_temperature == -30.0 + assert emulator.state.target_temperature == -30.0 def test_get_mode(emulator): @@ -34,7 +34,7 @@ def test_get_mode(emulator): def test_set_mode(emulator): assert emulator("SO2") == "OK" - assert emulator.mode == 2 + assert emulator.state.mode == Mode.STANDBY def test_get_dewpoint(emulator): @@ -47,10 +47,10 @@ def test_get_dewpoint_control_status(emulator): def test_set_dewpoint_control_status(emulator): assert emulator("SD0") == "OK" - assert not emulator.dewpoint_control_status + assert not emulator.state.dewpoint_control_status assert emulator("SD1") == "OK" - assert emulator.dewpoint_control_status + assert emulator.state.dewpoint_control_status def test_get_hold_mode(emulator): @@ -59,14 +59,18 @@ def test_get_hold_mode(emulator): def test_set_hold_mode(emulator): assert emulator("SH1") == "OK" - assert emulator.hold_mode == 11 + assert emulator.state.hold_mode == 11 assert emulator("SH0") == "OK" - assert emulator.hold_mode == 0 + assert emulator.state.hold_mode == 0 def test_get_control_status(emulator): + assert emulator("RI") == "I0" + assert emulator("ST+0300") == "OK" assert emulator("RI") == "I1" + assert emulator("ST-0300") == "OK" + assert emulator("RI") == "I2" def test_get_error_code(emulator):