Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions api/src/opentrons/hardware_control/modules/flex_stacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any, Awaitable, Callable, Dict, Optional, Mapping

from opentrons.drivers.flex_stacker.types import (
AxisParams,
Direction,
LEDColor,
LEDPattern,
Expand Down Expand Up @@ -292,10 +293,18 @@ async def move_axis(
) -> bool:
"""Move the axis in a direction by the given distance in mm."""
default = STACKER_MOTION_CONFIG[axis]["move"]
await self._driver.set_run_current(
axis, current if current is not None else default.run_current
)
await self._driver.set_ihold_current(axis, default.hold_current)
old_run_current = self._reader.motion_params[axis].run_current
new_run_current = current if current is not None else default.run_current
if new_run_current != old_run_current:
await self._driver.set_run_current(axis, new_run_current)
self._reader.motion_params[axis].run_current = new_run_current

old_hold_current = self._reader.motion_params[axis].hold_current
new_hold_current = default.hold_current
if new_hold_current != old_hold_current:
await self._driver.set_ihold_current(axis, new_hold_current)
self._reader.motion_params[axis].hold_current = new_hold_current

motion_params = default.move_params.update(
max_speed=speed, acceleration=acceleration
)
Expand All @@ -315,10 +324,18 @@ async def home_axis(
current: Optional[float] = None,
) -> bool:
default = STACKER_MOTION_CONFIG[axis]["home"]
await self._driver.set_run_current(
axis, current if current is not None else default.run_current
)
await self._driver.set_ihold_current(axis, default.hold_current)
old_run_current = self._reader.motion_params[axis].run_current
new_run_current = current if current is not None else default.run_current
if new_run_current != old_run_current:
await self._driver.set_run_current(axis, new_run_current)
self._reader.motion_params[axis].run_current = new_run_current

old_hold_current = self._reader.motion_params[axis].hold_current
new_hold_current = default.hold_current
if new_hold_current != old_hold_current:
await self._driver.set_ihold_current(axis, new_hold_current)
self._reader.motion_params[axis].hold_current = new_hold_current

motion_params = default.move_params.update(
max_speed=speed, acceleration=acceleration
)
Expand Down Expand Up @@ -610,8 +627,8 @@ def __init__(self, driver: AbstractFlexStackerDriver) -> None:
)
for s in TOFSensor
}
self.motion_params: Dict[StackerAxis, Optional[MoveParams]] = {
axis: None for axis in StackerAxis
self.motion_params: Dict[StackerAxis, AxisParams] = {
axis: AxisParams(0, 0, MoveParams(0, 0, 0)) for axis in StackerAxis
}
self.platform_state = PlatformState.UNKNOWN
self.hopper_door_closed = False
Expand Down Expand Up @@ -652,9 +669,10 @@ async def get_limit_switch_status(self) -> None:

async def get_motion_parameters(self) -> None:
"""Get the motion parameters used by the axis motors."""
self.motion_params = {
axis: await self._driver.get_motion_params(axis) for axis in StackerAxis
}
for axis in StackerAxis:
self.motion_params[axis].move_params = await self._driver.get_motion_params(
axis
)

async def get_platform_sensor_state(self) -> None:
"""Get the platform state."""
Expand Down
135 changes: 113 additions & 22 deletions api/tests/opentrons/hardware_control/modules/test_hc_flexstacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,23 @@
import pytest
import mock
from typing import AsyncGenerator
from opentrons.drivers.flex_stacker.driver import (
STACKER_MOTION_CONFIG,
)
from opentrons.drivers.flex_stacker.simulator import SimulatingDriver
from opentrons.drivers.flex_stacker.types import (
Direction,
LimitSwitchStatus,
PlatformStatus,
StackerAxis,
)
from opentrons.hardware_control import modules, ExecutionManager
from opentrons.drivers.rpi_drivers.types import USBPort
from opentrons.hardware_control.modules.flex_stacker import (
SIMULATING_POLL_PERIOD,
FlexStackerReader,
)
from opentrons.hardware_control.poller import Poller


@pytest.fixture
Expand All @@ -16,37 +31,113 @@ def usb_port() -> USBPort:


@pytest.fixture
async def simulating_module(
def mock_driver() -> mock.AsyncMock:
return mock.AsyncMock(spec=SimulatingDriver)


@pytest.fixture
async def subject(
usb_port: USBPort,
) -> AsyncGenerator[modules.AbstractModule, None]:
module = await modules.build(
port=usb_port.device_path,
mock_driver: mock.AsyncMock,
) -> AsyncGenerator[modules.FlexStacker, None]:
"""Test subject with mocked driver"""
reader = FlexStackerReader(driver=mock_driver)
poller = Poller(reader=reader, interval=SIMULATING_POLL_PERIOD)
stacker = modules.FlexStacker(
port="/dev/ot_module_sim_flexstacker0",
usb_port=usb_port,
type=modules.ModuleType["FLEX_STACKER"],
simulating=True,
driver=mock_driver,
reader=reader,
poller=poller,
device_info={
"serial": "dummySerialFS",
"model": "a1",
"version": "stacker-fw",
},
hw_control_loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)
assert isinstance(module, modules.AbstractModule)
await poller.start()
try:
yield module
yield stacker
finally:
await module.cleanup()
await stacker.cleanup()


@pytest.fixture
async def simulating_module_driver_patched(
simulating_module: modules.FlexStacker,
) -> AsyncGenerator[modules.AbstractModule, None]:
driver_mock = mock.MagicMock()
with mock.patch.object(
simulating_module, "_driver", driver_mock
), mock.patch.object(simulating_module._reader, "_driver", driver_mock):
yield simulating_module


async def test_sim_state(simulating_module: modules.FlexStacker) -> None:
status = simulating_module.device_info
async def test_sim_state(subject: modules.FlexStacker) -> None:
status = subject.device_info
assert status["serial"] == "dummySerialFS"
assert status["model"] == "a1"
assert status["version"] == "stacker-fw"


async def test_set_run_hold_current(
subject: modules.FlexStacker, mock_driver: mock.AsyncMock
) -> None:
mock_driver.get_platform_status.side_effect = [
PlatformStatus(True, False),
PlatformStatus(False, True),
]
mock_driver.get_limit_switches_status.side_effect = [
LimitSwitchStatus(False, True, False, False, False),
LimitSwitchStatus(True, True, False, False, False),
]

# Test move_axis

# run and hold current are 0 by default
assert subject._reader.motion_params[StackerAxis.X].run_current == 0
assert subject._reader.motion_params[StackerAxis.X].hold_current == 0
default = STACKER_MOTION_CONFIG[StackerAxis.X]["move"]

# Call the move_axis function with default current
await subject.move_axis(StackerAxis.X, Direction.EXTEND, 44)
# set_run_current should be called and run_current recorded
mock_driver.set_run_current.assert_called_with(StackerAxis.X, default.run_current)
mock_driver.set_ihold_current.assert_called_with(
StackerAxis.X, default.hold_current
)
motion_params = subject._reader.motion_params[StackerAxis.X]
assert motion_params.run_current == default.run_current
assert motion_params.hold_current == default.hold_current
mock_driver.set_run_current.reset_mock()
mock_driver.set_ihold_current.reset_mock()

# Make sure set_run_current and set_ihold_current are not called again
await subject.move_axis(StackerAxis.X, Direction.EXTEND, 44)
mock_driver.set_run_current.assert_not_called()
mock_driver.set_ihold_current.assert_not_called()
motion_params = subject._reader.motion_params[StackerAxis.X]
assert motion_params.run_current == default.run_current
assert motion_params.hold_current == default.hold_current

# Test home_axis

# Reset the run/hold current recorded
default = STACKER_MOTION_CONFIG[StackerAxis.X]["home"]
subject._reader.motion_params[StackerAxis.X].run_current = 0
subject._reader.motion_params[StackerAxis.X].hold_current = 0

# Call the home_axis function with default current
await subject.home_axis(StackerAxis.X, Direction.EXTEND)
mock_driver.set_run_current.assert_called_with(StackerAxis.X, default.run_current)
mock_driver.set_ihold_current.assert_called_with(
StackerAxis.X, default.hold_current
)
motion_params = subject._reader.motion_params[StackerAxis.X]
assert motion_params.run_current == default.run_current
assert motion_params.hold_current == default.hold_current
mock_driver.set_run_current.reset_mock()
mock_driver.set_ihold_current.reset_mock()

# Make sure set_run_current and set_ihold_current are not called again
await subject.home_axis(StackerAxis.X, Direction.EXTEND, 44)
mock_driver.set_run_current.assert_not_called()
mock_driver.set_ihold_current.assert_not_called()

# The recorded run/hold current should stay the same
motion_params = subject._reader.motion_params[StackerAxis.X]
assert motion_params.run_current == default.run_current
assert motion_params.hold_current == default.hold_current
mock_driver.set_run_current.reset_mock()
mock_driver.set_ihold_current.reset_mock()