diff --git a/_typos.toml b/_typos.toml index 20d22e4f16e..fc5fce0268c 100644 --- a/_typos.toml +++ b/_typos.toml @@ -20,6 +20,8 @@ PN = "PN" tro = "tro" FO = "FO" UE = "UE" +RHE = "RHE" +"ASEND" = "ASEND" [files] extend-exclude = [ diff --git a/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_dip_switch_addressing.png b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_dip_switch_addressing.png new file mode 100644 index 00000000000..f55d19773cf Binary files /dev/null and b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_dip_switch_addressing.png differ diff --git a/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_mp_dwp.png b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_mp_dwp.png new file mode 100644 index 00000000000..66dbfa04234 Binary files /dev/null and b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_mp_dwp.png differ diff --git a/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_physical_setup_overview.png b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_physical_setup_overview.png new file mode 100644 index 00000000000..e9eabddf0b4 Binary files /dev/null and b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_physical_setup_overview.png differ diff --git a/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_t_sensor_positioning.png b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_t_sensor_positioning.png new file mode 100644 index 00000000000..3232987ab76 Binary files /dev/null and b/docs/user_guide/01_material-handling/storage/inheco/img/inheco_incubator_shaker_t_sensor_positioning.png differ diff --git a/docs/user_guide/01_material-handling/storage/inheco/incubator_shaker.ipynb b/docs/user_guide/01_material-handling/storage/inheco/incubator_shaker.ipynb new file mode 100644 index 00000000000..8ce49a5dc30 --- /dev/null +++ b/docs/user_guide/01_material-handling/storage/inheco/incubator_shaker.ipynb @@ -0,0 +1,1298 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "01bd78dc-183e-45fe-a3b0-59c8666b4f14", + "metadata": {}, + "source": [ + "# Inheco Incubator (Shaker)\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
\n", + "\n", + "
    \n", + "
  • OEM Link
  • \n", + "
  • Communication Protocol / Hardware: Serial (FTDI) / USB-A
  • \n", + "
  • Communication Level: Firmware (documentation shared by OEM)
  • \n", + "
  • Same command set for:\n", + "
      \n", + "
    • Incubator “MP”
    • \n", + "
    • Incubator “DWP”
    • \n", + "
    • Incubator Shaker “MP”
    • \n", + "
    • Incubator Shaker “DWP”
    • \n", + "
    \n", + "
  • \n", + "
  • Incubator Shaker “MP” VID:PID 0403:6001
  • \n", + "
  • Takes in a single plate via a loading tray, heats it to the set temperature, and shakes it to the set RPM.
  • \n", + "
\n", + "\n", + "
\n", + "
\n", + " Figure: Inheco Incubator Shaker MP & DWP models\n", + "
" + ] + }, + { + "cell_type": "markdown", + "id": "70f74446-c274-4803-a6ce-6c9c2d7d3bba", + "metadata": {}, + "source": [ + "## About the Machine(s)\n", + "\n", + "Inheco incubator shakers are modular machines used for plate storage, temperature control and shaking.\n", + "They differentiate themselves:\n", + "- **heater shakers** ... heat a material on which a plate is being placed; open-access; non-uniform temperature distribution around the plate; enables shaking of plate.\n", + "- **incubator shakers** ... an enclosed chamber that is being heated and houses a plate; plate access is controlled via a loading tray and a door; *highly uniform temperature distribution around the plate*; enables shaking of plate.\n", + "\n", + "The Inheco incubator devices come in 4 versions, dependent on (1) whether they provide a shaking feature & (2) the size of plates they accept:\n", + "\n", + "\n", + "| **RTS Code** | **Shaking Feature** | **Plate Format** | **Device Identifier** | **Typical Model** |\n", + "|:-------------:|:--------------:|:----------------:|:----------------------|:------------------|\n", + "| `0` | ❌ No | MP (Microplate) | `incubator_mp` | INHECO Incubator MP | \n", + "| `1` | ✅ Yes | MP (Microplate) | `incubator_shaker_mp` | INHECO Incubator Shaker MP | \n", + "| `2` | ❌ No | DWP (Deepwell Plate) | `incubator_dwp` | INHECO Incubator DWP | \n", + "| `3` | ✅ Yes | DWP (Deepwell Plate) | `incubator_shaker_dwp` | INHECO Incubator Shaker DWP | \n", + "\n", + "\n", + "```{note}\n", + "Note: All 4 machines can be controlled with the same PyLabRobot Backend, called `InhecoIncubatorShakerBackend`!\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "42739583-9f29-4063-983d-18dcdfea61ba", + "metadata": {}, + "source": [ + "---\n", + "## Setup Instructions (Physical)" + ] + }, + { + "cell_type": "markdown", + "id": "95e39b36-c441-499d-bfaa-327d49d5cc04", + "metadata": {}, + "source": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "
\n", + " \n", + "
\n", + " Figure: Physical setup overview of the Inheco Incubator Shaker system\n", + "
\n" + ] + }, + { + "cell_type": "markdown", + "id": "e0f0ef32-566b-4fa3-b358-db2a703957de", + "metadata": {}, + "source": [ + "To facilitate integration, multiple devices can be placed on top of each other to form an Incubator Shaker Stack (see infographic above), but care has to be taken to not overstrain the connections:\n", + "\n", + "Each of the 4 different shaker types requires a different amount of power.\n", + "An easier way to identify the configurations possible is to think of \"incubator power credits\" - **no stack must exceed 5 power credits** (see User and Installation Manual):\n", + "\n", + "1. An \"incubator MP\" -> 1 \"incubator power credits\" -> 5 units can be stacked on top of each other.\n", + "2. An \"incubator DWP\" -> 1.25 \"incubator power credits\" -> 4 units.\n", + "3. An \"incubator shaker MP\" -> 1.6 \"incubator power credits\" -> 3 units\n", + "4. An \"incubator shaker DWP\" -> 2.5 \"incubator power credits\" -> 2 units\n", + "\n", + "However, the machines in a single stack can be of any of the 4 types.\n", + "This means you could create stacks of: \n", + "- 2x \"incubator DWP\" (1.25 credits) + 1x \"incubator shaker DWP\" (2.5 credits)\n", + "- 3x \"incubator MP\" (1 credits) + 1x \"incubator shaker MP\" (1.6 credits) [shown in the infographic above]\n", + "\n", + "When a stack would exceed more than 5 \"incubator power credits\", you **must build multiple stacks** (ask your Inheco sales representative if you are unsure before trying this out).\n", + "\n", + "The benefit of this setup is that only **one** power cable and only **one** USB cable have to be plugged into the machine at the very bottom of a machine (i.e. stack index 0).\n", + "Machines above the bottom one only need to be connected with the machine below it using the 15-pin SUB-D connectors that come with each machine when bought from Inheco.\n", + "\n", + "```{note}\n", + "Note: In PyLabRobot, the stack is the central control element and is controlled via its own instance of the `InhecoIncubatorShakerStackBackend`.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "40f8dcb4-7779-4c51-9bdf-bf8d05f46c46", + "metadata": {}, + "source": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
\n", + "\n", + "To connect an InhecoIncubatorShakerStackBackend you have to specify the DIP switch identifier:\n", + "
    \n", + "
  • located on the back of the bottom machine,
    \n", + "
  • it defines the DIP switch configuration for the entire stack above it.
  • \n", + "\n", + "
\n", + "\n", + "

Setting the DIP switch to generate a machine address

\n", + "\n", + "The DIP switch at the back of each machine has to be set manually and consists of 4 pins that can be set into an UP / 0 or a DOWN / 1 position.\n", + "\n", + "(Note: There are two more pins to the left of the DIP switch pins. They are not involved in setting the DIP switch address, and should be left in their DOWN position.)\n", + "\n", + "This represents binary encoding:\n", + "
    \n", + "
  • All pins at 0 → DIP switch is set to address binary 0 0 0 0 -> decimal 0
  • \n", + "
  • All pins at 1 → DIP switch is set to address binary 1 1 1 1 -> decimal 15 (24-1)
  • \n", + "
\n", + "\n", + "This information is crucial for creating the correct commands to communicate with the machine stack.\n", + "\n", + "
\n", + "
\n", + "Figure: DIP switch layout to generate different identifiers/addresses\n", + "
" + ] + }, + { + "cell_type": "markdown", + "id": "d0d6256e-673a-4979-88cc-d07959ce92ea", + "metadata": {}, + "source": [ + "---\n", + "## Setup Instructions (Programmatic)\n", + "\n", + "After the two cables have been connected to the bottom-most Inheco Incubator Shaker, you have to...\n", + "1. instantiate the `InhecoIncubatorShakerStackBackend` and give it the correct `dip_switch_id` & `stack_index`, and\n", + "2. create a `IncubatorShakerStack` frontend and give it the new backend instance.\n", + "\n", + "The \"stack\" is the central interface to all units in it.\n", + "The stack automatically identifies all units inside it (including their type), and will create both the correct connection and a physical instance for it.\n", + "\n", + "```{note}\n", + "Before a connection has been established the incubator shaker's front LED blinks.\n", + "After the connection has succesfully been made, the LED will continuously be on.\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "c9acf6e1-2465-42fd-bad8-f6d3fc052e97", + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.storage.inheco import IncubatorShakerStack, InhecoIncubatorShakerStackBackend\n", + "\n", + "import asyncio # only needed for examples in this tutorial, optional for you purposes\n", + "import time # only needed for examples in this tutorial, optional for you purposes" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "bb6e529f-3bb3-46de-87af-5931269b04e4", + "metadata": {}, + "outputs": [], + "source": [ + "iis_stack_backend = InhecoIncubatorShakerStackBackend(dip_switch_id = 2)\n", + "\n", + "iis_stack = IncubatorShakerStack(backend=iis_stack_backend)\n", + "\n", + "await iis_stack.setup()" + ] + }, + { + "cell_type": "markdown", + "id": "c2c7002f-a11b-402e-8100-5d9eb528a1f0", + "metadata": {}, + "source": [ + "```{note}\n", + "If you are interested in seeing information about the machine you are connecting to, you can set the `.setup()` optional argument `verbose` to `True`:\n", + "1. serial port used for connection\n", + "2. DIP switch ID used and verified\n", + "3. number of units identified in the stack\n", + "4. composition (index and type of units) of the stack \n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "0a17aadb-8373-4e24-9678-7c4453fc8661", + "metadata": {}, + "source": [ + "## Usage: Controlling Individual Units" + ] + }, + { + "cell_type": "markdown", + "id": "92e7123a-8aa7-41b7-baa9-9765bddaffe9", + "metadata": {}, + "source": [ + "### Addressing Units & Sensing Plate Presence\n", + "\n", + "The stack interface enables fast, direct access to any machine in a stack.\n", + "\n", + "Every Inheco incubator (shaker) contains an internal, reflection-based plate sensor.\n", + "(This is very useful e.g. when someone has forgotten their plate in the incubator 👀)\n", + "\n", + "Let's use this as an example of how you can address different units in the stack individually:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "0585bf87-7d00-4d1e-9ce3-f58617d66961", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "2" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "iis_stack.num_units" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7642febc-5a5e-4e17-be6d-8a75fae7a1f4", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0 False\n", + "1 False\n" + ] + } + ], + "source": [ + "for idx in range(iis_stack.num_units):\n", + " plate_presence_check = await iis_stack[idx].request_plate_in_incubator()\n", + " print(idx, plate_presence_check)" + ] + }, + { + "cell_type": "markdown", + "id": "051629ac", + "metadata": {}, + "source": [ + "Option 2: Addressing individual units by calling the stack backend with the correct stack_index" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04fa7214-fb34-4230-8fb4-f20e66a8e476", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0 False\n", + "1 False\n" + ] + } + ], + "source": [ + "for idx in range(iis_stack.num_units):\n", + " plate_presence_check = await iis_stack.backend.request_plate_in_incubator(\n", + " stack_index=idx\n", + " )\n", + " print(idx, plate_presence_check)" + ] + }, + { + "cell_type": "markdown", + "id": "59e6c797", + "metadata": {}, + "source": [ + "Option 3: Storing each unit as a handy variable" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "87ed2d05-620a-4e8a-9578-bd5d27a81e2a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "False False\n" + ] + } + ], + "source": [ + "incubator_shaker_0 = iis_stack[0]\n", + "plate_presence_check_0 = await incubator_shaker_0.request_plate_in_incubator()\n", + "\n", + "incubator_shaker_1 = iis_stack[1]\n", + "plate_presence_check_1 = await incubator_shaker_1.request_plate_in_incubator()\n", + "\n", + "print(plate_presence_check_0, plate_presence_check_1)" + ] + }, + { + "cell_type": "markdown", + "id": "dea2645e-5426-43c0-9511-35b30aa290cb", + "metadata": {}, + "source": [ + "We usually use the direct indexing of the frontend method but it is up to you choose.\n", + "e.g.: storing of units in separate variables can be very useful when using many stacks." + ] + }, + { + "cell_type": "markdown", + "id": "a30de061-fc3f-434c-882e-4972c8404479", + "metadata": {}, + "source": [ + "### Using Loading Tray" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e6009273-a626-4de9-a9db-0f0de8021a0e", + "metadata": {}, + "outputs": [], + "source": [ + "for idx in range(iis_stack.num_units):\n", + " await iis_stack[idx].open()\n", + " await asyncio.sleep(2)\n", + " await iis_stack[idx].close()" + ] + }, + { + "cell_type": "markdown", + "id": "7d86ace1-7522-4630-ad10-6f4b944fbfc9", + "metadata": {}, + "source": [ + "```{warning}\n", + "**On parallelization of commands to machines in the same incubator shaker stack**\n", + "\n", + "Each machine in the same stack communicates via the same USB(-A to -B) cable.\n", + "As a result, if you send multiple commands at the same time, they will be queued and executed one after another.\n", + "\n", + "This means you cannot open all incubator shakers in the same stack at the same time.\n", + "\n", + "However, if you arrange your Inheco Incubators into different stacks this should still be possible.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "6aaef05a-2cb9-4b12-bf86-ae6d63665036", + "metadata": {}, + "source": [ + "### Temperature Control" + ] + }, + { + "cell_type": "markdown", + "id": "7e28b749", + "metadata": {}, + "source": [ + "Show current temperature in °C" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c8da2cf9-cbeb-4202-8ef7-2cc583ce16c4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "20.1\n", + "23.6\n" + ] + } + ], + "source": [ + "for idx in range(iis_stack.num_units):\n", + " current_temp = await iis_stack[idx].get_temperature()\n", + " print(current_temp)" + ] + }, + { + "cell_type": "markdown", + "id": "e35729c9", + "metadata": {}, + "source": [ + "Time how long the machine takes to reach target temperature using standard Python - no need to re-invent the wheel" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "92fe00a9-9546-4d11-9a3c-c644188ea0c0", + "metadata": {}, + "outputs": [], + "source": [ + "target_temperature = 37\n", + "\n", + "await iis_stack[0].start_temperature_control(target_temperature)\n", + "\n", + "start_time = time.time()" + ] + }, + { + "cell_type": "markdown", + "id": "d8cf8808", + "metadata": {}, + "source": [ + "Quick check of how the temperature increases for 5 sec" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b635baae-0cc9-4d10-8644-c377f94656b9", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "20.3\n", + "20.7\n", + "21.6\n", + "22.6\n", + "23.5\n" + ] + } + ], + "source": [ + "for x in range(5):\n", + " current_temp = await iis_stack[0].get_temperature(sensor=\"main\")\n", + " print(current_temp)\n", + "\n", + " time.sleep(1)" + ] + }, + { + "cell_type": "markdown", + "id": "438e19d7-714d-46f3-8a9e-f00798ca9893", + "metadata": {}, + "source": [ + "\n", + " \n", + " \n", + " \n", + " \n", + "
\n", + "

The Inheco Incubator (Shaker) contains three independent temperature sensors:

\n", + "
    \n", + "
  1. main sensor — close to the door/front, inside the machine
  2. \n", + "
  3. validation sensor — back, inside the machine
  4. \n", + "
  5. boost sensor — on heating foil, inside the machine
  6. \n", + "
\n", + "

\n", + " By default, iis_stack[0].get_temperature()’s argument is set to\n", + " sensor=\"main\". \n", + " This can be changed to any of the following:\n", + "

\n", + "
    \n", + "
  • \"main\"
  • \n", + "
  • \"dif\"
  • \n", + "
  • \"boost\"
  • \n", + "
  • \"mean\" - takes all three sensors’ measurements and returns their geometric mean
  • \n", + "
\n", + "
\n", + " \"Inheco\n", + "
\n", + " \n", + " Figure: Inheco Incubator Shaker Temperature Sensor Positioning\n", + " \n", + "
\n" + ] + }, + { + "cell_type": "markdown", + "id": "aebaf1c5", + "metadata": {}, + "source": [ + "Wait until target temperature has been reach:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8c5caa89-e5c5-49de-996f-a13cd790aa61", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for target temperature 37.00 °C...\n", + "\n", + "[███████████████████████████████████████-] 36.97 °C (Δ=0.03 °C | ETA: 0.1s))\n", + "[OK] Target temperature reached.\n", + "\n", + "time taken to reach target temperaure 37°C: 37.5 sec\n" + ] + } + ], + "source": [ + "temp_reached = await iis_stack[0].wait_for_temperature(\n", + " sensor = \"mean\",\n", + " tolerance = 0.1, # ℃ - default: 0.2\n", + " interval_s = 0.2, # sec - default: 0.5\n", + " show_progress_bar = True # default: False\n", + ")\n", + "\n", + "elapsed_time = time.time() - start_time\n", + "\n", + "print(f\"\\ntime taken to reach target temperaure {target_temperature}°C: {round(elapsed_time, 1)} sec\")" + ] + }, + { + "cell_type": "markdown", + "id": "55235ece", + "metadata": {}, + "source": [ + "Simple stopping of temperature control without stopping (i.e. breaking the connection) the machine itself:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "88e17cd9-de42-4f6e-9db0-b74ad74e4a85", + "metadata": {}, + "outputs": [], + "source": [ + "await iis_stack[0].stop_temperature_control()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b5a0ff93-ee2e-4049-86f7-59815163d6f2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack[0].is_temperature_control_enabled()" + ] + }, + { + "cell_type": "markdown", + "id": "58c0aa31-9b6b-426f-a038-cdb80614c005", + "metadata": {}, + "source": [ + "### Shaking Control\n", + "\n", + "Only Incubator \"Shakers\" can use shaking commands.\n", + "\n", + "During `.setup()` the machine will check whether it is an `incubator_shaker` (\"MP\" or \"DWP\") and the Python backend only allows shaking commands being sent to the machine if it is an `incubator_shaker`, i.e. the following commands will not work if you have pure incubators." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fdac2882-59c2-4531-a115-97a644765476", + "metadata": {}, + "outputs": [], + "source": [ + "await iis_stack[0].shake(rpm=800)\n", + "\n", + "await asyncio.sleep(5)\n", + "\n", + "await iis_stack[0].stop_shaking()" + ] + }, + { + "cell_type": "markdown", + "id": "fc1da4bf-452e-4ead-bd8a-64d219449a64", + "metadata": {}, + "source": [ + "Inheco incubator shakers support precise, programmable motion in both the **X** and **Y** axes.\n", + "The resulting shaking pattern is defined by five parameters:\n", + "\n", + "- **Amplitude in X** (`Aₓ`, 0–3 mm)\n", + "- **Amplitude in Y** (`Aᵧ`, 0–3 mm)\n", + "- **Frequency in X** (`fₓ`, 6.6–30.0 Hz)\n", + "- **Frequency in Y** (`fᵧ`, 6.6–30.0 Hz)\n", + "- **Phase shift** (`φ`, the angular offset between X and Y motion, in degrees)\n", + "\n", + "Different combinations of these parameters produce circular, linear, elliptical, or\n", + "figure-eight movement paths.\n", + "\n", + "---\n", + "\n", + "#### Predefined Shaking Patterns in PyLabRobot\n", + "\n", + "To simplify configuration, PyLabRobot provides predefined motion presets that map common use cases to specific parameter combinations:\n", + "\n", + "| Pattern | Description | Parameter relationship | Required speed attribute |\n", + "|----------|--------------|------------------------|---------------------------|\n", + "| `orbital` | Circular shaking | `Aₓ = Aᵧ`, `φ = 90°`, `fₓ = fᵧ` | `rpm` |\n", + "| `elliptical` | Elliptical motion | `Aₓ ≠ Aᵧ`, `φ = 90°`, `fₓ = fᵧ` | `rpm` |\n", + "| `figure_eight` | Figure-eight (Lissajous) motion | `Aₓ ≈ Aᵧ`, `φ = 90°`, `fᵧ = 2 fₓ` | `rpm` |\n", + "| `linear_x` | Linear motion along X | `Aᵧ = 0` | `frequency_hz` |\n", + "| `linear_y` | Linear motion along Y | `Aₓ = 0` | `frequency_hz` |\n", + "\n", + "```{note}\n", + "The default behaviour of `.shake()` uses...\n", + "- an orbital shaking pattern,\n", + "- x amplitude = 3 mm,\n", + "- y amplitude = 3 mm.\n", + "\n", + "(see “Simplest usage” example above)\n" + ] + }, + { + "cell_type": "markdown", + "id": "637c7550", + "metadata": {}, + "source": [ + "Orbital shaking example with modified amplitudes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21ef6992-fdd2-47f7-b739-0de4cb1022e0", + "metadata": {}, + "outputs": [], + "source": [ + "await iis_stack[0].shake(\n", + " pattern=\"orbital\",\n", + " rpm=800,\n", + " amplitude_x_mm=2.0,\n", + " amplitude_y_mm=2.0\n", + ")\n", + "\n", + "await asyncio.sleep(5)\n", + "\n", + "await iis_stack[0].stop_shaking()" + ] + }, + { + "cell_type": "markdown", + "id": "2a9b5212", + "metadata": {}, + "source": [ + "Elliptical shaking example with modified amplitudes:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8b6d28a3-6c02-47cd-a5e0-1e01cc35e4d0", + "metadata": {}, + "outputs": [], + "source": [ + "await iis_stack[0].shake(\n", + " pattern=\"elliptical\",\n", + " rpm=800,\n", + " amplitude_x_mm=2.5,\n", + " amplitude_y_mm=2.5\n", + ")\n", + "\n", + "await asyncio.sleep(5)\n", + "\n", + "await iis_stack[0].stop_shaking()" + ] + }, + { + "cell_type": "markdown", + "id": "f1a72cea", + "metadata": {}, + "source": [ + "Figure-eight shaking example:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bd1b713c-0c1f-4e6c-81b0-88cee4ba8719", + "metadata": {}, + "outputs": [], + "source": [ + "await iis_stack[0].shake(\n", + " pattern=\"figure_eight\",\n", + " rpm=400,\n", + ")\n", + "\n", + "await asyncio.sleep(5)\n", + "\n", + "await iis_stack[0].stop_shaking()" + ] + }, + { + "cell_type": "markdown", + "id": "9eb382d9-3432-4d0e-9514-02f25eb7ef68", + "metadata": {}, + "source": [ + "If you feel adventurous, see the math that goes into the calculation of different shaking patterns here:" + ] + }, + { + "cell_type": "markdown", + "id": "97014c29-c023-4259-aa3f-1b7fa85c9c24", + "metadata": {}, + "source": [ + "
\n", + "📘 How PyLabRobot Implements Inheco Shaking Patterns (Mathematical Overview)\n", + "\n", + "Inheco incubator shakers move a plate by oscillating the platform in two directions — **X** and **Y** — at programmable amplitudes, frequencies, and phase offsets.\n", + "\n", + "---\n", + "\n", + "**The Core Equations**\n", + "\n", + "The motion of the platform is described by two sinusoidal functions:\n", + "\n", + "\\[\n", + "\\begin{aligned}\n", + "x(t) &= Aₓ \\sin(2\\pi fₓ t) \\\\\n", + "y(t) &= Aᵧ \\sin(2\\pi fᵧ t + φ)\n", + "\\end{aligned}\n", + "\\]\n", + "\n", + "Where:\n", + "\n", + "| Symbol | Meaning | Example |\n", + "|:--|:--|:--|\n", + "| `Aₓ`, `Aᵧ` | Amplitudes (mm) — how far the plate moves in X and Y | 2.5 mm |\n", + "| `fₓ`, `fᵧ` | Frequencies (Hz) — how fast each axis oscillates | 10 Hz, 20 Hz |\n", + "| `φ` | Phase shift (°) — timing offset between X and Y | 0°, 90°, 180° |\n", + "\n", + "Each axis moves smoothly back and forth like a spring. \n", + "When these two motions combine, they trace elegant paths such as circles, ellipses, or figure-eights.\n", + "\n", + "---\n", + "\n", + "**Pattern Intuition**\n", + "\n", + "Different shaking patterns are created by adjusting the relationships between these parameters:\n", + "\n", + "| Pattern | Conditions | Description |\n", + "|:--|:--|:--|\n", + "| **Linear X** | `Aᵧ = 0` | Motion only along X (back-and-forth line) |\n", + "| **Linear Y** | `Aₓ = 0` | Motion only along Y |\n", + "| **Orbital** | `Aₓ = Aᵧ`, `fₓ = fᵧ`, `φ = 90°` | Perfect circular motion |\n", + "| **Elliptical** | `Aₓ ≠ Aᵧ`, `fₓ = fᵧ`, `φ = 90°` | Elongated circle (ellipse) |\n", + "| **Figure-Eight (Lissajous)** | `Aₓ ≈ Aᵧ`, `fᵧ = 2 fₓ`, `φ = 90°` | Double-loop path shaped like ∞ |\n", + "\n", + "---\n", + "\n", + "**Example: Figure-Eight Motion**\n", + "\n", + "In firmware terms:\n", + "\n", + "SSP20,20,100,200,90\n", + "ASE1\n", + "\n", + "\n", + "corresponds to:\n", + "\n", + "- `Aₓ = Aᵧ = 2.0 mm`\n", + "- `fₓ = 10.0 Hz`\n", + "- `fᵧ = 20.0 Hz`\n", + "- `φ = 90°`\n", + "\n", + "This combination makes the platform’s Y motion twice as fast as its X motion — \n", + "the resulting path is a **Lissajous figure**, visually resembling a “figure-8”.\n", + "\n", + "---\n", + "\n", + "**Why This Matters**\n", + "\n", + "By controlling these parameters precisely:\n", + "- The **mixing efficiency** can be tuned to the liquid’s viscosity.\n", + "- The **path geometry** affects shear stress and aeration.\n", + "- **Repeatable motion profiles** ensure reproducibility across runs.\n", + "\n", + "Understanding this relationship helps you select the right pattern\n", + "(`orbital`, `elliptical`, `figure_eight`, etc.) for your experiment.\n", + "\n", + "
\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "d18c1605-e179-4b89-932c-a1d1fd00ae10", + "metadata": {}, + "source": [ + "### Empowerment Showcase\n", + "\n", + "With control of multiple single incubator shakers a whole array of complex experimental & optimisation processes is possible.\n", + "\n", + "This PyLabRobot integration aims to make these machine powers as accessible as possible.\n", + "\n", + "One still relatively simple example:\n", + "Parallelize shaking of different incubators with different shaking + temperature conditions ... did someone say \"Design of Experiments\" 👀📊" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9016a568-3e26-4041-95cf-149d6fbe9bd6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for target temperature 29.00 °C...\n", + "\n", + "[████████████████████████████████████----] 29.20 °C (Δ=0.20 °C | ETA: 3.0s)\n", + "[OK] Target temperature reached.\n", + "Waiting for target temperature 37.00 °C...\n", + "\n", + "[█████████████---------------------------] 36.87 °C (Δ=0.13 °C | ETA: 4.3s)\n", + "[OK] Target temperature reached.\n" + ] + } + ], + "source": [ + "await iis_stack[0].start_temperature_control(29)\n", + "await iis_stack[1].start_temperature_control(37)\n", + "\n", + "await iis_stack[0].wait_for_temperature(sensor=\"mean\", show_progress_bar=True)\n", + "await iis_stack[1].wait_for_temperature(sensor=\"mean\", show_progress_bar=True)\n", + "\n", + "\n", + "await iis_stack[0].shake(\n", + " pattern=\"orbital\",\n", + " rpm=500,\n", + ")\n", + "\n", + "await iis_stack[1].shake(\n", + " pattern=\"figure_eight\",\n", + " rpm=800,\n", + ")\n", + "\n", + "await asyncio.sleep(10)\n", + "\n", + "await iis_stack[0].stop_temperature_control()\n", + "await iis_stack[1].stop_temperature_control()\n", + "\n", + "await iis_stack[0].stop_shaking()\n", + "await iis_stack[1].stop_shaking()" + ] + }, + { + "cell_type": "markdown", + "id": "4db6a1e0-ed03-4359-9ab8-e19246d082cf", + "metadata": {}, + "source": [ + "### Self Test / Maintenance (PLR beta)\n", + "\n", + "The Inheco firmware provides a \"self-test\" which checks the drawer, temperature and shaking features.\n", + "This test can take up to 5 min.\n", + "\n", + "The test *must be* performed without a plate in the incubator.\n", + "\n", + "It generates a binary code in which each position represents a machine subsystem:\n", + "- Bit 0: Drawer\n", + "- Bit 1: Homogeneity Sensor 3 versus Sensor 1 (>2 K)\n", + "- Bit 2: Homogeneity Sensor 2 versus Sensor 1 (>2 K)\n", + "- Bit 3: Sensor 1 doesn’t reach Target Temperature after 130 sec.\n", + "- Bit 4: Y-Amplitude Shaker\n", + "- Bit 5: X-Amplitude Shaker\n", + "- Bit 6: Phase Shift Shaker\n", + "- Bit 7: Y-Frequency Shaker\n", + "- Bit 8: X-Frequency Shaker\n", + "- Bit 9: Line Boost-Heater broken\n", + "- Bit 10: Line Main-Heater broken\n", + "\n", + "A `0` means no error has been found for that subsystem, and a `1` means there is a hardware fault." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1c46a480-6d0f-4792-b388-12ce9c3513f6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{\n", + " \"drawer_error\": False,\n", + " \"homogeneity_sensor_3_vs_1_error\": False,\n", + " \"homogeneity_sensor_2_vs_1_error\": False,\n", + " \"sensor_1_target_temp_error\": False,\n", + " \"y_amplitude_shaker_error\": False,\n", + " \"x_amplitude_shaker_error\": False,\n", + " \"phase_shift_shaker_error\": False,\n", + " \"y_frequency_shaker_error\": False,\n", + " \"x_frequency_shaker_error\": False,\n", + " \"line_boost_heater_broken\": False,\n", + " \"line_main_heater_broken\": False,\n", + "}\n" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack[0].perform_self_test()" + ] + }, + { + "cell_type": "markdown", + "id": "a2d15a39-a2e4-4d7e-ac9a-5b93770bfa9b", + "metadata": {}, + "source": [ + "This is a beta feature in PyLabRobot and we will verify the interpretation with the PyLabRobot supporting OEM, Inheco - all our machines appear to be fully functional, i.e. we couldn't check whether a faulty machine will correctly be flagged by this self-test." + ] + }, + { + "cell_type": "markdown", + "id": "e9c95159-484c-4a21-9cdb-43c4ee017bbe", + "metadata": {}, + "source": [ + "---\n", + "## Usage: Master Control via the Stack Frontend 🦾\n", + "\n", + "Even though loops make setting temperatures fast and efficient, we found it is too much code.\n", + "\n", + "This is why we enabled the frontend to have \"master control commands\" for all units in a stack." + ] + }, + { + "cell_type": "markdown", + "id": "f81a3965-280f-4163-8fae-53db746e6c62", + "metadata": {}, + "source": [ + "### Querying Statuses" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9d9a782b-49f0-4c5f-8c33-504c44ccc289", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{0: 'closed', 1: 'closed'}" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack.request_loading_tray_states()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "445accca-0dc4-4713-bbe5-4212ff8741d8", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{0: False, 1: False}" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack.request_temperature_control_states()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "96a443ac-dfb8-4744-9f42-14e74c84f333", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{0: False, 1: False}" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack.request_shaking_states()" + ] + }, + { + "cell_type": "markdown", + "id": "a1660de6-b823-45ca-ada3-916c2e17d571", + "metadata": {}, + "source": [ + "### Master Commmands - Loading Trays" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1c71f19f-10f5-4a38-9dad-b87967b89220", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{0: 'open', 1: 'open'}" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack.open_all()\n", + "\n", + "await iis_stack.request_loading_tray_states()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "57fe0f47-f2ab-44db-bc0f-5cbea1f11e2c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{0: 'closed', 1: 'closed'}" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack.close_all()\n", + "\n", + "await iis_stack.request_loading_tray_states()" + ] + }, + { + "cell_type": "markdown", + "id": "6660d186-971a-42dc-927f-52b1689de27c", + "metadata": {}, + "source": [ + "### Master Commmands - Temperature Control" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d534ff6a-7b7b-45f7-aa6a-07c0387f286f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{0: 37.8, 1: 34.4}" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack.start_all_temperature_control(target_temperature=37)\n", + "\n", + "await asyncio.sleep(10)\n", + "\n", + "await iis_stack.get_all_temperatures()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7e98bbf3-aed4-4ae8-a888-9c588d5189a2", + "metadata": {}, + "outputs": [], + "source": [ + "await iis_stack.stop_all_temperature_control()" + ] + }, + { + "cell_type": "markdown", + "id": "71fb31e4-9cd5-4f4b-8bdb-2bb1e6e2835c", + "metadata": {}, + "source": [ + "### Master Commmands - Shaking Control" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "163db59d-3ae2-4dec-880a-53b323ca00bc", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{0: False, 1: False}" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "await iis_stack.request_shaking_states()" + ] + }, + { + "cell_type": "markdown", + "id": "7f34eb41-cb4a-4fbb-8c7c-36400dead6c4", + "metadata": {}, + "source": [ + "## Closing Connection\n", + "\n", + "Standard PyLabRobot way of closing the communication to the machine, i.e. the stack:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2c6fd6d7-c85c-4fc3-a35c-e23b9b18ee1d", + "metadata": {}, + "outputs": [], + "source": [ + "await iis_stack.stop()" + ] + }, + { + "cell_type": "markdown", + "id": "20cf4f13-0e54-4c26-9201-83140b738c35", + "metadata": {}, + "source": [ + "This stops all temperature control, and all shaking before disconnecting from the stack." + ] + }, + { + "cell_type": "markdown", + "id": "509f8bc7-9e9c-4250-b854-b2a0c8ded9e8", + "metadata": {}, + "source": [ + "```{note}\n", + "If you develop a small script that you find yourself re-using and that goes beyond the simple \"hello world, inheco incubator shaker\"-style examples here, please consider contributing it back to the PyLabRobot community as a Cookbook Recipe.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "98db023d-d081-4bce-be7e-466735b439c7", + "metadata": {}, + "source": [ + "---\n", + "## Usage: Multiple Stacks\n", + "\n", + "To connect more then one machine stack:\n", + "- instantiate a separate backend and frontend for each,\n", + "- you **must** hand the serial port to each stack's backend explicitly\n", + "\n", + "```{note}\n", + "When using one stack, PyLabRobot finds the machine's port automatically based on its unique VID:PID,\n", + "if multiple machines are found with the same VID:PID there is ambiguity\n", + "- e.g. the VSpin & Cytation 5 use the same identifier combo :')\n", + "```\n", + "- perform a setup for each stack. \n", + "\n", + "(set on the back of the bottom-most machine):\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1f7a4e50-9888-4327-8de1-097fe523590a", + "metadata": {}, + "outputs": [], + "source": [ + "iis_stack_backend_0 = InhecoIncubatorShakerStackBackend(dip_switch_id = 2, port=\"/dev/cu.usbserial-130\")\n", + "iis_stack_0 = IncubatorShakerStack(backend=iis_stack_backend_0)\n", + "await iis_stack.setup(verbose=True)\n", + "\n", + "iis_stack_backend_1 = InhecoIncubatorShakerStackBackend(dip_switch_id = 7, port=\"/dev/cu.usbserial-42\")\n", + "iis_stack_1 = IncubatorShakerStack(backend=iis_stack_backend_1)\n", + "await iis_stack_1.setup(verbose=True)\n", + "\n", + "iis_stack_backend_2 = InhecoIncubatorShakerStackBackend(dip_switch_id = 11, port=\"/dev/cu.usbserial-123\")\n", + "iis_stack_2 = IncubatorShakerStack(backend=iis_stack_backend_2)\n", + "await iis_stack_2.setup(verbose=True)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/user_guide/01_material-handling/storage/storage.rst b/docs/user_guide/01_material-handling/storage/storage.rst index 084e5d902c2..4c844c1fd39 100644 --- a/docs/user_guide/01_material-handling/storage/storage.rst +++ b/docs/user_guide/01_material-handling/storage/storage.rst @@ -140,3 +140,4 @@ Combined Retrieval & Access Summary :hidden: cytomat + inheco/incubator_shaker diff --git a/pylabrobot/io/serial.py b/pylabrobot/io/serial.py index 5e35289087b..80d11b19606 100644 --- a/pylabrobot/io/serial.py +++ b/pylabrobot/io/serial.py @@ -9,6 +9,7 @@ try: import serial + import serial.tools.list_ports HAS_SERIAL = True except ImportError as e: @@ -35,7 +36,9 @@ class Serial(IOBase): def __init__( self, - port: str, + port: Optional[str] = None, + vid: Optional[str] = None, + pid: Optional[str] = None, baudrate: int = 9600, bytesize: int = 8, # serial.EIGHTBITS parity: str = "N", # serial.PARITY_NONE @@ -45,6 +48,8 @@ def __init__( rtscts: bool = False, ): self._port = port + self._vid = vid + self._pid = pid self.baudrate = baudrate self.bytesize = bytesize self.parity = parity @@ -55,22 +60,110 @@ def __init__( self.timeout = timeout self.rtscts = rtscts + # Instant parameter validation at init time + if not self._port and not (self._vid and self._pid): + raise ValueError("Must specify either port or vid and pid.") + if get_capture_or_validation_active(): raise RuntimeError("Cannot create a new Serial object while capture or validation is active") @property def port(self) -> str: + assert self._port is not None, "Port not set. Did you call setup()?" return self._port async def setup(self): + """ + Initialize the serial connection to the device. + + This method resolves the appropriate serial port (either from an explicitly + provided path or by scanning for devices matching the configured USB + VID:PID pair), validates that the detected/selected port corresponds to + the expected hardware, and opens the serial connection in a dedicated + threadpool executor to avoid blocking the asyncio event loop. + + **Behavior:** + - Ensures `pyserial` is installed; otherwise raises `RuntimeError`. + - If a port is not explicitly provided: + - Scans all available COM ports and filters them by matching + `VID:PID` against the device's hardware ID string. + - Raises an error if zero matches are found. + - Raises an error if multiple matches are found and no port is specified. + - If a port *is* explicitly provided: + - Verifies that it matches the specified VID/PID (when provided). + - Logs the port choice for traceability. + - Opens the serial port using the configured parameters + (baudrate, bytesize, parity, etc.) via `loop.run_in_executor` to + ensure non-blocking operation. + - Cleans up the executor and re-raises the exception if the port cannot be opened. + + **Raises:** + RuntimeError: + - If `pyserial` is missing. + - If no matching serial devices are found for the given VID/PID and + no explicit port was provided. + - If multiple matching devices exist and the port is ambiguous. + - If an explicitly provided port does not match the VID/PID. + serial.SerialException: + - If the serial connection fails to open (e.g., device already in use). + + After successful completion, `self._ser` is an open `serial.Serial` + instance and `self._port` is updated to the resolved port path. + """ + if not HAS_SERIAL: raise RuntimeError(f"pyserial not installed. Import error: {_SERIAL_IMPORT_ERROR}") + loop = asyncio.get_running_loop() self._executor = ThreadPoolExecutor(max_workers=1) + # 1. VID:PID specified - port maybe + if self._vid and self._pid: + matching_ports = [ + p.device + for p in serial.tools.list_ports.comports() + if f"{self._vid}:{self._pid}" in (p.hwid or "") + ] + + # 1.a. No matching devices found AND no port specified + if not self._port and not matching_ports: + raise RuntimeError( + f"No machines found for VID={self._vid}, PID={self._pid}, and no port specified." + ) + + else: + matching_ports = [] + + # 2. VID:PID maybe - port specified + if self._port: # Port explicitly specified + candidate_port = self._port + + # 2.a. Port specified but does not match VID:PID - sanity check (e.g. typo in port) + if (self._vid and self._pid) and candidate_port not in matching_ports: + raise RuntimeError( + f"Specified port {candidate_port} not found among machines: {matching_ports} " + f"with VID={self._vid}:PID={self._pid}." + ) + else: # -> WINNER by port specification + logger.info( + f"Using explicitly provided port: {candidate_port} (for VID={self._vid}, PID={self._pid})", + ) + + # 3. VID:PID specified - port not specified -> Single device found -> WINNER by VID:PID search + elif len(matching_ports) == 1: + candidate_port = matching_ports[0] + + # 4. VID:PID specified - port not specified -> Multiple devices found -> ambiguity! + else: + raise RuntimeError( + f"Multiple devices detected with VID:PID {self._vid}:{self._pid}.\n" + f"Detected ports: {matching_ports}\n" + "Please specify the correct port address explicitly (e.g. /dev/ttyUSB0 or COM3)." + ) + def _open_serial() -> serial.Serial: return serial.Serial( - port=self._port, + port=candidate_port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, @@ -82,6 +175,7 @@ def _open_serial() -> serial.Serial: try: self._ser = await loop.run_in_executor(self._executor, _open_serial) + except serial.SerialException as e: logger.error("Could not connect to device, is it in use by a different notebook/process?") if self._executor is not None: @@ -89,52 +183,88 @@ def _open_serial() -> serial.Serial: self._executor = None raise e + self._port = candidate_port + async def stop(self): + """Close the serial device.""" + if self._ser is not None and self._ser.is_open: loop = asyncio.get_running_loop() + if self._executor is None: raise RuntimeError("Call setup() first.") await loop.run_in_executor(self._executor, self._ser.close) + if self._executor is not None: self._executor.shutdown(wait=True) self._executor = None async def write(self, data: bytes): + """Write data to the serial device.""" + assert self._ser is not None, "forgot to call setup?" + assert self._port is not None, "Port not set. Did you call setup()?" + loop = asyncio.get_running_loop() + if self._executor is None: raise RuntimeError("Call setup() first.") + await loop.run_in_executor(self._executor, self._ser.write, data) + logger.log(LOG_LEVEL_IO, "[%s] write %s", self._port, data) capturer.record( SerialCommand(device_id=self._port, action="write", data=data.decode("unicode_escape")) ) async def read(self, num_bytes: int = 1) -> bytes: + """Read data from the serial device.""" + assert self._ser is not None, "forgot to call setup?" + assert self._port is not None, "Port not set. Did you call setup()?" + loop = asyncio.get_running_loop() + if self._executor is None: raise RuntimeError("Call setup() first.") + data = await loop.run_in_executor(self._executor, self._ser.read, num_bytes) - logger.log(LOG_LEVEL_IO, "[%s] read %s", self._port, data) - capturer.record( - SerialCommand(device_id=self._port, action="read", data=data.decode("unicode_escape")) - ) + + if len(data) != 0: + logger.log(LOG_LEVEL_IO, "[%s] read %s", self._port, data) + capturer.record( + SerialCommand(device_id=self._port, action="read", data=data.decode("unicode_escape")) + ) + return cast(bytes, data) async def readline(self) -> bytes: # type: ignore # very dumb it's reading from pyserial + """Read a line from the serial device.""" + assert self._ser is not None, "forgot to call setup?" + assert self._port is not None, "Port not set. Did you call setup()?" + loop = asyncio.get_running_loop() + if self._executor is None: raise RuntimeError("Call setup() first.") + data = await loop.run_in_executor(self._executor, self._ser.readline) - logger.log(LOG_LEVEL_IO, "[%s] readline %s", self._port, data) - capturer.record( - SerialCommand(device_id=self._port, action="readline", data=data.decode("unicode_escape")) - ) + + if len(data) != 0: + logger.log(LOG_LEVEL_IO, "[%s] readline %s", self._port, data) + capturer.record( + SerialCommand(device_id=self._port, action="readline", data=data.decode("unicode_escape")) + ) + return cast(bytes, data) async def send_break(self, duration: float): + """Send a break condition for the specified duration.""" + + assert self._ser is not None, "forgot to call setup?" + assert self._port is not None, "Port not set. Did you call setup()?" + loop = asyncio.get_running_loop() if self._executor is None: raise RuntimeError("Call setup() first.") @@ -150,6 +280,8 @@ def _send_break(ser, duration: float) -> None: async def reset_input_buffer(self): assert self._ser is not None, "forgot to call setup?" + assert self._port is not None, "Port not set. Did you call setup()?" + loop = asyncio.get_running_loop() if self._executor is None: raise RuntimeError("Call setup() first.") @@ -159,6 +291,8 @@ async def reset_input_buffer(self): async def reset_output_buffer(self): assert self._ser is not None, "forgot to call setup?" + assert self._port is not None, "Port not set. Did you call setup()?" + loop = asyncio.get_running_loop() if self._executor is None: raise RuntimeError("Call setup() first.") diff --git a/pylabrobot/storage/__init__.py b/pylabrobot/storage/__init__.py index 54237cb6c27..0a987a7df1f 100644 --- a/pylabrobot/storage/__init__.py +++ b/pylabrobot/storage/__init__.py @@ -2,3 +2,4 @@ from .chatterbox import IncubatorChatterboxBackend from .cytomat import CytomatBackend from .incubator import Incubator +# from .inheco import * diff --git a/pylabrobot/storage/inheco/__init__.py b/pylabrobot/storage/inheco/__init__.py new file mode 100644 index 00000000000..9962248119c --- /dev/null +++ b/pylabrobot/storage/inheco/__init__.py @@ -0,0 +1,3 @@ +"""A hybrid between pylabrobot.shaking and pylabrobot.temperature_controlling""" +from .incubator_shaker import IncubatorShakerStack +from .incubator_shaker_backend import InhecoIncubatorShakerStackBackend, InhecoIncubatorShakerUnit diff --git a/pylabrobot/storage/inheco/incubator_shaker.py b/pylabrobot/storage/inheco/incubator_shaker.py new file mode 100644 index 00000000000..df2ee95b64f --- /dev/null +++ b/pylabrobot/storage/inheco/incubator_shaker.py @@ -0,0 +1,204 @@ +from typing import Dict + +from pylabrobot.machines.machine import Machine +from pylabrobot.resources import Coordinate, Resource, ResourceHolder + +from .incubator_shaker_backend import InhecoIncubatorShakerStackBackend, InhecoIncubatorShakerUnit + + +class IncubatorShakerStack(Resource, Machine): + """Frontend for a stack of INHECO Incubator/Shaker units. + + - Combines Carrier (geometric resource) and Machine (device lifecycle). + - Owns a backend instance for serial communication. + - Handles sequential (not concurrent) setup/teardown across all units. + """ + + def __init__(self, backend: InhecoIncubatorShakerStackBackend): + Resource.__init__( + self, + name="inheco_incubator_shaker_stack", + size_x=149.0, + size_y=268.5, + size_z=58.0, # MP: 58, Shaker MP: 88.5, DWP: 104, Shaker DWP: 139 mm + category="incubator_shaker_stack", + ) + + Machine.__init__(self, backend=backend) + self.backend: InhecoIncubatorShakerStackBackend = backend + self.units: list[InhecoIncubatorShakerUnit] = [] + self.loading_trays: list[ResourceHolder] = [] + + @property + def num_units(self) -> int: + """Return number of connected units in the stack.""" + return len(self.units) + + # ------------------------------------------------------------------------ + # Lifecycle & Resource setup + # ------------------------------------------------------------------------ + + _incubator_size_z_dict = { + "incubator_mp": 58.0, + "incubator_shaker_mp": 88.5, + "incubator_dwp": 104, + "incubator_shaker_dwp": 139, + } + _incubator_loading_tray_location = { # TODO: rough measurements, verify + "incubator_mp": None, # TODO: add when available + "incubator_shaker_mp": Coordinate(x=30.5, y=-150.5, z=51.2), + "incubator_dwp": None, # TODO: add when available + "incubator_shaker_dwp": Coordinate(x=30.5, y=-150.5, z=51.2), + } + + _possible_tray_y_coordinates = { + "open": -150.5, # TODO: verify by careful testing in controlled geometry setup + "closed": +24.0, + } + + _chamber_z_clearance = 2 + + _acceptable_plate_z_dimensions = { + "incubator_mp": 18 - _chamber_z_clearance, + "incubator_shaker_mp": 50 - _chamber_z_clearance, + "incubator_dwp": 18 - _chamber_z_clearance, + "incubator_shaker_dwp": 53 - _chamber_z_clearance, + } + + _incubator_power_credits_per_type = { + "incubator_mp": 1.0, + "incubator_dwp": 1.25, + "incubator_shaker_mp": 1.6, + "incubator_shaker_dwp": 2.5, + } + + async def setup(self, **backend_kwargs) -> None: + """Connect to the stack and build per-unit proxies.""" + + await self.backend.setup(**backend_kwargs) + + self.power_credit = 0.0 + + # Calculate true stack size + stack_size_z = 0.0 + + for i in range(self.backend.number_of_connected_units): + # Create unit proxies + unit = InhecoIncubatorShakerUnit(self.backend, index=i) + self.units.append(unit) + + # Create loading tray resources and calculate their locations + unit_type = self.backend.unit_composition[i] + self.power_credit += self._incubator_power_credits_per_type[unit_type] + unit_size_z = self._incubator_size_z_dict[unit_type] + + loading_tray = ResourceHolder( + size_x=127.76, size_y=85.48, size_z=0, name=f"unit-{i}-loading-tray" + ) + self.loading_trays.append(loading_tray) + + loc = self._incubator_loading_tray_location[unit_type] + if loc is None: + raise ValueError( + f"Loading tray location for unit type {unit_type} is not defined. " "Cannot set up stack." + ) + + self.assign_child_resource( + loading_tray, + location=Coordinate( + x=loc.x, + y=self._possible_tray_y_coordinates[ + "closed" + ], # setup finishes with all loading trays closed + z=stack_size_z + loc.z, + ), + ) + stack_size_z += unit_size_z + + self._size_z = stack_size_z + + assert ( + self.power_credit < 5 + ), f"Too many units: unit composition {self.backend.unit_composition} is exceeding 5 power credit limit. Reduce number of units." + + async def stop(self): + """Gracefully stop backend communication.""" + await self.backend.stop() + + async def request_loading_tray_states(self) -> dict: + """Request loading tray states for all units.""" + + return { + unit_index: await self.backend.request_drawer_status(stack_index=unit_index) + for unit_index in range(self.num_units) + } + + async def request_temperature_control_states(self) -> dict: + """Request temperature control states for all units.""" + + return { + unit_index: await self.backend.is_temperature_control_enabled(stack_index=unit_index) + for unit_index in range(self.num_units) + } + + async def request_shaking_states(self) -> dict: + """Request shaking states for all units.""" + + return { + unit_index: await self.backend.is_shaking_enabled(stack_index=unit_index) + for unit_index in range(self.num_units) + } + + # ------------------------------------------------------------------------ + # Stack to unit master commands + # ------------------------------------------------------------------------ + + async def open_all(self) -> None: + """Open all units in the stack.""" + for i in range(self.num_units): + await self.units[i].open() + + async def close_all(self) -> None: + """Close all units in the stack.""" + for i in range(self.num_units): + await self.units[i].close() + + async def start_all_temperature_control(self, target_temperature: float) -> None: + """Start temperature control for all units in the stack.""" + for i in range(self.num_units): + await self.units[i].start_temperature_control(target_temperature) + + async def get_all_temperatures(self) -> Dict[int, float]: + """Get current temperature for all units in the stack.""" + temperatures = {} + for i in range(self.num_units): + temp = await self.units[i].get_temperature() + temperatures[i] = temp + return temperatures + + async def stop_all_temperature_control(self) -> None: + """Stop temperature control for all units in the stack.""" + for i in range(self.num_units): + await self.units[i].stop_temperature_control() + + async def shake(self, *args, **kwargs) -> None: + """Start shaking for all units in the stack.""" + for i in range(self.num_units): + await self.units[i].shake(*args, **kwargs) + + async def stop_all_shaking(self) -> None: + """Stop shaking for all units in the stack.""" + for i in range(self.num_units): + await self.units[i].stop_shaking() + + # ------------------------------------------------------------------------ + # Unit accessors + # ------------------------------------------------------------------------ + + def __getitem__(self, index: int) -> InhecoIncubatorShakerUnit: + """Access a unit proxy via stack[index].""" + return self.units[index] + + def __len__(self): + """Return number of connected units.""" + return len(self.units) diff --git a/pylabrobot/storage/inheco/incubator_shaker_backend.py b/pylabrobot/storage/inheco/incubator_shaker_backend.py new file mode 100644 index 00000000000..70f473590f8 --- /dev/null +++ b/pylabrobot/storage/inheco/incubator_shaker_backend.py @@ -0,0 +1,1796 @@ +""" +Backend for Inheco Incubator Shaker Stack machine. + +This module implements a fully asynchronous serial communication backend for +Inheco Incubator/Shaker instruments (e.g., Inheco MP/DWP with or without shaker). + +Features: +- Auto-discovery of Inheco devices by VID:PID (error if more than one is found). +- Validation of DIP switch ID. +- Automatic identification of units in stack. +- Complete command/response layer with legacy CRC-8 and async-safe I/O. +- Structured firmware error reporting via InhecoError and contextual snapshotting. +- High-level API for temperature control, drawer handling, and shaking functions. +- Protocol-conformant parsing for EEPROM, sensor, and status commands. +""" + +import asyncio +import logging +import sys +from functools import wraps +from typing import Awaitable, Callable, Dict, List, Literal, Optional, TypeVar, cast + +from pylabrobot.io.serial import Serial +from pylabrobot.machines.machine import MachineBackend + +if sys.version_info < (3, 10): + from typing_extensions import Concatenate, ParamSpec +else: + from typing import Concatenate, ParamSpec +try: + import serial + + HAS_SERIAL = True +except ImportError as e: + HAS_SERIAL = False + _SERIAL_IMPORT_ERROR = e + + +P = ParamSpec("P") +R = TypeVar("R") + + +class InhecoError(RuntimeError): + """Represents an Inheco firmware-reported error.""" + + def __init__(self, command: str, code: str, message: str): + super().__init__(f"{command} failed with error {code}: {message}") + self.command: str = command + self.code: str = code + self.message: str = message + + +_REF_FLAG_NAMES: Dict[int, str] = { + # Heater (0-15) — names per manual's heater flags table (subset shown here) + 0: "H_WARN_WarmUp_TIME", + 1: "H_WARN_BoostCoolDown_TIME", + 2: "H_WARN_StartState_LIMIT_Up_TEMP_S2", + 3: "H_WARN_StartState_LIMIT_Up_TEMP_S3", + 4: "H_WARN_StartStateBoost_LIMIT_UpDown_TEMP_S3", + 5: "H_WARN_StableState_LIMIT_UpDown_TEMP_S2", + 6: "H_WARN_StableState_LIMIT_UpDown_TEMP_S3", + 7: "H_WARN_DELTA_TEMP_S1_S2", + 8: "H_ERR_DELTA_TEMP_S1_S2", + 9: "H_WARN_StartStateBoost_LIMIT_UpDown_TEMP_S2", + 10: "H_WARN_WaitStable_LIMIT_TEMP_S1", + 11: "H_WARN_WaitStable_LIMIT_TEMP_S2", + 12: "H_WARN_WaitStable_LIMIT_TEMP_S3", + 13: "H_ERR_S2_NTC_NotConnected", + 14: "H_ERR_S3_NTC_NotConnected", + 15: "H_WARN_DELTA_TEMP_S1_S3", + # Shaker (16-26) — names per manual's shaker flag set (page 39) + 16: "S_WARN_MotorCurrentLimit", + 17: "S_WARN_TargetSpeedTimeout", + 18: "S_WARN_PositionTimeout", + 19: "S_ERR_MotorTemperatureLimit", + 20: "S_ERR_TargetSpeedDeviation", + 21: "S_ERR_HomeSensorTimeout", + 22: "S_ERR_MotorDriverFault", + 23: "S_ERR_EncoderSignalLost", + 24: "S_ERR_AmplitudeOutOfRange", + 25: "S_ERR_VibrationExcessive", + 26: "S_ERR_InternalTimeout", + # 27-31 reserved +} + +FIRMWARE_ERROR_MAP: Dict[int, str] = { + 0: "Msg Ok", + 1: "Reset detected", + 2: "Invalid command", + 3: "Invalid operand", + 4: "Protocol error", + 5: "Reserved", + 6: "Timeout from Device", + 7: "Device not initialized", + 8: "Command not executable", + 9: "Drawer not in end position", + 10: "Unexpected Labware Status", + 13: "Drawer DWP not perfectly closed (NTC not connected)", + 14: "Floor ID error", + 15: "Timeout sub device", +} + +InhecoIncubatorUnitType = Literal[ + "incubator_mp", "incubator_shaker_mp", "incubator_dwp", "incubator_shaker_dwp", "unknown" +] + + +class InhecoIncubatorShakerStackBackend(MachineBackend): + """Interface for Inheco Incubator Shaker stack machines. + + Handles: + - USB/serial connection setup via VID/PID + - DIP switch ID verification + - Message framing, CRC generation + - Complete async read/write of firmware responses + - Binary-safe parsing and error mapping + """ + + # === Logging utility === + + # === Constructor === + + def __init__( + self, + dip_switch_id: int = 2, + port: Optional[str] = None, + write_timeout: float = 5.0, + read_timeout: float = 10.0, + vid: str = "0403", + pid: str = "6001", + ): + super().__init__() + + self.logger = logging.LoggerAdapter( + logging.getLogger(__name__), + {"dip_switch_id": dip_switch_id}, + ) + + # Core state + self.dip_switch_id = dip_switch_id + + self.io = Serial( + port=port, + vid=vid, + pid=pid, + baudrate=19_200, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=0, + write_timeout=1, + ) + + # Communication timeouts (defaults for all units in stack) + self.write_timeout = write_timeout + self.read_timeout = read_timeout + + # Cached state (stack level) + self.setup_finished = False + self.max_temperature = 85.0 # safe default + self.unit_composition: List[ + InhecoIncubatorUnitType + ] = [] # e.g. ["incubator_mp", "incubator_shaker_dwp", ...] + + self._send_command_lock = asyncio.Lock() + + @property + def number_of_connected_units(self) -> int: + """Return the number of connected units in the stack.""" + return len(self.unit_composition) + + def __repr__(self): + return ( + f"" + ) + + async def setup(self, port: Optional[str] = None): + """ + Detect and connect to the Inheco machine stack. + Discover Inheco device via VID:PID (0403:6001) and verify DIP switch ID. + """ + + # --- Establish serial connection --- + await self.io.setup() + + try: # --- Verify DIP switch ID via RTS --- + probe = self._build_message("RTS", stack_index=0) + await self.io.write(probe) + resp = await self._read_full_response(timeout=5.0) + + expected_hdr = (0xB0 + self.dip_switch_id) & 0xFF + if resp[0] != expected_hdr: + raise ValueError("Unexpected header") + + except Exception as e: + # Capture current IO reference before marking disconnected + msg = ( + f"Device on {self.io._port} failed DIP switch verification (expected ID=" + f"{self.dip_switch_id}). Please verify the DIP switch setting or wiring." + ) + self.logger.error(msg, exc_info=e) + + # --- Fail-safe teardown --- + try: + await self.io.stop() + self.logger.debug("Closed serial connection on %s", self.io.port) + except Exception as close_err: + self.logger.warning( + "Failed to close serial port cleanly on %s: %s", + self.io._port, + close_err, + ) + raise RuntimeError(msg) from e + + else: + # Connection verified and active + self.logger.log( + logging.INFO, + f"Connected to Inheco machine at {self.io.port} (DIP={self.dip_switch_id})", + ) + + # --- Cache stack-level state --- + number_of_connected_units = await self.request_number_of_connected_machines(stack_index=0) + + self.unit_composition = [] + + for unit_index in range(number_of_connected_units): + inc_type = await self.request_incubator_type(stack_index=unit_index) + self.unit_composition.append(inc_type) + + await self.initialize(stack_index=unit_index) + + self.setup_finished = True + + self.logger.info( + "Connected to Inheco Incubator Shaker Stack on %s\n" + "DIP switch ID of bottom unit: %s\n" + "Number of connected units: %s\n" + "Unit composition: %s", + self.io.port, + self.dip_switch_id, + self.number_of_connected_units, + self.unit_composition, + ) + + async def stop(self): + """Close serial connection & stop all active units the stack.""" + + for unit_index in range(self.number_of_connected_units): + temp_status = await self.is_temperature_control_enabled(stack_index=unit_index) + + if temp_status: + print(f"Stopping temperature control on unit {unit_index}...") + await self.stop_temperature_control(stack_index=unit_index) + + shake_status = await self.is_shaking_enabled(stack_index=unit_index) + + if shake_status: + print(f"Stopping shaking on unit {unit_index}...") + await self.stop_shaking(stack_index=unit_index) + + await self.io.stop() + + # === Low-level I/O === + + async def _read_full_response(self, timeout: float) -> bytes: + """Read a complete Inheco response frame asynchronously.""" + loop = asyncio.get_event_loop() + start = loop.time() + buf = bytearray() + expected_hdr = (0xB0 + self.dip_switch_id) & 0xFF + + def has_complete_tail(b: bytearray) -> bool: + # Valid frame ends with: [hdr][0x20-0x2F][0x60] + return len(b) >= 3 and b[-1] == 0x60 and b[-3] == expected_hdr and 0x20 <= b[-2] <= 0x2F + + while True: + chunk = await self.io.read(16) + if len(chunk) > 0: + buf.extend(chunk) + if has_complete_tail(buf): + self.logger.debug("RECV response: %s", buf.hex(" ")) + return bytes(buf) + + if loop.time() - start > timeout: + raise TimeoutError(f"Timed out waiting for complete response (so far: {buf.hex(' ')})") + + await asyncio.sleep(0.005) + + # === Encoding / Decoding === + + def _crc8_legacy(self, data: bytearray) -> int: + """Compute legacy CRC-8 used by Inheco devices.""" + crc = 0xA1 + for byte in data: + d = byte + for _ in range(8): + if (d ^ crc) & 1: + crc ^= 0x18 + crc >>= 1 + crc |= 0x80 + else: + crc >>= 1 + d >>= 1 + return crc & 0xFF + + def _build_message(self, command: str, stack_index: int = 0) -> bytes: + """Construct a full binary message with header and CRC.""" + if not (0 <= stack_index <= 5): + raise ValueError("stack_index must be between 0 and 5") + cmd = f"T0{stack_index}{command}".encode("ascii") + length = len(cmd) + 3 + address = (0x30 + self.dip_switch_id) & 0xFF + proto = (0xC0 + len(cmd)) & 0xFF + message = bytearray([length, address, proto]) + cmd + crc = self._crc8_legacy(message) + return bytes(message + bytearray([crc])) + + def _is_report_command(self, command: str) -> bool: + """Return True if command is a 'Report' type (starts with 'R').""" + return len(command) > 0 and command[0].upper() == "R" + + # === Response parsing === + + def _parse_response_binary_safe(self, resp: bytes) -> dict: + """Parse Inheco response frames safely (binary & multi-segment).""" + if len(resp) < 3: + raise ValueError("Incomplete response") + + expected_hdr = (0xB0 + self.dip_switch_id) & 0xFF + + # Trim any leading junk before first valid header + try: + start_idx = resp.index(bytes([expected_hdr])) + frame = resp[start_idx:] + except ValueError: + return { + "device": None, + "error_code": None, + "ok": False, + "data": "", + "raw_data": resp, + } + + # Validate tail + if len(frame) < 3 or frame[-1] != 0x60: + return { + "device": expected_hdr - 0xB0, + "error_code": None, + "ok": False, + "data": "", + "raw_data": frame, + } + + err_byte = frame[-2] + err_code = err_byte - 0x20 if 0x20 <= err_byte <= 0x2F else None + + # Extract data between headers + data_blocks = [] + i = 1 # start after first header + while i < len(frame) - 3: + try: + next_hdr = frame.index(bytes([expected_hdr]), i) + except ValueError: + next_hdr = len(frame) - 3 + if next_hdr > i: + data_blocks.append(frame[i:next_hdr]) + i = next_hdr + 1 + if next_hdr >= len(frame) - 3: + break + + raw_data = b"".join(data_blocks) + try: + ascii_data = raw_data.decode("ascii").strip("\x00") + except UnicodeDecodeError: + ascii_data = raw_data.hex() + + return { + "device": expected_hdr - 0xB0, + "error_code": err_code, + "ok": (err_code == 0), + "data": ascii_data, + "raw_data": raw_data, + } + + def _is_error_tail(self, resp: bytes) -> bool: + """Return True if the response ends in an explicit firmware error tail.""" + expected_hdr = (0xB0 + self.dip_switch_id) & 0xFF + return len(resp) >= 3 and resp.endswith(bytes([expected_hdr, 0x28, 0x60])) + + # === Command Layer === + + async def send_command( + self, + command: str, + *, + delay: float = 0.2, + write_timeout: Optional[float] = None, + read_timeout: Optional[float] = None, + stack_index: int = 0, + ) -> str: + """Send a framed command and return parsed response or raise InhecoError.""" + + async with self._send_command_lock: + # Use global default if not overridden + w_timeout = write_timeout or self.write_timeout + msg = self._build_message(command, stack_index=stack_index) + self.logger.debug("SEND command: %s (write_timeout=%s)", msg.hex(" "), w_timeout) + + await asyncio.wait_for(self.io.write(msg), timeout=w_timeout) + await asyncio.sleep(delay) + + response = await self._read_full_response(timeout=read_timeout or self.read_timeout) + if not response: + raise TimeoutError(f"No response from device for command: {command}") + + if self._is_error_tail(response): + tail_err = response[-2] - 0x20 + code = f"E{tail_err:02d}" + message = FIRMWARE_ERROR_MAP.get(tail_err, "Unknown firmware error") + raise InhecoError(command, code, message) + + parsed = self._parse_response_binary_safe(response) + if not parsed["ok"]: + code = f"E{parsed.get('error_code', 0):02d}" + message = FIRMWARE_ERROR_MAP.get(parsed.get("error_code", 0), "Unknown firmware error") + raise InhecoError(command, code, message) + + return str(parsed["data"]) + + # === Public high-level API === + + # Querying Machine State # + async def request_firmware_version(self, stack_index: int) -> str: + """EEPROM request: Return the firmware version string.""" + resp = await self.send_command("RFV0", stack_index=stack_index) + return resp + + async def request_serial_number(self, stack_index: int) -> str: + """EEPROM request: Return the device serial number.""" + resp = await self.send_command("RFV2", stack_index=stack_index) + return resp + + async def request_last_calibration_date(self, stack_index: int) -> str: + """EEPROM request""" + resp = await self.send_command("RCM", stack_index=stack_index) + return resp[:10] + + async def request_machine_allocation(self, layer: int = 0, stack_index: int = 0) -> dict: + """ + Report which device slots are occupied on a given layer (firmware 'RDAx,0'). + + Args: + layer: Layer index (0-7). Default = 0. + + Returns: + { + "layer": int, + "slot_mask": int, # e.g. 7 + "slot_mask_bin": str, # e.g. "0b0000000000000111" + "slots_connected": list[int] # e.g. [0, 1, 2] + } + + Notes: + Each bit in `slot_mask` represents one of 16 possible device slots: + bit=1 means a device is connected; bit=0 means empty. + """ + if not (0 <= layer <= 7): + raise ValueError(f"Layer must be between 0 and 7, got {layer}") + + resp = await self.send_command(f"RDA{layer},0", stack_index=stack_index) + resp_str = str(resp).strip() + slot_mask = int(resp_str) + slot_mask_bin = f"0b{slot_mask:016b}" + + slots_connected = [i for i in range(16) if (slot_mask >> i) & 1] + + return { + "layer": layer, + "slot_mask": slot_mask, + "slot_mask_bin": slot_mask_bin, + "slots_connected": slots_connected, + } + + async def request_number_of_connected_machines(self, layer: int = 0, stack_index: int = 0) -> int: + """ + Report the number of connected Inheco devices on a layer (RDAx,1). + + Args: + layer: Layer index (0-7). Default = 0. + + Returns: + Number of connected devices (0-16). + + Example: + Response "3" → 3 connected devices on that layer. + """ + if not (0 <= layer <= 7): + raise ValueError(f"Layer must be 0-7, got {layer}") + + resp = await self.send_command(f"RDA{layer},1", stack_index=stack_index) + return int(resp.strip()) + + async def request_labware_detection_threshold(self, stack_index: int) -> int: + """EEPROM request""" + resp = await self.send_command("RDM", stack_index=stack_index) + return int(resp) + + async def request_incubator_type(self, stack_index: int) -> InhecoIncubatorUnitType: + """Return a descriptive string of the incubator/shaker configuration.""" + + incubator_type_dict = { + "0": "incubator_mp", # no shaker + "1": "incubator_shaker_mp", + "2": "incubator_dwp", # no shaker + "3": "incubator_shaker_dwp", + } + resp = await self.send_command("RTS", stack_index=stack_index) + return incubator_type_dict.get(resp, "unknown") # type: ignore[return-value] + + async def request_plate_in_incubator(self, stack_index: int) -> bool: + """Sensor command:""" + resp = await self.send_command("RLW", stack_index=stack_index) + return resp == "1" + + async def request_operation_time_in_hours(self, stack_index: int) -> int: + """EEPROM request""" + resp = await self.send_command("RDC1", stack_index=stack_index) + return int(resp) + + async def request_drawer_cycles_performed(self, stack_index: int) -> int: + """EEPROM request""" + resp = await self.send_command("RDC2", stack_index=stack_index) + return int(resp) + + async def request_is_initialized(self, stack_index: int) -> bool: + """EEPROM request""" + resp = await self.send_command("REE", stack_index=stack_index) + return resp in {"0", "2"} + + async def request_plate_status_known(self, stack_index: int) -> bool: + """EEPROM request""" + resp = await self.send_command("REE", stack_index=stack_index) + return resp in {"0", "1"} + + async def request_thermal_calibration_date(self, stack_index: int) -> str: + """EEPROM request: Query the date of the last thermal calibration. + + Returns: + Calibration date in ISO format 'YYYY-MM-DD'. + """ + resp = await self.send_command("RCD", stack_index=stack_index) + date = resp.strip() + if not date or len(date) != 10 or date.count("-") != 2: + raise RuntimeError(f"Unexpected RCD response: {resp!r}") + return date + + # TODO: Command Placeholders + + async def request_calibration_low(self, sensor: int, format: int) -> float: + raise NotImplementedError("RCL (Report Calibration Low) not implemented yet.") + + async def request_calibration_high(self, sensor: int, format: int) -> float: + raise NotImplementedError("RCH (Report Calibration High) not implemented yet.") + + async def request_whole_calibration_data(self, key: str) -> bytes: + raise NotImplementedError("RWC (Read Whole Calibration Data) not implemented yet.") + + async def request_proportionality_factor(self) -> int: + raise NotImplementedError("RPF (Report Proportionality Factor) not implemented yet.") + + async def set_max_allowed_device_temperature(self, key: str, temperature: int) -> None: + raise NotImplementedError("SMT (Set Max Allowed Device Temperature) not implemented yet.") + + async def set_pid_proportional_gain(self, key: str, value: int) -> None: + raise NotImplementedError("SPP (Set PID Proportional Gain) not implemented yet.") + + async def set_pid_integration_value(self, key: str, value: int) -> None: + raise NotImplementedError("SPI (Set PID Integration Value) not implemented yet.") + + async def delete_counter(self, key: str, selector: int) -> None: + raise NotImplementedError("SDC (Set Delete Counter) not implemented yet.") + + async def set_boost_offset(self, offset: int) -> None: + raise NotImplementedError("SBO (Set Boost Offset) not implemented yet.") + + async def set_boost_time(self, time_s: int) -> None: + raise NotImplementedError("SBT (Set Boost Time) not implemented yet.") + + async def set_cooldown_time_factor(self, value: int) -> None: + raise NotImplementedError("SHK (Set Cool-Down Time Evaluation Factor) not implemented yet.") + + async def set_heatup_time_factor(self, value: int) -> None: + raise NotImplementedError("SHH (Set Heat-Up Time Evaluation Factor) not implemented yet.") + + async def set_heatup_offset(self, offset: int) -> None: + raise NotImplementedError("SHO (Set Heat-Up Offset) not implemented yet.") + + async def set_calibration_low(self, key: str, sensor1: int, sensor2: int, sensor3: int) -> None: + raise NotImplementedError("SCL (Set Calibration Low) not implemented yet.") + + async def set_calibration_high( + self, key: str, sensor1: int, sensor2: int, sensor3: int, date: str + ) -> None: + raise NotImplementedError("SCH (Set Calibration High and Date) not implemented yet.") + + async def reset_calibration_data(self, key: str) -> None: + raise NotImplementedError("SRC (Set Reset Calibration-Data) not implemented yet.") + + async def set_proportionality_factor(self, value: int) -> None: + raise NotImplementedError("SPF (Set Proportionality Factor) not implemented yet.") + + # # # Setup Requirement # # # + + async def initialize(self, stack_index: int) -> str: + """Initializes the machine unit after power-on. + All other Action-Commands of the device are prohibited before the command is executed. + On command AID, the drawer will be closed. If the drawer is hold in its open position, + no Error will be generated! + If AID is send during operating, the shaker and heater stop immediately! + """ + resp = await self.send_command("AID", stack_index=stack_index) + return resp + + # # # Loading Tray Features # # # + + async def open(self, stack_index: int) -> None: + """Open the incubator door & move loading tray out.""" + await self.send_command("AOD", stack_index=stack_index) + + async def close(self, stack_index: int) -> None: + """Move the loading tray in & close the incubator door.""" + await self.send_command("ACD", stack_index=stack_index) + + DrawerStatus = Literal["open", "closed"] + + async def request_drawer_status(self, stack_index: int) -> DrawerStatus: + """Report the current drawer (loading tray) status. + + Returns: + 'open' if the loading tray is open, 'closed' if closed. + + Notes: + - Firmware response: '1' = open, '0' = closed. + """ + resp = await self.send_command("RDS", stack_index=stack_index) + if resp == "1": + return "open" + if resp == "0": + return "closed" + raise ValueError(f"Unexpected RDS response: {resp!r}") + + # TODO: Drawer Placeholder Commands + + async def request_motor_power_clockwise(self) -> int: + raise NotImplementedError("RPR (Report Motor Power Clockwise) not implemented yet.") + + async def request_motor_power_anticlockwise(self) -> int: + raise NotImplementedError("RPL (Report Motor Power Anticlockwise) not implemented yet.") + + async def request_motor_current_limit_clockwise(self) -> int: + raise NotImplementedError("RGR (Report Motor Current Limit Clockwise) not implemented yet.") + + async def request_motor_current_limit_anticlockwise(self) -> int: + raise NotImplementedError("RGL (Report Motor Current Limit Anticlockwise) not implemented yet.") + + async def set_motor_power_clockwise(self, key: str, power: int) -> None: + raise NotImplementedError("SPR (Set Motor Power Clockwise) not implemented yet.") + + async def set_motor_power_anticlockwise(self, key: str, power: int) -> None: + raise NotImplementedError("SPL (Set Motor Power Anticlockwise) not implemented yet.") + + async def set_motor_current_limit_clockwise(self, key: str, current: int) -> None: + raise NotImplementedError("SGR (Set Motor Current Limit Clockwise) not implemented yet.") + + async def set_motor_current_limit_anticlockwise(self, key: str, current: int) -> None: + raise NotImplementedError("SGL (Set Motor Current Limit Anticlockwise) not implemented yet.") + + # # # Temperature Features # # # + + async def start_temperature_control(self, temperature: float, stack_index: int) -> None: + """Set and activate the target incubation temperature (°C). + + The device begins active heating toward the target temperature. + Passive cooling (firmware default) may occur automatically if the + target temperature is below ambient, depending on environmental conditions. + """ + + assert temperature < self.max_temperature, ( + "Target temperature must be below max temperature of the incubator, i.e. " + f"{self.max_temperature}C, target temperature given = {temperature}" + ) + + target = round(temperature * 10) + await self.send_command(f"STT{target}", stack_index=stack_index) # Store target temperature + await self.send_command("SHE1", stack_index=stack_index) # Enable temperature regulation + + async def stop_temperature_control(self, stack_index: int) -> None: + """Stop active temperature regulation. + + Disables the incubator's heating control loop. + The previously set target temperature remains stored in the + device's memory but is no longer actively maintained. + The incubator will passively drift toward ambient temperature. + """ + await self.send_command("SHE0", stack_index=stack_index) + + async def get_temperature( + self, + stack_index: int, + sensor: Literal["mean", "main", "dif", "boost"] = "main", + read_timeout: float = 60.0, + ) -> float: + """Return current measured temperature in °C.""" + + sensor_mapping = { + "mean": [1, 2, 3], + "main": [1], + "dif": [2], + "boost": [3], + } + vals = [] + for idx in sensor_mapping[sensor]: + val = await self.send_command(f"RAT{idx}", stack_index=stack_index, read_timeout=read_timeout) + vals.append(int(val) / 10.0) + return round(sum(vals) / len(vals), 2) + + async def request_target_temperature(self, stack_index: int) -> float: + """Return target temperature in °C.""" + + resp = await self.send_command("RTT", stack_index=stack_index) + + return int(resp) / 10 + + async def is_temperature_control_enabled(self, stack_index: int) -> bool: + """ + Return True if active temperature control is enabled (RHE = 1 or 2), + False if control is off (RHE = 0). + + Firmware response (RHE): + 0: control loop off (passive equilibrium) + 1: control loop on (heating/cooling active) + 2: control + booster on (extended heating mode) + """ + resp = await self.send_command("RHE", stack_index=stack_index) + try: + status = int(resp.strip()) + except ValueError: + raise InhecoError("RHE", "E00", f"Unexpected response: {resp!r}") + + if status not in (0, 1, 2): + raise InhecoError("RHE", "E00", f"Invalid heater status value: {status}") + + enabled = status in (1, 2) + + return enabled + + async def request_pid_controller_coefficients(self, stack_index: int) -> tuple[float, float]: + """ + Query the current PID controller coefficients. + + Returns: + (P, I): tuple of floats + - P: proportional gain (selector 1) + - I: integration value (selector 2; 0 = integration off) + """ + p_resp = await self.send_command("RPC1", stack_index=stack_index) + i_resp = await self.send_command("RPC2", stack_index=stack_index) + + try: + p = float(p_resp.strip()) + i = float(i_resp.strip()) + except ValueError: + raise RuntimeError(f"Unexpected RPC response(s): P={p_resp!r}, I={i_resp!r}") + + return p, i + + async def request_maximum_allowed_temperature( + self, stack_index: int, measured: bool = False + ) -> float: + """ + Query the maximum allowed or maximum measured device temperature (in °C). + + Args: + measured: + - False: report configured maximum allowed temperature (default) + - True: report maximum measured temperature since last reset + + Returns: + Temperature in °C (value / 10) + """ + selector = "1" if measured else "" + resp = await self.send_command(f"RMT{selector}", stack_index=stack_index) + try: + return int(resp.strip()) / 10.0 + except ValueError: + raise RuntimeError(f"Unexpected RMT response: {resp!r}") + + async def request_delta_temperature(self, stack_index: int) -> float: + """ + Query the absolute temperature difference between target and actual plate temperature. + + Returns: + Delta temperature in °C (positive if below target, negative if above target). + + Notes: + - Reported in 1/10 °C. + - Negative values indicate the plate is warmer than the target. + """ + resp = await self.send_command("RDT", stack_index=stack_index) + try: + return int(resp.strip()) / 10.0 + except ValueError: + raise RuntimeError(f"Unexpected RDT response: {resp!r}") + + async def wait_for_temperature( + self, + stack_index: int, + sensor: Literal["main", "dif", "boost", "mean"] = "main", + tolerance: float = 0.2, + interval_s: float = 0.5, + timeout_s: Optional[float] = 600.0, + show_progress_bar: bool = False, + ) -> float: + """ + Wait asynchronously until the target temperature is reached. + + Args: + sensor: Temperature sensor to monitor ("main", "dif", "boost", or "mean"). + tolerance: Acceptable difference (in °C) between current and target temperature. + interval_s: Polling interval in seconds. Default = 0.5 s. + timeout_s: Maximum time to wait in seconds. None disables timeout. Default = 600 s. + show_progress_bar: If True, display a dynamic ASCII progress bar in stdout. Default False. + + Returns: + Final measured temperature in °C once within tolerance. + + Raises: + TimeoutError: If target not reached within `timeout_s`. + ValueError: If temperature control is not enabled or no valid target returned. + """ + target_temp = await self.request_target_temperature(stack_index=stack_index) + if target_temp is None: + raise ValueError("Device did not return a valid target temperature.") + + temperature_control_enabled = await self.is_temperature_control_enabled(stack_index=stack_index) + if not temperature_control_enabled: + raise ValueError( + f"Temperature control is not enabled on the machine ({stack_index}: {self.unit_composition[stack_index]})." + ) + + start_time = asyncio.get_event_loop().time() + first_temp = await self.get_temperature(sensor=sensor, stack_index=stack_index) + initial_diff = abs(first_temp - target_temp) + bar_width = 40 + + if show_progress_bar: + print(f"Waiting for target temperature {target_temp:.2f} °C...\n") + + while True: + current_temp = await self.get_temperature(sensor=sensor, stack_index=stack_index) + diff = abs(current_temp - target_temp) + + # Compute normalized progress (1 = done) + progress = 1.0 - min(diff / max(initial_diff, 1e-6), 1.0) + filled = int(bar_width * progress) + bar = "█" * filled + "-" * (bar_width - filled) + + if show_progress_bar: + # Compute slope (°C/sec) based on direction of travel + delta_done = abs(current_temp - first_temp) + + elapsed = asyncio.get_event_loop().time() - start_time + + slope = delta_done / max(elapsed, 1e-6) # °C per second + + if slope > 0.0001: + remaining = diff # remaining °C + eta_s = remaining / slope + eta_str = f"{eta_s:6.1f}s" + else: + eta_str = " --- " + + sys.stdout.write(f"\r[{bar}] {current_temp:.2f} °C " f"(Δ={diff:.2f} °C | ETA: {eta_str})") + sys.stdout.flush() + + if diff <= tolerance: + if show_progress_bar: + sys.stdout.write("\n[OK] Target temperature reached.\n") + sys.stdout.flush() + + self.logger.info("Target temperature reached (%.2f °C).", current_temp) + return current_temp + + if timeout_s is not None: + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed > timeout_s: + if show_progress_bar: + sys.stdout.write("\n[ERROR] Timeout waiting for temperature.\n") + sys.stdout.flush() + + raise TimeoutError( + f"Timeout after {timeout_s:.1f}s: " + f"temperature {current_temp:.2f} °C " + f"did not reach target {target_temp:.2f} °C ±{tolerance:.2f} °C." + ) + + await asyncio.sleep(interval_s) + + # # # Shaking Features # # # + + @staticmethod + def requires_incubator_shaker( + func: Callable[Concatenate["InhecoIncubatorShakerStackBackend", P], Awaitable[R]], + ) -> Callable[Concatenate["InhecoIncubatorShakerStackBackend", P], Awaitable[R]]: + @wraps(func) + async def wrapper( + self: "InhecoIncubatorShakerStackBackend", *args: P.args, **kwargs: P.kwargs + ) -> R: + name = getattr(func, "__name__", func.__class__.__name__) + stack_index = cast(int, kwargs.get("stack_index", 0)) + incubator_type = self.unit_composition[stack_index] + + if "shaker" not in incubator_type: + raise RuntimeError(f"{name}() requires a shaker-capable model (got {incubator_type!r}).") + + return await func(self, *args, **kwargs) + + return wrapper + + @requires_incubator_shaker + async def request_shaker_frequency_x(self, stack_index: int, selector: int = 0) -> float: + """Read the set or actual shaker frequency in the X-direction. + + Args: + selector: 0 = to-be-set frequency, 1 = actual frequency. Default = 0. + + Returns: + Frequency in Hz. + """ + if selector not in (0, 1): + raise ValueError(f"Selector must be 0 or 1, got {selector}") + resp = await self.send_command(f"RFX{selector}", stack_index=stack_index) + return float(resp) / 10.0 # firmware reports in 1/10 Hz + + @requires_incubator_shaker + async def request_shaker_frequency_y(self, stack_index: int, selector: int = 0) -> float: + """Read the set or actual shaker frequency in the Y-direction. + + Args: + selector: 0 = to-be-set frequency, 1 = actual frequency. Default = 0. + + Returns: + Frequency in Hz. + """ + if selector not in (0, 1): + raise ValueError(f"Selector must be 0 or 1, got {selector}") + resp = await self.send_command(f"RFY{selector}", stack_index=stack_index) + return float(resp) / 10.0 # firmware reports in 1/10 Hz + + @requires_incubator_shaker + async def request_shaker_amplitude_x(self, stack_index: int, selector: int = 0) -> float: + """Read the set, actual, or static shaker amplitude in the X-direction. + + Args: + selector: 0 = set amplitude, 1 = actual amplitude, 2 = static distance from middle. Default = 0. + + Returns: + Amplitude in millimeters (mm). + """ + if selector not in (0, 1, 2): + raise ValueError(f"Selector must be 0, 1, or 2, got {selector}") + resp = await self.send_command(f"RAX{selector}", stack_index=stack_index) + return float(resp) / 10.0 # firmware reports in 1/10 mm + + @requires_incubator_shaker + async def request_shaker_amplitude_y(self, stack_index: int, selector: int = 0) -> float: + """Read the set, actual, or static shaker amplitude in the Y-direction. + + Args: + selector: 0 = set amplitude, 1 = actual amplitude, 2 = static distance from middle. Default = 0. + + Returns: + Amplitude in millimeters (mm). + """ + if selector not in (0, 1, 2): + raise ValueError(f"Selector must be 0, 1, or 2, got {selector}") + resp = await self.send_command(f"RAY{selector}", stack_index=stack_index) + return float(resp) / 10.0 # firmware reports in 1/10 mm + + async def is_shaking_enabled(self, stack_index: int) -> bool: + """Return True if the shaker is currently enabled or still decelerating. + + The firmware returns: 0 = shaker off; 1 = shaker on; 2 = shaker switched off but still moving. + + Returns: + True if the shaker is active or still moving (status 1 or 2), False if fully stopped (status 0). + """ + + if "shaker" not in self.unit_composition[stack_index]: + return False + + resp = await self.send_command("RSE", stack_index=stack_index) + + try: + status = int(resp) + except ValueError: + raise InhecoError("RSE", "E00", f"Unexpected response: {resp!r}") + + if status not in (0, 1, 2): + raise InhecoError("RSE", "E00", f"Invalid shaker status value: {status}") + + return status in (1, 2) # TODO: discuss whether 2 should count as "shaking" + + @requires_incubator_shaker + async def set_shaker_parameters( + self, + amplitude_x: float, + amplitude_y: float, + frequency_x: float, + frequency_y: float, + phase_shift: float, + stack_index: int, + ) -> None: + """Set shaker parameters for both X and Y axes in a single command (firmware 'SSP'). + + This combines the functionality of the individual SAX, SAY, SFX, SFY, and SPS commands. + + Args: + amplitude_x: Amplitude on the X-axis in mm (0.0-3.0 mm, corresponds to 0-30 in firmware units). + amplitude_y: Amplitude on the Y-axis in mm (0.0-3.0 mm, corresponds to 0-30 in firmware units). + frequency_x: Frequency on the X-axis in Hz (6.6-30.0 Hz, corresponds to 66-300 in firmware units). + frequency_y: Frequency on the Y-axis in Hz (6.6-30.0 Hz, corresponds to 66-300 in firmware units). + phase_shift: Phase shift between X and Y axes in degrees (0-360°). + + Notes: + - This command simplifies coordinated shaker setup. + - All arguments are automatically converted to the firmware's expected integer scaling. + (mm → x10; Hz → x10; ° left unscaled) + - The firmware returns an acknowledgment frame on success. + + Raises: + ValueError: If any parameter is outside its valid range. + InhecoError: If the device reports an error or rejects the command. + """ + # --- Validation --- + if not (0.0 <= amplitude_x <= 3.0): + raise ValueError(f"Amplitude X must be between 0.0 and 3.0 mm, got {amplitude_x}") + if not (0.0 <= amplitude_y <= 3.0): + raise ValueError(f"Amplitude Y must be between 0.0 and 3.0 mm, got {amplitude_y}") + if not (6.6 <= frequency_x <= 30.0): + raise ValueError(f"Frequency X must be between 6.6 and 30.0 Hz, got {frequency_x}") + if not (6.6 <= frequency_y <= 30.0): + raise ValueError(f"Frequency Y must be between 6.6 and 30.0 Hz, got {frequency_y}") + if not (0.0 <= phase_shift <= 360.0): + raise ValueError(f"Phase shift must be between 0° and 360°, got {phase_shift}") + + # --- Convert to firmware units --- + amp_x_fw = round(amplitude_x * 10) + amp_y_fw = round(amplitude_y * 10) + freq_x_fw = round(frequency_x * 10) + freq_y_fw = round(frequency_y * 10) + phase_fw = round(phase_shift) + + # --- Build and send command --- + cmd = f"SSP{amp_x_fw},{amp_y_fw},{freq_x_fw},{freq_y_fw},{phase_fw}" + await self.send_command(cmd, stack_index=stack_index) + + def _mm_to_fw(self, mm: float) -> int: + """Convert mm → firmware units (1/10 mm). + + Valid range: 0.0-3.0 mm (→ 0-30 in firmware). + Raises ValueError if out of range. + """ + if not (0.0 <= mm <= 3.0): + raise ValueError(f"Amplitude must be between 0.0 and 3.0 mm, got {mm}") + return int(round(mm * 10)) + + def _rpm_to_fw_hz10(self, rpm: float) -> int: + """Convert RPM → firmware Hz·10 units (validated). + + 396-1800 RPM ↔ 6.6-30.0 Hz ↔ 66-300 in firmware. + """ + if not (396 <= rpm <= 1800): + raise ValueError(f"RPM must be between 396 and 1800, got {rpm}") + return int(round((rpm / 60.0) * 10)) + + def _hz_to_fw_hz10(self, hz: float) -> int: + """Convert Hz → firmware Hz·10 units (validated).""" + if not (6.6 <= hz <= 30.0): + raise ValueError(f"Frequency must be between 6.6 and 30.0 Hz, got {hz}") + return int(round(hz * 10)) + + def _validate_hz_or_rpm(self, frequency_hz: Optional[float], rpm: Optional[float]) -> None: + """Ensure exactly one of frequency_hz or rpm is provided.""" + if (frequency_hz is None) == (rpm is None): + raise ValueError("Provide exactly one of frequency_hz or rpm.") + + def _phase_or_default(self, phase_deg: Optional[float], default: int) -> int: + """Return integer phase or default (0-360°).""" + p = default if phase_deg is None else int(round(phase_deg)) + if not (0 <= p <= 360): + raise ValueError(f"Phase must be 0-360°, got {p}") + return p + + def _fw_freq_pair(self, frequency_hz: Optional[float], rpm: Optional[float]) -> tuple[int, int]: + """Return validated firmware frequency pair (Hz·10, Hz·10).""" + if frequency_hz is not None: + f = self._hz_to_fw_hz10(frequency_hz) + else: + # At this point rpm MUST be not None (validated earlier) + assert rpm is not None + f = self._rpm_to_fw_hz10(rpm) + return (f, f) + + def _fw_amp_pair_linear_x(self, ax_mm: float) -> tuple[int, int]: + return (self._mm_to_fw(ax_mm), 0) + + def _fw_amp_pair_linear_y(self, ay_mm: float) -> tuple[int, int]: + return (0, self._mm_to_fw(ay_mm)) + + def _fw_amp_pair_xy(self, ax_mm: float, ay_mm: float) -> tuple[int, int]: + return (self._mm_to_fw(ax_mm), self._mm_to_fw(ay_mm)) + + @requires_incubator_shaker + async def set_shaker_pattern( + self, + *, + pattern: Literal["linear_x", "linear_y", "orbital", "elliptical", "figure_eight"], + stack_index: int, + frequency_hz: Optional[float] = None, + rpm: Optional[float] = None, + amplitude_x_mm: Optional[float] = None, + amplitude_y_mm: Optional[float] = None, + phase_deg: Optional[float] = None, + ) -> None: + """Set the shaker motion pattern and parameters (without enabling motion). + + Patterns: + - linear_x: motion along X only. + - linear_y: motion along Y only. + - orbital: circular motion (equal amplitudes on both axes, 90° phase). + - elliptical: elliptical motion (unequal amplitudes, 90° phase). + - figure_eight: double-loop motion (any amplitudes, 180° phase). + """ + self._validate_hz_or_rpm(frequency_hz, rpm) + fx, fy = self._fw_freq_pair(frequency_hz, rpm) + + if pattern == "linear_x": + if amplitude_x_mm is None: + raise ValueError("linear_x requires amplitude_x_mm.") + ax, ay = self._fw_amp_pair_linear_x(amplitude_x_mm) + phase = self._phase_or_default(phase_deg, 0) + + elif pattern == "linear_y": + if amplitude_y_mm is None: + raise ValueError("linear_y requires amplitude_y_mm.") + ax, ay = self._fw_amp_pair_linear_y(amplitude_y_mm) + phase = self._phase_or_default(phase_deg, 0) + + elif pattern == "orbital": + # --- orbital: equal amplitudes, 90° phase --- + if amplitude_x_mm is None or amplitude_y_mm is None: + raise ValueError("orbital requires both amplitude_x_mm and amplitude_y_mm.") + if abs(amplitude_x_mm - amplitude_y_mm) > 1e-6: + raise ValueError( + f"Orbital motion requires equal amplitudes on X and Y " + f"(got {amplitude_x_mm} mm vs {amplitude_y_mm} mm). " + f"Use pattern='elliptical' instead." + ) + ax, ay = self._fw_amp_pair_xy(amplitude_x_mm, amplitude_y_mm) + phase = self._phase_or_default(phase_deg, 90) + + elif pattern == "elliptical": + # --- elliptical: differing amplitudes, 90° phase --- + ax_mm = amplitude_x_mm if amplitude_x_mm is not None else 2.5 + ay_mm = amplitude_y_mm if amplitude_y_mm is not None else 2.0 + ax, ay = self._fw_amp_pair_xy(ax_mm, ay_mm) + phase = self._phase_or_default(phase_deg, 90) + + elif pattern == "figure_eight": + # --- true figure eight: fx:fy = 1:2, phase = 90° --- + ax_mm = amplitude_x_mm if amplitude_x_mm is not None else 2.5 + ay_mm = amplitude_y_mm if amplitude_y_mm is not None else 2.5 + ax, ay = self._fw_amp_pair_xy(ax_mm, ay_mm) + + # base frequency (default 10 Hz if not given) + base_hz = frequency_hz if frequency_hz is not None else (rpm / 60.0 if rpm else 10.0) + fx = self._hz_to_fw_hz10(base_hz) + fy = self._hz_to_fw_hz10(base_hz * 2) + + phase = self._phase_or_default(phase_deg, 90) + + else: + raise ValueError( + f"Unknown pattern: {pattern}" + "\nValid options: 'linear_x', 'linear_y', 'orbital', 'elliptical', 'figure_eight'" + ) + + await self.send_command(f"SSP{ax},{ay},{fx},{fy},{phase}", stack_index=stack_index) + + @requires_incubator_shaker + async def set_shaker_status(self, enabled: bool, stack_index: int) -> None: + """Enable or disable the shaker (ASEND always used when enabled).""" + await self.send_command("ASEND" if enabled else "ASE0", stack_index=stack_index) + + @requires_incubator_shaker + async def shake( + self, + *, + stack_index: int, + pattern: Literal["linear_x", "linear_y", "orbital", "elliptical", "figure_eight"] = "orbital", + rpm: Optional[float] = None, + frequency_hz: Optional[float] = None, + amplitude_x_mm: float = 3.0, + amplitude_y_mm: float = 3.0, + phase_deg: Optional[float] = None, + ) -> None: + """ + Configure and start shaking with the given motion pattern. + + This command safely updates shaker parameters (frequency, amplitude, phase) + and starts motion using `ASEND` (no labware detection). If the shaker is + already running, it is first stopped and reinitialized before applying new + parameters—required because the firmware only latches `SSP` settings when + the shaker transitions from idle to active. + + Behavior: + - Stops the shaker if active, waits briefly, applies the new pattern, and restarts shaking. + - Ensures consistent parameter changes and prevents ignored SSP updates. + + Args: + pattern: Motion pattern: `"linear_x"`, `"linear_y"`, `"orbital"`, `"elliptical"`, or `"figure_eight"`. + rpm: Rotational speed (396-1800 RPM). Mutually exclusive with `frequency_hz`. + frequency_hz: Oscillation frequency (6.6-30.0 Hz). Mutually exclusive with `rpm`. + amplitude_x_mm: X-axis amplitude in mm (0.0-3.0 mm). + amplitude_y_mm: Y-axis amplitude in mm (0.0-3.0 mm). + phase_deg: Optional phase offset between X and Y axes (0-360°). + + Raises: + ValueError: If parameter ranges or combinations are invalid. + InhecoError: If the device rejects the command or is not ready. + """ + + is_shaking = await self.is_shaking_enabled(stack_index=stack_index) + if is_shaking: + await self.stop_shaking(stack_index=stack_index) + await asyncio.sleep(0.5) # brief pause for firmware to settle + + await self.set_shaker_pattern( + pattern=pattern, + rpm=rpm, + stack_index=stack_index, + frequency_hz=frequency_hz, + amplitude_x_mm=amplitude_x_mm, + amplitude_y_mm=amplitude_y_mm, + phase_deg=phase_deg, + ) + await self.set_shaker_status(True, stack_index=stack_index) + + # TODO: expose direction argument -> clockwise / counterclockwise for rotating shaking patterns + + @requires_incubator_shaker + async def stop_shaking(self, stack_index: int) -> None: + """Stop shaker (ASE0).""" + + await self.set_shaker_status(False, stack_index=stack_index) + + @requires_incubator_shaker + async def request_shaker_phase_shift(self, stack_index: int, selector: int = 0) -> float: + """Read the set or actual phase shift between X and Y shaker drives (firmware 'RPS' command). + + Args: + selector: 0 = currently set phase shift, 1 = actual phase shift. Default = 0. + + Returns: + Phase shift in degrees [°]. Returns 12345.0 if the shaker has not reached a stable state or if phase shift calculation is invalid due to too-small amplitudes (< 1 mm on either axis). + """ + if selector not in (0, 1): + raise ValueError(f"Selector must be 0 or 1, got {selector}") + + resp = await self.send_command(f"RPS{selector}", stack_index=stack_index) + return float(resp) + + @requires_incubator_shaker + async def request_shaker_calibration_value(self, position: int, selector: int = 0) -> float: + raise NotImplementedError("RSC (Read Shaker Calibration Values) not implemented yet.") + + @requires_incubator_shaker + async def read_whole_shaker_calibration_data(self, key: str) -> str: + raise NotImplementedError("RWJ (Read Whole Shaker Adjustment Data) not implemented yet.") + + @requires_incubator_shaker + async def set_shaker_calibration_value( + self, + key: str, + position: int, + value: int, + ) -> None: + raise NotImplementedError("SSC (Set Shaker Calibration Values) not implemented yet.") + + # # # Self-Test # # # + + async def perform_self_test(self, stack_index: int, read_timeout: int = 500) -> Dict[str, bool]: + """Execute the internal self-test routine. + + Normal Testing-Time: ca. 3 minutes (Beware of timeouts!) + Maximum Testing-Time: 465 sec. (Beware of timeouts!) + The Test must be performed without a Labware inside! + The Error code is reported as an 11-Bit-Code. A set bit (1) represents an Error. The Test stops + immediately, when the Drawer-Test fails to avoid damage to the device! + Please note the AQS was developed for the Incubator Shaker MP. This can lead assemblies to + DWP Incubators that the result of the AQS, due to the design, 8 (bit2) could be. + + Returns: + A dictionary mapping error condition names to booleans indicating presence of each error. + """ + + plate_in_status = await self.request_plate_in_incubator(stack_index=stack_index) + if plate_in_status: + raise ValueError("Self-test requires an empty incubator.") + + loading_tray_status = await self.request_drawer_status(stack_index=stack_index) + if loading_tray_status == "open": + raise ValueError("Self-test requires a closed loading tray.") + + resp = await self.send_command("AQS", stack_index=stack_index, read_timeout=read_timeout) + + if not isinstance(resp, str): + raise RuntimeError(f"Invalid response: {resp!r}") + + resp_decimal = int(resp) + + assert 0 <= resp_decimal <= 2047, f"Invalid self-test response received: {resp!r}" + + binary_response = bin(resp_decimal)[2:].zfill(11) + return { + "drawer_error": binary_response[-1] == "1", + "homogeneity_sensor_3_vs_1_error": binary_response[-2] == "1", + "homogeneity_sensor_2_vs_1_error": binary_response[-3] == "1", + "sensor_1_target_temp_error": binary_response[-4] == "1", + "y_amplitude_shaker_error": binary_response[-5] == "1", + "x_amplitude_shaker_error": binary_response[-6] == "1", + "phase_shift_shaker_error": binary_response[-7] == "1", + "y_frequency_shaker_error": binary_response[-8] == "1", + "x_frequency_shaker_error": binary_response[-9] == "1", + "line_boost_heater_broken": binary_response[-10] == "1", + "line_main_heater_broken": binary_response[-11] == "1", + } + + +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +# IncubatorShakerUnit Class +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + + +class InhecoIncubatorShakerUnit: + """High-level API for an individual Inheco Incubator/Shaker unit in a stacked system.""" + + def __init__(self, backend: InhecoIncubatorShakerStackBackend, index: int): + self.backend: InhecoIncubatorShakerStackBackend = backend + self.index = index + + def __repr__(self): + return f"" + + # === Common high-level API shortcuts (explicitly delegate) === + + # Querying Machine State # + async def request_firmware_version(self) -> str: + """EEPROM request: Return the firmware version string.""" + return await self.backend.request_firmware_version(stack_index=self.index) + + async def request_serial_number(self) -> str: + """EEPROM request: Return the device serial number.""" + return await self.backend.request_serial_number(stack_index=self.index) + + async def request_last_calibration_date(self) -> str: + """EEPROM request: Query the date of the last calibration.""" + return await self.backend.request_last_calibration_date(stack_index=self.index) + + async def request_machine_allocation(self, layer: int = 0) -> dict: + """ + Report which device slots are occupied on a given layer (firmware 'RDAx,0'). + + Args: + layer: Layer index (0-7). Default = 0. + + Returns: + { + "layer": int, + "slot_mask": int, # e.g. 7 + "slot_mask_bin": str, # e.g. "0b0000000000000111" + "slots_connected": list[int] # e.g. [0, 1, 2] + } + + Notes: + Each bit in `slot_mask` represents one of 16 possible device slots: bit=1 means a device is connected; bit=0 means empty. + """ + return await self.backend.request_machine_allocation(layer=layer, stack_index=self.index) + + async def request_number_of_connected_machines(self, layer: int = 0) -> int: + """ + Report the number of connected Inheco devices on a layer (RDAx,1). + + Args: + layer: Layer index (0-7). Default = 0. + + Returns: + Number of connected devices (0-16). + + Example: + Response "3" → 3 connected devices on that layer. + """ + return await self.backend.request_number_of_connected_machines( + layer=layer, stack_index=self.index + ) + + async def request_labware_detection_threshold(self) -> int: + """EEPROM request""" + return await self.backend.request_labware_detection_threshold(stack_index=self.index) + + async def request_incubator_type(self) -> InhecoIncubatorUnitType: + """Return a descriptive string of the incubator/shaker configuration.""" + return await self.backend.request_incubator_type(stack_index=self.index) + + async def request_plate_in_incubator(self) -> bool: + """Sensor command: Check if a plate is currently present in the incubator.""" + return await self.backend.request_plate_in_incubator(stack_index=self.index) + + async def request_operation_time_in_hours(self) -> int: + """EEPROM request: Total operation time of the device in hours.""" + return await self.backend.request_operation_time_in_hours(stack_index=self.index) + + async def request_drawer_cycles_performed(self) -> int: + """EEPROM request: Number of drawer open/close cycles performed.""" + return await self.backend.request_drawer_cycles_performed(stack_index=self.index) + + async def request_is_initialized(self) -> bool: + """EEPROM request: Check if the machine has been initialized.""" + resp = await self.backend.request_is_initialized(stack_index=self.index) + return resp in {"0", "2"} + + async def request_plate_status_known(self) -> bool: + """EEPROM request: Check if the plate status is known.""" + return await self.backend.request_plate_status_known(stack_index=self.index) + + async def request_thermal_calibration_date(self) -> str: + """EEPROM request: Query the date of the last thermal calibration. + + Returns: + Calibration date in ISO format 'YYYY-MM-DD'. + """ + return await self.backend.request_thermal_calibration_date(stack_index=self.index) + + # # # Setup Requirement # # # + + async def initialize(self) -> str: + """Initializes the machine unit after power-on. + All other Action-Commands of the device are prohibited before the command is executed. + On command AID, the drawer will be closed. If the drawer is hold in its open position, + no Error will be generated! + If AID is send during operating, the shaker and heater stop immediately! + """ + return await self.backend.initialize(stack_index=self.index) + + # # # Loading Tray Features # # # + + async def open(self) -> None: + """Open the incubator door & move loading tray out.""" + await self.backend.open(stack_index=self.index) + + async def close(self) -> None: + """Move the loading tray in & close the incubator door.""" + await self.backend.close(stack_index=self.index) + + async def request_drawer_status(self) -> InhecoIncubatorShakerStackBackend.DrawerStatus: + """Report the current drawer (loading tray) status. + + Returns: + 'open' if the loading tray is open, 'closed' if closed. + """ + return await self.backend.request_drawer_status(stack_index=self.index) + + # # # Temperature Features # # # + + async def start_temperature_control(self, temperature: float) -> None: + """Set and activate the target incubation temperature (°C). + + The device begins active heating toward the target temperature. + Passive cooling (firmware default) may occur automatically if the + target temperature is below ambient, depending on environmental conditions. + """ + await self.backend.start_temperature_control(temperature=temperature, stack_index=self.index) + + async def stop_temperature_control(self) -> None: + """Stop active temperature regulation. + + Disables the incubator's heating control loop. + The previously set target temperature remains stored in the + device's memory but is no longer actively maintained. + The incubator will passively drift toward ambient temperature. + """ + await self.backend.stop_temperature_control(stack_index=self.index) + + async def get_temperature( + self, + sensor: Literal["mean", "main", "dif", "boost"] = "main", + read_timeout: float = 60.0, + ) -> float: + """Return current measured temperature in °C.""" + return await self.backend.get_temperature( + sensor=sensor, stack_index=self.index, read_timeout=read_timeout + ) + + async def request_target_temperature(self) -> float: + """Return target temperature in °C.""" + return await self.backend.request_target_temperature(stack_index=self.index) + + async def is_temperature_control_enabled(self) -> bool: + """ + Return True if active temperature control is enabled (RHE = 1 or 2), + False if control is off (RHE = 0). + + Firmware response (RHE): 0: control loop off (passive equilibrium). 1: control loop on (heating/cooling active). 2: control + booster on (extended heating mode). + """ + return await self.backend.is_temperature_control_enabled(stack_index=self.index) + + async def request_pid_controller_coefficients(self) -> tuple[float, float]: + """ + Query the current PID controller coefficients. + + Returns: + (P, I): tuple of floats. P: proportional gain (selector 1). I: integration value (selector 2; 0 = integration off) + """ + return await self.backend.request_pid_controller_coefficients(stack_index=self.index) + + async def request_maximum_allowed_temperature(self, measured: bool = False) -> float: + """ + Query the maximum allowed or maximum measured device temperature (in °C). + + Args: + measured: + - False: report configured maximum allowed temperature (default) + - True: report maximum measured temperature since last reset + + Returns: + Temperature in °C (value / 10) + """ + return await self.backend.request_maximum_allowed_temperature( + stack_index=self.index, measured=measured + ) + + async def request_delta_temperature(self) -> float: + """ + Query the absolute temperature difference between target and actual plate temperature. + + Returns: + Delta temperature in °C (positive if below target, negative if above target). + + Notes: + - Reported in 1/10 °C. + - Negative values indicate the plate is warmer than the target. + """ + return await self.backend.request_delta_temperature(stack_index=self.index) + + async def wait_for_temperature( + self, + *, + sensor: Literal["main", "dif", "boost", "mean"] = "main", + tolerance: float = 0.2, + interval_s: float = 0.5, + timeout_s: Optional[float] = 600.0, + show_progress_bar: bool = False, + ) -> float: + """ + Wait asynchronously until the target temperature is reached. + + Args: + sensor: Temperature sensor to monitor ("main", "dif", "boost", or "mean"). + tolerance: Acceptable difference (in °C) between current and target temperature. + interval_s: Polling interval in seconds. Default = 0.5 s. + timeout_s: Maximum time to wait in seconds. None disables timeout. Default = 600 s. + show_progress_bar: If True, display a dynamic ASCII progress bar in stdout. Default False. + + Returns: + Final measured temperature in °C once within tolerance. + + Raises: + TimeoutError: If target not reached within `timeout_s`. + ValueError: If temperature control is not enabled or no valid target returned. + """ + return await self.backend.wait_for_temperature( + stack_index=self.index, + sensor=sensor, + tolerance=tolerance, + interval_s=interval_s, + timeout_s=timeout_s, + show_progress_bar=show_progress_bar, + ) + + # # # Shaking Features # # # + + async def request_shaker_frequency_x(self, selector: int = 0) -> float: + """Read the set or actual shaker frequency in the X-direction. + + Args: + selector: 0 = to-be-set frequency, 1 = actual frequency. Default = 0. + + Returns: + Frequency in Hz. + """ + return await self.backend.request_shaker_frequency_x(stack_index=self.index, selector=selector) + + async def request_shaker_frequency_y(self, selector: int = 0) -> float: + """Read the set or actual shaker frequency in the Y-direction. + + Args: + selector: 0 = to-be-set frequency, 1 = actual frequency. Default = 0. + + Returns: + Frequency in Hz. + """ + return await self.backend.request_shaker_frequency_y(stack_index=self.index, selector=selector) + + async def request_shaker_amplitude_x(self, selector: int = 0) -> float: + """Read the set, actual, or static shaker amplitude in the X-direction. + + Args: + selector: 0 = set amplitude, 1 = actual amplitude, 2 = static distance from middle. Default = 0. + + Returns: + Amplitude in millimeters (mm). + """ + return await self.backend.request_shaker_amplitude_x(stack_index=self.index, selector=selector) + + async def request_shaker_amplitude_y(self, selector: int = 0) -> float: + """Read the set, actual, or static shaker amplitude in the Y-direction. + + Args: + selector: 0 = set amplitude, 1 = actual amplitude, 2 = static distance from middle. Default = 0. + + Returns: + Amplitude in millimeters (mm). + """ + return await self.backend.request_shaker_amplitude_y(stack_index=self.index, selector=selector) + + async def is_shaking_enabled(self) -> bool: + """Return True if the shaker is currently enabled or still decelerating. + + The firmware returns: 0: shaker off, 1: shaker on, 2: shaker switched off but still moving. + + Returns: + True if the shaker is active or still moving (status 1 or 2), False if fully stopped (status 0). + """ + return await self.backend.is_shaking_enabled(stack_index=self.index) + + async def set_shaker_parameters( + self, + amplitude_x: float, + amplitude_y: float, + frequency_x: float, + frequency_y: float, + phase_shift: float, + stack_index: int, + ) -> None: + """Set shaker parameters for both X and Y axes in a single command (firmware 'SSP'). + + This combines the functionality of the individual SAX, SAY, SFX, SFY, and SPS commands. + + Args: + amplitude_x: Amplitude on the X-axis in mm (0.0-3.0 mm, corresponds to 0-30 in firmware units). + amplitude_y: Amplitude on the Y-axis in mm (0.0-3.0 mm, corresponds to 0-30 in firmware units). + frequency_x: Frequency on the X-axis in Hz (6.6-30.0 Hz, corresponds to 66-300 in firmware units). + frequency_y: Frequency on the Y-axis in Hz (6.6-30.0 Hz, corresponds to 66-300 in firmware units). + phase_shift: Phase shift between X and Y axes in degrees (0-360°). + + Notes: + - This command simplifies coordinated shaker setup. + - All arguments are automatically converted to the firmware's expected integer scaling. + (mm → x10; Hz → x10; ° left unscaled) + - The firmware returns an acknowledgment frame on success. + + Raises: + ValueError: If any parameter is outside its valid range. + InhecoError: If the device reports an error or rejects the command. + """ + await self.backend.set_shaker_parameters( + stack_index=stack_index, + amplitude_x=amplitude_x, + amplitude_y=amplitude_y, + frequency_x=frequency_x, + frequency_y=frequency_y, + phase_shift=phase_shift, + ) + + async def set_shaker_pattern( + self, + *, + pattern: Literal["linear_x", "linear_y", "orbital", "elliptical", "figure_eight"], + frequency_hz: Optional[float] = None, + rpm: Optional[float] = None, + amplitude_x_mm: Optional[float] = None, + amplitude_y_mm: Optional[float] = None, + phase_deg: Optional[float] = None, + ) -> None: + """Set the shaker motion pattern and parameters (without enabling motion). + + Patterns: + - linear_x: motion along X only. + - linear_y: motion along Y only. + - orbital: circular motion (equal amplitudes on both axes, 90° phase). + - elliptical: elliptical motion (unequal amplitudes, 90° phase). + - figure_eight: double-loop motion (any amplitudes, 180° phase). + """ + await self.backend.set_shaker_pattern( + stack_index=self.index, + pattern=pattern, + frequency_hz=frequency_hz, + rpm=rpm, + amplitude_x_mm=amplitude_x_mm, + amplitude_y_mm=amplitude_y_mm, + phase_deg=phase_deg, + ) + + async def set_shaker_status(self, enabled: bool) -> None: + """Enable or disable the shaker (ASEND always used when enabled).""" + await self.backend.set_shaker_status(stack_index=self.index, enabled=enabled) + + async def shake( + self, + *, + pattern: Literal["linear_x", "linear_y", "orbital", "elliptical", "figure_eight"] = "orbital", + rpm: Optional[float] = None, + frequency_hz: Optional[float] = None, + amplitude_x_mm: float = 3.0, + amplitude_y_mm: float = 3.0, + phase_deg: Optional[float] = None, + ) -> None: + """ + Configure and start shaking with the given motion pattern. + + This command safely updates shaker parameters (frequency, amplitude, phase) + and starts motion using `ASEND` (no labware detection). If the shaker is + already running, it is first stopped and reinitialized before applying new + parameters—required because the firmware only latches `SSP` settings when + the shaker transitions from idle to active. + + Behavior: + - Stops the shaker if active, waits briefly, applies the new pattern, and restarts shaking. + - Ensures consistent parameter changes and prevents ignored SSP updates. + + Args: + pattern: Motion pattern: `"linear_x"`, `"linear_y"`, `"orbital"`, `"elliptical"`, or `"figure_eight"`. + rpm: Rotational speed (396-1800 RPM). Mutually exclusive with `frequency_hz`. + frequency_hz: Oscillation frequency (6.6-30.0 Hz). Mutually exclusive with `rpm`. + amplitude_x_mm: X-axis amplitude in mm (0.0-3.0 mm). + amplitude_y_mm: Y-axis amplitude in mm (0.0-3.0 mm). + phase_deg: Optional phase offset between X and Y axes (0-360°). + + Raises: + ValueError: If parameter ranges or combinations are invalid. + InhecoError: If the device rejects the command or is not ready. + """ + await self.backend.shake( + stack_index=self.index, + pattern=pattern, + rpm=rpm, + frequency_hz=frequency_hz, + amplitude_x_mm=amplitude_x_mm, + amplitude_y_mm=amplitude_y_mm, + phase_deg=phase_deg, + ) + + async def stop_shaking(self) -> None: + """Stop shaker (ASE0).""" + await self.backend.stop_shaking(stack_index=self.index) + + async def request_shaker_phase_shift(self, selector: int = 0) -> float: + """Read the set or actual phase shift between X and Y shaker drives (firmware 'RPS' command). + + Args: + selector: 0 = currently set phase shift, 1 = actual phase shift. Default = 0. + + Returns: + Phase shift in degrees [°]. Returns 12345.0 if the shaker has not reached a stable state or if phase shift calculation is invalid due to too-small amplitudes (< 1 mm on either axis). + """ + return await self.backend.request_shaker_phase_shift(stack_index=self.index, selector=selector) + + # # # Self-Test # # # + + async def perform_self_test(self, read_timeout: int = 500) -> Dict[str, bool]: + """Execute the internal self-test routine.""" + + return await self.backend.perform_self_test(stack_index=self.index, read_timeout=read_timeout)