diff --git a/switchbot/const.py b/switchbot/const.py index 611ee0b..822a864 100644 --- a/switchbot/const.py +++ b/switchbot/const.py @@ -3,12 +3,12 @@ from enum import Enum +from .enum import StrEnum + DEFAULT_RETRY_COUNT = 3 DEFAULT_RETRY_TIMEOUT = 1 DEFAULT_SCAN_TIMEOUT = 5 -from .enum import StrEnum - class SwitchbotAuthenticationError(RuntimeError): """Raised when authentication fails. diff --git a/switchbot/devices/base_cover.py b/switchbot/devices/base_cover.py new file mode 100644 index 0000000..8394c29 --- /dev/null +++ b/switchbot/devices/base_cover.py @@ -0,0 +1,142 @@ +"""Library to handle connection with Switchbot.""" +from __future__ import annotations + +import logging +from abc import abstractmethod +from typing import Any + +from ..models import SwitchBotAdvertisement +from .device import REQ_HEADER, SwitchbotDevice, update_after_operation + +# Cover keys +COVER_COMMAND = "4501" + +# For second element of open and close arrs we should add two bytes i.e. ff00 +# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) * +# * Only for curtains 3. For other models use ff +# Second byte [00] is a command (00 - open, 64 - close) +POSITION_KEYS = [ + f"{REQ_HEADER}{COVER_COMMAND}0101", + f"{REQ_HEADER}{COVER_COMMAND}05", # +speed +] # +actual_position +STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"] + +COVER_EXT_SUM_KEY = f"{REQ_HEADER}460401" +COVER_EXT_ADV_KEY = f"{REQ_HEADER}460402" + + +_LOGGER = logging.getLogger(__name__) + + +class SwitchbotBaseCover(SwitchbotDevice): + """Representation of a Switchbot Cover devices for both curtains and tilt blinds.""" + + def __init__(self, reverse: bool, *args: Any, **kwargs: Any) -> None: + """Switchbot Cover device constructor.""" + + super().__init__(*args, **kwargs) + self._reverse = reverse + self._settings: dict[str, Any] = {} + self.ext_info_sum: dict[str, Any] = {} + self.ext_info_adv: dict[str, Any] = {} + self._is_opening: bool = False + self._is_closing: bool = False + + async def _send_multiple_commands(self, keys: list[str]) -> bool: + """Send multiple commands to device. + + Since we current have no way to tell which command the device + needs we send both. + """ + final_result = False + for key in keys: + result = await self._send_command(key) + final_result |= self._check_command_result(result, 0, {1}) + return final_result + + @update_after_operation + async def stop(self) -> bool: + """Send stop command to device.""" + return await self._send_multiple_commands(STOP_KEYS) + + @update_after_operation + async def set_position(self, position: int, speed: int = 255) -> bool: + """Send position command (0-100) to device. Speed 255 - normal, 1 - slow""" + position = (100 - position) if self._reverse else position + return await self._send_multiple_commands( + [ + f"{POSITION_KEYS[0]}{position:02X}", + f"{POSITION_KEYS[1]}{speed:02X}{position:02X}", + ] + ) + + @abstractmethod + def get_position(self) -> Any: + """Return current device position.""" + + @abstractmethod + async def get_basic_info(self) -> dict[str, Any] | None: + """Get device basic settings.""" + + @abstractmethod + async def get_extended_info_summary(self) -> dict[str, Any] | None: + """Get extended info for all devices in chain.""" + + async def get_extended_info_adv(self) -> dict[str, Any] | None: + """Get advance page info for device chain.""" + + _data = await self._send_command(key=COVER_EXT_ADV_KEY) + if not _data: + _LOGGER.error("%s: Unsuccessful, no result from device", self.name) + return None + + if _data in (b"\x07", b"\x00"): + _LOGGER.error("%s: Unsuccessful, please try again", self.name) + return None + + _state_of_charge = [ + "not_charging", + "charging_by_adapter", + "charging_by_solar", + "fully_charged", + "solar_not_charging", + "charging_error", + ] + + self.ext_info_adv["device0"] = { + "battery": _data[1], + "firmware": _data[2] / 10.0, + "stateOfCharge": _state_of_charge[_data[3]], + } + + # If grouped curtain device present. + if _data[4]: + self.ext_info_adv["device1"] = { + "battery": _data[4], + "firmware": _data[5] / 10.0, + "stateOfCharge": _state_of_charge[_data[6]], + } + + return self.ext_info_adv + + def get_light_level(self) -> Any: + """Return cached light level.""" + # To get actual light level call update() first. + return self._get_adv_value("lightLevel") + + def is_reversed(self) -> bool: + """Return True if curtain position is opposite from SB data.""" + return self._reverse + + def is_calibrated(self) -> Any: + """Return True curtain is calibrated.""" + # To get actual light level call update() first. + return self._get_adv_value("calibration") + + def is_opening(self) -> bool: + """Return True if the curtain is opening.""" + return self._is_opening + + def is_closing(self) -> bool: + """Return True if the curtain is closing.""" + return self._is_closing diff --git a/switchbot/devices/base_light.py b/switchbot/devices/base_light.py index 432c347..bfdedf6 100644 --- a/switchbot/devices/base_light.py +++ b/switchbot/devices/base_light.py @@ -1,16 +1,15 @@ from __future__ import annotations +import asyncio import logging +import time from abc import abstractmethod from typing import Any +from ..models import SwitchBotAdvertisement from .device import ColorMode, SwitchbotDevice _LOGGER = logging.getLogger(__name__) -import asyncio -import time - -from ..models import SwitchBotAdvertisement class SwitchbotBaseLight(SwitchbotDevice): diff --git a/switchbot/devices/blind_tilt.py b/switchbot/devices/blind_tilt.py index ae181c3..ab69e0d 100644 --- a/switchbot/devices/blind_tilt.py +++ b/switchbot/devices/blind_tilt.py @@ -10,27 +10,27 @@ update_after_operation, ) -from .curtain import CURTAIN_EXT_SUM_KEY, SwitchbotCurtain +from ..models import SwitchBotAdvertisement +from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover _LOGGER = logging.getLogger(__name__) -BLIND_COMMAND = "4501" OPEN_KEYS = [ - f"{REQ_HEADER}{BLIND_COMMAND}010132", - f"{REQ_HEADER}{BLIND_COMMAND}05ff32", + f"{REQ_HEADER}{COVER_COMMAND}010132", + f"{REQ_HEADER}{COVER_COMMAND}05ff32", ] CLOSE_DOWN_KEYS = [ - f"{REQ_HEADER}{BLIND_COMMAND}010100", - f"{REQ_HEADER}{BLIND_COMMAND}05ff00", + f"{REQ_HEADER}{COVER_COMMAND}010100", + f"{REQ_HEADER}{COVER_COMMAND}05ff00", ] CLOSE_UP_KEYS = [ - f"{REQ_HEADER}{BLIND_COMMAND}010164", - f"{REQ_HEADER}{BLIND_COMMAND}05ff64", + f"{REQ_HEADER}{COVER_COMMAND}010164", + f"{REQ_HEADER}{COVER_COMMAND}05ff64", ] -class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice): +class SwitchbotBlindTilt(SwitchbotBaseCover, SwitchbotSequenceDevice): """Representation of a Switchbot Blind Tilt.""" # The position of the blind is saved returned with 0 = closed down, 50 = open and 100 = closed up. @@ -43,23 +43,52 @@ class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice): def __init__(self, *args: Any, **kwargs: Any) -> None: """Switchbot Blind Tilt/woBlindTilt constructor.""" - super().__init__(*args, **kwargs) - self._reverse: bool = kwargs.pop("reverse_mode", False) + super().__init__(self._reverse, *args, **kwargs) + + def _set_parsed_data( + self, advertisement: SwitchBotAdvertisement, data: dict[str, Any] + ) -> None: + """Set data.""" + in_motion = data["inMotion"] + previous_tilt = self._get_adv_value("tilt") + new_tilt = data["tilt"] + self._update_motion_direction(in_motion, previous_tilt, new_tilt) + super()._set_parsed_data(advertisement, data) + + def _update_motion_direction( + self, in_motion: bool, previous_tilt: int | None, new_tilt: int + ) -> None: + """Update opening/closing status based on movement.""" + if previous_tilt is None: + return + if in_motion is False: + self._is_closing = self._is_opening = False + return + + if new_tilt != previous_tilt: + self._is_opening = new_tilt > previous_tilt + self._is_closing = new_tilt < previous_tilt @update_after_operation async def open(self) -> bool: """Send open command.""" + self._is_opening = True + self._is_closing = False return await self._send_multiple_commands(OPEN_KEYS) @update_after_operation async def close_up(self) -> bool: """Send close up command.""" + self._is_opening = False + self._is_closing = True return await self._send_multiple_commands(CLOSE_UP_KEYS) @update_after_operation async def close_down(self) -> bool: """Send close down command.""" + self._is_opening = False + self._is_closing = True return await self._send_multiple_commands(CLOSE_DOWN_KEYS) # The aim of this is to close to the nearest endpoint. @@ -114,8 +143,8 @@ async def get_basic_info(self) -> dict[str, Any] | None: } async def get_extended_info_summary(self) -> dict[str, Any] | None: - """Get basic info for all devices in chain.""" - _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY) + """Get extended info for all devices in chain.""" + _data = await self._send_command(key=COVER_EXT_SUM_KEY) if not _data: _LOGGER.error("%s: Unsuccessful, no result from device", self.name) diff --git a/switchbot/devices/curtain.py b/switchbot/devices/curtain.py index 2bfcc66..52ba7bf 100644 --- a/switchbot/devices/curtain.py +++ b/switchbot/devices/curtain.py @@ -4,40 +4,35 @@ import logging from typing import Any -from switchbot.models import SwitchBotAdvertisement - -from .device import REQ_HEADER, SwitchbotDevice, update_after_operation - -# Curtain keys -CURTAIN_COMMAND = "4501" +from ..models import SwitchBotAdvertisement +from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover +from .device import REQ_HEADER, update_after_operation # For second element of open and close arrs we should add two bytes i.e. ff00 # First byte [ff] stands for speed (00 or ff - normal, 01 - slow) * # * Only for curtains 3. For other models use ff # Second byte [00] is a command (00 - open, 64 - close) OPEN_KEYS = [ - f"{REQ_HEADER}{CURTAIN_COMMAND}010100", - f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "00" + f"{REQ_HEADER}{COVER_COMMAND}010100", + f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "00" ] CLOSE_KEYS = [ - f"{REQ_HEADER}{CURTAIN_COMMAND}010164", - f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + "64" + f"{REQ_HEADER}{COVER_COMMAND}010164", + f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "64" ] POSITION_KEYS = [ - f"{REQ_HEADER}{CURTAIN_COMMAND}0101", - f"{REQ_HEADER}{CURTAIN_COMMAND}05", # +speed + f"{REQ_HEADER}{COVER_COMMAND}0101", + f"{REQ_HEADER}{COVER_COMMAND}05", # +speed ] # +actual_position -STOP_KEYS = [f"{REQ_HEADER}{CURTAIN_COMMAND}0001", f"{REQ_HEADER}{CURTAIN_COMMAND}00ff"] +STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"] -CURTAIN_EXT_SUM_KEY = f"{REQ_HEADER}460401" -CURTAIN_EXT_ADV_KEY = f"{REQ_HEADER}460402" CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101" _LOGGER = logging.getLogger(__name__) -class SwitchbotCurtain(SwitchbotDevice): +class SwitchbotCurtain(SwitchbotBaseCover): """Representation of a Switchbot Curtain.""" def __init__(self, *args: Any, **kwargs: Any) -> None: @@ -51,13 +46,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # and position = 100 equals open. The parameter is default set to True so that # the definition of position is the same as in Home Assistant. - super().__init__(*args, **kwargs) self._reverse: bool = kwargs.pop("reverse_mode", True) + super().__init__(self._reverse, *args, **kwargs) self._settings: dict[str, Any] = {} self.ext_info_sum: dict[str, Any] = {} self.ext_info_adv: dict[str, Any] = {} - self._is_opening: bool = False - self._is_closing: bool = False def _set_parsed_data( self, advertisement: SwitchBotAdvertisement, data: dict[str, Any] @@ -69,18 +62,6 @@ def _set_parsed_data( self._update_motion_direction(in_motion, previous_position, new_position) super()._set_parsed_data(advertisement, data) - async def _send_multiple_commands(self, keys: list[str]) -> bool: - """Send multiple commands to device. - - Since we current have no way to tell which command the device - needs we send both. - """ - final_result = False - for key in keys: - result = await self._send_command(key) - final_result |= self._check_command_result(result, 0, {1}) - return final_result - @update_after_operation async def open(self, speed: int = 255) -> bool: """Send open command. Speed 255 - normal, 1 - slow""" @@ -103,19 +84,16 @@ async def close(self, speed: int = 255) -> bool: async def stop(self) -> bool: """Send stop command to device.""" self._is_opening = self._is_closing = False - return await self._send_multiple_commands(STOP_KEYS) + return await super().stop() @update_after_operation async def set_position(self, position: int, speed: int = 255) -> bool: """Send position command (0-100) to device. Speed 255 - normal, 1 - slow""" - position = (100 - position) if self._reverse else position - self._update_motion_direction(True, self._get_adv_value("position"), position) - return await self._send_multiple_commands( - [ - f"{POSITION_KEYS[0]}{position:02X}", - f"{POSITION_KEYS[1]}{speed:02X}{position:02X}", - ] + direction_adjusted_position = (100 - position) if self._reverse else position + self._update_motion_direction( + True, self._get_adv_value("position"), direction_adjusted_position ) + return await super().set_position(position, speed) def get_position(self) -> Any: """Return cached position (0-100) of Curtain.""" @@ -168,8 +146,8 @@ def _update_motion_direction( self._is_closing = new_position < previous_position async def get_extended_info_summary(self) -> dict[str, Any] | None: - """Get basic info for all devices in chain.""" - _data = await self._send_command(key=CURTAIN_EXT_SUM_KEY) + """Get extended info for all devices in chain.""" + _data = await self._send_command(key=COVER_EXT_SUM_KEY) if not _data: _LOGGER.error("%s: Unsuccessful, no result from device", self.name) @@ -184,78 +162,19 @@ async def get_extended_info_summary(self) -> dict[str, Any] | None: "touchToOpen": bool(_data[1] & 0b01000000), "light": bool(_data[1] & 0b00100000), "openDirection": ( - "left_to_right" if _data[1] & 0b00010000 == 1 else "right_to_left" + "left_to_right" if _data[1] & 0b00010000 else "right_to_left" ), } # if grouped curtain device present. if _data[2] != 0: self.ext_info_sum["device1"] = { - "openDirectionDefault": not bool(_data[1] & 0b10000000), - "touchToOpen": bool(_data[1] & 0b01000000), - "light": bool(_data[1] & 0b00100000), + "openDirectionDefault": not bool(_data[2] & 0b10000000), + "touchToOpen": bool(_data[2] & 0b01000000), + "light": bool(_data[2] & 0b00100000), "openDirection": ( - "left_to_right" if _data[1] & 0b00010000 else "right_to_left" + "left_to_right" if _data[2] & 0b00010000 else "right_to_left" ), } return self.ext_info_sum - - async def get_extended_info_adv(self) -> dict[str, Any] | None: - """Get advance page info for device chain.""" - - _data = await self._send_command(key=CURTAIN_EXT_ADV_KEY) - if not _data: - _LOGGER.error("%s: Unsuccessful, no result from device", self.name) - return None - - if _data in (b"\x07", b"\x00"): - _LOGGER.error("%s: Unsuccessful, please try again", self.name) - return None - - _state_of_charge = [ - "not_charging", - "charging_by_adapter", - "charging_by_solar", - "fully_charged", - "solar_not_charging", - "charging_error", - ] - - self.ext_info_adv["device0"] = { - "battery": _data[1], - "firmware": _data[2] / 10.0, - "stateOfCharge": _state_of_charge[_data[3]], - } - - # If grouped curtain device present. - if _data[4]: - self.ext_info_adv["device1"] = { - "battery": _data[4], - "firmware": _data[5] / 10.0, - "stateOfCharge": _state_of_charge[_data[6]], - } - - return self.ext_info_adv - - def get_light_level(self) -> Any: - """Return cached light level.""" - # To get actual light level call update() first. - return self._get_adv_value("lightLevel") - - def is_reversed(self) -> bool: - """Return True if curtain position is opposite from SB data.""" - return self._reverse - - def is_calibrated(self) -> Any: - """Return True curtain is calibrated.""" - # To get actual light level call update() first. - return self._get_adv_value("calibration") - - def is_opening(self) -> bool: - """Return True if the curtain is opening.""" - return self._is_opening - - def is_closing(self) -> bool: - """Return True if the curtain is closing.""" - return self._is_closing diff --git a/tests/test_adv_parser.py b/tests/test_adv_parser.py index c6fdea4..88ec15e 100644 --- a/tests/test_adv_parser.py +++ b/tests/test_adv_parser.py @@ -963,6 +963,7 @@ def test_wosensor_active_zero_data(): active=True, ) + def test_woiosensor_passive_and_active(): """Test parsing woiosensor as passive with active data as well.""" ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") diff --git a/tests/test_base_cover.py b/tests/test_base_cover.py new file mode 100644 index 0000000..a8247fc --- /dev/null +++ b/tests/test_base_cover.py @@ -0,0 +1,152 @@ +from unittest.mock import AsyncMock, Mock + +import pytest +from bleak.backends.device import BLEDevice + +from switchbot import SwitchBotAdvertisement, SwitchbotModel +from switchbot.devices import base_cover, blind_tilt + +from .test_adv_parser import generate_ble_device + + +def create_device_for_command_testing(position=50, calibration=True): + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device) + base_cover_device.update_from_advertisement( + make_advertisement_data(ble_device, True, position, calibration) + ) + base_cover_device._send_multiple_commands = AsyncMock() + base_cover_device.update = AsyncMock() + return base_cover_device + + +def make_advertisement_data( + ble_device: BLEDevice, in_motion: bool, position: int, calibration: bool = True +): + """Set advertisement data with defaults.""" + + return SwitchBotAdvertisement( + address="aa:bb:cc:dd:ee:ff", + data={ + "rawAdvData": b"c\xc0X\x00\x11\x04", + "data": { + "calibration": calibration, + "battery": 88, + "inMotion": in_motion, + "tilt": position, + "lightLevel": 1, + "deviceChain": 1, + }, + "isEncrypted": False, + "model": "c", + "modelFriendlyName": "Curtain", + "modelName": SwitchbotModel.CURTAIN, + }, + device=ble_device, + rssi=-80, + active=True, + ) + + +@pytest.mark.asyncio +async def test_send_multiple_commands(): + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + base_cover_device = base_cover.SwitchbotBaseCover(False, ble_device) + base_cover_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 50, True) + ) + base_cover_device._send_command = AsyncMock() + base_cover_device._check_command_result = Mock(return_value=True) + await base_cover_device._send_multiple_commands(blind_tilt.OPEN_KEYS) + assert base_cover_device._send_command.await_count == 2 + + +@pytest.mark.asyncio +async def test_stop(): + base_cover_device = create_device_for_command_testing() + await base_cover_device.stop() + base_cover_device._send_multiple_commands.assert_awaited_once_with( + base_cover.STOP_KEYS + ) + + +@pytest.mark.asyncio +async def test_set_position(): + base_cover_device = create_device_for_command_testing() + await base_cover_device.set_position(50) + base_cover_device._send_multiple_commands.assert_awaited_once() + + +@pytest.mark.asyncio +@pytest.mark.parametrize("data_value", [(None), (b"\x07"), (b"\x00")]) +async def test_get_extended_info_adv_returns_none_when_bad_data(data_value): + base_cover_device = create_device_for_command_testing() + base_cover_device._send_command = AsyncMock(return_value=data_value) + assert await base_cover_device.get_extended_info_adv() is None + + +@pytest.mark.asyncio +async def test_get_extended_info_adv_returns_single_device(): + base_cover_device = create_device_for_command_testing() + base_cover_device._send_command = AsyncMock( + return_value=bytes([0, 50, 20, 0, 0, 0, 0]) + ) + ext_result = await base_cover_device.get_extended_info_adv() + assert ext_result["device0"]["battery"] == 50 + assert ext_result["device0"]["firmware"] == 2 + assert "device1" not in ext_result + + +@pytest.mark.asyncio +async def test_get_extended_info_adv_returns_both_devices(): + base_cover_device = create_device_for_command_testing() + base_cover_device._send_command = AsyncMock( + return_value=bytes([0, 50, 20, 0, 10, 30, 0]) + ) + ext_result = await base_cover_device.get_extended_info_adv() + assert ext_result["device0"]["battery"] == 50 + assert ext_result["device0"]["firmware"] == 2 + assert ext_result["device1"]["battery"] == 10 + assert ext_result["device1"]["firmware"] == 3 + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "data_value,result", + [ + (0, "not_charging"), + (1, "charging_by_adapter"), + (2, "charging_by_solar"), + (3, "fully_charged"), + (4, "solar_not_charging"), + (5, "charging_error"), + ], +) +async def test_get_extended_info_adv_returns_device0_charge_states(data_value, result): + base_cover_device = create_device_for_command_testing() + base_cover_device._send_command = AsyncMock( + return_value=bytes([0, 50, 20, data_value, 10, 30, 0]) + ) + ext_result = await base_cover_device.get_extended_info_adv() + assert ext_result["device0"]["stateOfCharge"] == result + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "data_value,result", + [ + (0, "not_charging"), + (1, "charging_by_adapter"), + (2, "charging_by_solar"), + (3, "fully_charged"), + (4, "solar_not_charging"), + (5, "charging_error"), + ], +) +async def test_get_extended_info_adv_returns_device1_charge_states(data_value, result): + base_cover_device = create_device_for_command_testing() + base_cover_device._send_command = AsyncMock( + return_value=bytes([0, 50, 20, 0, 10, 30, data_value]) + ) + ext_result = await base_cover_device.get_extended_info_adv() + assert ext_result["device1"]["stateOfCharge"] == result diff --git a/tests/test_blind_tilt.py b/tests/test_blind_tilt.py new file mode 100644 index 0000000..e86f190 --- /dev/null +++ b/tests/test_blind_tilt.py @@ -0,0 +1,240 @@ +from unittest.mock import AsyncMock + +import pytest +from bleak.backends.device import BLEDevice + +from switchbot import SwitchBotAdvertisement, SwitchbotModel +from switchbot.devices import blind_tilt +from switchbot.devices.base_cover import COVER_EXT_SUM_KEY + +from .test_adv_parser import generate_ble_device + + +def create_device_for_command_testing( + position=50, calibration=True, reverse_mode=False +): + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = blind_tilt.SwitchbotBlindTilt( + ble_device, reverse_mode=reverse_mode + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, position, calibration) + ) + curtain_device._send_multiple_commands = AsyncMock() + curtain_device.update = AsyncMock() + return curtain_device + + +def make_advertisement_data( + ble_device: BLEDevice, in_motion: bool, position: int, calibration: bool = True +): + """Set advertisement data with defaults.""" + + return SwitchBotAdvertisement( + address="aa:bb:cc:dd:ee:ff", + data={ + "rawAdvData": b"c\xc0X\x00\x11\x04", + "data": { + "calibration": calibration, + "battery": 88, + "inMotion": in_motion, + "tilt": position, + "lightLevel": 1, + "deviceChain": 1, + }, + "isEncrypted": False, + "model": "c", + "modelFriendlyName": "Curtain", + "modelName": SwitchbotModel.CURTAIN, + }, + device=ble_device, + rssi=-80, + active=True, + ) + + +@pytest.mark.asyncio +async def test_open(): + blind_device = create_device_for_command_testing() + await blind_device.open() + blind_device._send_multiple_commands.assert_awaited_once_with(blind_tilt.OPEN_KEYS) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "position,keys", [(5, blind_tilt.CLOSE_DOWN_KEYS), (55, blind_tilt.CLOSE_UP_KEYS)] +) +async def test_close(position, keys): + blind_device = create_device_for_command_testing(position=position) + await blind_device.close() + blind_device._send_multiple_commands.assert_awaited_once_with(keys) + + +@pytest.mark.asyncio +async def test_get_basic_info_returns_none_when_no_data(): + blind_device = create_device_for_command_testing() + blind_device._get_basic_info = AsyncMock(return_value=None) + + assert await blind_device.get_basic_info() is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "reverse_mode,data,result", + [ + ( + False, + bytes([0, 1, 10, 2, 255, 255, 50, 4]), + [1, 1, 1, 1, 1, True, False, False, True, 50, 4], + ), + ( + False, + bytes([0, 1, 10, 2, 0, 0, 50, 4]), + [1, 1, 0, 0, 0, False, False, False, False, 50, 4], + ), + ( + False, + bytes([0, 1, 10, 2, 0, 1, 50, 4]), + [1, 1, 0, 0, 1, False, True, False, True, 50, 4], + ), + ( + True, + bytes([0, 1, 10, 2, 255, 255, 50, 4]), + [1, 1, 1, 1, 1, True, False, True, False, 50, 4], + ), + ( + True, + bytes([0, 1, 10, 2, 0, 0, 50, 4]), + [1, 1, 0, 0, 0, False, False, False, False, 50, 4], + ), + ( + True, + bytes([0, 1, 10, 2, 0, 1, 50, 4]), + [1, 1, 0, 0, 1, False, True, False, True, 50, 4], + ), + ], +) +async def test_get_basic_info(reverse_mode, data, result): + blind_device = create_device_for_command_testing(reverse_mode=reverse_mode) + blind_device._get_basic_info = AsyncMock(return_value=data) + + info = await blind_device.get_basic_info() + assert info["battery"] == result[0] + assert info["firmware"] == result[1] + assert info["light"] == result[2] + assert info["fault"] == result[2] + assert info["solarPanel"] == result[3] + assert info["calibration"] == result[3] + assert info["calibrated"] == result[3] + assert info["inMotion"] == result[4] + assert info["motionDirection"]["opening"] == result[5] + assert info["motionDirection"]["closing"] == result[6] + assert info["motionDirection"]["up"] == result[7] + assert info["motionDirection"]["down"] == result[8] + assert info["tilt"] == result[9] + assert info["timers"] == result[10] + + +@pytest.mark.asyncio +async def test_get_extended_info_summary_sends_command(): + blind_device = create_device_for_command_testing() + blind_device._send_command = AsyncMock() + await blind_device.get_extended_info_summary() + blind_device._send_command.assert_awaited_once_with(key=COVER_EXT_SUM_KEY) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("data_value", [(None), (b"\x07"), (b"\x00")]) +async def test_get_extended_info_summary_returns_none_when_bad_data(data_value): + blind_device = create_device_for_command_testing() + blind_device._send_command = AsyncMock(return_value=data_value) + assert await blind_device.get_extended_info_summary() is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "data,result", [(bytes([0, 0]), False), (bytes([0, 255]), True)] +) +async def test_get_extended_info_summary(data, result): + blind_device = create_device_for_command_testing() + blind_device._send_command = AsyncMock(return_value=data) + ext_result = await blind_device.get_extended_info_summary() + assert ext_result["device0"]["light"] == result + + +@pytest.mark.parametrize("reverse_mode", [(True), (False)]) +def test_device_passive_opening(reverse_mode): + """Test passive opening advertisement.""" + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = blind_tilt.SwitchbotBlindTilt( + ble_device, reverse_mode=reverse_mode + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 0) + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 10) + ) + + assert curtain_device.is_opening() is True + assert curtain_device.is_closing() is False + + +@pytest.mark.parametrize("reverse_mode", [(True), (False)]) +def test_device_passive_closing(reverse_mode): + """Test passive closing advertisement.""" + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = blind_tilt.SwitchbotBlindTilt( + ble_device, reverse_mode=reverse_mode + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 100) + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 90) + ) + + assert curtain_device.is_opening() is False + assert curtain_device.is_closing() is True + + +@pytest.mark.parametrize("reverse_mode", [(True), (False)]) +def test_device_passive_opening_then_stop(reverse_mode): + """Test passive stopped after opening advertisement.""" + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = blind_tilt.SwitchbotBlindTilt( + ble_device, reverse_mode=reverse_mode + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 0) + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 10) + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, False, 10) + ) + + assert curtain_device.is_opening() is False + assert curtain_device.is_closing() is False + + +@pytest.mark.parametrize("reverse_mode", [(True), (False)]) +def test_device_passive_closing_then_stop(reverse_mode): + """Test passive stopped after closing advertisement.""" + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = blind_tilt.SwitchbotBlindTilt( + ble_device, reverse_mode=reverse_mode + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 100) + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 90) + ) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, False, 90) + ) + + assert curtain_device.is_opening() is False + assert curtain_device.is_closing() is False diff --git a/tests/test_curtain.py b/tests/test_curtain.py index d1b9961..0b0e82b 100644 --- a/tests/test_curtain.py +++ b/tests/test_curtain.py @@ -1,16 +1,29 @@ -from typing import Any -from unittest.mock import Mock +from unittest.mock import AsyncMock, Mock import pytest from bleak.backends.device import BLEDevice from switchbot import SwitchBotAdvertisement, SwitchbotModel from switchbot.devices import curtain +from switchbot.devices.base_cover import COVER_EXT_SUM_KEY from .test_adv_parser import generate_ble_device -def set_advertisement_data(ble_device: BLEDevice, in_motion: bool, position: int): +def create_device_for_command_testing(calibration=True, reverse_mode=False): + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 50, calibration) + ) + curtain_device._send_multiple_commands = AsyncMock() + curtain_device.update = AsyncMock() + return curtain_device + + +def make_advertisement_data( + ble_device: BLEDevice, in_motion: bool, position: int, calibration: bool = True +): """Set advertisement data with defaults.""" return SwitchBotAdvertisement( @@ -18,7 +31,7 @@ def set_advertisement_data(ble_device: BLEDevice, in_motion: bool, position: int data={ "rawAdvData": b"c\xc0X\x00\x11\x04", "data": { - "calibration": True, + "calibration": calibration, "battery": 88, "inMotion": in_motion, "position": position, @@ -42,7 +55,7 @@ def test_device_passive_not_in_motion(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, False, 0) + make_advertisement_data(ble_device, False, 0) ) assert curtain_device.is_opening() is False @@ -55,10 +68,10 @@ def test_device_passive_opening(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 0) + make_advertisement_data(ble_device, True, 0) ) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 10) + make_advertisement_data(ble_device, True, 10) ) assert curtain_device.is_opening() is True @@ -71,10 +84,10 @@ def test_device_passive_closing(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 100) + make_advertisement_data(ble_device, True, 100) ) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 90) + make_advertisement_data(ble_device, True, 90) ) assert curtain_device.is_opening() is False @@ -87,13 +100,13 @@ def test_device_passive_opening_then_stop(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 0) + make_advertisement_data(ble_device, True, 0) ) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 10) + make_advertisement_data(ble_device, True, 10) ) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, False, 10) + make_advertisement_data(ble_device, False, 10) ) assert curtain_device.is_opening() is False @@ -106,13 +119,13 @@ def test_device_passive_closing_then_stop(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 100) + make_advertisement_data(ble_device, True, 100) ) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 90) + make_advertisement_data(ble_device, True, 90) ) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, False, 90) + make_advertisement_data(ble_device, False, 90) ) assert curtain_device.is_opening() is False @@ -126,7 +139,7 @@ async def test_device_active_not_in_motion(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, False, 0) + make_advertisement_data(ble_device, False, 0) ) basic_info = bytes([0, 0, 0, 0, 0, 0, 100, 0]) @@ -149,7 +162,7 @@ async def test_device_active_opening(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 0) + make_advertisement_data(ble_device, True, 0) ) basic_info = bytes([0, 0, 0, 0, 0, 67, 10, 0]) @@ -172,7 +185,7 @@ async def test_device_active_closing(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 100) + make_advertisement_data(ble_device, True, 100) ) basic_info = bytes([0, 0, 0, 0, 0, 67, 90, 0]) @@ -195,7 +208,7 @@ async def test_device_active_opening_then_stop(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 0) + make_advertisement_data(ble_device, True, 0) ) basic_info = bytes([0, 0, 0, 0, 0, 67, 10, 0]) @@ -222,7 +235,7 @@ async def test_device_active_closing_then_stop(reverse_mode): ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") curtain_device = curtain.SwitchbotCurtain(ble_device, reverse_mode=reverse_mode) curtain_device.update_from_advertisement( - set_advertisement_data(ble_device, True, 100) + make_advertisement_data(ble_device, True, 100) ) basic_info = bytes([0, 0, 0, 0, 0, 67, 90, 0]) @@ -240,3 +253,174 @@ async def custom_implementation(): assert curtain_device.is_opening() is False assert curtain_device.is_closing() is False + + +@pytest.mark.asyncio +async def test_get_basic_info_returns_none_when_no_data(): + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = curtain.SwitchbotCurtain(ble_device) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 0) + ) + curtain_device._get_basic_info = AsyncMock(return_value=None) + + assert await curtain_device.get_basic_info() is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "data,result", + [ + ( + bytes([0, 1, 10, 2, 255, 255, 50, 4]), + [1, 1, 2, "right_to_left", 1, 1, 50, 4], + ), + (bytes([0, 1, 10, 2, 0, 0, 50, 4]), [1, 1, 2, "left_to_right", 0, 0, 50, 4]), + ], +) +async def test_get_basic_info(data, result): + ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any") + curtain_device = curtain.SwitchbotCurtain(ble_device) + curtain_device.update_from_advertisement( + make_advertisement_data(ble_device, True, 0) + ) + + async def custom_implementation(): + return data + + curtain_device._get_basic_info = Mock(side_effect=custom_implementation) + + info = await curtain_device.get_basic_info() + assert info["battery"] == result[0] + assert info["firmware"] == result[1] + assert info["chainLength"] == result[2] + assert info["openDirection"] == result[3] + assert info["touchToOpen"] == result[4] + assert info["light"] == result[4] + assert info["fault"] == result[4] + assert info["solarPanel"] == result[5] + assert info["calibration"] == result[5] + assert info["calibrated"] == result[5] + assert info["inMotion"] == result[5] + assert info["position"] == result[6] + assert info["timers"] == result[7] + + +@pytest.mark.asyncio +async def test_open(): + curtain_device = create_device_for_command_testing() + await curtain_device.open() + assert curtain_device.is_opening() is True + assert curtain_device.is_closing() is False + curtain_device._send_multiple_commands.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_close(): + curtain_device = create_device_for_command_testing() + await curtain_device.close() + assert curtain_device.is_opening() is False + assert curtain_device.is_closing() is True + curtain_device._send_multiple_commands.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_stop(): + curtain_device = create_device_for_command_testing() + await curtain_device.stop() + assert curtain_device.is_opening() is False + assert curtain_device.is_closing() is False + curtain_device._send_multiple_commands.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_set_position_opening(): + curtain_device = create_device_for_command_testing() + await curtain_device.set_position(100) + assert curtain_device.is_opening() is True + assert curtain_device.is_closing() is False + curtain_device._send_multiple_commands.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_set_position_closing(): + curtain_device = create_device_for_command_testing() + await curtain_device.set_position(0) + assert curtain_device.is_opening() is False + assert curtain_device.is_closing() is True + curtain_device._send_multiple_commands.assert_awaited_once() + + +def test_get_position(): + curtain_device = create_device_for_command_testing() + assert curtain_device.get_position() == 50 + + +@pytest.mark.asyncio +async def test_get_extended_info_summary_sends_command(): + curtain_device = create_device_for_command_testing() + curtain_device._send_command = AsyncMock() + await curtain_device.get_extended_info_summary() + curtain_device._send_command.assert_awaited_once_with(key=COVER_EXT_SUM_KEY) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("data_value", [(None), (b"\x07"), (b"\x00")]) +async def test_get_extended_info_summary_returns_none_when_bad_data(data_value): + curtain_device = create_device_for_command_testing() + curtain_device._send_command = AsyncMock(return_value=data_value) + assert await curtain_device.get_extended_info_summary() is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "data,result", + [ + ([0, 0, 0], [True, False, False, "right_to_left"]), + ([255, 255, 0], [False, True, True, "left_to_right"]), + ], +) +async def test_get_extended_info_summary_returns_device0(data, result): + curtain_device = create_device_for_command_testing() + curtain_device._send_command = AsyncMock(return_value=bytes(data)) + ext_result = await curtain_device.get_extended_info_summary() + assert ext_result["device0"]["openDirectionDefault"] == result[0] + assert ext_result["device0"]["touchToOpen"] == result[1] + assert ext_result["device0"]["light"] == result[2] + assert ext_result["device0"]["openDirection"] == result[3] + assert "device1" not in ext_result + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "data,result", + [ + ([0, 0, 1], [True, False, False, "right_to_left"]), + ([255, 255, 255], [False, True, True, "left_to_right"]), + ], +) +async def test_get_extended_info_summary_returns_device1(data, result): + curtain_device = create_device_for_command_testing() + curtain_device._send_command = AsyncMock(return_value=bytes(data)) + ext_result = await curtain_device.get_extended_info_summary() + assert ext_result["device1"]["openDirectionDefault"] == result[0] + assert ext_result["device1"]["touchToOpen"] == result[1] + assert ext_result["device1"]["light"] == result[2] + assert ext_result["device1"]["openDirection"] == result[3] + + +def test_get_light_level(): + curtain_device = create_device_for_command_testing() + assert curtain_device.get_light_level() == 1 + + +@pytest.mark.parametrize("reverse_mode", [(True), (False)]) +def test_is_reversed(reverse_mode): + curtain_device = create_device_for_command_testing(reverse_mode=reverse_mode) + assert curtain_device.is_reversed() == reverse_mode + + +@pytest.mark.parametrize("calibration", [(True), (False)]) +def test_is_calibrated(calibration): + curtain_device = create_device_for_command_testing(calibration=calibration) + assert curtain_device.is_calibrated() == calibration