From dcb1d86a15e9387385128ebaf32498d4af268963 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 12 Nov 2022 20:39:04 -0600 Subject: [PATCH] feat: add gap parser (#6) --- src/bluetooth_data_tools/__init__.py | 14 +- src/bluetooth_data_tools/gap.py | 145 +++++++++++++++++++ tests/test_gap.py | 200 +++++++++++++++++++++++++++ 3 files changed, 357 insertions(+), 2 deletions(-) create mode 100644 src/bluetooth_data_tools/gap.py create mode 100644 tests/test_gap.py diff --git a/src/bluetooth_data_tools/__init__.py b/src/bluetooth_data_tools/__init__.py index 3060686..5b4134c 100644 --- a/src/bluetooth_data_tools/__init__.py +++ b/src/bluetooth_data_tools/__init__.py @@ -1,10 +1,16 @@ """Bluetooth data tools.""" from __future__ import annotations -__version__ = "0.2.0" +from struct import Struct +from .gap import ( + BLEGAPAdvertisement, + BLEGAPType, + decode_advertisement_data, + parse_advertisement_data, +) -from struct import Struct +__version__ = "0.2.0" L_PACK = Struct(">L") @@ -14,6 +20,10 @@ "newest_manufacturer_data", "human_readable_name", "short_address", + "BLEGAPType", + "BLEGAPAdvertisement", + "decode_advertisement_data", + "parse_advertisement_data", ] diff --git a/src/bluetooth_data_tools/gap.py b/src/bluetooth_data_tools/gap.py new file mode 100644 index 0000000..8262737 --- /dev/null +++ b/src/bluetooth_data_tools/gap.py @@ -0,0 +1,145 @@ +"""GATT Advertisement and Scan Response Data (GAP).""" + + +from __future__ import annotations + +import logging +from collections.abc import Iterable +from dataclasses import dataclass +from enum import IntEnum +from uuid import UUID + +BLE_UUID = "0000-1000-8000-00805f9b34fb" +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class BLEGAPAdvertisement: + + local_name: str | None + service_uuids: list[str] + service_data: dict[str, bytes] + manufacturer_data: dict[int, bytes] + tx_power: int | None + + +class BLEGAPType(IntEnum): + """Advertising data types.""" + + TYPE_UNKNOWN = 0x00 + TYPE_FLAGS = 0x01 + TYPE_16BIT_SERVICE_UUID_MORE_AVAILABLE = 0x02 + TYPE_16BIT_SERVICE_UUID_COMPLETE = 0x03 + TYPE_32BIT_SERVICE_UUID_MORE_AVAILABLE = 0x04 + TYPE_32BIT_SERVICE_UUID_COMPLETE = 0x05 + TYPE_128BIT_SERVICE_UUID_MORE_AVAILABLE = 0x06 + TYPE_128BIT_SERVICE_UUID_COMPLETE = 0x07 + TYPE_SHORT_LOCAL_NAME = 0x08 + TYPE_COMPLETE_LOCAL_NAME = 0x09 + TYPE_TX_POWER_LEVEL = 0x0A + TYPE_CLASS_OF_DEVICE = 0x0D + TYPE_SIMPLE_PAIRING_HASH_C = 0x0E + TYPE_SIMPLE_PAIRING_RANDOMIZER_R = 0x0F + TYPE_SECURITY_MANAGER_TK_VALUE = 0x10 + TYPE_SECURITY_MANAGER_OOB_FLAGS = 0x11 + TYPE_SLAVE_CONNECTION_INTERVAL_RANGE = 0x12 + TYPE_SOLICITED_SERVICE_UUIDS_16BIT = 0x14 + TYPE_SOLICITED_SERVICE_UUIDS_128BIT = 0x15 + TYPE_SERVICE_DATA = 0x16 + TYPE_PUBLIC_TARGET_ADDRESS = 0x17 + TYPE_RANDOM_TARGET_ADDRESS = 0x18 + TYPE_APPEARANCE = 0x19 + TYPE_ADVERTISING_INTERVAL = 0x1A + TYPE_LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B + TYPE_LE_ROLE = 0x1C + TYPE_SIMPLE_PAIRING_HASH_C256 = 0x1D + TYPE_SIMPLE_PAIRING_RANDOMIZER_R256 = 0x1E + TYPE_SERVICE_DATA_32BIT_UUID = 0x20 + TYPE_SERVICE_DATA_128BIT_UUID = 0x21 + TYPE_URI = 0x24 + TYPE_3D_INFORMATION_DATA = 0x3D + TYPE_MANUFACTURER_SPECIFIC_DATA = 0xFF + + +_BLEGAPType_MAP = {gap_ad.value: gap_ad for gap_ad in BLEGAPType} + + +def decode_advertisement_data( + encoded_struct: bytes, +) -> Iterable[tuple[BLEGAPType, bytes]]: + """Decode a BLE GAP AD structure.""" + offset = 0 + while offset < len(encoded_struct): + try: + length = encoded_struct[offset] + if not length: + return + type_ = encoded_struct[offset + 1] + if not type_: + return + start = offset + 2 + end = start + length - 1 + value = encoded_struct[start:end] + except IndexError as ex: + _LOGGER.error( + "Invalid BLE GAP AD structure at offset %s: %s (%s)", + offset, + encoded_struct, + ex, + ) + return + + yield _BLEGAPType_MAP.get(type_, BLEGAPType.TYPE_UNKNOWN), value + offset += 1 + length + + +def parse_advertisement_data( + data: Iterable[bytes], +) -> BLEGAPAdvertisement: + """Parse advertisement data.""" + manufacturer_data: dict[int, bytes] = {} + service_data: dict[str, bytes] = {} + service_uuids: list[str] = [] + local_name: str | None = None + tx_power: int | None = None + + for gap_data in data: + for gap_type, gap_value in decode_advertisement_data(gap_data): + if gap_type == BLEGAPType.TYPE_SHORT_LOCAL_NAME and not local_name: + local_name = gap_value.decode("utf-8", errors="replace") + elif gap_type == BLEGAPType.TYPE_COMPLETE_LOCAL_NAME: + local_name = gap_value.decode("utf-8", errors="replace") + elif gap_type == BLEGAPType.TYPE_MANUFACTURER_SPECIFIC_DATA: + manufacturer_id = int.from_bytes(gap_value[:2], "little") + manufacturer_data[manufacturer_id] = gap_value[2:] + elif gap_type in { + BLEGAPType.TYPE_16BIT_SERVICE_UUID_COMPLETE, + BLEGAPType.TYPE_16BIT_SERVICE_UUID_MORE_AVAILABLE, + }: + uuid_int = int.from_bytes(gap_value[:2], "little") + service_uuids.append(f"0000{uuid_int:04x}-{BLE_UUID}") + elif gap_type in { + BLEGAPType.TYPE_128BIT_SERVICE_UUID_MORE_AVAILABLE, + BLEGAPType.TYPE_128BIT_SERVICE_UUID_COMPLETE, + }: + uuid_str = str(UUID(int=int.from_bytes(gap_value[:16], "little"))) + service_uuids.append(uuid_str) + elif gap_type == BLEGAPType.TYPE_SERVICE_DATA: + uuid_int = int.from_bytes(gap_value[:2], "little") + service_data[f"0000{uuid_int:04x}-{BLE_UUID}"] = gap_value[2:] + elif gap_type == BLEGAPType.TYPE_SERVICE_DATA_32BIT_UUID: + uuid_int = int.from_bytes(gap_value[:4], "little") + service_data[f"{uuid_int:08x}-{BLE_UUID}"] = gap_value[4:] + elif gap_type == BLEGAPType.TYPE_SERVICE_DATA_128BIT_UUID: + uuid_str = str(UUID(int=int.from_bytes(gap_value[:16], "little"))) + service_data[uuid_str] = gap_value[16:] + elif gap_type == BLEGAPType.TYPE_TX_POWER_LEVEL: + tx_power = int.from_bytes(gap_value, "little", signed=True) + + return BLEGAPAdvertisement( + local_name, + service_uuids, + service_data, + manufacturer_data, + tx_power, + ) diff --git a/tests/test_gap.py b/tests/test_gap.py new file mode 100644 index 0000000..c98b8c0 --- /dev/null +++ b/tests/test_gap.py @@ -0,0 +1,200 @@ +import base64 + +from bluetooth_data_tools import parse_advertisement_data + + +def test_parse_advertisement_data_Prodigio_D83567A4F5A5(): + data = [ + base64.b64decode("AgoEFglQcm9kaWdpb19EODM1NjdBNEY1QTU="), + base64.b64decode("AgEGEQYbxdWlAgCqneMRKvIQGaoGCf8CJUQJgAcAAg=="), + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name == "Prodigio_D83567A4F5A5" + assert adv.service_uuids == ["06aa1910-f22a-11e3-9daa-0002a5d5c51b"] + assert adv.service_data == {} + assert adv.manufacturer_data == {9474: b"D\t\x80\x07\x00\x02"} + assert adv.tx_power == 4 + + +def test_parse_advertisement_data_unknown_apple_device(): + data = [ + base64.b64decode("AgEaAgoFCv9MABAFChx3+Vs="), + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {} + assert adv.manufacturer_data == {76: b"\x10\x05\n\x1cw\xf9["} + assert adv.tx_power == 5 + + +def test_parse_advertisement_data_empty(): + data = [ + b"\x00", + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {} + assert adv.manufacturer_data == {} + assert adv.tx_power is None + + +def test_parse_advertisement_data_flags_only(): + data = [ + b"\x01\x01\x06", + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {} + assert adv.manufacturer_data == {} + assert adv.tx_power is None + + +def test_parse_advertisement_data_ignores_invalid(): + data = [ + b"\x02\x01\x1a\x02\n\x05\n\xffL\x00\x10\x05\n\x1cw\xf9[\x02\x01", + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {} + assert adv.manufacturer_data == {76: b"\x10\x05\n\x1cw\xf9["} + assert adv.tx_power == 5 + + +def test_parse_advertisement_data_ignores_zero_type(): + data = [ + b"\x02\x01\x1a\x02\n\x05\n\xffL\x00\x10\x05\n\x1cw\xf9[\x02\x00", + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {} + assert adv.manufacturer_data == {76: b"\x10\x05\n\x1cw\xf9["} + assert adv.tx_power == 5 + + +def test_parse_advertisement_data_unknown_fd3d(): + data = [ + base64.b64decode("AgEGD/9pCWBV+Tw02tgAEDEAAA=="), + base64.b64decode("BhY9/WcAZA=="), + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {"0000fd3d-0000-1000-8000-00805f9b34fb": b"g\x00d"} + assert adv.manufacturer_data == {2409: b"`U\xf9<4\xda\xd8\x00\x101\x00\x00"} + assert adv.tx_power is None + + +def test_parse_advertisement_data_moat(): + data = [ + base64.b64decode("AgEGAwMAEBUWABDfeeOmErMVUHBjVGIcb7kL//8="), + base64.b64decode("AgoAAwMAIAsWAFDfeeOmErO5CwgJTW9hdF9TMg=="), + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name == "Moat_S2" + assert adv.service_uuids == [ + "00001000-0000-1000-8000-00805f9b34fb", + "00002000-0000-1000-8000-00805f9b34fb", + ] + assert adv.service_data == { + "00001000-0000-1000-8000-00805f9b34fb": b"\xdfy\xe3\xa6\x12\xb3\x15PpcTb" + b"\x1co\xb9\x0b\xff\xff", + "00005000-0000-1000-8000-00805f9b34fb": b"\xdfy\xe3\xa6\x12\xb3\xb9\x0b", + } + assert adv.manufacturer_data == {} + assert adv.tx_power == 0 + + +def test_parse_advertisement_data_unknown_apple_215(): + data = [ + base64.b64decode("AgEGGv9MAAIV1Ubfl0dXR+++CT4ty90MdxU2zcm1"), + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {} + assert adv.manufacturer_data == { + 76: b"\x02\x15\xd5F\xdf\x97GWG\xef\xbe\t>-\xcb\xdd\x0cw\x156\xcd\xc9\xb5" + } + assert adv.tx_power is None + + +def test_parse_advertisement_data_oral_b_toothbrush(): + data = [ + base64.b64decode("AgEGDv/cAAYyawNSAAEECQAEAwIN/g=="), + base64.b64decode("EglPcmFsLUIgVG9vdGhicnVzaAUSEABQAAIKAA=="), + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name == "Oral-B Toothbrush" + assert adv.service_uuids == ["0000fe0d-0000-1000-8000-00805f9b34fb"] + assert adv.service_data == {} + assert adv.manufacturer_data == {220: b"\x062k\x03R\x00\x01\x04\t\x00\x04"} + assert adv.tx_power == 0 + + +def test_parse_advertisement_short_local_name(): + data = [ + base64.b64decode("AgEGFv9MAAYxAOTEm+77PgUADQABAmMRIGUECE5hbg=="), + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name == "Nan" + assert adv.service_uuids == [] + assert adv.service_data == {} + assert adv.manufacturer_data == { + 76: b"\x061\x00\xe4\xc4\x9b\xee\xfb>\x05\x00\r\x00\x01\x02c\x11 e" + } + assert adv.tx_power is None + + +def test_parse_advertisement_data_32bit_service_data(): + data = [ + b"\x07\x20\x1a\x02\n\x05\n\xff", + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff"} + assert adv.manufacturer_data == {} + assert adv.tx_power is None + + +def test_parse_advertisement_data_128bit_service_data(): + data = [ + b"\x12\x21\x1a\x02\n\x05\n\xff\x062k\x03R\x00\x01\x04\t\x00\x04", + ] + + adv = parse_advertisement_data(data) + + assert adv.local_name is None + assert adv.service_uuids == [] + assert adv.service_data == {"00090401-0052-036b-3206-ff0a050a021a": b"\x04"} + assert adv.manufacturer_data == {} + assert adv.tx_power is None