Skip to content

Commit

Permalink
Merge 1ff42f3 into 2247fb0
Browse files Browse the repository at this point in the history
  • Loading branch information
spacegaier committed Feb 23, 2021
2 parents 2247fb0 + 1ff42f3 commit e4971a1
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 7 deletions.
114 changes: 112 additions & 2 deletions test/devices_tests/cover_test.py
Expand Up @@ -374,8 +374,23 @@ def test_stop(self):
group_address_position="1/2/3",
group_address_position_state="1/2/4",
)
# Attempt stopping while not actually moving
self.loop.run_until_complete(cover_short_stop.stop())
self.assertEqual(xknx.telegrams.qsize(), 1)
self.assertEqual(xknx.telegrams.qsize(), 0)

# Attempt stopping while moving down
cover_short_stop.travelcalculator.set_position(0)
self.loop.run_until_complete(cover_short_stop.set_down())
self.loop.run_until_complete(cover_short_stop.stop())
self.assertEqual(xknx.telegrams.qsize(), 2)
telegram = xknx.telegrams.get_nowait()
self.assertEqual(
telegram,
Telegram(
destination_address=GroupAddress("1/2/1"),
payload=GroupValueWrite(DPTBinary(1)),
),
)
telegram = xknx.telegrams.get_nowait()
self.assertEqual(
telegram,
Expand All @@ -385,6 +400,28 @@ def test_stop(self):
),
)

# Attempt stopping while moving up
cover_short_stop.travelcalculator.set_position(100)
self.loop.run_until_complete(cover_short_stop.set_up())
self.loop.run_until_complete(cover_short_stop.stop())
self.assertEqual(xknx.telegrams.qsize(), 2)
telegram = xknx.telegrams.get_nowait()
self.assertEqual(
telegram,
Telegram(
destination_address=GroupAddress("1/2/1"),
payload=GroupValueWrite(DPTBinary(0)),
),
)
telegram = xknx.telegrams.get_nowait()
self.assertEqual(
telegram,
Telegram(
destination_address=GroupAddress("1/2/2"),
payload=GroupValueWrite(DPTBinary(0)),
),
)

cover_manual_stop = Cover(
xknx,
"TestCover",
Expand All @@ -405,6 +442,79 @@ def test_stop(self):
),
)

def test_stop_angle(self):
"""Test stopping cover during angle move / tilting."""
xknx = XKNX()
cover_short_stop = Cover(
xknx,
"TestCover",
group_address_long="1/2/1",
group_address_short="1/2/2",
group_address_angle="1/2/5",
group_address_angle_state="1/2/6",
)
# Attempt stopping while not actually tilting
self.loop.run_until_complete(cover_short_stop.stop())
self.assertEqual(xknx.telegrams.qsize(), 0)

# Set cover tilt to a dummy start value, since otherwise we cannot
# determine later on a tilt direction and without it, stopping the
# til process has no effect.
self.loop.run_until_complete(
cover_short_stop.angle.process(
Telegram(
destination_address=GroupAddress("1/2/5"),
payload=GroupValueWrite(DPTArray(0xAA)),
)
)
)

# Attempt stopping while tilting down
self.loop.run_until_complete(cover_short_stop.set_angle(100))
self.loop.run_until_complete(cover_short_stop.stop())
self.assertEqual(xknx.telegrams.qsize(), 2)
telegram = xknx.telegrams.get_nowait()
print(telegram)
self.assertEqual(
telegram,
Telegram(
destination_address=GroupAddress("1/2/5"),
payload=GroupValueWrite(DPTArray(0xFF)),
),
)
telegram = xknx.telegrams.get_nowait()
print(telegram)
self.assertEqual(
telegram,
Telegram(
destination_address=GroupAddress("1/2/2"),
payload=GroupValueWrite(DPTBinary(1)),
),
)

# Attempt stopping while tilting up
self.loop.run_until_complete(cover_short_stop.set_angle(0))
self.loop.run_until_complete(cover_short_stop.stop())
self.assertEqual(xknx.telegrams.qsize(), 2)
telegram = xknx.telegrams.get_nowait()
print(telegram)
self.assertEqual(
telegram,
Telegram(
destination_address=GroupAddress("1/2/5"),
payload=GroupValueWrite(DPTArray(0x00)),
),
)
telegram = xknx.telegrams.get_nowait()
print(telegram)
self.assertEqual(
telegram,
Telegram(
destination_address=GroupAddress("1/2/2"),
payload=GroupValueWrite(DPTBinary(0)),
),
)

#
# TEST POSITION
#
Expand Down Expand Up @@ -569,7 +679,7 @@ def test_angle(self):
)

def test_angle_not_supported(self):
"""Test changing angle on cover wich does not support angle."""
"""Test changing angle on cover which does not support angle."""
xknx = XKNX()
cover = Cover(
xknx,
Expand Down
29 changes: 25 additions & 4 deletions xknx/devices/cover.py
Expand Up @@ -18,7 +18,7 @@
)

from .device import Device, DeviceCallbackType
from .travelcalculator import TravelCalculator
from .travelcalculator import TravelCalculator, TravelStatus

if TYPE_CHECKING:
from xknx.remote_value import RemoteValue
Expand Down Expand Up @@ -125,6 +125,7 @@ def __init__(
self.travel_time_up = travel_time_up

self.travelcalculator = TravelCalculator(travel_time_down, travel_time_up)
self.travel_direction_tilt = Optional[TravelStatus]

self.device_class = device_class

Expand Down Expand Up @@ -198,12 +199,14 @@ async def set_down(self) -> None:
"""Move cover down."""
await self.updown.down()
self.travelcalculator.start_travel_down()
self.travel_direction_tilt = None
await self.after_update()

async def set_up(self) -> None:
"""Move cover up."""
await self.updown.up()
self.travelcalculator.start_travel_up()
self.travel_direction_tilt = None
await self.after_update()

async def set_short_down(self) -> None:
Expand All @@ -219,11 +222,21 @@ async def stop(self) -> None:
if self.stop_.writable:
await self.stop_.on()
elif self.step.writable:
await self.step.increase()
if (
self.travel_direction_tilt == TravelStatus.DIRECTION_UP
or self.travelcalculator.travel_direction == TravelStatus.DIRECTION_UP
):
await self.step.decrease()
elif (
self.travel_direction_tilt == TravelStatus.DIRECTION_DOWN
or self.travelcalculator.travel_direction == TravelStatus.DIRECTION_DOWN
):
await self.step.increase()
else:
logger.warning("Stop not supported for device %s", self.get_name())
return
self.travelcalculator.stop()
self.travel_direction_tilt = None
await self.after_update()

async def set_position(self, position: int) -> None:
Expand Down Expand Up @@ -271,14 +284,22 @@ async def set_angle(self, angle: int) -> None:
if not self.supports_angle:
logger.warning("Angle not supported for device %s", self.get_name())
return

current_angle = self.current_angle()
self.travel_direction_tilt = (
TravelStatus.DIRECTION_DOWN
if current_angle and angle >= current_angle
else TravelStatus.DIRECTION_UP
)

await self.angle.set(angle)

async def auto_stop_if_necessary(self) -> None:
"""Do auto stop if necessary."""
# If device does not support auto_positioning,
# we have to stop the device when position is reached.
# we have to stop the device when position is reached,
# unless device was traveling to fully open
# or fully closed state
# or fully closed state.
if (
self.supports_stop
and not self.position_target.writable
Expand Down
2 changes: 1 addition & 1 deletion xknx/remote_value/remote_value.py
Expand Up @@ -218,7 +218,7 @@ async def read_state(self, wait_for_result: bool = False) -> None:
"""Send GroupValueRead telegram for state address to KNX bus."""
if self.group_address_state is not None:
# pylint: disable=import-outside-toplevel
# TODO: send a ReadRequset and start a timeout from here instead of ValueReader
# TODO: send a ReadRequest and start a timeout from here instead of ValueReader
# cancel timeout form process(); delete ValueReader
from xknx.core import ValueReader

Expand Down

0 comments on commit e4971a1

Please sign in to comment.