Skip to content

Commit

Permalink
Merge pull request #438 from BenjaminLawson/pandora-extended-advertising
Browse files Browse the repository at this point in the history
Implement Pandora extended advertising
  • Loading branch information
barbibulle committed Mar 8, 2024
2 parents e554bd1 + 256044a commit 1f3aee5
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ __pycache__
# generated by setuptools_scm
bumble/_version.py
.vscode/launch.json
.vscode/settings.json
/.idea
venv/
.venv/
# snoop logs
out/
20 changes: 15 additions & 5 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,20 @@ async def create_advertising_set(
Returns:
An AdvertisingSet instance.
"""
# Instantiate default values
if advertising_parameters is None:
advertising_parameters = AdvertisingParameters()

if (
not advertising_parameters.advertising_event_properties.is_legacy
and advertising_data
and scan_response_data
):
raise ValueError(
"Extended advertisements can't have both data and scan \
response data"
)

# Allocate a new handle
try:
advertising_handle = next(
Expand All @@ -2125,10 +2139,6 @@ async def create_advertising_set(
except StopIteration as exc:
raise RuntimeError("all valid advertising handles already in use") from exc

# Instantiate default values
if advertising_parameters is None:
advertising_parameters = AdvertisingParameters()

# Use the device's random address if a random address is needed but none was
# provided.
if (
Expand Down Expand Up @@ -2222,7 +2232,7 @@ async def start_scanning(
scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type: int = OwnAddressType.RANDOM,
filter_duplicates: bool = False,
scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY),
scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY],
) -> None:
# Check that the arguments are legal
if scan_interval < scan_window:
Expand Down
144 changes: 138 additions & 6 deletions bumble/pandora/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
DEVICE_DEFAULT_SCAN_INTERVAL,
DEVICE_DEFAULT_SCAN_WINDOW,
Advertisement,
AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
Phy,
)
from bumble.gatt import Service
from bumble.hci import (
Expand All @@ -47,6 +50,7 @@
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
from pandora.host_grpc_aio import HostServicer
from pandora import host_pb2
from pandora.host_pb2 import (
NOT_CONNECTABLE,
NOT_DISCOVERABLE,
Expand Down Expand Up @@ -94,6 +98,25 @@
3: SECONDARY_CODED,
}

PRIMARY_PHY_TO_BUMBLE_PHY_MAP: Dict[PrimaryPhy, Phy] = {
PRIMARY_1M: Phy.LE_1M,
PRIMARY_CODED: Phy.LE_CODED,
}

SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
SECONDARY_NONE: Phy.LE_1M,
SECONDARY_1M: Phy.LE_1M,
SECONDARY_2M: Phy.LE_2M,
SECONDARY_CODED: Phy.LE_CODED,
}

OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = {
host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC,
host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM,
}


class HostService(HostServicer):
waited_connections: Set[int]
Expand Down Expand Up @@ -281,10 +304,113 @@ def on_disconnection(_: None) -> None:
async def Advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
if not request.legacy:
raise NotImplementedError(
"TODO: add support for extended advertising in Bumble"
try:
if request.legacy:
async for rsp in self.legacy_advertise(request, context):
yield rsp
else:
async for rsp in self.extended_advertise(request, context):
yield rsp
finally:
pass

async def extended_advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
advertising_data = bytes(self.unpack_data_types(request.data))
scan_response_data = bytes(self.unpack_data_types(request.scan_response_data))
scannable = len(scan_response_data) != 0

advertising_event_properties = AdvertisingEventProperties(
is_connectable=request.connectable,
is_scannable=scannable,
is_directed=request.target is not None,
is_high_duty_cycle_directed_connectable=False,
is_legacy=False,
is_anonymous=False,
include_tx_power=False,
)

peer_address = Address.ANY
if request.target:
# Need to reverse bytes order since Bumble Address is using MSB.
target_bytes = bytes(reversed(request.target))
if request.target_variant() == "public":
peer_address = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS)
else:
peer_address = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)

advertising_parameters = AdvertisingParameters(
advertising_event_properties=advertising_event_properties,
own_address_type=OWN_ADDRESS_MAP[request.own_address_type],
peer_address=peer_address,
primary_advertising_phy=PRIMARY_PHY_TO_BUMBLE_PHY_MAP[request.primary_phy],
secondary_advertising_phy=SECONDARY_PHY_TO_BUMBLE_PHY_MAP[
request.secondary_phy
],
)
if advertising_interval := request.interval:
advertising_parameters.primary_advertising_interval_min = int(
advertising_interval
)
advertising_parameters.primary_advertising_interval_max = int(
advertising_interval
)
if interval_range := request.interval_range:
advertising_parameters.primary_advertising_interval_max += int(
interval_range
)

advertising_set = await self.device.create_advertising_set(
advertising_parameters=advertising_parameters,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)

pending_connection: asyncio.Future[
bumble.device.Connection
] = asyncio.get_running_loop().create_future()

if request.connectable:

def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
):
pending_connection.set_result(connection)

self.device.on('connection', on_connection)

try:
# Advertise until RPC is canceled
while True:
if not advertising_set.enabled:
self.log.debug('Advertise (extended)')
await advertising_set.start()

if not request.connectable:
await asyncio.sleep(1)
continue

connection = await pending_connection
pending_connection = asyncio.get_running_loop().create_future()

cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
yield AdvertiseResponse(connection=Connection(cookie=cookie))

await asyncio.sleep(1)
finally:
try:
self.log.debug('Stop Advertise (extended)')
await advertising_set.stop()
await advertising_set.remove()
except Exception:
pass

async def legacy_advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
if advertising_interval := request.interval:
self.device.config.advertising_interval_min = int(advertising_interval)
self.device.config.advertising_interval_max = int(advertising_interval)
Expand Down Expand Up @@ -422,11 +548,16 @@ async def Scan(
self, request: ScanRequest, context: grpc.ServicerContext
) -> AsyncGenerator[ScanningResponse, None]:
# TODO: modify `start_scanning` to accept floats instead of int for ms values
if request.phys:
raise NotImplementedError("TODO: add support for `request.phys`")

self.log.debug('Scan')

scanning_phys = []
if PRIMARY_1M in request.phys:
scanning_phys.append(int(Phy.LE_1M))
if PRIMARY_CODED in request.phys:
scanning_phys.append(int(Phy.LE_CODED))
if not scanning_phys:
scanning_phys = [int(Phy.LE_1M), int(Phy.LE_CODED)]

scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue()
handler = self.device.on('advertisement', scan_queue.put_nowait)
await self.device.start_scanning(
Expand All @@ -439,6 +570,7 @@ async def Scan(
scan_window=int(request.window)
if request.window
else DEVICE_DEFAULT_SCAN_WINDOW,
scanning_phys=scanning_phys,
)

try:
Expand Down

0 comments on commit 1f3aee5

Please sign in to comment.