Skip to content

Commit

Permalink
feat: add gap parser (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco committed Nov 13, 2022
1 parent 2b4fd66 commit dcb1d86
Show file tree
Hide file tree
Showing 3 changed files with 357 additions and 2 deletions.
14 changes: 12 additions & 2 deletions src/bluetooth_data_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
"""Bluetooth data tools."""
from __future__ import annotations

__version__ = "0.2.0"
from struct import Struct

from .gap import (
BLEGAPAdvertisement,
BLEGAPType,
decode_advertisement_data,
parse_advertisement_data,
)

from struct import Struct
__version__ = "0.2.0"

L_PACK = Struct(">L")

Expand All @@ -14,6 +20,10 @@
"newest_manufacturer_data",
"human_readable_name",
"short_address",
"BLEGAPType",
"BLEGAPAdvertisement",
"decode_advertisement_data",
"parse_advertisement_data",
]


Expand Down
145 changes: 145 additions & 0 deletions src/bluetooth_data_tools/gap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""GATT Advertisement and Scan Response Data (GAP)."""


from __future__ import annotations

import logging
from collections.abc import Iterable
from dataclasses import dataclass
from enum import IntEnum
from uuid import UUID

BLE_UUID = "0000-1000-8000-00805f9b34fb"
_LOGGER = logging.getLogger(__name__)


@dataclass
class BLEGAPAdvertisement:

local_name: str | None
service_uuids: list[str]
service_data: dict[str, bytes]
manufacturer_data: dict[int, bytes]
tx_power: int | None


class BLEGAPType(IntEnum):
"""Advertising data types."""

TYPE_UNKNOWN = 0x00
TYPE_FLAGS = 0x01
TYPE_16BIT_SERVICE_UUID_MORE_AVAILABLE = 0x02
TYPE_16BIT_SERVICE_UUID_COMPLETE = 0x03
TYPE_32BIT_SERVICE_UUID_MORE_AVAILABLE = 0x04
TYPE_32BIT_SERVICE_UUID_COMPLETE = 0x05
TYPE_128BIT_SERVICE_UUID_MORE_AVAILABLE = 0x06
TYPE_128BIT_SERVICE_UUID_COMPLETE = 0x07
TYPE_SHORT_LOCAL_NAME = 0x08
TYPE_COMPLETE_LOCAL_NAME = 0x09
TYPE_TX_POWER_LEVEL = 0x0A
TYPE_CLASS_OF_DEVICE = 0x0D
TYPE_SIMPLE_PAIRING_HASH_C = 0x0E
TYPE_SIMPLE_PAIRING_RANDOMIZER_R = 0x0F
TYPE_SECURITY_MANAGER_TK_VALUE = 0x10
TYPE_SECURITY_MANAGER_OOB_FLAGS = 0x11
TYPE_SLAVE_CONNECTION_INTERVAL_RANGE = 0x12
TYPE_SOLICITED_SERVICE_UUIDS_16BIT = 0x14
TYPE_SOLICITED_SERVICE_UUIDS_128BIT = 0x15
TYPE_SERVICE_DATA = 0x16
TYPE_PUBLIC_TARGET_ADDRESS = 0x17
TYPE_RANDOM_TARGET_ADDRESS = 0x18
TYPE_APPEARANCE = 0x19
TYPE_ADVERTISING_INTERVAL = 0x1A
TYPE_LE_BLUETOOTH_DEVICE_ADDRESS = 0x1B
TYPE_LE_ROLE = 0x1C
TYPE_SIMPLE_PAIRING_HASH_C256 = 0x1D
TYPE_SIMPLE_PAIRING_RANDOMIZER_R256 = 0x1E
TYPE_SERVICE_DATA_32BIT_UUID = 0x20
TYPE_SERVICE_DATA_128BIT_UUID = 0x21
TYPE_URI = 0x24
TYPE_3D_INFORMATION_DATA = 0x3D
TYPE_MANUFACTURER_SPECIFIC_DATA = 0xFF


_BLEGAPType_MAP = {gap_ad.value: gap_ad for gap_ad in BLEGAPType}


def decode_advertisement_data(
encoded_struct: bytes,
) -> Iterable[tuple[BLEGAPType, bytes]]:
"""Decode a BLE GAP AD structure."""
offset = 0
while offset < len(encoded_struct):
try:
length = encoded_struct[offset]
if not length:
return
type_ = encoded_struct[offset + 1]
if not type_:
return
start = offset + 2
end = start + length - 1
value = encoded_struct[start:end]
except IndexError as ex:
_LOGGER.error(
"Invalid BLE GAP AD structure at offset %s: %s (%s)",
offset,
encoded_struct,
ex,
)
return

yield _BLEGAPType_MAP.get(type_, BLEGAPType.TYPE_UNKNOWN), value
offset += 1 + length


def parse_advertisement_data(
data: Iterable[bytes],
) -> BLEGAPAdvertisement:
"""Parse advertisement data."""
manufacturer_data: dict[int, bytes] = {}
service_data: dict[str, bytes] = {}
service_uuids: list[str] = []
local_name: str | None = None
tx_power: int | None = None

for gap_data in data:
for gap_type, gap_value in decode_advertisement_data(gap_data):
if gap_type == BLEGAPType.TYPE_SHORT_LOCAL_NAME and not local_name:
local_name = gap_value.decode("utf-8", errors="replace")
elif gap_type == BLEGAPType.TYPE_COMPLETE_LOCAL_NAME:
local_name = gap_value.decode("utf-8", errors="replace")
elif gap_type == BLEGAPType.TYPE_MANUFACTURER_SPECIFIC_DATA:
manufacturer_id = int.from_bytes(gap_value[:2], "little")
manufacturer_data[manufacturer_id] = gap_value[2:]
elif gap_type in {
BLEGAPType.TYPE_16BIT_SERVICE_UUID_COMPLETE,
BLEGAPType.TYPE_16BIT_SERVICE_UUID_MORE_AVAILABLE,
}:
uuid_int = int.from_bytes(gap_value[:2], "little")
service_uuids.append(f"0000{uuid_int:04x}-{BLE_UUID}")
elif gap_type in {
BLEGAPType.TYPE_128BIT_SERVICE_UUID_MORE_AVAILABLE,
BLEGAPType.TYPE_128BIT_SERVICE_UUID_COMPLETE,
}:
uuid_str = str(UUID(int=int.from_bytes(gap_value[:16], "little")))
service_uuids.append(uuid_str)
elif gap_type == BLEGAPType.TYPE_SERVICE_DATA:
uuid_int = int.from_bytes(gap_value[:2], "little")
service_data[f"0000{uuid_int:04x}-{BLE_UUID}"] = gap_value[2:]
elif gap_type == BLEGAPType.TYPE_SERVICE_DATA_32BIT_UUID:
uuid_int = int.from_bytes(gap_value[:4], "little")
service_data[f"{uuid_int:08x}-{BLE_UUID}"] = gap_value[4:]
elif gap_type == BLEGAPType.TYPE_SERVICE_DATA_128BIT_UUID:
uuid_str = str(UUID(int=int.from_bytes(gap_value[:16], "little")))
service_data[uuid_str] = gap_value[16:]
elif gap_type == BLEGAPType.TYPE_TX_POWER_LEVEL:
tx_power = int.from_bytes(gap_value, "little", signed=True)

return BLEGAPAdvertisement(
local_name,
service_uuids,
service_data,
manufacturer_data,
tx_power,
)
200 changes: 200 additions & 0 deletions tests/test_gap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import base64

from bluetooth_data_tools import parse_advertisement_data


def test_parse_advertisement_data_Prodigio_D83567A4F5A5():
data = [
base64.b64decode("AgoEFglQcm9kaWdpb19EODM1NjdBNEY1QTU="),
base64.b64decode("AgEGEQYbxdWlAgCqneMRKvIQGaoGCf8CJUQJgAcAAg=="),
]

adv = parse_advertisement_data(data)

assert adv.local_name == "Prodigio_D83567A4F5A5"
assert adv.service_uuids == ["06aa1910-f22a-11e3-9daa-0002a5d5c51b"]
assert adv.service_data == {}
assert adv.manufacturer_data == {9474: b"D\t\x80\x07\x00\x02"}
assert adv.tx_power == 4


def test_parse_advertisement_data_unknown_apple_device():
data = [
base64.b64decode("AgEaAgoFCv9MABAFChx3+Vs="),
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {}
assert adv.manufacturer_data == {76: b"\x10\x05\n\x1cw\xf9["}
assert adv.tx_power == 5


def test_parse_advertisement_data_empty():
data = [
b"\x00",
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {}
assert adv.manufacturer_data == {}
assert adv.tx_power is None


def test_parse_advertisement_data_flags_only():
data = [
b"\x01\x01\x06",
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {}
assert adv.manufacturer_data == {}
assert adv.tx_power is None


def test_parse_advertisement_data_ignores_invalid():
data = [
b"\x02\x01\x1a\x02\n\x05\n\xffL\x00\x10\x05\n\x1cw\xf9[\x02\x01",
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {}
assert adv.manufacturer_data == {76: b"\x10\x05\n\x1cw\xf9["}
assert adv.tx_power == 5


def test_parse_advertisement_data_ignores_zero_type():
data = [
b"\x02\x01\x1a\x02\n\x05\n\xffL\x00\x10\x05\n\x1cw\xf9[\x02\x00",
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {}
assert adv.manufacturer_data == {76: b"\x10\x05\n\x1cw\xf9["}
assert adv.tx_power == 5


def test_parse_advertisement_data_unknown_fd3d():
data = [
base64.b64decode("AgEGD/9pCWBV+Tw02tgAEDEAAA=="),
base64.b64decode("BhY9/WcAZA=="),
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {"0000fd3d-0000-1000-8000-00805f9b34fb": b"g\x00d"}
assert adv.manufacturer_data == {2409: b"`U\xf9<4\xda\xd8\x00\x101\x00\x00"}
assert adv.tx_power is None


def test_parse_advertisement_data_moat():
data = [
base64.b64decode("AgEGAwMAEBUWABDfeeOmErMVUHBjVGIcb7kL//8="),
base64.b64decode("AgoAAwMAIAsWAFDfeeOmErO5CwgJTW9hdF9TMg=="),
]

adv = parse_advertisement_data(data)

assert adv.local_name == "Moat_S2"
assert adv.service_uuids == [
"00001000-0000-1000-8000-00805f9b34fb",
"00002000-0000-1000-8000-00805f9b34fb",
]
assert adv.service_data == {
"00001000-0000-1000-8000-00805f9b34fb": b"\xdfy\xe3\xa6\x12\xb3\x15PpcTb"
b"\x1co\xb9\x0b\xff\xff",
"00005000-0000-1000-8000-00805f9b34fb": b"\xdfy\xe3\xa6\x12\xb3\xb9\x0b",
}
assert adv.manufacturer_data == {}
assert adv.tx_power == 0


def test_parse_advertisement_data_unknown_apple_215():
data = [
base64.b64decode("AgEGGv9MAAIV1Ubfl0dXR+++CT4ty90MdxU2zcm1"),
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {}
assert adv.manufacturer_data == {
76: b"\x02\x15\xd5F\xdf\x97GWG\xef\xbe\t>-\xcb\xdd\x0cw\x156\xcd\xc9\xb5"
}
assert adv.tx_power is None


def test_parse_advertisement_data_oral_b_toothbrush():
data = [
base64.b64decode("AgEGDv/cAAYyawNSAAEECQAEAwIN/g=="),
base64.b64decode("EglPcmFsLUIgVG9vdGhicnVzaAUSEABQAAIKAA=="),
]

adv = parse_advertisement_data(data)

assert adv.local_name == "Oral-B Toothbrush"
assert adv.service_uuids == ["0000fe0d-0000-1000-8000-00805f9b34fb"]
assert adv.service_data == {}
assert adv.manufacturer_data == {220: b"\x062k\x03R\x00\x01\x04\t\x00\x04"}
assert adv.tx_power == 0


def test_parse_advertisement_short_local_name():
data = [
base64.b64decode("AgEGFv9MAAYxAOTEm+77PgUADQABAmMRIGUECE5hbg=="),
]

adv = parse_advertisement_data(data)

assert adv.local_name == "Nan"
assert adv.service_uuids == []
assert adv.service_data == {}
assert adv.manufacturer_data == {
76: b"\x061\x00\xe4\xc4\x9b\xee\xfb>\x05\x00\r\x00\x01\x02c\x11 e"
}
assert adv.tx_power is None


def test_parse_advertisement_data_32bit_service_data():
data = [
b"\x07\x20\x1a\x02\n\x05\n\xff",
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff"}
assert adv.manufacturer_data == {}
assert adv.tx_power is None


def test_parse_advertisement_data_128bit_service_data():
data = [
b"\x12\x21\x1a\x02\n\x05\n\xff\x062k\x03R\x00\x01\x04\t\x00\x04",
]

adv = parse_advertisement_data(data)

assert adv.local_name is None
assert adv.service_uuids == []
assert adv.service_data == {"00090401-0052-036b-3206-ff0a050a021a": b"\x04"}
assert adv.manufacturer_data == {}
assert adv.tx_power is None

0 comments on commit dcb1d86

Please sign in to comment.