Skip to content

Commit

Permalink
Improve handling of passive only data when active scans are not avail…
Browse files Browse the repository at this point in the history
…able (#153)
  • Loading branch information
bdraco committed Dec 17, 2022
1 parent 2d57f63 commit 1451c61
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 27 deletions.
83 changes: 71 additions & 12 deletions switchbot/adv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"0000fd3d-0000-1000-8000-00805f9b34fb",
"00000d00-0000-1000-8000-00805f9b34fb",
)
MFR_DATA_ORDER = (2409, 89)


class SwitchbotSupportedType(TypedDict):
Expand All @@ -44,11 +45,15 @@ class SwitchbotSupportedType(TypedDict):
"modelName": SwitchbotModel.CONTACT_SENSOR,
"modelFriendlyName": "Contact Sensor",
"func": process_wocontact,
"manufacturer_id": 2409,
"manufacturer_data_length": 13,
},
"H": {
"modelName": SwitchbotModel.BOT,
"modelFriendlyName": "Bot",
"func": process_wohand,
"service_uuids": {"cba20d00-224d-11e6-9fb8-0002a5d5c51b"},
"manufacturer_id": 89,
},
"s": {
"modelName": SwitchbotModel.MOTION_SENSOR,
Expand All @@ -64,6 +69,12 @@ class SwitchbotSupportedType(TypedDict):
"modelName": SwitchbotModel.CURTAIN,
"modelFriendlyName": "Curtain",
"func": process_wocurtain,
"service_uuids": {
"00001800-0000-1000-8000-00805f9b34fb",
"00001801-0000-1000-8000-00805f9b34fb",
"cba20d00-224d-11e6-9fb8-0002a5d5c51b",
},
"manufacturer_id": 89,
},
"T": {
"modelName": SwitchbotModel.METER,
Expand Down Expand Up @@ -107,50 +118,98 @@ class SwitchbotSupportedType(TypedDict):
},
}

_SWITCHBOT_MODEL_TO_CHAR = {
model_data["modelName"]: model_chr
for model_chr, model_data in SUPPORTED_TYPES.items()
}

MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
mfr_id: [] for mfr_id in MFR_DATA_ORDER
}
for model_chr, model in SUPPORTED_TYPES.items():
if "manufacturer_id" in model:
mfr_id = model["manufacturer_id"]
MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))


def parse_advertisement_data(
device: BLEDevice, advertisement_data: AdvertisementData
device: BLEDevice,
advertisement_data: AdvertisementData,
model: SwitchbotModel | None = None,
) -> SwitchBotAdvertisement | None:
"""Parse advertisement data."""
_mgr_datas = list(advertisement_data.manufacturer_data.values())
service_data = advertisement_data.service_data

if not service_data:
return None

_service_data = None
for uuid in SERVICE_DATA_ORDER:
if uuid in service_data:
_service_data = service_data[uuid]
break

if not _service_data:
return None
_mfr_data = None
_mfr_id = None
for mfr_id in MFR_DATA_ORDER:
if mfr_id in advertisement_data.manufacturer_data:
_mfr_id = mfr_id
_mfr_data = advertisement_data.manufacturer_data[mfr_id]
break

_mfr_data = _mgr_datas[0] if _mgr_datas else None
if _mfr_data is None and _service_data is None:
return None

try:
data = _parse_data(_service_data, _mfr_data)
data = _parse_data(
"".join(sorted(advertisement_data.service_uuids)),
_service_data,
_mfr_data,
_mfr_id,
model,
)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception(
"Failed to parse advertisement data: %s: %s", advertisement_data, err
)
return None

if not data:
return None

return SwitchBotAdvertisement(device.address, data, device, advertisement_data.rssi)


@lru_cache(maxsize=128)
def _parse_data(
_service_data: bytes, _mfr_data: bytes | None
_service_uuids_str: str,
_service_data: bytes | None,
_mfr_data: bytes | None,
_mfr_id: int | None = None,
_switchbot_model: SwitchbotModel | None = None,
) -> SwitchBotAdvertisement | None:
"""Parse advertisement data."""
_model = chr(_service_data[0] & 0b01111111)
_model = chr(_service_data[0] & 0b01111111) if _service_data else None

if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
_service_uuids = set(_service_uuids_str.split(","))
for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
if model_data.get("manufacturer_data_length") == len(_mfr_data):
_model = model_chr
break
if model_data.get("service_uuids", set()).intersection(_service_uuids):
_model = model_chr
break

if not _model and _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
_model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]

if not _model:
return None

_isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
data = {
"rawAdvData": _service_data,
"data": {},
"model": _model,
"isEncrypted": bool(_service_data[0] & 0b10000000),
"isEncrypted": _isEncrypted,
}

type_data = SUPPORTED_TYPES.get(_model)
Expand Down
12 changes: 11 additions & 1 deletion switchbot/adv_parsers/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,18 @@
from __future__ import annotations


def process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wohand(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
"""Process woHand/Bot services data."""
if data is None and mfr_data is None:
return {}

if data is None:
return {
"switchMode": None,
"isOn": None,
"battery": None,
}

_switch_mode = bool(data[1] & 0b10000000)

return {
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/bulb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from __future__ import annotations


def process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_color_bulb(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoBulb services data."""
if mfr_data is None:
return {}
Expand Down
13 changes: 9 additions & 4 deletions switchbot/adv_parsers/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
from __future__ import annotations


def process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wocontact(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process woContact Sensor services data."""
battery = data[2] & 0b01111111
tested = bool(data[1] & 0b10000000)
contact_timeout = data[3] & 0b00000100 == 0b00000100
if data is None and mfr_data is None:
return {}

battery = data[2] & 0b01111111 if data else None
tested = bool(data[1] & 0b10000000) if data else None
contact_timeout = data[3] & 0b00000100 == 0b00000100 if data else False

if mfr_data and len(mfr_data) >= 13:
motion_detected = bool(mfr_data[7] & 0b10000000)
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/curtain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@


def process_wocurtain(
data: bytes, mfr_data: bytes | None, reverse: bool = True
data: bytes | None, mfr_data: bytes | None, reverse: bool = True
) -> dict[str, bool | int]:
"""Process woCurtain/Curtain services data."""
if data is None:
return {}

_position = max(min(data[3] & 0b01111111, 100), 0)

Expand Down
6 changes: 4 additions & 2 deletions switchbot/adv_parsers/humidifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# Low: 658000c5222b6300
# Med: 658000c5432b6300
# High: 658000c5642b6300
def process_wohumidifier(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wohumidifier(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoHumi services data."""
if mfr_data is None:
if data is None:
return {}
_LOGGER.debug("mfr_data: %s", mfr_data.hex())
_LOGGER.debug("data: %s", data.hex())
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/light_strip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from __future__ import annotations


def process_wostrip(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wostrip(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoStrip services data."""
if mfr_data is None:
return {}
Expand Down
4 changes: 2 additions & 2 deletions switchbot/adv_parsers/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
_LOGGER = logging.getLogger(__name__)


def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wolock(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
"""Process woLock services data."""
if mfr_data is None:
return {}
Expand All @@ -17,7 +17,7 @@ def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]
_LOGGER.debug("data: %s", data.hex())

return {
"battery": data[2] & 0b01111111,
"battery": data[2] & 0b01111111 if data else None,
"calibration": bool(mfr_data[7] & 0b10000000),
"status": LockStatus(mfr_data[7] & 0b01110000),
"update_from_secondary_lock": bool(mfr_data[7] & 0b00001000),
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from typing import Any


def process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, Any]:
def process_wosensorth(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]:
"""Process woSensorTH/Temp sensor services data."""
if data is None:
return {}

_temp_sign = 1 if data[4] & 0b10000000 else -1
_temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))
Expand Down
6 changes: 5 additions & 1 deletion switchbot/adv_parsers/motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
from __future__ import annotations


def process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wopresence(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoPresence Sensor services data."""
if data is None:
return {}
return {
"tested": bool(data[1] & 0b10000000),
"motion_detected": bool(data[1] & 0b01000000),
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/plug.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from __future__ import annotations


def process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_woplugmini(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process plug mini."""
if mfr_data is None:
return {}
Expand Down
62 changes: 62 additions & 0 deletions tests/test_adv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,39 @@ def test_contact_sensor_mfr():
)


def test_contact_sensor_mfr_no_service_data():
"""Test contact sensor with passive data only."""
ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
adv_data = generate_advertisement_data(
manufacturer_data={2409: b"\xcb9\xcd\xc4=FA,\x00F\x01\x8f\xc4"},
service_data={},
tx_power=-127,
rssi=-70,
)
result = parse_advertisement_data(ble_device, adv_data)
assert result == SwitchBotAdvertisement(
address="aa:bb:cc:dd:ee:ff",
data={
"data": {
"battery": None,
"button_count": 4,
"contact_open": False,
"contact_timeout": False,
"is_light": False,
"motion_detected": False,
"tested": None,
},
"isEncrypted": False,
"model": "d",
"modelFriendlyName": "Contact Sensor",
"modelName": SwitchbotModel.CONTACT_SENSOR,
"rawAdvData": None,
},
device=ble_device,
rssi=-70,
)


def test_contact_sensor_srv():
"""Test parsing adv data from new bot firmware."""
ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
Expand Down Expand Up @@ -547,3 +580,32 @@ def test_contact_sensor_closed():
device=ble_device,
rssi=-50,
)


def test_switchbot_passive():
"""Test parsing switchbot as passive."""
ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
adv_data = generate_advertisement_data(
manufacturer_data={89: bytes.fromhex("d51cfb397856")},
service_data={},
tx_power=-127,
rssi=-50,
)
result = parse_advertisement_data(ble_device, adv_data, SwitchbotModel.BOT)
assert result == SwitchBotAdvertisement(
address="aa:bb:cc:dd:ee:ff",
data={
"data": {
"battery": None,
"switchMode": None,
"isOn": None,
},
"isEncrypted": False,
"model": "H",
"modelFriendlyName": "Bot",
"modelName": SwitchbotModel.BOT,
"rawAdvData": None,
},
device=ble_device,
rssi=-50,
)

0 comments on commit 1451c61

Please sign in to comment.