# Hello World, Inheco Incubator (Shaker)

This notebook provides a minimal working example to connect to and interact with an INHECO Incubator Shaker over a USB serial connection. It demonstrates how to initialize communication, issue basic commands, and interpret the device’s responses.

## About the Device and Setup

INHECO incubator shakers are modular devices used for temperature control and agitation in lab automation setups. Multiple devices can be connected in a stack via USB, each assigned a **device ID** and **subdevice number** (usually 0–5). Commands are sent via a serial protocol and must be carefully constructed with CRC checks and timeouts.

Ensure:
- The correct **serial port** is selected (e.g., `/dev/cu.usbserial-140` on macOS).
- You know the **device ID** for the incubator you're communicating with.
- Only one command is sent at a time — the device cannot queue or process multiple requests concurrently.

## Command Types

There are three classes of commands:

**Report Commands**

Used to **query** the device’s state or configuration. These include:
- `RFV0`: Read firmware version  
- `RFV2`: Read serial number  
- `RATx`: Read actual temperature (for subdevice `x`)

They return a string payload enclosed between header and terminator bytes.

**Set Commands**

Used to **configure** parameters or states. For example:
- `SCT020`: Set control temperature to 20°C  
- `SST1`: Set shaking speed to 1 Hz  

A successful response consists only of an acknowledgement terminator (`0x20 0x60`).

**Action Commands**

Used to **trigger actions**. Examples include:
- `AID`: Initialize device  
- `AOD`: Open lid  
- `ACD`: Close lid  

These may respond more slowly (up to 5 seconds), especially when mechanical motion is involved.

---

The notebook will walk through:
- Connecting to the device
- Reading firmware and serial number
- Opening and closing the lid
- Reading temperatures from subdevices

> 📌 This is an essential step for integrating INHECO hardware into larger lab automation workflows.


## Setup (Physical)

In [3]:
import asyncio
import serial
import logging
import time
import threading
from typing import Optional, List


class InhecoTask:
    def __init__(self, fut: asyncio.Future, cmd: str, timeout_time: float):
        self.fut = fut
        self.cmd = cmd
        self.timeout_time = timeout_time


class InhecoIncubatorShakerBackend:

    def __init__(self, port: str, device_id: int = 2):
        self.port = port
        self.device_id = device_id
        self.logger = logging.getLogger(__name__)
        self.ser = None
        self.write_timeout = 2.0
        self.read_timeout = 5.0
        self._waiting_tasks: List[InhecoTask] = []
        self._reading_thread: Optional[threading.Thread] = None
        self._reading_active = False
        self._rx_buffer = bytearray()

    async def setup(self):
        self.ser = serial.Serial(
            port=self.port,
            baudrate=19200,
            bytesize=serial.EIGHTBITS,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            timeout=0
        )
        self.ser.write_timeout = self.write_timeout

        fw_version = await self.read_firmware_version(subdevice=0)
        print(
            f"Connected to INHECO Incubator on {self.port}\n"
            f"Inheco Incubator firmware version: {fw_version}"
        )
        self.logger.info(
            f"Connected to INHECO Incubator on {self.port}\n"
            f"Inheco Incubator firmware version: {fw_version}"
        )
        await self.initialize(subdevice=0)

    async def stop(self):
        if self.ser and self.ser.is_open:
            self.ser.close()
            self.logger.info("Disconnected from INHECO Incubator Shaker")
        self._reading_active = False
        self._waiting_tasks.clear()

    async def write(self, data: bytes):
        self.logger.debug(f"-> {data.hex()}")
        self.ser.write(data)

    async def _read_response(self, timeout: Optional[float] = None) -> bytes:
        timeout = timeout or self.read_timeout
        fut: asyncio.Future = asyncio.get_event_loop().create_future()
        self._start_reading(fut, timeout=timeout)
        return await fut

    def _start_reading(self, fut: asyncio.Future, timeout: float):
        timeout_time = time.time() + timeout
        self._waiting_tasks.append(InhecoTask(fut=fut, cmd="", timeout_time=timeout_time))

        if not self._reading_active:
            self._reading_active = True
            self._reading_thread = threading.Thread(target=self._reading_loop, daemon=True)
            self._reading_thread.start()

    def _reading_loop(self):
        expected_header = 0xB0 + self.device_id
        expected_tail = bytes([expected_header, 0x20, 0x60])

        while self._waiting_tasks:
            now = time.time()
            for i in range(len(self._waiting_tasks) - 1, -1, -1):
                task = self._waiting_tasks[i]
                if now > task.timeout_time:
                    task.fut.get_loop().call_soon_threadsafe(
                        task.fut.set_exception,
                        TimeoutError("Timed out waiting for response")
                    )
                    del self._waiting_tasks[i]
                    continue

            if self.ser.in_waiting:
                data = self.ser.read(self.ser.in_waiting)
                if data:
                    self._rx_buffer.extend(data)
                    self.logger.debug(f"<- {data.hex()} (buffer={self._rx_buffer.hex()})")

                    # Check if full response received
                    if expected_tail in self._rx_buffer:
                        end = self._rx_buffer.index(expected_tail) + len(expected_tail)
                        response = bytes(self._rx_buffer[:end])
                        self._rx_buffer = self._rx_buffer[end:]

                        for task in self._waiting_tasks:
                            task.fut.get_loop().call_soon_threadsafe(
                                task.fut.set_result, response
                            )
                        self._waiting_tasks.clear()

            time.sleep(0.01)

        self._reading_active = False

    def _crc8(self, data: bytearray) -> int:
        crc = 0xA1
        for byte in data:
            d = byte
            for _ in range(8):
                if ((d ^ crc) & 1):
                    crc ^= 0x18
                    crc >>= 1
                    crc |= 0x80
                else:
                    crc >>= 1
                d >>= 1
        return crc & 0xFF

    def _build_message(self, command: str, subdevice: int = 0) -> bytes:
        if not (0 <= subdevice <= 5):
            raise ValueError("Subdevice must be between 0 and 5")
        full_command = f"T0{subdevice}{command}"
        cmd = full_command.encode("ascii")
        length = len(cmd) + 3
        address = 0x30 + self.device_id
        proto = 0xC0 + len(cmd)
        message = bytearray([length, address, proto]) + cmd
        crc = self._crc8(message)
        return bytes(message + bytearray([crc]))

    def _is_report_command(self, command: str) -> bool:
        return command.upper().startswith("R")

    def _parse_response_bytes(self, response: bytes) -> str:
        expected_header = 0xB0 + self.device_id
        expected_tail = bytes([expected_header, 0x20, 0x60])

        if not response.startswith(bytes([expected_header])):
            raise ValueError(f"Unexpected response header: {response[:1].hex()}")

        if not response.endswith(expected_tail):
            raise ValueError(f"Unexpected response terminator: {response[-3:].hex()}")

        # Strip leading header and trailing header+status+terminator
        core = response[1:-3]

        try:
            return core.decode("ascii").strip()
        except UnicodeDecodeError:
            cleaned = core.decode("ascii", errors="ignore").strip()
            self.logger.warning(f"Non-ASCII bytes stripped from payload: {core!r}")
            return cleaned

    async def send_command(self, command: str, subdevice: int = 0, delay: float = 0.2,
                           read_timeout: Optional[float] = None) -> str:
        msg = self._build_message(command, subdevice=subdevice)
        await self.write(msg)
        await asyncio.sleep(delay)
        response = await self._read_response(timeout=read_timeout)
        if not response:
            raise TimeoutError(f"No response from device for command: {command}")
        try:
            if self._is_report_command(command):
                return self._parse_response_bytes(response)
            else:
                self._validate_acknowledgement(response)
                return ""
        except Exception as e:
            self.logger.error(f"Error parsing response to {command}: {e}")
            raise

    def _validate_acknowledgement(self, response: bytes):
        expected_header = 0xB0 + self.device_id
        if not response.startswith(bytes([expected_header])):
            raise ValueError(f"Unexpected ack header: {response[:1].hex()}")
        if not response.endswith(bytes([expected_header, 0x20, 0x60])):
            raise ValueError(f"Unexpected ack terminator: {response[-3:].hex()}")

    # === Public API ===
    async def read_firmware_version(self, subdevice: int = 0):
        return await self.send_command("RFV0", subdevice=subdevice)

    async def read_serial_number(self, subdevice: int = 0):
        return await self.send_command("RFV2", subdevice=subdevice)

    async def initialize(self, subdevice: int = 0):
        return await self.send_command("AID", subdevice=subdevice)

    async def open_lid(self, subdevice: int = 0):
        return await self.send_command("AOD", subdevice=subdevice)

    async def close_lid(self, subdevice: int = 0):
        return await self.send_command("ACD", subdevice=subdevice)


## Setup (Programmatic)

In [4]:
incubator = InhecoIncubatorShakerBackend("/dev/cu.usbserial-1140", device_id=2)
await incubator.setup()

Connected to INHECO Incubator on /dev/cu.usbserial-1140
Inheco Incubator firmware version: IncShak_C_V3.50_04/2012


In [8]:
incubator.device_id

2

## Usage

### Testing & Investigation

In [4]:
await incubator.read_firmware_version(subdevice=0)

'IncShak_C_V3.50_04/2012'

In [3]:
await incubator.read_firmware_version(subdevice=0)

'IncShak_C_V3.50_04/2012'

In [5]:
await incubator.send_command("REE")

# Reports the status of the initialisation of the device and the status of the Lab-Ware detection
# if
# 0 Device initialised
# 1 Device not initialised
# 2 Lab-Ware status unknown
# 3 Device not initialised and Lab-Ware status unknow

'2'

In [6]:
await incubator.send_command("RCM")

# Reports the date and an alphanumeric string (e.g. operator) of the last calibration for the device.
# The Data is reported in the Format YYYY-MM-DD,xxxxx (Example: 2005-09-28,xxxxx).
# The five xs are alphanumeric wildcards.

'2025-09-17,QS'

### Loading Drawer

In [7]:
await incubator.open_lid()
# await asyncio.sleep(5)
await incubator.close_lid()

''

### Temperature Control

In [8]:
for t in range(5):
    temp_meas = {}
    
    for x in range(1,4):
        temp_meas[x] = await incubator.send_command(f"RAT{x}", read_timeout=60)

    print(t, temp_meas)
    time.sleep(1)

# This command reports the actual ambient or slot temperatures of the three sensors.
# It can be chosen whether the evaluated temperature or the sensor value should be reported.
# The temperatures are reported in 1/10 °C: 345 = 34,5 °C

0 {1: '172', 2: '173', 3: '173'}
1 {1: '172', 2: '173', 3: '173'}
2 {1: '172', 2: '173', 3: '173'}
3 {1: '173', 2: '173', 3: '173'}
4 {1: '172', 2: '173', 3: '173'}


In [None]:
await incubator.send_command("RTT", read_timeout=60)


In [None]:
await incubator.send_command("STT700", read_timeout=60)


### Shaking Control

TODO

In [None]:
import serial.tools.list_ports

def list_serial_devices():
    ports = serial.tools.list_ports.comports()
    for port in ports:
        print(f"Device: {port.device}")
        print(f"  Description: {port.description}")
        print(f"  HWID: {port.hwid}")
        print("-" * 40)

if __name__ == "__main__":
    print("Scanning USB serial devices...")
    list_serial_devices()


### Closing Connection

In [9]:
await incubator.stop()