Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored common curtain/tilt code to shared base class #231

Merged
merged 21 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions switchbot/devices/base_cover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Library to handle connection with Switchbot."""
from __future__ import annotations

from abc import abstractmethod

import logging
from typing import Any

from .device import REQ_HEADER, SwitchbotDevice, update_after_operation

# Cover keys
COVER_COMMAND = "4501"

# For second element of open and close arrs we should add two bytes i.e. ff00
# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
# * Only for curtains 3. For other models use ff
# Second byte [00] is a command (00 - open, 64 - close)
POSITION_KEYS = [
f"{REQ_HEADER}{COVER_COMMAND}0101",
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed
] # +actual_position
STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]

COVER_EXT_SUM_KEY = f"{REQ_HEADER}460401"
COVER_EXT_ADV_KEY = f"{REQ_HEADER}460402"


_LOGGER = logging.getLogger(__name__)


class SwitchbotBaseCover(SwitchbotDevice):
"""Representation of a Switchbot Cover devices for both curtains and tilt blinds."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Switchbot Cover device constructor."""

super().__init__(*args, **kwargs)
self._settings: dict[str, Any] = {}
self.ext_info_sum: dict[str, Any] = {}
self.ext_info_adv: dict[str, Any] = {}

async def _send_multiple_commands(self, keys: list[str]) -> bool:
"""Send multiple commands to device.

Since we current have no way to tell which command the device
needs we send both.
"""
final_result = False
for key in keys:
result = await self._send_command(key)
final_result |= self._check_command_result(result, 0, {1})
return final_result

@update_after_operation
async def stop(self) -> bool:
"""Send stop command to device."""
return await self._send_multiple_commands(STOP_KEYS)

@update_after_operation
async def set_position(self, position: int, speed: int = 255) -> bool:
"""Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
position = (100 - position) if self._reverse else position
return await self._send_multiple_commands(
[
f"{POSITION_KEYS[0]}{position:02X}",
f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
]
)

@abstractmethod
def get_position(self) -> Any:
"""Return current device position."""
pass
dcmeglio marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
async def get_basic_info(self) -> dict[str, Any] | None:
"""Get device basic settings."""
pass
dcmeglio marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
async def get_extended_info_summary(self) -> dict[str, Any] | None:
"""Get extended info for all devices in chain."""
pass
dcmeglio marked this conversation as resolved.
Show resolved Hide resolved

async def get_extended_info_adv(self) -> dict[str, Any] | None:
"""Get advance page info for device chain."""

_data = await self._send_command(key=COVER_EXT_ADV_KEY)
if not _data:
_LOGGER.error("%s: Unsuccessful, no result from device", self.name)
return None

if _data in (b"\x07", b"\x00"):
_LOGGER.error("%s: Unsuccessful, please try again", self.name)
return None

_state_of_charge = [
"not_charging",
"charging_by_adapter",
"charging_by_solar",
"fully_charged",
"solar_not_charging",
"charging_error",
]

self.ext_info_adv["device0"] = {
"battery": _data[1],
"firmware": _data[2] / 10.0,
"stateOfCharge": _state_of_charge[_data[3]],
}

# If grouped curtain device present.
if _data[4]:
self.ext_info_adv["device1"] = {
"battery": _data[4],
"firmware": _data[5] / 10.0,
"stateOfCharge": _state_of_charge[_data[6]],
}

return self.ext_info_adv
21 changes: 10 additions & 11 deletions switchbot/devices/blind_tilt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,26 @@
update_after_operation,
)

from .curtain import CURTAIN_EXT_SUM_KEY, SwitchbotCurtain
from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover

_LOGGER = logging.getLogger(__name__)


BLIND_COMMAND = "4501"
OPEN_KEYS = [
f"{REQ_HEADER}{BLIND_COMMAND}010132",
f"{REQ_HEADER}{BLIND_COMMAND}05ff32",
f"{REQ_HEADER}{COVER_COMMAND}010132",
f"{REQ_HEADER}{COVER_COMMAND}05ff32",
]
CLOSE_DOWN_KEYS = [
f"{REQ_HEADER}{BLIND_COMMAND}010100",
f"{REQ_HEADER}{BLIND_COMMAND}05ff00",
f"{REQ_HEADER}{COVER_COMMAND}010100",
f"{REQ_HEADER}{COVER_COMMAND}05ff00",
]
CLOSE_UP_KEYS = [
f"{REQ_HEADER}{BLIND_COMMAND}010164",
f"{REQ_HEADER}{BLIND_COMMAND}05ff64",
f"{REQ_HEADER}{COVER_COMMAND}010164",
f"{REQ_HEADER}{COVER_COMMAND}05ff64",
]


class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice):
class SwitchbotBlindTilt(SwitchbotBaseCover, SwitchbotSequenceDevice):
"""Representation of a Switchbot Blind Tilt."""

# The position of the blind is saved returned with 0 = closed down, 50 = open and 100 = closed up.
Expand Down Expand Up @@ -114,8 +113,8 @@ async def get_basic_info(self) -> dict[str, Any] | None:
}

async def get_extended_info_summary(self) -> dict[str, Any] | None:
"""Get basic info for all devices in chain."""
_data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
"""Get extended info for all devices in chain."""
_data = await self._send_command(key=COVER_EXT_SUM_KEY)

if not _data:
_LOGGER.error("%s: Unsuccessful, no result from device", self.name)
Expand Down
89 changes: 16 additions & 73 deletions switchbot/devices/curtain.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,35 @@

from switchbot.models import SwitchBotAdvertisement

from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
from .base_cover import SwitchbotBaseCover, COVER_COMMAND, COVER_EXT_SUM_KEY
from .device import REQ_HEADER, update_after_operation

# Curtain keys
CURTAIN_COMMAND = "4501"

# For second element of open and close arrs we should add two bytes i.e. ff00
# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
# * Only for curtains 3. For other models use ff
# Second byte [00] is a command (00 - open, 64 - close)
OPEN_KEYS = [
f"{REQ_HEADER}{CURTAIN_COMMAND}010100",
f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "00"
f"{REQ_HEADER}{COVER_COMMAND}010100",
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "00"
]
CLOSE_KEYS = [
f"{REQ_HEADER}{CURTAIN_COMMAND}010164",
f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "64"
f"{REQ_HEADER}{COVER_COMMAND}010164",
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "64"
]
POSITION_KEYS = [
f"{REQ_HEADER}{CURTAIN_COMMAND}0101",
f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed
f"{REQ_HEADER}{COVER_COMMAND}0101",
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed
] # +actual_position
STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"]
STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]

CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401"
CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402"
CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"


_LOGGER = logging.getLogger(__name__)


class SwitchbotCurtain(SwitchbotDevice):
class SwitchbotCurtain(SwitchbotBaseCover):
"""Representation of a Switchbot Curtain."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
Expand Down Expand Up @@ -69,18 +66,6 @@ def _set_parsed_data(
self._update_motion_direction(in_motion, previous_position, new_position)
super()._set_parsed_data(advertisement, data)

async def _send_multiple_commands(self, keys: list[str]) -> bool:
"""Send multiple commands to device.

Since we current have no way to tell which command the device
needs we send both.
"""
final_result = False
for key in keys:
result = await self._send_command(key)
final_result |= self._check_command_result(result, 0, {1})
return final_result

@update_after_operation
async def open(self, speed: int = 255) -> bool:
"""Send open command. Speed 255 - normal, 1 - slow"""
Expand All @@ -103,19 +88,14 @@ async def close(self, speed: int = 255) -> bool:
async def stop(self) -> bool:
"""Send stop command to device."""
self._is_opening = self._is_closing = False
return await self._send_multiple_commands(STOP_KEYS)
return await super().stop()

@update_after_operation
async def set_position(self, position: int, speed: int = 255) -> bool:
"""Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
position = (100 - position) if self._reverse else position
self._update_motion_direction(True, self._get_adv_value("position"), position)
return await self._send_multiple_commands(
[
f"{POSITION_KEYS[0]}{position:02X}",
f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
]
)
direction_adjusted_position = (100 - position) if self._reverse else position
self._update_motion_direction(True, self._get_adv_value("position"), direction_adjusted_position)
return await super().set_position(position, speed)

def get_position(self) -> Any:
"""Return cached position (0-100) of Curtain."""
Expand Down Expand Up @@ -168,8 +148,8 @@ def _update_motion_direction(
self._is_closing = new_position < previous_position

async def get_extended_info_summary(self) -> dict[str, Any] | None:
dcmeglio marked this conversation as resolved.
Show resolved Hide resolved
"""Get basic info for all devices in chain."""
_data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
"""Get extended info for all devices in chain."""
_data = await self._send_command(key=COVER_EXT_SUM_KEY)

if not _data:
_LOGGER.error("%s: Unsuccessful, no result from device", self.name)
Expand Down Expand Up @@ -201,43 +181,6 @@ async def get_extended_info_summary(self) -> dict[str, Any] | None:

return self.ext_info_sum

async def get_extended_info_adv(self) -> dict[str, Any] | None:
"""Get advance page info for device chain."""

_data = await self._send_command(key=CURTAIN_EXT_ADV_KEY)
if not _data:
_LOGGER.error("%s: Unsuccessful, no result from device", self.name)
return None

if _data in (b"\x07", b"\x00"):
_LOGGER.error("%s: Unsuccessful, please try again", self.name)
return None

_state_of_charge = [
"not_charging",
"charging_by_adapter",
"charging_by_solar",
"fully_charged",
"solar_not_charging",
"charging_error",
]

self.ext_info_adv["device0"] = {
"battery": _data[1],
"firmware": _data[2] / 10.0,
"stateOfCharge": _state_of_charge[_data[3]],
}

# If grouped curtain device present.
if _data[4]:
self.ext_info_adv["device1"] = {
"battery": _data[4],
"firmware": _data[5] / 10.0,
"stateOfCharge": _state_of_charge[_data[6]],
}

return self.ext_info_adv

def get_light_level(self) -> Any:
"""Return cached light level."""
# To get actual light level call update() first.
Expand Down
Loading
Loading