Skip to content

Commit

Permalink
Implement Pandora extended advertising
Browse files Browse the repository at this point in the history
Support setting the PHY of Pandora scans.
  • Loading branch information
BenjaminLawson committed Mar 7, 2024
1 parent de8f3d9 commit 0c61c2b
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ __pycache__
# generated by setuptools_scm
bumble/_version.py
.vscode/launch.json
.vscode/settings.json
/.idea
venv/
.venv/
# snoop logs
out/
18 changes: 14 additions & 4 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,20 @@ async def create_advertising_set(
Returns:
An AdvertisingSet instance.
"""
# Instantiate default values
if advertising_parameters is None:
advertising_parameters = AdvertisingParameters()

if (
not advertising_parameters.advertising_event_properties.is_legacy
and advertising_data
and scan_response_data
):
raise ValueError(
"Extended advertisements can't have both data and scan \
response data"
)

# Allocate a new handle
try:
advertising_handle = next(
Expand All @@ -2125,10 +2139,6 @@ async def create_advertising_set(
except StopIteration as exc:
raise RuntimeError("all valid advertising handles already in use") from exc

# Instantiate default values
if advertising_parameters is None:
advertising_parameters = AdvertisingParameters()

# Use the device's random address if a random address is needed but none was
# provided.
if (
Expand Down
133 changes: 126 additions & 7 deletions bumble/pandora/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
DEVICE_DEFAULT_SCAN_INTERVAL,
DEVICE_DEFAULT_SCAN_WINDOW,
Advertisement,
AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
Phy,
)
from bumble.gatt import Service
from bumble.hci import (
Expand All @@ -47,6 +50,7 @@
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
from pandora.host_grpc_aio import HostServicer
from pandora import host_pb2
from pandora.host_pb2 import (
NOT_CONNECTABLE,
NOT_DISCOVERABLE,
Expand Down Expand Up @@ -94,6 +98,18 @@
3: SECONDARY_CODED,
}

PRIMARY_PHY_TO_BUMBLE_PHY_MAP: Dict[PrimaryPhy, Phy] = {
PRIMARY_1M: Phy.LE_1M,
PRIMARY_CODED: Phy.LE_CODED,
}

SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
SECONDARY_NONE: Phy.LE_1M,
SECONDARY_1M: Phy.LE_1M,
SECONDARY_2M: Phy.LE_2M,
SECONDARY_CODED: Phy.LE_CODED,
}


class HostService(HostServicer):
waited_connections: Set[int]
Expand Down Expand Up @@ -281,10 +297,107 @@ def on_disconnection(_: None) -> None:
async def Advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
if not request.legacy:
raise NotImplementedError(
"TODO: add support for extended advertising in Bumble"
)
try:
if request.legacy:
async for rsp in self.legacy_advertise(request, context):
yield rsp
else:
async for rsp in self.extended_advertise(request, context):
yield rsp
finally:
pass

async def extended_advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
advertising_data = bytes(self.unpack_data_types(request.data))
scan_response_data = bytes(self.unpack_data_types(request.scan_response_data))
scannable = len(scan_response_data) != 0

advertising_event_properties = AdvertisingEventProperties(
is_connectable=request.connectable,
is_scannable=scannable,
is_directed=request.target is not None,
is_high_duty_cycle_directed_connectable=False,
is_legacy=False,
is_anonymous=False,
include_tx_power=False,
)

peer_address = Address.ANY
if request.target:
# Need to reverse bytes order since Bumble Address is using MSB.
target_bytes = bytes(reversed(request.target))
if request.target_variant() == "public":
peer_address = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS)
else:
peer_address = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)

advertising_parameters = AdvertisingParameters(
advertising_event_properties=advertising_event_properties,
own_address_type=request.own_address_type,
peer_address=peer_address,
primary_advertising_phy=PRIMARY_PHY_TO_BUMBLE_PHY_MAP[request.primary_phy],
secondary_advertising_phy=SECONDARY_PHY_TO_BUMBLE_PHY_MAP[
request.secondary_phy
],
)
if advertising_interval := request.interval:
advertising_parameters.advertising_interval_min = int(advertising_interval)
advertising_parameters.advertising_interval_max = int(advertising_interval)
if interval_range := request.interval_range:
advertising_parameters.advertising_interval_max += int(interval_range)

advertising_set = await self.device.create_advertising_set(
advertising_parameters=advertising_parameters,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)

pending_connection: asyncio.Future[
bumble.device.Connection
] = asyncio.get_running_loop().create_future()

if request.connectable:

def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
):
pending_connection.set_result(connection)

self.device.on('connection', on_connection)

try:
# Advertise until RPC is canceled
while True:
if not advertising_set.enabled:
self.log.debug('Advertise (extended)')
await advertising_set.start()

if not request.connectable:
await asyncio.sleep(1)
continue

connection = await pending_connection
pending_connection = asyncio.get_running_loop().create_future()

cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
yield AdvertiseResponse(connection=Connection(cookie=cookie))

await asyncio.sleep(1)
finally:
try:
self.log.debug('Stop Advertise (extended)')
await advertising_set.stop()
await advertising_set.remove()
except Exception:
pass

async def legacy_advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
if advertising_interval := request.interval:
self.device.config.advertising_interval_min = int(advertising_interval)
self.device.config.advertising_interval_max = int(advertising_interval)
Expand Down Expand Up @@ -422,11 +535,16 @@ async def Scan(
self, request: ScanRequest, context: grpc.ServicerContext
) -> AsyncGenerator[ScanningResponse, None]:
# TODO: modify `start_scanning` to accept floats instead of int for ms values
if request.phys:
raise NotImplementedError("TODO: add support for `request.phys`")

self.log.debug('Scan')

scanning_phys = []
if PRIMARY_1M in request.phys:
scanning_phys.append(Phy.LE_1M)
if PRIMARY_CODED in request.phys:
scanning_phys.append(Phy.LE_CODED)
if not scanning_phys:
scanning_phys = [Phy.LE_1M, Phy.LE_CODED]

scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue()
handler = self.device.on('advertisement', scan_queue.put_nowait)
await self.device.start_scanning(
Expand All @@ -439,6 +557,7 @@ async def Scan(
scan_window=int(request.window)
if request.window
else DEVICE_DEFAULT_SCAN_WINDOW,
scanning_phys=scanning_phys,
)

try:
Expand Down

0 comments on commit 0c61c2b

Please sign in to comment.