From 1451c615d2ed3b896f5a3ccf5f85e70a4b6df17d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 16 Dec 2022 16:26:42 -1000 Subject: [PATCH] Improve handling of passive only data when active scans are not available (#153) --- switchbot/adv_parser.py | 83 ++++++++++++++++++++++++---- switchbot/adv_parsers/bot.py | 12 +++- switchbot/adv_parsers/bulb.py | 4 +- switchbot/adv_parsers/contact.py | 13 +++-- switchbot/adv_parsers/curtain.py | 4 +- switchbot/adv_parsers/humidifier.py | 6 +- switchbot/adv_parsers/light_strip.py | 4 +- switchbot/adv_parsers/lock.py | 4 +- switchbot/adv_parsers/meter.py | 4 +- switchbot/adv_parsers/motion.py | 6 +- switchbot/adv_parsers/plug.py | 4 +- tests/test_adv_parser.py | 62 +++++++++++++++++++++ 12 files changed, 179 insertions(+), 27 deletions(-) diff --git a/switchbot/adv_parser.py b/switchbot/adv_parser.py index 61a5d6a..dcbaf3d 100644 --- a/switchbot/adv_parser.py +++ b/switchbot/adv_parser.py @@ -29,6 +29,7 @@ "0000fd3d-0000-1000-8000-00805f9b34fb", "00000d00-0000-1000-8000-00805f9b34fb", ) +MFR_DATA_ORDER = (2409, 89) class SwitchbotSupportedType(TypedDict): @@ -44,11 +45,15 @@ class SwitchbotSupportedType(TypedDict): "modelName": SwitchbotModel.CONTACT_SENSOR, "modelFriendlyName": "Contact Sensor", "func": process_wocontact, + "manufacturer_id": 2409, + "manufacturer_data_length": 13, }, "H": { "modelName": SwitchbotModel.BOT, "modelFriendlyName": "Bot", "func": process_wohand, + "service_uuids": {"cba20d00-224d-11e6-9fb8-0002a5d5c51b"}, + "manufacturer_id": 89, }, "s": { "modelName": SwitchbotModel.MOTION_SENSOR, @@ -64,6 +69,12 @@ class SwitchbotSupportedType(TypedDict): "modelName": SwitchbotModel.CURTAIN, "modelFriendlyName": "Curtain", "func": process_wocurtain, + "service_uuids": { + "00001800-0000-1000-8000-00805f9b34fb", + "00001801-0000-1000-8000-00805f9b34fb", + "cba20d00-224d-11e6-9fb8-0002a5d5c51b", + }, + "manufacturer_id": 89, }, "T": { "modelName": SwitchbotModel.METER, @@ -107,50 +118,98 @@ class SwitchbotSupportedType(TypedDict): }, } +_SWITCHBOT_MODEL_TO_CHAR = { + model_data["modelName"]: model_chr + for model_chr, model_data in SUPPORTED_TYPES.items() +} + +MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = { + mfr_id: [] for mfr_id in MFR_DATA_ORDER +} +for model_chr, model in SUPPORTED_TYPES.items(): + if "manufacturer_id" in model: + mfr_id = model["manufacturer_id"] + MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model)) + def parse_advertisement_data( - device: BLEDevice, advertisement_data: AdvertisementData + device: BLEDevice, + advertisement_data: AdvertisementData, + model: SwitchbotModel | None = None, ) -> SwitchBotAdvertisement | None: """Parse advertisement data.""" - _mgr_datas = list(advertisement_data.manufacturer_data.values()) service_data = advertisement_data.service_data - if not service_data: - return None - _service_data = None for uuid in SERVICE_DATA_ORDER: if uuid in service_data: _service_data = service_data[uuid] break - if not _service_data: - return None + _mfr_data = None + _mfr_id = None + for mfr_id in MFR_DATA_ORDER: + if mfr_id in advertisement_data.manufacturer_data: + _mfr_id = mfr_id + _mfr_data = advertisement_data.manufacturer_data[mfr_id] + break - _mfr_data = _mgr_datas[0] if _mgr_datas else None + if _mfr_data is None and _service_data is None: + return None try: - data = _parse_data(_service_data, _mfr_data) + data = _parse_data( + "".join(sorted(advertisement_data.service_uuids)), + _service_data, + _mfr_data, + _mfr_id, + model, + ) except Exception as err: # pylint: disable=broad-except _LOGGER.exception( "Failed to parse advertisement data: %s: %s", advertisement_data, err ) return None + if not data: + return None + return SwitchBotAdvertisement(device.address, data, device, advertisement_data.rssi) @lru_cache(maxsize=128) def _parse_data( - _service_data: bytes, _mfr_data: bytes | None + _service_uuids_str: str, + _service_data: bytes | None, + _mfr_data: bytes | None, + _mfr_id: int | None = None, + _switchbot_model: SwitchbotModel | None = None, ) -> SwitchBotAdvertisement | None: """Parse advertisement data.""" - _model = chr(_service_data[0] & 0b01111111) + _model = chr(_service_data[0] & 0b01111111) if _service_data else None + + if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA: + _service_uuids = set(_service_uuids_str.split(",")) + for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]: + if model_data.get("manufacturer_data_length") == len(_mfr_data): + _model = model_chr + break + if model_data.get("service_uuids", set()).intersection(_service_uuids): + _model = model_chr + break + + if not _model and _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR: + _model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model] + + if not _model: + return None + + _isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False data = { "rawAdvData": _service_data, "data": {}, "model": _model, - "isEncrypted": bool(_service_data[0] & 0b10000000), + "isEncrypted": _isEncrypted, } type_data = SUPPORTED_TYPES.get(_model) diff --git a/switchbot/adv_parsers/bot.py b/switchbot/adv_parsers/bot.py index 88a2532..147f04c 100644 --- a/switchbot/adv_parsers/bot.py +++ b/switchbot/adv_parsers/bot.py @@ -2,8 +2,18 @@ from __future__ import annotations -def process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_wohand(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]: """Process woHand/Bot services data.""" + if data is None and mfr_data is None: + return {} + + if data is None: + return { + "switchMode": None, + "isOn": None, + "battery": None, + } + _switch_mode = bool(data[1] & 0b10000000) return { diff --git a/switchbot/adv_parsers/bulb.py b/switchbot/adv_parsers/bulb.py index 1803c0d..e31dc17 100644 --- a/switchbot/adv_parsers/bulb.py +++ b/switchbot/adv_parsers/bulb.py @@ -2,7 +2,9 @@ from __future__ import annotations -def process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_color_bulb( + data: bytes | None, mfr_data: bytes | None +) -> dict[str, bool | int]: """Process WoBulb services data.""" if mfr_data is None: return {} diff --git a/switchbot/adv_parsers/contact.py b/switchbot/adv_parsers/contact.py index 2d4bc87..b5f2cfe 100644 --- a/switchbot/adv_parsers/contact.py +++ b/switchbot/adv_parsers/contact.py @@ -2,11 +2,16 @@ from __future__ import annotations -def process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_wocontact( + data: bytes | None, mfr_data: bytes | None +) -> dict[str, bool | int]: """Process woContact Sensor services data.""" - battery = data[2] & 0b01111111 - tested = bool(data[1] & 0b10000000) - contact_timeout = data[3] & 0b00000100 == 0b00000100 + if data is None and mfr_data is None: + return {} + + battery = data[2] & 0b01111111 if data else None + tested = bool(data[1] & 0b10000000) if data else None + contact_timeout = data[3] & 0b00000100 == 0b00000100 if data else False if mfr_data and len(mfr_data) >= 13: motion_detected = bool(mfr_data[7] & 0b10000000) diff --git a/switchbot/adv_parsers/curtain.py b/switchbot/adv_parsers/curtain.py index 877b150..88f82c6 100644 --- a/switchbot/adv_parsers/curtain.py +++ b/switchbot/adv_parsers/curtain.py @@ -3,9 +3,11 @@ def process_wocurtain( - data: bytes, mfr_data: bytes | None, reverse: bool = True + data: bytes | None, mfr_data: bytes | None, reverse: bool = True ) -> dict[str, bool | int]: """Process woCurtain/Curtain services data.""" + if data is None: + return {} _position = max(min(data[3] & 0b01111111, 100), 0) diff --git a/switchbot/adv_parsers/humidifier.py b/switchbot/adv_parsers/humidifier.py index ae54f99..7030383 100644 --- a/switchbot/adv_parsers/humidifier.py +++ b/switchbot/adv_parsers/humidifier.py @@ -13,9 +13,11 @@ # Low: 658000c5222b6300 # Med: 658000c5432b6300 # High: 658000c5642b6300 -def process_wohumidifier(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_wohumidifier( + data: bytes | None, mfr_data: bytes | None +) -> dict[str, bool | int]: """Process WoHumi services data.""" - if mfr_data is None: + if data is None: return {} _LOGGER.debug("mfr_data: %s", mfr_data.hex()) _LOGGER.debug("data: %s", data.hex()) diff --git a/switchbot/adv_parsers/light_strip.py b/switchbot/adv_parsers/light_strip.py index d9ef324..8953e2e 100644 --- a/switchbot/adv_parsers/light_strip.py +++ b/switchbot/adv_parsers/light_strip.py @@ -2,7 +2,9 @@ from __future__ import annotations -def process_wostrip(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_wostrip( + data: bytes | None, mfr_data: bytes | None +) -> dict[str, bool | int]: """Process WoStrip services data.""" if mfr_data is None: return {} diff --git a/switchbot/adv_parsers/lock.py b/switchbot/adv_parsers/lock.py index bc0c5d7..c3c29ee 100644 --- a/switchbot/adv_parsers/lock.py +++ b/switchbot/adv_parsers/lock.py @@ -8,7 +8,7 @@ _LOGGER = logging.getLogger(__name__) -def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_wolock(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]: """Process woLock services data.""" if mfr_data is None: return {} @@ -17,7 +17,7 @@ def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int] _LOGGER.debug("data: %s", data.hex()) return { - "battery": data[2] & 0b01111111, + "battery": data[2] & 0b01111111 if data else None, "calibration": bool(mfr_data[7] & 0b10000000), "status": LockStatus(mfr_data[7] & 0b01110000), "update_from_secondary_lock": bool(mfr_data[7] & 0b00001000), diff --git a/switchbot/adv_parsers/meter.py b/switchbot/adv_parsers/meter.py index d32fade..4dd8e0a 100644 --- a/switchbot/adv_parsers/meter.py +++ b/switchbot/adv_parsers/meter.py @@ -4,8 +4,10 @@ from typing import Any -def process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, Any]: +def process_wosensorth(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]: """Process woSensorTH/Temp sensor services data.""" + if data is None: + return {} _temp_sign = 1 if data[4] & 0b10000000 else -1 _temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10)) diff --git a/switchbot/adv_parsers/motion.py b/switchbot/adv_parsers/motion.py index 7db202e..a57905b 100644 --- a/switchbot/adv_parsers/motion.py +++ b/switchbot/adv_parsers/motion.py @@ -2,8 +2,12 @@ from __future__ import annotations -def process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_wopresence( + data: bytes | None, mfr_data: bytes | None +) -> dict[str, bool | int]: """Process WoPresence Sensor services data.""" + if data is None: + return {} return { "tested": bool(data[1] & 0b10000000), "motion_detected": bool(data[1] & 0b01000000), diff --git a/switchbot/adv_parsers/plug.py b/switchbot/adv_parsers/plug.py index f8f0a47..4665b77 100644 --- a/switchbot/adv_parsers/plug.py +++ b/switchbot/adv_parsers/plug.py @@ -2,7 +2,9 @@ from __future__ import annotations -def process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]: +def process_woplugmini( + data: bytes | None, mfr_data: bytes | None +) -> dict[str, bool | int]: """Process plug mini.""" if mfr_data is None: return {} diff --git a/tests/test_adv_parser.py b/tests/test_adv_parser.py index 947354b..8835549 100644 --- a/tests/test_adv_parser.py +++ b/tests/test_adv_parser.py @@ -445,6 +445,39 @@ def test_contact_sensor_mfr(): ) +def test_contact_sensor_mfr_no_service_data(): + """Test contact sensor with passive data only.""" + ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any") + adv_data = generate_advertisement_data( + manufacturer_data={2409: b"\xcb9\xcd\xc4=FA,\x00F\x01\x8f\xc4"}, + service_data={}, + tx_power=-127, + rssi=-70, + ) + result = parse_advertisement_data(ble_device, adv_data) + assert result == SwitchBotAdvertisement( + address="aa:bb:cc:dd:ee:ff", + data={ + "data": { + "battery": None, + "button_count": 4, + "contact_open": False, + "contact_timeout": False, + "is_light": False, + "motion_detected": False, + "tested": None, + }, + "isEncrypted": False, + "model": "d", + "modelFriendlyName": "Contact Sensor", + "modelName": SwitchbotModel.CONTACT_SENSOR, + "rawAdvData": None, + }, + device=ble_device, + rssi=-70, + ) + + def test_contact_sensor_srv(): """Test parsing adv data from new bot firmware.""" ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any") @@ -547,3 +580,32 @@ def test_contact_sensor_closed(): device=ble_device, rssi=-50, ) + + +def test_switchbot_passive(): + """Test parsing switchbot as passive.""" + ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any") + adv_data = generate_advertisement_data( + manufacturer_data={89: bytes.fromhex("d51cfb397856")}, + service_data={}, + tx_power=-127, + rssi=-50, + ) + result = parse_advertisement_data(ble_device, adv_data, SwitchbotModel.BOT) + assert result == SwitchBotAdvertisement( + address="aa:bb:cc:dd:ee:ff", + data={ + "data": { + "battery": None, + "switchMode": None, + "isOn": None, + }, + "isEncrypted": False, + "model": "H", + "modelFriendlyName": "Bot", + "modelName": SwitchbotModel.BOT, + "rawAdvData": None, + }, + device=ble_device, + rssi=-50, + )