diff --git a/roborock/containers.py b/roborock/containers.py index 6433723d..ff6cb126 100644 --- a/roborock/containers.py +++ b/roborock/containers.py @@ -10,6 +10,15 @@ from functools import cached_property from typing import Any, NamedTuple, get_args, get_origin +from .clean_modes import ( + CleanRoutes, + VacuumModes, + VacuumModesOld, + WaterModes, + get_clean_modes, + get_clean_routes, + get_water_modes, +) from .code_mappings import ( SHORT_MODEL_TO_ENUM, RoborockCategory, @@ -19,7 +28,6 @@ RoborockDockTypeCode, RoborockDockWashTowelModeCode, RoborockErrorCode, - RoborockFanPowerCode, RoborockFanSpeedP10, RoborockFanSpeedQ7Max, RoborockFanSpeedQRevoCurv, @@ -34,7 +42,6 @@ RoborockFinishReason, RoborockInCleaning, RoborockModeEnum, - RoborockMopIntensityCode, RoborockMopIntensityP10, RoborockMopIntensityQ7Max, RoborockMopIntensityQRevoCurv, @@ -46,7 +53,6 @@ RoborockMopIntensityS8MaxVUltra, RoborockMopIntensitySaros10, RoborockMopIntensitySaros10R, - RoborockMopModeCode, RoborockMopModeQRevoCurv, RoborockMopModeQRevoMaster, RoborockMopModeQRevoMaxV, @@ -92,7 +98,6 @@ ROBOROCK_G20S_Ultra, ) from .device_features import DeviceFeatures -from .exceptions import RoborockException _LOGGER = logging.getLogger(__name__) @@ -357,12 +362,12 @@ class Status(RoborockBase): back_type: int | None = None wash_phase: int | None = None wash_ready: int | None = None - fan_power: RoborockFanPowerCode | None = None + fan_power: int | None = None dnd_enabled: int | None = None map_status: int | None = None is_locating: int | None = None lock_status: int | None = None - water_box_mode: RoborockMopIntensityCode | None = None + water_box_mode: int | None = None water_box_carriage_status: int | None = None mop_forbidden_enable: int | None = None camera_status: int | None = None @@ -375,7 +380,7 @@ class Status(RoborockBase): dust_collection_status: int | None = None auto_dust_collection: int | None = None avoid_count: int | None = None - mop_mode: RoborockMopModeCode | None = None + mop_mode: int | None = None debug_mode: int | None = None collision_avoid_status: int | None = None switch_map_mode: int | None = None @@ -406,28 +411,6 @@ def __post_init__(self) -> None: self.error_code_name = self.error_code.name if self.state is not None: self.state_name = self.state.name - if self.water_box_mode is not None: - self.water_box_mode_name = self.water_box_mode.name - if self.fan_power is not None: - self.fan_power_options = self.fan_power.keys() - self.fan_power_name = self.fan_power.name - if self.mop_mode is not None: - self.mop_mode_name = self.mop_mode.name - - def get_fan_speed_code(self, fan_speed: str) -> int: - if self.fan_power is None: - raise RoborockException("Attempted to get fan speed before status has been updated.") - return self.fan_power.as_dict().get(fan_speed) - - def get_mop_intensity_code(self, mop_intensity: str) -> int: - if self.water_box_mode is None: - raise RoborockException("Attempted to get mop_intensity before status has been updated.") - return self.water_box_mode.as_dict().get(mop_intensity) - - def get_mop_mode_code(self, mop_mode: str) -> int: - if self.mop_mode is None: - raise RoborockException("Attempted to get mop_mode before status has been updated.") - return self.mop_mode.as_dict().get(mop_mode) @property def current_map(self) -> int | None: @@ -574,6 +557,34 @@ class Saros10Status(Status): } +def get_custom_status(features: DeviceFeatures, region: str) -> "DeviceStatus": + _available_fan_speeds = get_clean_modes(features) + _available_fan_speed_mapping = {fan.code: fan.name for fan in _available_fan_speeds} + _available_water_modes = get_water_modes(features) + _available_water_modes_mapping = {mop.code: mop.name for mop in _available_water_modes} + _available_mop_routes = get_clean_routes(features, region) + _available_mop_routes_mapping = {route.code: route.name for route in _available_mop_routes} + + class DeviceStatus(Status): + available_fan_speeds: list[VacuumModes] | list[VacuumModesOld] = _available_fan_speeds + available_water_modes: list[WaterModes] = _available_water_modes + available_mop_routes: list[CleanRoutes] = _available_mop_routes + + @property + def fan_speed(self) -> VacuumModes | None: + return _available_fan_speed_mapping.get(self.fan_power) + + @property + def water_mode(self) -> WaterModes | None: + return _available_water_modes_mapping.get(self.water_box_mode) + + @property + def mop_route(self) -> CleanRoutes | None: + return _available_mop_routes_mapping.get(self.mop_mode) + + return DeviceStatus + + @dataclass class DnDTimer(RoborockBaseTimer): """DnDTimer""" @@ -762,6 +773,7 @@ class DeviceData(RoborockBase): host: str | None = None product_nickname: RoborockProductNickname | None = None device_features: DeviceFeatures | None = None + region: str | None = None def __post_init__(self): self.product_nickname = SHORT_MODEL_TO_ENUM.get(self.model.split(".")[-1], RoborockProductNickname.PEARLPLUS) diff --git a/roborock/devices/traits/v1/status.py b/roborock/devices/traits/v1/status.py index c52fa0c2..90a92a8a 100644 --- a/roborock/devices/traits/v1/status.py +++ b/roborock/devices/traits/v1/status.py @@ -1,6 +1,6 @@ from typing import Self -from roborock.containers import HomeDataProduct, ModelStatus, S7MaxVStatus, Status +from roborock.containers import HomeDataProduct, Status, get_custom_status from roborock.devices.traits.v1 import common from roborock.roborock_typing import RoborockCommand @@ -13,12 +13,12 @@ class StatusTrait(Status, common.V1TraitMixin): def __init__(self, product_info: HomeDataProduct) -> None: """Initialize the StatusTrait.""" self._product_info = product_info + self._status_type = get_custom_status(self.device_info.device_features, self.device_info.region) def _parse_response(self, response: common.V1ResponseData) -> Self: """Parse the response from the device into a CleanSummary.""" - status_type: type[Status] = ModelStatus.get(self._product_info.model, S7MaxVStatus) if isinstance(response, list): response = response[0] if isinstance(response, dict): - return status_type.from_dict(response) + return self._status_type.from_dict(response) raise ValueError(f"Unexpected status format: {response!r}") diff --git a/roborock/version_1_apis/roborock_client_v1.py b/roborock/version_1_apis/roborock_client_v1.py index 2a4d7b84..35a78c2c 100644 --- a/roborock/version_1_apis/roborock_client_v1.py +++ b/roborock/version_1_apis/roborock_client_v1.py @@ -4,10 +4,12 @@ import time from abc import ABC, abstractmethod from collections.abc import Callable, Coroutine +from functools import cached_property from typing import Any, TypeVar, final from roborock import ( AppInitStatus, + DeviceFeatures, DeviceProp, DockSummary, RoborockCommand, @@ -33,17 +35,16 @@ DnDTimer, DustCollectionMode, FlowLedStatus, - ModelStatus, MultiMapsList, NetworkInfo, RoborockBase, RoomMapping, - S7MaxVStatus, ServerTimer, SmartWashParams, Status, ValleyElectricityTimer, WashTowelMode, + get_custom_status, ) from roborock.protocols.v1_protocol import MapResponse, SecurityData, create_map_response_decoder from roborock.roborock_message import ( @@ -159,7 +160,7 @@ def __init__(self, device_info: DeviceData, security_data: SecurityData | None) self._diagnostic_data.update({"misc_info": security_data.to_diagnostic_data()}) self._map_response_decoder = create_map_response_decoder(security_data) - self._status_type: type[Status] = ModelStatus.get(device_info.model, S7MaxVStatus) + self._status_type: Status | None = None self.cache: dict[CacheableAttribute, AttributeCache] = { cacheable_attribute: AttributeCache(attr, self._send_command) for cacheable_attribute, attr in get_cache_map().items() @@ -172,15 +173,17 @@ async def async_release(self) -> None: await super().async_release() [item.stop() for item in self.cache.values()] - @property + @cached_property def status_type(self) -> type[Status]: """Gets the status type for this device""" - return self._status_type + if self.device_info.device_features is None or self.device_info.region is None: + raise RoborockException("Device features and region are required to get status type") + return get_custom_status(self.device_info.device_features, self.device_info.region) async def get_status(self) -> Status: - data = self._status_type.from_dict(await self.cache[CacheableAttribute.status].async_value(force=True)) + data = self.status_type.from_dict(await self.cache[CacheableAttribute.status].async_value(force=True)) if data is None: - return self._status_type() + return self.status_type() return data async def get_dnd_timer(self) -> DnDTimer | None: @@ -345,7 +348,14 @@ async def load_multi_map(self, map_flag: int) -> None: async def get_app_init_status(self) -> AppInitStatus: """Gets the app init status (needed for determining vacuum capabilities).""" - return await self.send_command(RoborockCommand.APP_GET_INIT_STATUS, return_type=AppInitStatus) + init_status = await self.send_command(RoborockCommand.APP_GET_INIT_STATUS, return_type=AppInitStatus) + self.device_info.device_features = DeviceFeatures.from_feature_flags( + init_status.new_feature_info, + init_status.new_feature_info_str, + init_status.feature_info, + self.device_info.product_nickname, + ) + return init_status @abstractmethod async def _send_command( diff --git a/tests/conftest.py b/tests/conftest.py index a38d429f..df23df87 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,7 @@ import pytest from aioresponses import aioresponses -from roborock import HomeData, UserData +from roborock import SHORT_MODEL_TO_ENUM, DeviceFeatures, HomeData, UserData from roborock.containers import DeviceData from roborock.roborock_message import RoborockMessage from roborock.version_1_apis.roborock_local_client_v1 import RoborockLocalClientV1 @@ -389,3 +389,27 @@ async def _subscribe(self, callback: Callable[[RoborockMessage], None]) -> Calla """Simulate subscribing to messages.""" self.subscribers.append(callback) return lambda: self.subscribers.remove(callback) + + +@pytest.fixture(name="s7_device_features") +def s7_device_features_fixture() -> DeviceFeatures: + model = "roborock.vacuum.a15" + product_nickname = SHORT_MODEL_TO_ENUM.get(model.split(".")[-1]) + return DeviceFeatures.from_feature_flags( + new_feature_info=636084721975295, + new_feature_info_str="0000000000002000", + feature_info=[111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 122, 123, 124, 125], + product_nickname=product_nickname, + ) + + +@pytest.fixture(name="qrevo_maxv_device_features") +def qrevo_maxv_device_features_fixture() -> DeviceFeatures: + model = "roborock.vacuum.a87" + product_nickname = SHORT_MODEL_TO_ENUM.get(model.split(".")[-1]) + return DeviceFeatures.from_feature_flags( + new_feature_info=4499197267967999, + new_feature_info_str="508A977F7EFEFFFF", + feature_info=[111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125], + product_nickname=product_nickname, + ) diff --git a/tests/devices/test_v1_device.py b/tests/devices/test_v1_device.py index e164f8bc..d4c31298 100644 --- a/tests/devices/test_v1_device.py +++ b/tests/devices/test_v1_device.py @@ -7,7 +7,7 @@ import pytest from syrupy import SnapshotAssertion -from roborock.containers import HomeData, S7MaxVStatus, UserData +from roborock.containers import HomeData, UserData from roborock.devices.device import RoborockDevice from roborock.devices.traits import v1 from roborock.devices.traits.v1.common import V1TraitMixin @@ -18,7 +18,6 @@ USER_DATA = UserData.from_dict(mock_data.USER_DATA) HOME_DATA = HomeData.from_dict(mock_data.HOME_DATA_RAW) -STATUS = S7MaxVStatus.from_dict(mock_data.STATUS) TESTDATA = pathlib.Path("tests/protocols/testdata/v1_protocol/") diff --git a/tests/test_containers.py b/tests/test_containers.py index c2c8b5dd..332d922a 100644 --- a/tests/test_containers.py +++ b/tests/test_containers.py @@ -5,7 +5,7 @@ from syrupy import SnapshotAssertion -from roborock import CleanRecord, CleanSummary, Consumable, DnDTimer, HomeData, S7MaxVStatus, UserData +from roborock import CleanRecord, CleanSummary, Consumable, DnDTimer, HomeData, UserData from roborock.b01_containers import ( B01Fault, B01Props, @@ -17,12 +17,9 @@ RoborockDockErrorCode, RoborockDockTypeCode, RoborockErrorCode, - RoborockFanSpeedS7MaxV, - RoborockMopIntensityS7, - RoborockMopModeS7, RoborockStateCode, ) -from roborock.containers import MultiMapsList, RoborockBase +from roborock.containers import MultiMapsList, RoborockBase, get_custom_status from .mock_data import ( CLEAN_RECORD, @@ -221,8 +218,12 @@ def test_consumable(): assert c.cleaning_brush_work_times == 66 -def test_status(): - s = S7MaxVStatus.from_dict(STATUS) +def test_status(s7_device_features): + status = get_custom_status( + s7_device_features, + "US", + ) + s = status.from_dict(STATUS) assert s.msg_ver == 2 assert s.msg_seq == 458 assert s.state == RoborockStateCode.charging @@ -267,14 +268,19 @@ def test_status(): assert s.charge_status == 1 assert s.unsave_map_reason == 0 assert s.unsave_map_flag == 0 - assert s.fan_power == RoborockFanSpeedS7MaxV.balanced - assert s.mop_mode == RoborockMopModeS7.standard - assert s.water_box_mode == RoborockMopIntensityS7.intense + assert s.fan_power == 102 + assert s.fan_speed == "BALANCED" + assert s.mop_route == "STANDARD" + assert s.water_mode == "INTENSE" -def test_current_map() -> None: +def test_current_map(s7_device_features) -> None: """Test the current map logic based on map status.""" - s = S7MaxVStatus.from_dict(STATUS) + status = get_custom_status( + s7_device_features, + "US", + ) + s = status.from_dict(STATUS) assert s.map_status == 3 assert s.current_map == 0 @@ -327,10 +333,15 @@ def test_clean_record(): assert cr.map_flag == 0 -def test_no_value(): +def test_no_value(s7_device_features): modified_status = STATUS.copy() modified_status["dock_type"] = 9999 - s = S7MaxVStatus.from_dict(modified_status) + status = get_custom_status( + s7_device_features, + "US", + ) + + s = status.from_dict(modified_status) assert s.dock_type == RoborockDockTypeCode.unknown assert -9999 not in RoborockDockTypeCode.keys() assert "missing" not in RoborockDockTypeCode.values() @@ -483,11 +494,15 @@ def test_multi_maps_list_info(snapshot: SnapshotAssertion) -> None: assert deserialized == snapshot -def test_accurate_map_flag() -> None: +def test_accurate_map_flag(s7_device_features) -> None: """Test that we parse the map flag accurately.""" - s = S7MaxVStatus.from_dict(STATUS) + status = get_custom_status( + s7_device_features, + "US", + ) + s = status.from_dict(STATUS) assert s.current_map == 0 - s = S7MaxVStatus.from_dict( + s = status.from_dict( { **STATUS, "map_status": 252, # Code for no map diff --git a/tests/test_supported_features.py b/tests/test_supported_features.py index 91eb5568..cd04a861 100644 --- a/tests/test_supported_features.py +++ b/tests/test_supported_features.py @@ -1,17 +1,6 @@ -from roborock import SHORT_MODEL_TO_ENUM -from roborock.device_features import DeviceFeatures - - -def test_supported_features_qrevo_maxv(): +def test_supported_features_qrevo_maxv(qrevo_maxv_device_features): """Ensure that a QREVO MaxV has some more complicated features enabled.""" - model = "roborock.vacuum.a87" - product_nickname = SHORT_MODEL_TO_ENUM.get(model.split(".")[-1]) - device_features = DeviceFeatures.from_feature_flags( - new_feature_info=4499197267967999, - new_feature_info_str="508A977F7EFEFFFF", - feature_info=[111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125], - product_nickname=product_nickname, - ) + device_features = qrevo_maxv_device_features assert device_features print("\n".join(device_features.get_supported_features())) @@ -21,20 +10,11 @@ def test_supported_features_qrevo_maxv(): assert device_features.is_dust_collection_setting_supported assert device_features.is_led_status_switch_supported assert not device_features.is_matter_supported - print(device_features) -def test_supported_features_s7(): +def test_supported_features_s7(s7_device_features): """Ensure that a S7 has some more basic features enabled.""" - - model = "roborock.vacuum.a15" - product_nickname = SHORT_MODEL_TO_ENUM.get(model.split(".")[-1]) - device_features = DeviceFeatures.from_feature_flags( - new_feature_info=636084721975295, - new_feature_info_str="0000000000002000", - feature_info=[111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 122, 123, 124, 125], - product_nickname=product_nickname, - ) + device_features = s7_device_features num_true = sum(vars(device_features).values()) assert num_true != 0 assert device_features