Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of passive only data when active scans are not available #153

Merged
merged 1 commit into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 71 additions & 12 deletions switchbot/adv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"0000fd3d-0000-1000-8000-00805f9b34fb",
"00000d00-0000-1000-8000-00805f9b34fb",
)
MFR_DATA_ORDER = (2409, 89)


class SwitchbotSupportedType(TypedDict):
Expand All @@ -44,11 +45,15 @@ class SwitchbotSupportedType(TypedDict):
"modelName": SwitchbotModel.CONTACT_SENSOR,
"modelFriendlyName": "Contact Sensor",
"func": process_wocontact,
"manufacturer_id": 2409,
"manufacturer_data_length": 13,
},
"H": {
"modelName": SwitchbotModel.BOT,
"modelFriendlyName": "Bot",
"func": process_wohand,
"service_uuids": {"cba20d00-224d-11e6-9fb8-0002a5d5c51b"},
"manufacturer_id": 89,
},
"s": {
"modelName": SwitchbotModel.MOTION_SENSOR,
Expand All @@ -64,6 +69,12 @@ class SwitchbotSupportedType(TypedDict):
"modelName": SwitchbotModel.CURTAIN,
"modelFriendlyName": "Curtain",
"func": process_wocurtain,
"service_uuids": {
"00001800-0000-1000-8000-00805f9b34fb",
"00001801-0000-1000-8000-00805f9b34fb",
"cba20d00-224d-11e6-9fb8-0002a5d5c51b",
},
"manufacturer_id": 89,
},
"T": {
"modelName": SwitchbotModel.METER,
Expand Down Expand Up @@ -107,50 +118,98 @@ class SwitchbotSupportedType(TypedDict):
},
}

_SWITCHBOT_MODEL_TO_CHAR = {
model_data["modelName"]: model_chr
for model_chr, model_data in SUPPORTED_TYPES.items()
}

MODELS_BY_MANUFACTURER_DATA: dict[int, list[tuple[str, SwitchbotSupportedType]]] = {
mfr_id: [] for mfr_id in MFR_DATA_ORDER
}
for model_chr, model in SUPPORTED_TYPES.items():
if "manufacturer_id" in model:
mfr_id = model["manufacturer_id"]
MODELS_BY_MANUFACTURER_DATA[mfr_id].append((model_chr, model))


def parse_advertisement_data(
device: BLEDevice, advertisement_data: AdvertisementData
device: BLEDevice,
advertisement_data: AdvertisementData,
model: SwitchbotModel | None = None,
) -> SwitchBotAdvertisement | None:
"""Parse advertisement data."""
_mgr_datas = list(advertisement_data.manufacturer_data.values())
service_data = advertisement_data.service_data

if not service_data:
return None

_service_data = None
for uuid in SERVICE_DATA_ORDER:
if uuid in service_data:
_service_data = service_data[uuid]
break

if not _service_data:
return None
_mfr_data = None
_mfr_id = None
for mfr_id in MFR_DATA_ORDER:
if mfr_id in advertisement_data.manufacturer_data:
_mfr_id = mfr_id
_mfr_data = advertisement_data.manufacturer_data[mfr_id]
break

_mfr_data = _mgr_datas[0] if _mgr_datas else None
if _mfr_data is None and _service_data is None:
return None

try:
data = _parse_data(_service_data, _mfr_data)
data = _parse_data(
"".join(sorted(advertisement_data.service_uuids)),
_service_data,
_mfr_data,
_mfr_id,
model,
)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception(
"Failed to parse advertisement data: %s: %s", advertisement_data, err
)
return None

if not data:
return None

return SwitchBotAdvertisement(device.address, data, device, advertisement_data.rssi)


@lru_cache(maxsize=128)
def _parse_data(
_service_data: bytes, _mfr_data: bytes | None
_service_uuids_str: str,
_service_data: bytes | None,
_mfr_data: bytes | None,
_mfr_id: int | None = None,
_switchbot_model: SwitchbotModel | None = None,
) -> SwitchBotAdvertisement | None:
"""Parse advertisement data."""
_model = chr(_service_data[0] & 0b01111111)
_model = chr(_service_data[0] & 0b01111111) if _service_data else None

if not _model and _mfr_id and _mfr_id in MODELS_BY_MANUFACTURER_DATA:
_service_uuids = set(_service_uuids_str.split(","))
for model_chr, model_data in MODELS_BY_MANUFACTURER_DATA[_mfr_id]:
if model_data.get("manufacturer_data_length") == len(_mfr_data):
_model = model_chr
break
if model_data.get("service_uuids", set()).intersection(_service_uuids):
_model = model_chr
break

if not _model and _switchbot_model and _switchbot_model in _SWITCHBOT_MODEL_TO_CHAR:
_model = _SWITCHBOT_MODEL_TO_CHAR[_switchbot_model]

if not _model:
return None

_isEncrypted = bool(_service_data[0] & 0b10000000) if _service_data else False
data = {
"rawAdvData": _service_data,
"data": {},
"model": _model,
"isEncrypted": bool(_service_data[0] & 0b10000000),
"isEncrypted": _isEncrypted,
}

type_data = SUPPORTED_TYPES.get(_model)
Expand Down
12 changes: 11 additions & 1 deletion switchbot/adv_parsers/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,18 @@
from __future__ import annotations


def process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wohand(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
"""Process woHand/Bot services data."""
if data is None and mfr_data is None:
return {}

if data is None:
return {
"switchMode": None,
"isOn": None,
"battery": None,
}

_switch_mode = bool(data[1] & 0b10000000)

return {
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/bulb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from __future__ import annotations


def process_color_bulb(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_color_bulb(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoBulb services data."""
if mfr_data is None:
return {}
Expand Down
13 changes: 9 additions & 4 deletions switchbot/adv_parsers/contact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
from __future__ import annotations


def process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wocontact(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process woContact Sensor services data."""
battery = data[2] & 0b01111111
tested = bool(data[1] & 0b10000000)
contact_timeout = data[3] & 0b00000100 == 0b00000100
if data is None and mfr_data is None:
return {}

battery = data[2] & 0b01111111 if data else None
tested = bool(data[1] & 0b10000000) if data else None
contact_timeout = data[3] & 0b00000100 == 0b00000100 if data else False

if mfr_data and len(mfr_data) >= 13:
motion_detected = bool(mfr_data[7] & 0b10000000)
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/curtain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@


def process_wocurtain(
data: bytes, mfr_data: bytes | None, reverse: bool = True
data: bytes | None, mfr_data: bytes | None, reverse: bool = True
) -> dict[str, bool | int]:
"""Process woCurtain/Curtain services data."""
if data is None:
return {}

_position = max(min(data[3] & 0b01111111, 100), 0)

Expand Down
6 changes: 4 additions & 2 deletions switchbot/adv_parsers/humidifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# Low: 658000c5222b6300
# Med: 658000c5432b6300
# High: 658000c5642b6300
def process_wohumidifier(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wohumidifier(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoHumi services data."""
if mfr_data is None:
if data is None:
return {}
_LOGGER.debug("mfr_data: %s", mfr_data.hex())
_LOGGER.debug("data: %s", data.hex())
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/light_strip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from __future__ import annotations


def process_wostrip(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wostrip(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoStrip services data."""
if mfr_data is None:
return {}
Expand Down
4 changes: 2 additions & 2 deletions switchbot/adv_parsers/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
_LOGGER = logging.getLogger(__name__)


def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wolock(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool | int]:
"""Process woLock services data."""
if mfr_data is None:
return {}
Expand All @@ -17,7 +17,7 @@ def process_wolock(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]
_LOGGER.debug("data: %s", data.hex())

return {
"battery": data[2] & 0b01111111,
"battery": data[2] & 0b01111111 if data else None,
"calibration": bool(mfr_data[7] & 0b10000000),
"status": LockStatus(mfr_data[7] & 0b01110000),
"update_from_secondary_lock": bool(mfr_data[7] & 0b00001000),
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from typing import Any


def process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, Any]:
def process_wosensorth(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]:
"""Process woSensorTH/Temp sensor services data."""
if data is None:
return {}

_temp_sign = 1 if data[4] & 0b10000000 else -1
_temp_c = _temp_sign * ((data[4] & 0b01111111) + ((data[3] & 0b00001111) / 10))
Expand Down
6 changes: 5 additions & 1 deletion switchbot/adv_parsers/motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
from __future__ import annotations


def process_wopresence(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_wopresence(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process WoPresence Sensor services data."""
if data is None:
return {}
return {
"tested": bool(data[1] & 0b10000000),
"motion_detected": bool(data[1] & 0b01000000),
Expand Down
4 changes: 3 additions & 1 deletion switchbot/adv_parsers/plug.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from __future__ import annotations


def process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
def process_woplugmini(
data: bytes | None, mfr_data: bytes | None
) -> dict[str, bool | int]:
"""Process plug mini."""
if mfr_data is None:
return {}
Expand Down
62 changes: 62 additions & 0 deletions tests/test_adv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,39 @@ def test_contact_sensor_mfr():
)


def test_contact_sensor_mfr_no_service_data():
"""Test contact sensor with passive data only."""
ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
adv_data = generate_advertisement_data(
manufacturer_data={2409: b"\xcb9\xcd\xc4=FA,\x00F\x01\x8f\xc4"},
service_data={},
tx_power=-127,
rssi=-70,
)
result = parse_advertisement_data(ble_device, adv_data)
assert result == SwitchBotAdvertisement(
address="aa:bb:cc:dd:ee:ff",
data={
"data": {
"battery": None,
"button_count": 4,
"contact_open": False,
"contact_timeout": False,
"is_light": False,
"motion_detected": False,
"tested": None,
},
"isEncrypted": False,
"model": "d",
"modelFriendlyName": "Contact Sensor",
"modelName": SwitchbotModel.CONTACT_SENSOR,
"rawAdvData": None,
},
device=ble_device,
rssi=-70,
)


def test_contact_sensor_srv():
"""Test parsing adv data from new bot firmware."""
ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
Expand Down Expand Up @@ -547,3 +580,32 @@ def test_contact_sensor_closed():
device=ble_device,
rssi=-50,
)


def test_switchbot_passive():
"""Test parsing switchbot as passive."""
ble_device = BLEDevice("aa:bb:cc:dd:ee:ff", "any")
adv_data = generate_advertisement_data(
manufacturer_data={89: bytes.fromhex("d51cfb397856")},
service_data={},
tx_power=-127,
rssi=-50,
)
result = parse_advertisement_data(ble_device, adv_data, SwitchbotModel.BOT)
assert result == SwitchBotAdvertisement(
address="aa:bb:cc:dd:ee:ff",
data={
"data": {
"battery": None,
"switchMode": None,
"isOn": None,
},
"isEncrypted": False,
"model": "H",
"modelFriendlyName": "Bot",
"modelName": SwitchbotModel.BOT,
"rawAdvData": None,
},
device=ble_device,
rssi=-50,
)