diff --git a/test/devices_tests/cover_test.py b/test/devices_tests/cover_test.py index cce88dccd3..cb2b1c3983 100644 --- a/test/devices_tests/cover_test.py +++ b/test/devices_tests/cover_test.py @@ -374,8 +374,23 @@ def test_stop(self): group_address_position="1/2/3", group_address_position_state="1/2/4", ) + # Attempt stopping while not actually moving self.loop.run_until_complete(cover_short_stop.stop()) - self.assertEqual(xknx.telegrams.qsize(), 1) + self.assertEqual(xknx.telegrams.qsize(), 0) + + # Attempt stopping while moving down + cover_short_stop.travelcalculator.set_position(0) + self.loop.run_until_complete(cover_short_stop.set_down()) + self.loop.run_until_complete(cover_short_stop.stop()) + self.assertEqual(xknx.telegrams.qsize(), 2) + telegram = xknx.telegrams.get_nowait() + self.assertEqual( + telegram, + Telegram( + destination_address=GroupAddress("1/2/1"), + payload=GroupValueWrite(DPTBinary(1)), + ), + ) telegram = xknx.telegrams.get_nowait() self.assertEqual( telegram, @@ -385,6 +400,28 @@ def test_stop(self): ), ) + # Attempt stopping while moving up + cover_short_stop.travelcalculator.set_position(100) + self.loop.run_until_complete(cover_short_stop.set_up()) + self.loop.run_until_complete(cover_short_stop.stop()) + self.assertEqual(xknx.telegrams.qsize(), 2) + telegram = xknx.telegrams.get_nowait() + self.assertEqual( + telegram, + Telegram( + destination_address=GroupAddress("1/2/1"), + payload=GroupValueWrite(DPTBinary(0)), + ), + ) + telegram = xknx.telegrams.get_nowait() + self.assertEqual( + telegram, + Telegram( + destination_address=GroupAddress("1/2/2"), + payload=GroupValueWrite(DPTBinary(0)), + ), + ) + cover_manual_stop = Cover( xknx, "TestCover", @@ -405,6 +442,79 @@ def test_stop(self): ), ) + def test_stop_angle(self): + """Test stopping cover during angle move / tilting.""" + xknx = XKNX() + cover_short_stop = Cover( + xknx, + "TestCover", + group_address_long="1/2/1", + group_address_short="1/2/2", + group_address_angle="1/2/5", + group_address_angle_state="1/2/6", + ) + # Attempt stopping while not actually tilting + self.loop.run_until_complete(cover_short_stop.stop()) + self.assertEqual(xknx.telegrams.qsize(), 0) + + # Set cover tilt to a dummy start value, since otherwise we cannot + # determine later on a tilt direction and without it, stopping the + # til process has no effect. + self.loop.run_until_complete( + cover_short_stop.angle.process( + Telegram( + destination_address=GroupAddress("1/2/5"), + payload=GroupValueWrite(DPTArray(0xAA)), + ) + ) + ) + + # Attempt stopping while tilting down + self.loop.run_until_complete(cover_short_stop.set_angle(100)) + self.loop.run_until_complete(cover_short_stop.stop()) + self.assertEqual(xknx.telegrams.qsize(), 2) + telegram = xknx.telegrams.get_nowait() + print(telegram) + self.assertEqual( + telegram, + Telegram( + destination_address=GroupAddress("1/2/5"), + payload=GroupValueWrite(DPTArray(0xFF)), + ), + ) + telegram = xknx.telegrams.get_nowait() + print(telegram) + self.assertEqual( + telegram, + Telegram( + destination_address=GroupAddress("1/2/2"), + payload=GroupValueWrite(DPTBinary(1)), + ), + ) + + # Attempt stopping while tilting up + self.loop.run_until_complete(cover_short_stop.set_angle(0)) + self.loop.run_until_complete(cover_short_stop.stop()) + self.assertEqual(xknx.telegrams.qsize(), 2) + telegram = xknx.telegrams.get_nowait() + print(telegram) + self.assertEqual( + telegram, + Telegram( + destination_address=GroupAddress("1/2/5"), + payload=GroupValueWrite(DPTArray(0x00)), + ), + ) + telegram = xknx.telegrams.get_nowait() + print(telegram) + self.assertEqual( + telegram, + Telegram( + destination_address=GroupAddress("1/2/2"), + payload=GroupValueWrite(DPTBinary(0)), + ), + ) + # # TEST POSITION # @@ -569,7 +679,7 @@ def test_angle(self): ) def test_angle_not_supported(self): - """Test changing angle on cover wich does not support angle.""" + """Test changing angle on cover which does not support angle.""" xknx = XKNX() cover = Cover( xknx, diff --git a/xknx/devices/cover.py b/xknx/devices/cover.py index c250ef7c17..f6c5f7205a 100644 --- a/xknx/devices/cover.py +++ b/xknx/devices/cover.py @@ -18,7 +18,7 @@ ) from .device import Device, DeviceCallbackType -from .travelcalculator import TravelCalculator +from .travelcalculator import TravelCalculator, TravelStatus if TYPE_CHECKING: from xknx.remote_value import RemoteValue @@ -125,6 +125,7 @@ def __init__( self.travel_time_up = travel_time_up self.travelcalculator = TravelCalculator(travel_time_down, travel_time_up) + self.travel_direction_tilt = Optional[TravelStatus] self.device_class = device_class @@ -198,12 +199,14 @@ async def set_down(self) -> None: """Move cover down.""" await self.updown.down() self.travelcalculator.start_travel_down() + self.travel_direction_tilt = None await self.after_update() async def set_up(self) -> None: """Move cover up.""" await self.updown.up() self.travelcalculator.start_travel_up() + self.travel_direction_tilt = None await self.after_update() async def set_short_down(self) -> None: @@ -219,11 +222,21 @@ async def stop(self) -> None: if self.stop_.writable: await self.stop_.on() elif self.step.writable: - await self.step.increase() + if ( + self.travel_direction_tilt == TravelStatus.DIRECTION_UP + or self.travelcalculator.travel_direction == TravelStatus.DIRECTION_UP + ): + await self.step.decrease() + elif ( + self.travel_direction_tilt == TravelStatus.DIRECTION_DOWN + or self.travelcalculator.travel_direction == TravelStatus.DIRECTION_DOWN + ): + await self.step.increase() else: logger.warning("Stop not supported for device %s", self.get_name()) return self.travelcalculator.stop() + self.travel_direction_tilt = None await self.after_update() async def set_position(self, position: int) -> None: @@ -271,14 +284,22 @@ async def set_angle(self, angle: int) -> None: if not self.supports_angle: logger.warning("Angle not supported for device %s", self.get_name()) return + + current_angle = self.current_angle() + self.travel_direction_tilt = ( + TravelStatus.DIRECTION_DOWN + if current_angle and angle >= current_angle + else TravelStatus.DIRECTION_UP + ) + await self.angle.set(angle) async def auto_stop_if_necessary(self) -> None: """Do auto stop if necessary.""" # If device does not support auto_positioning, - # we have to stop the device when position is reached. + # we have to stop the device when position is reached, # unless device was traveling to fully open - # or fully closed state + # or fully closed state. if ( self.supports_stop and not self.position_target.writable diff --git a/xknx/remote_value/remote_value.py b/xknx/remote_value/remote_value.py index 9bcda02d62..e6d1ba875a 100644 --- a/xknx/remote_value/remote_value.py +++ b/xknx/remote_value/remote_value.py @@ -218,7 +218,7 @@ async def read_state(self, wait_for_result: bool = False) -> None: """Send GroupValueRead telegram for state address to KNX bus.""" if self.group_address_state is not None: # pylint: disable=import-outside-toplevel - # TODO: send a ReadRequset and start a timeout from here instead of ValueReader + # TODO: send a ReadRequest and start a timeout from here instead of ValueReader # cancel timeout form process(); delete ValueReader from xknx.core import ValueReader