Skip to content

Commit

Permalink
Add ZHA cover tilt (#102072)
Browse files Browse the repository at this point in the history
* cover tilt reimplementation

* rework tilt test

* Fix ZHA cover tests

* Match ZHA cover tilt code-style with the rest

* Increase coverage for ZHA cover, optimize update

---------

Co-authored-by: josef109 <josefglaze@gmail.com>
  • Loading branch information
tomasbedrich and josef109 committed Oct 24, 2023
1 parent 4febb2e commit d25b4aa
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 34 deletions.
25 changes: 22 additions & 3 deletions homeassistant/components/zha/core/cluster_handlers/closures.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,19 @@ class WindowCoveringClient(ClientClusterHandler):
class WindowCovering(ClusterHandler):
"""Window cluster handler."""

_value_attribute = 8
_value_attribute_lift = (
closures.WindowCovering.AttributeDefs.current_position_lift_percentage.id
)
_value_attribute_tilt = (
closures.WindowCovering.AttributeDefs.current_position_tilt_percentage.id
)
REPORT_CONFIG = (
AttrReportConfig(
attr="current_position_lift_percentage", config=REPORT_CONFIG_IMMEDIATE
),
AttrReportConfig(
attr="current_position_tilt_percentage", config=REPORT_CONFIG_IMMEDIATE
),
)

async def async_update(self):
Expand All @@ -140,10 +148,21 @@ async def async_update(self):
if result is not None:
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
8,
self._value_attribute_lift,
"current_position_lift_percentage",
result,
)
result = await self.get_attribute_value(
"current_position_tilt_percentage", from_cache=False
)
self.debug("read current tilt position: %s", result)
if result is not None:
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
self._value_attribute_tilt,
"current_position_tilt_percentage",
result,
)

@callback
def attribute_updated(self, attrid: int, value: Any, _: Any) -> None:
Expand All @@ -152,7 +171,7 @@ def attribute_updated(self, attrid: int, value: Any, _: Any) -> None:
self.debug(
"Attribute report '%s'[%s] = %s", self.cluster.name, attr_name, value
)
if attrid == self._value_attribute:
if attrid in (self._value_attribute_lift, self._value_attribute_tilt):
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value
)
68 changes: 44 additions & 24 deletions homeassistant/components/zha/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_POSITION,
ATTR_TILT_POSITION,
CoverDeviceClass,
CoverEntity,
)
Expand Down Expand Up @@ -80,6 +81,7 @@ def __init__(self, unique_id, zha_device, cluster_handlers, **kwargs):
super().__init__(unique_id, zha_device, cluster_handlers, **kwargs)
self._cover_cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER)
self._current_position = None
self._tilt_position = None

async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass."""
Expand All @@ -94,6 +96,10 @@ def async_restore_last_state(self, last_state):
self._state = last_state.state
if "current_position" in last_state.attributes:
self._current_position = last_state.attributes["current_position"]
if "current_tilt_position" in last_state.attributes:
self._tilt_position = last_state.attributes[
"current_tilt_position"
] # first allocation activate tilt

@property
def is_closed(self) -> bool | None:
Expand All @@ -120,11 +126,20 @@ def current_cover_position(self) -> int | None:
"""
return self._current_position

@property
def current_cover_tilt_position(self) -> int | None:
"""Return the current tilt position of the cover."""
return self._tilt_position

@callback
def async_set_position(self, attr_id, attr_name, value):
"""Handle position update from cluster handler."""
_LOGGER.debug("setting position: %s", value)
self._current_position = 100 - value
_LOGGER.debug("setting position: %s %s %s", attr_id, attr_name, value)
if attr_name == "current_position_lift_percentage":
self._current_position = 100 - value
elif attr_name == "current_position_tilt_percentage":
self._tilt_position = 100 - value

if self._current_position == 0:
self._state = STATE_CLOSED
elif self._current_position == 100:
Expand All @@ -145,13 +160,27 @@ async def async_open_cover(self, **kwargs: Any) -> None:
raise HomeAssistantError(f"Failed to open cover: {res[1]}")
self.async_update_state(STATE_OPENING)

async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
res = await self._cover_cluster_handler.go_to_tilt_percentage(0)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to open cover tilt: {res[1]}")
self.async_update_state(STATE_OPENING)

async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the window cover."""
res = await self._cover_cluster_handler.down_close()
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to close cover: {res[1]}")
self.async_update_state(STATE_CLOSING)

async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
res = await self._cover_cluster_handler.go_to_tilt_percentage(100)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to close cover tilt: {res[1]}")
self.async_update_state(STATE_CLOSING)

async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the roller shutter to a specific position."""
new_pos = kwargs[ATTR_POSITION]
Expand All @@ -162,6 +191,16 @@ async def async_set_cover_position(self, **kwargs: Any) -> None:
STATE_CLOSING if new_pos < self._current_position else STATE_OPENING
)

async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover til to a specific position."""
new_pos = kwargs[ATTR_TILT_POSITION]
res = await self._cover_cluster_handler.go_to_tilt_percentage(100 - new_pos)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to set cover tilt position: {res[1]}")
self.async_update_state(
STATE_CLOSING if new_pos < self._tilt_position else STATE_OPENING
)

async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the window cover."""
res = await self._cover_cluster_handler.stop()
Expand All @@ -170,28 +209,9 @@ async def async_stop_cover(self, **kwargs: Any) -> None:
self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED
self.async_write_ha_state()

async def async_update(self) -> None:
"""Attempt to retrieve the open/close state of the cover."""
await super().async_update()
await self.async_get_state()

async def async_get_state(self, from_cache=True):
"""Fetch the current state."""
_LOGGER.debug("polling current state")
if self._cover_cluster_handler:
pos = await self._cover_cluster_handler.get_attribute_value(
"current_position_lift_percentage", from_cache=from_cache
)
_LOGGER.debug("read pos=%s", pos)

if pos is not None:
self._current_position = 100 - pos
self._state = (
STATE_OPEN if self.current_cover_position > 0 else STATE_CLOSED
)
else:
self._current_position = None
self._state = None
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover tilt."""
await self.async_stop_cover()


@MULTI_MATCH(
Expand Down

0 comments on commit d25b4aa

Please sign in to comment.