Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update parser #31

Merged
merged 1 commit into from Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 22 additions & 4 deletions README.md
Expand Up @@ -50,11 +50,11 @@ A full list of all supported sensors can be found on the [BLE monitor documentat

## Usage

When using default input parameters, you can use bleparser as follows (more working examples below).
When using default input parameters, you can use bleparser with raw BLE data as follows (more working examples below).

```python
ble_parser = BleParser()
sensor_msg, tracker_msg = ble_parser.parse_data(data)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)
```

You can set optional parameters, the example below shows all possible input parameters with default values.
Expand All @@ -68,8 +68,26 @@ ble_parser = BleParser(
tracker_whitelist=[],
aeskeys={}
)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)
```

It is also possible to use advertisement data directly.

```python
ble_parser = BleParser()
sensor_data, tracker_data = self.parse_advertisement(
mac,
rssi,
service_class_uuid16,
service_class_uuid128,
local_name,
service_data_list,
man_spec_data_list
)
```

`service_data_list` and `man_spec_data_list` have to be in a list, as a BKE advertisement can contain multiple service data/manufacturer specific data packets.

**report_unknown**

Report unknown sensors. Can be set to `ATC`, `b-parasite`, `BlueMaestro`, `Brifit`, `Govee`, `Inkbird`, `iNode`, `Jinou`, `Kegtron`, `KKM`, `Mikrotik`, `Moat`, `Mi Scale`, `Oral-B`, `Qingping`, `Ruuvitag`, `SensorPush`, `Sensirion`, `Teltonika`, `Thermoplus`, `Thermopro`, `Xiaogui` or `Xiaomi` to report unknown sensors of a specific brand to the logger. You can set it to `Other` to report all unknown advertisements to the logger. Default: `False`
Expand Down Expand Up @@ -109,7 +127,7 @@ data_string = "043e2502010000219335342d5819020106151695fe5020aa01da219335342d580
data = bytes(bytearray.fromhex(data_string))

ble_parser = BleParser()
sensor_msg, tracker_msg = ble_parser.parse_data(data)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)
print(sensor_msg)
```

Expand Down Expand Up @@ -137,7 +155,7 @@ track_mac = bytes.fromhex(track_mac.replace(":", ""))
tracker_whitelist.append(track_mac.lower())

ble_parser = BleParser(tracker_whitelist=tracker_whitelist)
sensor_msg, tracker_msg = ble_parser.parse_data(data)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)
print(tracker_msg)
```

Expand Down
2 changes: 1 addition & 1 deletion examples/ble2mqtt.py
Expand Up @@ -66,7 +66,7 @@

## Define callback
def process_hci_events(data):
sensor_data, tracker_data = parser.parse_data(data)
sensor_data, tracker_data = parser.parse_raw_data(data)

if tracker_data:
mac = ':'.join(wrap(tracker_data.pop("mac"), 2))
Expand Down
88 changes: 72 additions & 16 deletions package/bleparser/__init__.py
@@ -1,4 +1,5 @@
"""Parser for passive BLE advertisements."""
from typing import Optional
import logging

from .acconeer import parse_acconeer
Expand All @@ -11,6 +12,7 @@
from .brifit import parse_brifit
from .const import TILT_TYPES
from .govee import parse_govee
from .helpers import to_mac, to_unformatted_mac
from .ha_ble import parse_ha_ble
from .hhcc import parse_hhcc
from .ibeacon import parse_ibeacon
Expand Down Expand Up @@ -64,7 +66,7 @@ def __init__(
self.movements_list = {}
self.adv_priority = {}

def parse_data(self, data):
def parse_raw_data(self, data):
"""Parse the raw data."""
# check if packet is Extended scan result
is_ext_packet = True if data[3] == 0x0D else False
Expand All @@ -91,15 +93,12 @@ def parse_data(self, data):
rssi = rssi - 256
# MAC address
mac = (data[8 if is_ext_packet else 7:14 if is_ext_packet else 13])[::-1]
sensor_data = None
tracker_data = None
complete_local_name = ""
shortened_local_name = ""
service_class_uuid16 = None
service_class_uuid128 = None
service_data_list = []
man_spec_data_list = []
unknown_sensor = False

while adpayload_size > 1:
adstuct_size = data[adpayload_start] + 1
Expand Down Expand Up @@ -132,6 +131,37 @@ def parse_data(self, data):
adpayload_size -= adstuct_size
adpayload_start += adstuct_size

if complete_local_name:
local_name = complete_local_name
else:
local_name = shortened_local_name

sensor_data, tracker_data = self.parse_advertisement(
mac,
rssi,
service_class_uuid16,
service_class_uuid128,
local_name,
service_data_list,
man_spec_data_list
)
return sensor_data, tracker_data

def parse_advertisement(
self,
mac: bytes,
rssi: int,
service_class_uuid16: Optional[int] = None,
service_class_uuid128: Optional[bytes] = None,
local_name: Optional[str] = "",
service_data_list: Optional[list] = [],
man_spec_data_list: Optional[list] = []
):
"""parse BLE advertisement"""
sensor_data = None
tracker_data = None
unknown_sensor = False

while not sensor_data:
if service_data_list:
for service_data in service_data_list:
Expand All @@ -152,7 +182,7 @@ def parse_data(self, data):
# UUID16 = User Data and Bond Management (used by BLE HA)
sensor_data = parse_ha_ble(self, service_data, uuid16, mac, rssi)
break
elif uuid16 in [0xAA20, 0xAA21, 0xAA22] and complete_local_name == "ECo":
elif uuid16 in [0xAA20, 0xAA21, 0xAA22] and local_name == "ECo":
# UUID16 = Relsib
sensor_data = parse_relsib(self, service_data, mac, rssi)
break
Expand Down Expand Up @@ -189,7 +219,7 @@ def parse_data(self, data):
# UUID16 = Temperature and Humidity (used by Teltonika)
if len(service_data_list) == 2:
service_data = b"".join(service_data_list)
sensor_data = parse_teltonika(self, service_data, complete_local_name, mac, rssi)
sensor_data = parse_teltonika(self, service_data, local_name, mac, rssi)
break
else:
unknown_sensor = True
Expand Down Expand Up @@ -236,7 +266,7 @@ def parse_data(self, data):
break
elif comp_id == 0x06D5:
# Sensirion
sensor_data = parse_sensirion(self, man_spec_data, complete_local_name, mac, rssi)
sensor_data = parse_sensirion(self, man_spec_data, local_name, mac, rssi)
break
elif comp_id in [0x2121, 0x2122] and data_len == 0x0B:
# Air Mentor
Expand Down Expand Up @@ -301,11 +331,11 @@ def parse_data(self, data):
# Thermoplus
sensor_data = parse_thermoplus(self, man_spec_data, mac, rssi)
break
elif (comp_id in [0x0000, 0x0001] or complete_local_name in ["iBBQ", "sps", "tps"]) and (
elif (comp_id in [0x0000, 0x0001] or local_name in ["iBBQ", "sps", "tps"]) and (
data_len in [0x0A, 0x0D, 0x0F, 0x13, 0x17]
):
# Inkbird
sensor_data = parse_inkbird(self, man_spec_data, complete_local_name, mac, rssi)
sensor_data = parse_inkbird(self, man_spec_data, local_name, mac, rssi)
break
else:
unknown_sensor = True
Expand All @@ -319,13 +349,13 @@ def parse_data(self, data):
break

# Filter on complete local name
elif complete_local_name in ["sps", "tps"] and data_len == 0x0A:
elif local_name in ["sps", "tps"] and data_len == 0x0A:
# Inkbird IBS-TH
sensor_data = parse_inkbird(self, man_spec_data, complete_local_name, mac, rssi)
sensor_data = parse_inkbird(self, man_spec_data, local_name, mac, rssi)
break
elif complete_local_name[0:5] == "TP359" and data_len == 0x07:
elif local_name[0:5] == "TP359" and data_len == 0x07:
# Thermopro
sensor_data = parse_thermopro(self, man_spec_data, complete_local_name[0:5], mac, rssi)
sensor_data = parse_thermopro(self, man_spec_data, local_name[0:5], mac, rssi)
break

# Filter on other parts of the manufacturer specific data
Expand All @@ -342,7 +372,20 @@ def parse_data(self, data):
else:
unknown_sensor = True
if unknown_sensor and self.report_unknown == "Other":
_LOGGER.info("Unknown advertisement received: %s", data.hex())
_LOGGER.info(
"Unknown advertisement received for mac: %s"
"service data: %s"
"manufacturer specific data: %s"
"local name: %s"
"UUID16: %s,"
"UUID128: %s",
to_mac(mac),
service_data_list,
man_spec_data_list,
local_name,
service_class_uuid16,
service_class_uuid128,
)
break

# check for monitored device trackers
Expand All @@ -353,14 +396,27 @@ def parse_data(self, data):
else:
tracker_data = {
"is connected": True,
"mac": ''.join('{:02X}'.format(x) for x in mac),
"mac": to_unformatted_mac(mac),
"rssi": rssi,
}
else:
tracker_data = None

if self.report_unknown_whitelist:
if tracker_id in self.report_unknown_whitelist:
_LOGGER.info("BLE advertisement received from MAC/UUID %s: %s", tracker_id.hex(), data.hex())
_LOGGER.info(
"BLE advertisement received from MAC/UUID %s: "
"service data: %s"
"manufacturer specific data: %s"
"local name: %s"
"UUID16: %s,"
"UUID128: %s",
tracker_id.hex(),
service_data_list,
man_spec_data_list,
local_name,
service_class_uuid16,
service_class_uuid128
)

return sensor_data, tracker_data
4 changes: 2 additions & 2 deletions package/bleparser/switchbot.py
Expand Up @@ -16,13 +16,13 @@ def parse_switchbot(self, data, source_mac, rssi):
switchbot_mac = source_mac
device_id = data[4] + (data[5] << 8)

if msg_length == 10 and device_id in [0x0054, 0x0069]:
if msg_length == 10 and device_id in [0x0054, 0x0069, 0x1054]:
xvalue = data[6:10]
(byte1, byte2, byte3, byte4) = unpack("<BBBB", xvalue)
batt = (byte1 & 127)
temp = float(byte3 - 128) + float(byte2 / 10.0)
humi = (byte4 & 127)
if device_id == 0x0054:
if device_id in [0x0054, 0x1054]:
device_type = "Meter TH S1"
elif device_id == 0x0069:
device_type = "Meter TH plus"
Expand Down
21 changes: 18 additions & 3 deletions package/bleparser/xiaomi.py
Expand Up @@ -33,6 +33,7 @@
0x0DFD: "K9B-3BTN",
0x01AA: "LYWSDCGQ",
0x045B: "LYWSD02",
0x16e4: "LYWSD02MMC",
0x055B: "LYWSD03MMC",
0x098B: "MCCGQ02HL",
0x06d3: "MHO-C303",
Expand Down Expand Up @@ -205,6 +206,7 @@ def obj0007(xobj):
return {}
return {"door": door, "door action": action}


def obj0008(xobj, device_type):
"""armed away"""
returnData = {}
Expand All @@ -227,6 +229,7 @@ def obj0008(xobj, device_type):
}
return returnData


def obj0010(xobj):
"""Toothbrush"""
if xobj[0] == 0:
Expand Down Expand Up @@ -269,7 +272,7 @@ def obj000b(xobj, device_type):
action = BLE_LOCK_ACTION[action][2]
method = BLE_LOCK_METHOD[method]

#Biometric unlock then disarm
# Biometric unlock then disarm
if device_type == "DSL-C08":
if method == "password":
if 5000 <= key_id < 6000:
Expand Down Expand Up @@ -605,6 +608,7 @@ def obj100d(xobj):
else:
return {}


def obj100e(xobj, device_type):
"""Lock common attribute"""
# https://iot.mi.com/new/doc/accesses/direct-access/embedded-development/ble/object-definition#%E9%94%81%E5%B1%9E%E6%80%A7
Expand All @@ -616,6 +620,7 @@ def obj100e(xobj, device_type):
childlock = lock_attribute >> 3 ^ 1
return {"childlock": childlock, "lock": lock}


def obj2000(xobj):
"""Body temperature"""
if len(xobj) == 5:
Expand All @@ -632,7 +637,7 @@ def obj2000(xobj):
return {}


# The following data objects are device specific. For now only added for XMWSDJ04MMC and XMWXKG01YL
# The following data objects are device specific. For now only added for LYWSD02MMC, XMWSDJ04MMC, XMWXKG01YL
# https://miot-spec.org/miot-spec-v2/instances?status=all
def obj4803(xobj):
"""Battery"""
Expand All @@ -655,6 +660,15 @@ def obj4c01(xobj):
return {}


def obj4c02(xobj):
"""Humidity"""
if len(xobj) == 1:
humi = xobj[0]
return {"humidity": humi}
else:
return {}


def obj4c08(xobj):
"""Humidity"""
if len(xobj) == 4:
Expand Down Expand Up @@ -768,6 +782,7 @@ def obj4e0e(xobj):
0x4803: obj4803,
0x4a01: obj4a01,
0x4c01: obj4c01,
0x4c02: obj4c02,
0x4c08: obj4c08,
0x4c14: obj4c14,
0x4e0c: obj4e0c,
Expand Down Expand Up @@ -967,7 +982,7 @@ def parse_xiaomi(self, data, source_mac, rssi):
if obj_length != 0:
resfunc = xiaomi_dataobject_dict.get(obj_typecode, None)
if resfunc:
if hex(obj_typecode) in ["0x8","0x100e","0x1001", "0xf", "0xb"]:
if hex(obj_typecode) in ["0x8", "0x100e", "0x1001", "0xf", "0xb"]:
result.update(resfunc(dobject, device_type))
else:
result.update(resfunc(dobject))
Expand Down
2 changes: 1 addition & 1 deletion package/tests/test_acconeer.py
Expand Up @@ -12,7 +12,7 @@ def test_acconeer_xm122(self):

# pylint: disable=unused-variable
ble_parser = BleParser()
sensor_msg, tracker_msg = ble_parser.parse_data(data)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)

assert sensor_msg["firmware"] == "Acconeer"
assert sensor_msg["type"] == "Acconeer XM122"
Expand Down
4 changes: 2 additions & 2 deletions package/tests/test_airmentor.py
Expand Up @@ -10,7 +10,7 @@ def test_air_mentor_pro_2_set_1(self):
data = bytes(bytearray.fromhex(data_string))
# pylint: disable=unused-variable
ble_parser = BleParser()
sensor_msg, tracker_msg = ble_parser.parse_data(data)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)

assert sensor_msg["firmware"] == "Air Mentor"
assert sensor_msg["type"] == "Air Mentor Pro 2"
Expand All @@ -31,7 +31,7 @@ def test_air_mentor_pro_2_set_2(self):
data = bytes(bytearray.fromhex(data_string))
# pylint: disable=unused-variable
ble_parser = BleParser()
sensor_msg, tracker_msg = ble_parser.parse_data(data)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)

assert sensor_msg["firmware"] == "Air Mentor"
assert sensor_msg["type"] == "Air Mentor Pro 2"
Expand Down
2 changes: 1 addition & 1 deletion package/tests/test_almendo.py
Expand Up @@ -10,7 +10,7 @@ def test_almendo_blusensor_mini(self):
data = bytes(bytearray.fromhex(data_string))
# pylint: disable=unused-variable
ble_parser = BleParser()
sensor_msg, tracker_msg = ble_parser.parse_data(data)
sensor_msg, tracker_msg = ble_parser.parse_raw_data(data)

assert sensor_msg["firmware"] == "Almendo V1"
assert sensor_msg["type"] == "bluSensor Mini"
Expand Down