In [1]:
%load_ext autoreload
%autoreload 2

import sys
import time
from pathlib import Path

project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))
print(f"Project root added to sys.path: {project_root}")

from src.devices.dewmaster import DewMaster

Project root added to sys.path: /home/daniel/repos/lab-controller


In [None]:
dewmaster = DewMaster(port="/dev/ttyS3", baudrate=9600)

if dewmaster.connect():
    print("Connected to DewMaster.")
    dewmaster.set_output_interval(1)
    time.sleep(1)
    for i in range(10):
        dewmaster.ser.reset_input_buffer()
        dewmaster.ser.reset_output_buffer()
        line = dewmaster.ser.readline().decode(errors='ignore').strip()
        print(line)
        time.sleep(1)

data = dewmaster.poll()
print("Polled data:", data)
dewmaster.disconnect()

Connected to DewMaster.


PO1










Polled data: None
Disconnected DewMaster.


In [13]:
import serial
import time
import re
import logging
from typing import Optional, Dict, Tuple, List
from dataclasses import dataclass
from datetime import datetime

log = logging.getLogger(__name__)

# ----------------------------------------------------------------------
# Data structures
# ----------------------------------------------------------------------
@dataclass
class Measurement:
    """Parsed measurement line."""
    timestamp: datetime
    dp: float          # Dew Point (°C)
    at: float          # Ambient Temperature (°C)
    rh: float          # Relative Humidity (%)
    raw: str           # Original line (for debugging)

    def __repr__(self):
        return (f"DP={self.dp:6.2f}°C  AT={self.at:6.2f}°C  RH={self.rh:6.2f}%  "
                f"[{self.timestamp:%H:%M:%S}]")


# ----------------------------------------------------------------------
# Main class
# ----------------------------------------------------------------------
class DewMaster:
    """
    Full-featured wrapper for the Edgetech DewMaster hygrometer (RS-232).

    Implements **every** command from the manual (Table 12-1) with:
      • Proper multi-step prompting
      • 2-digit field enforcement
      • Case-insensitivity
      • Full status report parsing
    """

    # ------------------------------------------------------------------
    # Parameter numbers (from manual)
    # ------------------------------------------------------------------
    PARAM = {
        "DP": 1, "AT": 2, "RH": 3, "PPMV": 4, "GR/LB": 5,
        "PSIA": 6, "PSIG": 7, "WET_BULB": 8
    }

    # ------------------------------------------------------------------
    # Regex for normal data lines (example from manual)
    # ------------------------------------------------------------------
    DATA_RE = re.compile(
        r"(?P<date>\d{2}/\d{2}/\d{2})\s+"
        r"(?P<time>\d{2}:\d{2}:\d{2})\s+"
        r"DP\s*=\s*(?P<dp>[-+]?\d*\.\d+)\s*C\s+"
        r"AT\s*=\s*(?P<at>[-+]?\d*\.\d+)\s*C\s+"
        r"RH\s*=\s*(?P<rh>[-+]?\d*\.\d+)"
    )

    # ------------------------------------------------------------------
    # Construction / context manager
    # ------------------------------------------------------------------
    def __init__(
        self,
        port: str,
        baudrate: int = 9600,
        timeout: float = 2.0,
        name: str = "DewMaster"
    ):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.name = name
        self.ser: Optional[serial.Serial] = None

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()

    # ------------------------------------------------------------------
    # Connection
    # ------------------------------------------------------------------
    def connect(self) -> None:
        """Open serial port and flush buffers."""
        try:
            self.ser = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=self.timeout,
                xonxoff=False,
                rtscts=False,
                dsrdtr=False,
            )
            time.sleep(1.0)
            self.ser.reset_input_buffer()
            self.ser.reset_output_buffer()
            log.info(f"{self.name} connected on {self.port} @ {self.baudrate} baud")
        except Exception as e:
            raise RuntimeError(f"Failed to open {self.name}: {e}")

    def disconnect(self) -> None:
        """Close serial port."""
        if self.ser and self.ser.is_open:
            self.ser.close()
            log.info(f"{self.name} disconnected.")

    # ------------------------------------------------------------------
    # Low-level I/O
    # ------------------------------------------------------------------
    def _write(self, data: str) -> None:
        """Write string + <CR>."""
        if not self.ser or not self.ser.is_open:
            raise RuntimeError("Serial port not open")
        payload = data + "\r"
        self.ser.write(payload.encode("ascii", errors="ignore"))
        self.ser.flush()
        time.sleep(0.05)   # tiny delay for instrument to react

    def _read_until_prompt(self, timeout: float = 3.0) -> List[str]:
        """Read lines until we see 'Input:' or timeout."""
        lines: List[str] = []
        start = time.time()
        while time.time() - start < timeout:
            line = self.ser.readline().decode("ascii", errors="ignore").strip()
            if line:
                lines.append(line)
                if line.upper().startswith("INPUT:"):
                    return lines
        return lines

    def _expect_input_prompt(self) -> None:
        """Wait for the instrument to echo 'Input: X'."""
        lines = self._read_until_prompt()
        if not any(l.upper().startswith("INPUT:") for l in lines):
            raise RuntimeError("Did not receive INPUT prompt")

    # ------------------------------------------------------------------
    # Generic command helpers
    # ------------------------------------------------------------------
    def _send_command(self, cmd: str, *args: str) -> List[str]:
        """
        Send a command that may require multiple <CR> steps.
        Returns all lines received after the final <CR>.
        """
        self._write(cmd)
        self._expect_input_prompt()

        responses: List[str] = []
        for arg in args:
            self._write(arg)
            resp = self._read_until_prompt()
            responses.extend(resp)

        return responses

    # ------------------------------------------------------------------
    # High-level commands (exact manual syntax)
    # ------------------------------------------------------------------

    # ---- ABC -------------------------------------------------------
    def start_abc(self) -> None:
        """AB <CR> – start a manual ABC cycle."""
        self._send_command("AB")

    def abc_enable(self, enable: bool) -> None:
        """AE <CR> – toggle ABC enable."""
        self._send_command("AE")

    def set_abc_start_time(self, hh: int, mm: int) -> None:
        """AS <CR> HH:MM <CR>"""
        t = f"{hh:02d}:{mm:02d}"
        self._send_command("AS", t)

    def set_abc_interval(self, hh: int, mm: int) -> None:
        """AI <CR> HH:MM <CR>"""
        t = f"{hh:02d}:{mm:02d}"
        self._send_command("AI", t)

    def abc_hold_toggle(self) -> None:
        """AH <CR> – toggle analog/serial hold during ABC."""
        self._send_command("AH")

    # ---- Alarms ----------------------------------------------------
    def set_alarm(
        self,
        alarm: int,
        direction: str,
        value: float,
        param: str
    ) -> None:
        """
        AL1 <CR> > +22.3 1 <CR>
        direction: '>' or '<'
        param: one of DP, AT, RH, PPMV, GR/LB, PSIA, PSIG, WET_BULB
        """
        if alarm not in (1, 2):
            raise ValueError("alarm must be 1 or 2")
        if direction not in (">", "<"):
            raise ValueError("direction must be '>' or '<'")
        pnum = self.PARAM[param.upper()]
        sign = "+" if value >= 0 else ""
        cmd = f"{direction} {sign}{value} {pnum}"
        self._send_command(f"AL{alarm}", cmd)

    def alarm_latch_toggle(self, alarm: int) -> None:
        """L1 <CR> or L2 <CR>"""
        if alarm not in (1, 2):
            raise ValueError("alarm must be 1 or 2")
        self._send_command(f"L{alarm}")

    # ---- Analog outputs --------------------------------------------
    def set_analog_output(
        self,
        channel: int,
        low: float,
        high: float,
        param: str
    ) -> None:
        """
        AO1 <CR> -33 45 2 <CR>
        low/high may be negative (include minus sign)
        """
        if channel not in (1, 2, 3):
            raise ValueError("channel must be 1, 2 or 3")
        pnum = self.PARAM[param.upper()]
        # Build string exactly as manual expects
        low_str = f"{low}" if low >= 0 else f"{low}"
        high_str = f"{high}" if high >= 0 else f"{high}"
        cmd = f"{low_str} {high_str} {pnum}"
        self._send_command(f"AO{channel}", cmd)

    # ---- Averaging -------------------------------------------------
    def set_averaging(self, samples: int) -> None:
        """AV <CR> 04 <CR>  (1-16)"""
        if not (1 <= samples <= 16):
            raise ValueError("samples must be 1-16")
        self._send_command("AV", f"{samples:02d}")

    # ---- Date / Time -----------------------------------------------
    def set_date(self, mm: int, dd: int, yy: int) -> None:
        """D <CR> 01/01/00 <CR>"""
        date_str = f"{mm:02d}/{dd:02d}/{yy:02d}"
        self._send_command("D", date_str)

    def set_time(self, hh: int, mm: int, ss: int) -> None:
        """T <CR> 23:59:59 <CR>"""
        time_str = f"{hh:02d}:{mm:02d}:{ss:02d}"
        self._send_command("T", time_str)

    # ---- Serial output interval ------------------------------------
    def set_output_interval(self, seconds: int) -> None:
        """O <CR> 3600 <CR>  (0-3600)"""
        if not (0 <= seconds <= 3600):
            raise ValueError("seconds must be 0-3600")
        self._send_command("O", str(seconds))

    # ---- Poll ------------------------------------------------------
    def poll(self) -> Optional[Measurement]:
        """P <CR> – request one measurement."""
        self._write("P")
        self._expect_input_prompt()
        # Read up to 2 seconds for the data line
        line = self.ser.readline().decode("ascii", errors="ignore").strip()
        return self._parse_measurement_line(line)

    # ---- Manual heat/cool ------------------------------------------
    def manual_heat(self, enable: bool) -> None:
        """MH <CR> – toggle manual max heat."""
        self._send_command("MH")

    def manual_cool(self, enable: bool) -> None:
        """MC <CR> – toggle manual max cool."""
        self._send_command("MC")

    # ---- Units -----------------------------------------------------
    def set_units(self, unit: str) -> None:
        """U <CR> C <CR> or U <CR> F <CR>"""
        if unit.upper() not in ("C", "F"):
            raise ValueError("unit must be C or F")
        self._send_command("U", unit.upper())

    # ---- Status report ---------------------------------------------
    def get_status(self) -> Dict:
        """ST <CR> – return full status report as dict."""
        self._write("ST")
        self._expect_input_prompt()
        lines = []
        while True:
            line = self.ser.readline().decode("ascii", errors="ignore").rstrip()
            if not line:
                continue
            lines.append(line)
            if "Press ENTER to continue" in line or "ESC to return" in line:
                break
        return self._parse_status_report(lines)

    # ------------------------------------------------------------------
    # Streaming
    # ------------------------------------------------------------------
    def stream(self, duration: float = 10.0, callback=None):
        """
        Stream automatic output for `duration` seconds.
        If `callback` is given, call it with each Measurement.
        """
        print(f"Streaming for {duration:.1f}s...")
        deadline = time.time() + duration
        while time.time() < deadline:
            line = self.ser.readline().decode("ascii", errors="ignore").strip()
            if not line:
                continue
            meas = self._parse_measurement_line(line)
            if meas:
                if callback:
                    callback(meas)
                else:
                    print(meas)

    # ------------------------------------------------------------------
    # Internal parsers
    # ------------------------------------------------------------------
    def _parse_measurement_line(self, line: str) -> Optional[Measurement]:
        m = self.DATA_RE.search(line)
        if not m:
            return None
        gd = m.groupdict()
        ts = datetime.strptime(f"{gd['date']} {gd['time']}", "%m/%d/%y %H:%M:%S")
        return Measurement(
            timestamp=ts,
            dp=float(gd['dp']),
            at=float(gd['at']),
            rh=float(gd['rh']),
            raw=line
        )

    def _parse_status_report(self, lines: List[str]) -> Dict:
        """Very light parsing – returns raw lines grouped by section."""
        report = {"raw": lines, "sections": {}}
        current = None
        for line in lines:
            line = line.strip()
            if not line:
                continue
            if line.startswith("ABC data:"):
                current = "ABC"
                report["sections"][current] = []
            elif line.startswith("ALARM data:"):
                current = "ALARM"
                report["sections"][current] = []
            elif line.startswith("ANALOG data:"):
                current = "ANALOG"
                report["sections"][current] = []
            elif line.startswith("DISPLAY data:"):
                current = "DISPLAY"
                report["sections"][current] = []
            elif line.startswith("SERIAL data:"):
                current = "SERIAL"
                report["sections"][current] = []
            elif current:
                report["sections"][current].append(line)
        return report


# ----------------------------------------------------------------------
# Example usage
# ----------------------------------------------------------------------
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    with DewMaster("/dev/ttyS3", baudrate=19200) as dm:
        # Basic poll
        meas = dm.poll()
        if meas:
            print("Poll:", meas)

        # Set output every 5 seconds
        dm.set_output_interval(5)

        # Stream for 15 seconds
        dm.stream(duration=15.0)

        # Set alarm 1: DP > 25.0 °C
        dm.set_alarm(alarm=1, direction=">", value=25.0, param="DP")

        # Get full status
        status = dm.get_status()
        print("\n--- STATUS REPORT ---")
        for section, data in status["sections"].items():
            print(f"\n[{section}]")
            for l in data:
                print("  " + l)

INFO:__main__:DewMaster connected on /dev/ttyS3 @ 19200 baud
INFO:__main__:DewMaster disconnected.


RuntimeError: Did not receive INPUT prompt