From 670e8ab94064b001a0fb6d306229ff68afe3cf73 Mon Sep 17 00:00:00 2001 From: Nat Date: Tue, 14 Oct 2025 17:23:45 -0400 Subject: [PATCH 1/6] BioShake backend A new backend for the BioShake, as well as adding the class to the __init__.py file --- pylabrobot/heating_shaking/__init__.py | 1 + .../heating_shaking/bioshake_backend.py | 133 ++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 pylabrobot/heating_shaking/bioshake_backend.py diff --git a/pylabrobot/heating_shaking/__init__.py b/pylabrobot/heating_shaking/__init__.py index f18996a422e..9430e2e7869 100644 --- a/pylabrobot/heating_shaking/__init__.py +++ b/pylabrobot/heating_shaking/__init__.py @@ -13,3 +13,4 @@ inheco_thermoshake_rm, ) from pylabrobot.heating_shaking.inheco.thermoshake_backend import InhecoThermoshakeBackend +from pylabrobot.heating_shaking.bioshake_backend import BioShake \ No newline at end of file diff --git a/pylabrobot/heating_shaking/bioshake_backend.py b/pylabrobot/heating_shaking/bioshake_backend.py new file mode 100644 index 00000000000..8d75992e099 --- /dev/null +++ b/pylabrobot/heating_shaking/bioshake_backend.py @@ -0,0 +1,133 @@ +import asyncio +from typing import Optional +from pylabrobot.heating_shaking.backend import HeaterShakerBackend +from pylabrobot.io.serial import Serial + +try: + import serial + + HAS_SERIAL = True +except ImportError as e: + HAS_SERIAL = False + _SERIAL_IMPORT_ERROR = e + +class BioShake(HeaterShakerBackend): + def __init__(self, port: str, timeout: int = 60): + if not HAS_SERIAL: + raise RuntimeError( + f"pyserial is required for the BioShake module backend. Import error: {_SERIAL_IMPORT_ERROR}" + ) + + self.setup_finished = False + self.port = port + self.timeout = timeout + self.io = Serial( + port=self.port, + baudrate=9600, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + write_timeout=10, + timeout=self.timeout, + ) + + async def _send_command(self, cmd: str, delay: float = 0.5): + try: + # Send the command + await self.io.write((cmd + '\r').encode('ascii')) + await asyncio.sleep(delay) + + # Read and decode the response + response = await self.io.readline() + decoded = response.decode('ascii', errors='ignore').strip() + + # Parsing the response from the BioShake + + # No response at all + if not decoded: + raise RuntimeError(f"No response for '{cmd}'") + + # Device-specific errors + if decoded.startswith("e"): + raise RuntimeError(f"Device returned error for '{cmd}': '{decoded}'") + + if decoded.startswith("u ->"): + raise NotImplementedError (f"'{cmd}' not supported: '{decoded}'") + + # Standard OK + if decoded.lower().startswith("ok"): + return None + + # All other valid responses (e.g. temperature and remaining time) + return decoded + + except Exception as e: + raise RuntimeError( + f"Unexpected error while sending '{cmd}': {type(e).__name__}: {e}" + ) from e + + async def setup(self): + await super().setup() + await self.io.setup() + + async def stop(self): + await super().stop() + await self.io.stop() + + async def reset(self): + # Reset the BioShake if stuck in "e" state + await self._send_command(cmd="resetDevice", delay=30) + + async def home(self): + # Initialize the BioShake into home position + await self._send_command(cmd="shakeGoHome", delay=5) + + async def shake(self, speed: float, duration: Optional[float] = None): + # Set the speed of the shaker + set_speed_cmd = f"setShakeTargetSpeed{speed}" + await self._send_command(cmd=set_speed_cmd, delay=0.2) + + # Send the command to start shaking, either with or without duration + if duration is None: + await self._send_command(cmd="shakeOn", delay=0.2) + else: + set_duration_cmd = f"shakeOnWithRuntime{duration}" + await self._send_command(cmd=set_duration_cmd, delay=0.2) + + async def stop_shaking(self): + await self._send_command(cmd="shakeOff", delay=0.2) + + async def get_remaining_time(self) -> float: + response = await self._send_command(cmd="getShakeRemainingTime", delay=0.2) + return float(response) # Return the remaining time in seconds if duration was set + + @property + def supports_locking(self) -> bool: + return True + + async def lock_plate(self): + await self._send_command(cmd="setElmLockPos", delay=0.3) + + async def unlock_plate(self): + await self._send_command(cmd="setElmUnlockPos", delay=0.3) + + @property + def supports_active_cooling(self) -> bool: + return True + + async def set_temperature(self, temperature: float): + # Set the temperature of the shaker + temperature = temperature * 10 + set_temp_cmd = f"setTempTarget{temperature}" + await self._send_command(cmd=set_temp_cmd, delay=0.2) + + # Start temperature control + await self._send_command(cmd="tempOn", delay=0.2) + + async def get_current_temperature(self) -> float: + response = await self._send_command(cmd="getTempActual", delay=0.2) + return float(response) + + async def deactivate(self): + # Stop temperature control + await self._send_command(cmd="tempOff", delay=0.2) \ No newline at end of file From 651f3423c1d43fb4707036799caab83a4348c284 Mon Sep 17 00:00:00 2001 From: Nat Date: Fri, 17 Oct 2025 14:38:24 -0400 Subject: [PATCH 2/6] Updated version of backend 1.) Add reset and home in setup 2.) Add acceleration/deceleration to shaking function 3.) Asserted speed, temperature, and acceleration based on model 4.) More error handling in general --- pylabrobot/heating_shaking/__init__.py | 2 +- .../heating_shaking/bioshake_backend.py | 185 +++++++++++++++--- .../backends/hamilton/STAR_backend.py | 8 +- 3 files changed, 160 insertions(+), 35 deletions(-) diff --git a/pylabrobot/heating_shaking/__init__.py b/pylabrobot/heating_shaking/__init__.py index 9430e2e7869..a11c255c17f 100644 --- a/pylabrobot/heating_shaking/__init__.py +++ b/pylabrobot/heating_shaking/__init__.py @@ -1,6 +1,7 @@ """A hybrid between pylabrobot.shaking and pylabrobot.temperature_controlling""" from pylabrobot.heating_shaking.backend import HeaterShakerBackend +from pylabrobot.heating_shaking.bioshake_backend import BioShake from pylabrobot.heating_shaking.chatterbox import HeaterShakerChatterboxBackend from pylabrobot.heating_shaking.hamilton_backend import ( HamiltonHeaterShakerBackend, @@ -13,4 +14,3 @@ inheco_thermoshake_rm, ) from pylabrobot.heating_shaking.inheco.thermoshake_backend import InhecoThermoshakeBackend -from pylabrobot.heating_shaking.bioshake_backend import BioShake \ No newline at end of file diff --git a/pylabrobot/heating_shaking/bioshake_backend.py b/pylabrobot/heating_shaking/bioshake_backend.py index 8d75992e099..e92c2d39a82 100644 --- a/pylabrobot/heating_shaking/bioshake_backend.py +++ b/pylabrobot/heating_shaking/bioshake_backend.py @@ -1,5 +1,6 @@ import asyncio from typing import Optional + from pylabrobot.heating_shaking.backend import HeaterShakerBackend from pylabrobot.io.serial import Serial @@ -11,6 +12,7 @@ HAS_SERIAL = False _SERIAL_IMPORT_ERROR = e + class BioShake(HeaterShakerBackend): def __init__(self, port: str, timeout: int = 60): if not HAS_SERIAL: @@ -31,44 +33,58 @@ def __init__(self, port: str, timeout: int = 60): timeout=self.timeout, ) - async def _send_command(self, cmd: str, delay: float = 0.5): + async def _send_command(self, cmd: str, delay: float = 0.5, timeout: float = 2): try: - # Send the command - await self.io.write((cmd + '\r').encode('ascii')) - await asyncio.sleep(delay) + # Flush serial buffers for a clean start + await self.io.reset_input_buffer() + await self.io.reset_output_buffer() + + # Send the command + await self.io.write((cmd + "\r").encode("ascii")) + await asyncio.sleep(delay) - # Read and decode the response - response = await self.io.readline() - decoded = response.decode('ascii', errors='ignore').strip() + # Read and decode the response with a timeout + try: + response = await asyncio.wait_for(self.io.readline(), timeout=timeout) - # Parsing the response from the BioShake + except asyncio.TimeoutError: + raise RuntimeError(f"Timed out waiting for response to '{cmd}'") - # No response at all - if not decoded: - raise RuntimeError(f"No response for '{cmd}'") + decoded = response.decode("ascii", errors="ignore").strip() - # Device-specific errors - if decoded.startswith("e"): - raise RuntimeError(f"Device returned error for '{cmd}': '{decoded}'") + # Parsing the response from the BioShake - if decoded.startswith("u ->"): - raise NotImplementedError (f"'{cmd}' not supported: '{decoded}'") + # No response at all + if not decoded: + raise RuntimeError(f"No response for '{cmd}'") - # Standard OK - if decoded.lower().startswith("ok"): - return None + # Device-specific errors + if decoded.startswith("e"): + raise RuntimeError(f"Device returned error for '{cmd}': '{decoded}'") - # All other valid responses (e.g. temperature and remaining time) - return decoded + if decoded.startswith("u ->"): + raise NotImplementedError(f"'{cmd}' not supported: '{decoded}'") + + # Standard OK + if decoded.lower().startswith("ok"): + return None + + # All other valid responses (e.g. temperature and remaining time) + return decoded except Exception as e: - raise RuntimeError( - f"Unexpected error while sending '{cmd}': {type(e).__name__}: {e}" - ) from e + raise RuntimeError(f"Unexpected error while sending '{cmd}': {type(e).__name__}: {e}") from e - async def setup(self): + async def setup(self, skip_home: bool = False): await super().setup() await self.io.setup() + if not skip_home: + # Reset first before homing it to ensure the device is ready for run + await self.reset() + # Additional seconds until next command can be send after reset + await asyncio.sleep(4) + # Now home the device + await self.home() async def stop(self): await super().stop() @@ -76,30 +92,118 @@ async def stop(self): async def reset(self): # Reset the BioShake if stuck in "e" state - await self._send_command(cmd="resetDevice", delay=30) + # Flush serial buffers for a clean start + await self.io.reset_input_buffer() + await self.io.reset_output_buffer() + + # Send the command + await self.io.write(("resetDevice\r").encode("ascii")) + + start = asyncio.get_event_loop().time() + max_seconds = 30 # How long a reset typically last + + while True: + # Break the loop if process takes longer than 30 seconds + if asyncio.get_event_loop().time() - start > max_seconds: + raise TimeoutError("Reset did not complete in time") + + try: + # Wait for each line with a timeout + response = await asyncio.wait_for(self.io.readline(), timeout=2) + decoded = response.decode("ascii", errors="ignore").strip() + + if decoded: + # Stop when the final message arrives + if "Initialization complete" in decoded: + break + + except asyncio.TimeoutError: + # Keep polling if nothing arrives within timeout + continue async def home(self): # Initialize the BioShake into home position await self._send_command(cmd="shakeGoHome", delay=5) - async def shake(self, speed: float, duration: Optional[float] = None): + async def shake(self, speed: float, duration: Optional[float] = None, accel: Optional[int] = 0): + # Check if speed is an integer + if isinstance(speed, float): + if speed.is_integer(): + speed = int(speed) + else: + raise ValueError(f"Speed must be a whole number, not {speed}") + elif not isinstance(speed, int): + raise TypeError(f"Speed must be an integer, not {type(speed).__name__}") + + # Get the min and max speed of the device to assert speed + min_speed = int(float(await self._send_command(cmd="getShakeMinRpm", delay=0.2))) + max_speed = int(float(await self._send_command(cmd="getShakeMaxRpm", delay=0.2))) + + assert ( + min_speed <= speed <= max_speed + ), f"Speed {speed} RPM is out of range. Allowed range is {min_speed}–{max_speed} RPM" + # Set the speed of the shaker set_speed_cmd = f"setShakeTargetSpeed{speed}" - await self._send_command(cmd=set_speed_cmd, delay=0.2) + await self._send_command(cmd=set_speed_cmd) + + # Check if accel is an integer + if isinstance(accel, float): + if accel.is_integer(): + accel = int(accel) + else: + raise ValueError(f"Acceleration must be a whole number, not {accel}") + elif not isinstance(accel, int): + raise TypeError(f"Acceleration must be an integer, not {type(accel).__name__}") + + # Get the min and max accel of the device to asset accel + min_accel = int(float(await self._send_command(cmd="getShakeAccelerationMin", delay=0.2))) + max_accel = int(float(await self._send_command(cmd="getShakeAccelerationMax", delay=0.2))) + + assert ( + min_accel <= accel <= max_accel + ), f"Acceleration {accel} seconds is out of range. Allowed range is {min_accel}-{max_accel} seconds" + + # Set the acceleration of the shaker + set_accel_cmd = f"setShakeAcceleration{accel}" + await self._send_command(cmd=set_accel_cmd, delay=0.2) # Send the command to start shaking, either with or without duration + if duration is None: await self._send_command(cmd="shakeOn", delay=0.2) else: set_duration_cmd = f"shakeOnWithRuntime{duration}" await self._send_command(cmd=set_duration_cmd, delay=0.2) - async def stop_shaking(self): + async def stop_shaking(self, decel: Optional[int] = 0): + # Check if decel is an integer + if isinstance(decel, float): + if decel.is_integer(): + decel = int(decel) + else: + raise ValueError(f"Deceleration must be a whole number, not {decel}") + elif not isinstance(decel, int): + raise TypeError(f"Deceleration must be an integer, not {type(decel).__name__}") + + # Get the min and max decel of the device to asset decel + min_decel = int(float(await self._send_command(cmd="getShakeAccelerationMin", delay=0.2))) + max_decel = int(float(await self._send_command(cmd="getShakeAccelerationMax", delay=0.2))) + + assert ( + min_decel <= decel <= max_decel + ), f"Deceleration {decel} seconds is out of range. Allowed range is {min_decel}-{max_decel} seconds" + + # Set the deceleration of the shaker + set_decel_cmd = f"setShakeAcceleration{decel}" + await self._send_command(cmd=set_decel_cmd, delay=0.2) + + # stop shaking await self._send_command(cmd="shakeOff", delay=0.2) async def get_remaining_time(self) -> float: response = await self._send_command(cmd="getShakeRemainingTime", delay=0.2) - return float(response) # Return the remaining time in seconds if duration was set + return float(response) # Return the remaining time in seconds if duration was set @property def supports_locking(self) -> bool: @@ -116,8 +220,25 @@ def supports_active_cooling(self) -> bool: return True async def set_temperature(self, temperature: float): - # Set the temperature of the shaker + # Get the min and max set points of the device to assert temperature + min_temp = int(float(await self._send_command(cmd="getTempMin", delay=0.2))) + max_temp = int(float(await self._send_command(cmd="getTempMax", delay=0.2))) + + assert ( + min_temp <= temperature <= max_temp + ), f"Temperature {temperature} C is out of range. Allowed range is {min_temp}–{max_temp} C." + temperature = temperature * 10 + + # Check if temperature is an integer + if isinstance(temperature, float): + if temperature.is_integer(): + temperature = int(temperature) + else: + raise ValueError(f"Temperature must be a whole number, not {temperature} (1/10 C)") + elif not isinstance(temperature, int): + raise TypeError(f"Temperature must be an integer, not {type(Temperature).__name__} (1/10 C)") + set_temp_cmd = f"setTempTarget{temperature}" await self._send_command(cmd=set_temp_cmd, delay=0.2) @@ -130,4 +251,4 @@ async def get_current_temperature(self) -> float: async def deactivate(self): # Stop temperature control - await self._send_command(cmd="tempOff", delay=0.2) \ No newline at end of file + await self._send_command(cmd="tempOff", delay=0.2) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 31e0d53a414..eceb7a2b8ad 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1795,7 +1795,9 @@ async def aspirate( immersion_depth_direction = immersion_depth_direction or [ 0 if (id_ >= 0) else 1 for id_ in immersion_depth ] - immersion_depth = [im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)] + immersion_depth = [ + im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth) + ] surface_following_distance = _fill_in_defaults(surface_following_distance, [0.0] * n) flow_rates = [ op.flow_rate or (hlc.aspiration_flow_rate if hlc is not None else 100.0) @@ -2101,7 +2103,9 @@ async def dispense( immersion_depth_direction = immersion_depth_direction or [ 0 if (id_ >= 0) else 1 for id_ in immersion_depth ] - immersion_depth = [im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth)] + immersion_depth = [ + im * (-1 if immersion_depth_direction[i] else 1) for i, im in enumerate(immersion_depth) + ] surface_following_distance = _fill_in_defaults(surface_following_distance, [0.0] * n) flow_rates = [ op.flow_rate or (hlc.dispense_flow_rate if hlc is not None else 120.0) From f0d9f33a7cf8fa8feb5cf2aaf83f51a2b81c6a19 Mon Sep 17 00:00:00 2001 From: Nat Date: Fri, 24 Oct 2025 14:16:27 -0400 Subject: [PATCH 3/6] BioShake Backend 2.1 Fix parameter of shake and stop_shaking functions --- pylabrobot/heating_shaking/bioshake_backend.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pylabrobot/heating_shaking/bioshake_backend.py b/pylabrobot/heating_shaking/bioshake_backend.py index e92c2d39a82..17a370f856c 100644 --- a/pylabrobot/heating_shaking/bioshake_backend.py +++ b/pylabrobot/heating_shaking/bioshake_backend.py @@ -111,6 +111,7 @@ async def reset(self): # Wait for each line with a timeout response = await asyncio.wait_for(self.io.readline(), timeout=2) decoded = response.decode("ascii", errors="ignore").strip() + await asyncio.sleep(0.1) if decoded: # Stop when the final message arrives @@ -125,7 +126,7 @@ async def home(self): # Initialize the BioShake into home position await self._send_command(cmd="shakeGoHome", delay=5) - async def shake(self, speed: float, duration: Optional[float] = None, accel: Optional[int] = 0): + async def shake(self, speed: int, duration: Optional[float] = None, accel: int = 0): # Check if speed is an integer if isinstance(speed, float): if speed.is_integer(): @@ -176,7 +177,7 @@ async def shake(self, speed: float, duration: Optional[float] = None, accel: Opt set_duration_cmd = f"shakeOnWithRuntime{duration}" await self._send_command(cmd=set_duration_cmd, delay=0.2) - async def stop_shaking(self, decel: Optional[int] = 0): + async def stop_shaking(self, decel: int = 0): # Check if decel is an integer if isinstance(decel, float): if decel.is_integer(): From e9b031073cb37ecd4a93cb31239864accddf3a7e Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Wed, 29 Oct 2025 23:43:47 -0700 Subject: [PATCH 4/6] remove duration related things in backend --- pylabrobot/heating_shaking/bioshake_backend.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pylabrobot/heating_shaking/bioshake_backend.py b/pylabrobot/heating_shaking/bioshake_backend.py index 17a370f856c..02d76e73125 100644 --- a/pylabrobot/heating_shaking/bioshake_backend.py +++ b/pylabrobot/heating_shaking/bioshake_backend.py @@ -126,7 +126,7 @@ async def home(self): # Initialize the BioShake into home position await self._send_command(cmd="shakeGoHome", delay=5) - async def shake(self, speed: int, duration: Optional[float] = None, accel: int = 0): + async def shake(self, speed: int, accel: int = 0): # Check if speed is an integer if isinstance(speed, float): if speed.is_integer(): @@ -171,11 +171,7 @@ async def shake(self, speed: int, duration: Optional[float] = None, accel: int = # Send the command to start shaking, either with or without duration - if duration is None: - await self._send_command(cmd="shakeOn", delay=0.2) - else: - set_duration_cmd = f"shakeOnWithRuntime{duration}" - await self._send_command(cmd=set_duration_cmd, delay=0.2) + await self._send_command(cmd="shakeOn", delay=0.2) async def stop_shaking(self, decel: int = 0): # Check if decel is an integer @@ -202,10 +198,6 @@ async def stop_shaking(self, decel: int = 0): # stop shaking await self._send_command(cmd="shakeOff", delay=0.2) - async def get_remaining_time(self) -> float: - response = await self._send_command(cmd="getShakeRemainingTime", delay=0.2) - return float(response) # Return the remaining time in seconds if duration was set - @property def supports_locking(self) -> bool: return True From ceb836a29718b538acde8d92619fb52b89d0e55d Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 30 Oct 2025 00:01:08 -0700 Subject: [PATCH 5/6] fix types, accel->acceleration, decel->deceleration --- .../heating_shaking/bioshake_backend.py | 81 ++++++++++--------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/pylabrobot/heating_shaking/bioshake_backend.py b/pylabrobot/heating_shaking/bioshake_backend.py index 02d76e73125..526fcdfd512 100644 --- a/pylabrobot/heating_shaking/bioshake_backend.py +++ b/pylabrobot/heating_shaking/bioshake_backend.py @@ -3,6 +3,7 @@ from pylabrobot.heating_shaking.backend import HeaterShakerBackend from pylabrobot.io.serial import Serial +from pylabrobot.machines.backend import MachineBackend try: import serial @@ -76,7 +77,7 @@ async def _send_command(self, cmd: str, delay: float = 0.5, timeout: float = 2): raise RuntimeError(f"Unexpected error while sending '{cmd}': {type(e).__name__}: {e}") from e async def setup(self, skip_home: bool = False): - await super().setup() + await MachineBackend.setup(self) await self.io.setup() if not skip_home: # Reset first before homing it to ensure the device is ready for run @@ -87,7 +88,7 @@ async def setup(self, skip_home: bool = False): await self.home() async def stop(self): - await super().stop() + await MachineBackend.stop(self) await self.io.stop() async def reset(self): @@ -113,7 +114,7 @@ async def reset(self): decoded = response.decode("ascii", errors="ignore").strip() await asyncio.sleep(0.1) - if decoded: + if len(decoded) > 0: # Stop when the final message arrives if "Initialization complete" in decoded: break @@ -126,15 +127,16 @@ async def home(self): # Initialize the BioShake into home position await self._send_command(cmd="shakeGoHome", delay=5) - async def shake(self, speed: int, accel: int = 0): + async def shake(self, speed: float, acceleration: int = 0): # Check if speed is an integer if isinstance(speed, float): - if speed.is_integer(): - speed = int(speed) - else: + if not speed.is_integer(): raise ValueError(f"Speed must be a whole number, not {speed}") - elif not isinstance(speed, int): - raise TypeError(f"Speed must be an integer, not {type(speed).__name__}") + speed = int(speed) + if not isinstance(speed, int): + raise TypeError( + f"Speed must be an integer or a whole number float, not {type(speed).__name__}" + ) # Get the min and max speed of the device to assert speed min_speed = int(float(await self._send_command(cmd="getShakeMinRpm", delay=0.2))) @@ -142,57 +144,59 @@ async def shake(self, speed: int, accel: int = 0): assert ( min_speed <= speed <= max_speed - ), f"Speed {speed} RPM is out of range. Allowed range is {min_speed}–{max_speed} RPM" + ), f"Speed {speed} RPM is out of range. Allowed range is {min_speed}-{max_speed} RPM" # Set the speed of the shaker set_speed_cmd = f"setShakeTargetSpeed{speed}" await self._send_command(cmd=set_speed_cmd) # Check if accel is an integer - if isinstance(accel, float): - if accel.is_integer(): - accel = int(accel) - else: - raise ValueError(f"Acceleration must be a whole number, not {accel}") - elif not isinstance(accel, int): - raise TypeError(f"Acceleration must be an integer, not {type(accel).__name__}") - - # Get the min and max accel of the device to asset accel + if isinstance(acceleration, float): + if not acceleration.is_integer(): # type: ignore[attr-defined] # mypy is retarded + raise ValueError(f"Acceleration must be a whole number, not {acceleration}") + acceleration = int(acceleration) + if not isinstance(acceleration, int): + raise TypeError( + f"Acceleration must be an integer or a whole number float, not {type(acceleration).__name__}" + ) + + # Get the min and max acceleration of the device to check bounds min_accel = int(float(await self._send_command(cmd="getShakeAccelerationMin", delay=0.2))) max_accel = int(float(await self._send_command(cmd="getShakeAccelerationMax", delay=0.2))) assert ( - min_accel <= accel <= max_accel - ), f"Acceleration {accel} seconds is out of range. Allowed range is {min_accel}-{max_accel} seconds" + min_accel <= acceleration <= max_accel + ), f"Acceleration {acceleration} seconds is out of range. Allowed range is {min_accel}-{max_accel} seconds" # Set the acceleration of the shaker - set_accel_cmd = f"setShakeAcceleration{accel}" + set_accel_cmd = f"setShakeAcceleration{acceleration}" await self._send_command(cmd=set_accel_cmd, delay=0.2) # Send the command to start shaking, either with or without duration await self._send_command(cmd="shakeOn", delay=0.2) - async def stop_shaking(self, decel: int = 0): + async def stop_shaking(self, deceleration: int = 0): # Check if decel is an integer - if isinstance(decel, float): - if decel.is_integer(): - decel = int(decel) - else: - raise ValueError(f"Deceleration must be a whole number, not {decel}") - elif not isinstance(decel, int): - raise TypeError(f"Deceleration must be an integer, not {type(decel).__name__}") + if isinstance(deceleration, float): + if not deceleration.is_integer(): # type: ignore[attr-defined] # mypy is retarded + raise ValueError(f"Deceleration must be a whole number, not {deceleration}") + deceleration = int(deceleration) + if not isinstance(deceleration, int): + raise TypeError( + f"Deceleration must be an integer or a whole number float, not {type(deceleration).__name__}" + ) # Get the min and max decel of the device to asset decel min_decel = int(float(await self._send_command(cmd="getShakeAccelerationMin", delay=0.2))) max_decel = int(float(await self._send_command(cmd="getShakeAccelerationMax", delay=0.2))) assert ( - min_decel <= decel <= max_decel - ), f"Deceleration {decel} seconds is out of range. Allowed range is {min_decel}-{max_decel} seconds" + min_decel <= deceleration <= max_decel + ), f"Deceleration {deceleration} seconds is out of range. Allowed range is {min_decel}-{max_decel} seconds" # Set the deceleration of the shaker - set_decel_cmd = f"setShakeAcceleration{decel}" + set_decel_cmd = f"setShakeAcceleration{deceleration}" await self._send_command(cmd=set_decel_cmd, delay=0.2) # stop shaking @@ -225,12 +229,13 @@ async def set_temperature(self, temperature: float): # Check if temperature is an integer if isinstance(temperature, float): - if temperature.is_integer(): - temperature = int(temperature) - else: + if not temperature.is_integer(): raise ValueError(f"Temperature must be a whole number, not {temperature} (1/10 C)") - elif not isinstance(temperature, int): - raise TypeError(f"Temperature must be an integer, not {type(Temperature).__name__} (1/10 C)") + temperature = int(temperature) + if not isinstance(temperature, int): + raise TypeError( + f"Temperature must be an integer or a whole number float, not {type(temperature).__name__} (1/10 C)" + ) set_temp_cmd = f"setTempTarget{temperature}" await self._send_command(cmd=set_temp_cmd, delay=0.2) From e545a880f425ec3ccca6c30b8d8152dc431b7f40 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 30 Oct 2025 00:02:44 -0700 Subject: [PATCH 6/6] remove unused imoprt --- pylabrobot/heating_shaking/bioshake_backend.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pylabrobot/heating_shaking/bioshake_backend.py b/pylabrobot/heating_shaking/bioshake_backend.py index 526fcdfd512..4a5912511e9 100644 --- a/pylabrobot/heating_shaking/bioshake_backend.py +++ b/pylabrobot/heating_shaking/bioshake_backend.py @@ -1,5 +1,4 @@ import asyncio -from typing import Optional from pylabrobot.heating_shaking.backend import HeaterShakerBackend from pylabrobot.io.serial import Serial @@ -144,7 +143,7 @@ async def shake(self, speed: float, acceleration: int = 0): assert ( min_speed <= speed <= max_speed - ), f"Speed {speed} RPM is out of range. Allowed range is {min_speed}-{max_speed} RPM" + ), f"Speed {speed} RPM is out of range. Allowed range is {min_speed}{max_speed} RPM" # Set the speed of the shaker set_speed_cmd = f"setShakeTargetSpeed{speed}"