-
Notifications
You must be signed in to change notification settings - Fork 65
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
attempt to fix pandora test (+3 squashed commits)
Squashed commits: [759372d] address PR comments [2f2a275] wip [cc86b98] wip wip address PR comments attempt to fix pandora test
- Loading branch information
1 parent
0903093
commit dea907b
Showing
10 changed files
with
1,987 additions
and
143 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,386 @@ | ||
# Copyright 2024 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# ----------------------------------------------------------------------------- | ||
# Imports | ||
# ----------------------------------------------------------------------------- | ||
from __future__ import annotations | ||
import asyncio | ||
import dataclasses | ||
import logging | ||
import os | ||
from typing import Dict, Optional | ||
|
||
import click | ||
import pyee | ||
|
||
from bumble.colors import color | ||
import bumble.core | ||
import bumble.device | ||
import bumble.gatt | ||
import bumble.hci | ||
import bumble.profiles.bap | ||
import bumble.profiles.pbp | ||
import bumble.transport | ||
import bumble.utils | ||
|
||
|
||
# ----------------------------------------------------------------------------- | ||
# Logging | ||
# ----------------------------------------------------------------------------- | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
# ----------------------------------------------------------------------------- | ||
# Constants | ||
# ----------------------------------------------------------------------------- | ||
AURACAST_DEFAULT_DEVICE_NAME = "Bumble Auracast" | ||
AURACAST_DEFAULT_DEVICE_ADDRESS = bumble.hci.Address("F0:F1:F2:F3:F4:F5") | ||
|
||
|
||
# ----------------------------------------------------------------------------- | ||
# Discover Broadcasts | ||
# ----------------------------------------------------------------------------- | ||
class BroadcastDiscoverer: | ||
@dataclasses.dataclass | ||
class Broadcast(pyee.EventEmitter): | ||
name: str | ||
sync: bumble.device.PeriodicAdvertisingSync | ||
rssi: int = 0 | ||
public_broadcast_announcement: Optional[ | ||
bumble.profiles.pbp.PublicBroadcastAnnouncement | ||
] = None | ||
broadcast_audio_announcement: Optional[ | ||
bumble.profiles.bap.BroadcastAudioAnnouncement | ||
] = None | ||
basic_audio_announcement: Optional[ | ||
bumble.profiles.bap.BasicAudioAnnouncement | ||
] = None | ||
appearance: Optional[bumble.core.Appearance] = None | ||
biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None | ||
|
||
def __post_init__(self) -> None: | ||
super().__init__() | ||
self.sync.on('establishment', self.on_sync_establishment) | ||
self.sync.on('loss', self.on_sync_loss) | ||
self.sync.on('periodic_advertisement', self.on_periodic_advertisement) | ||
self.sync.on('biginfo_advertisement', self.on_biginfo_advertisement) | ||
|
||
self.establishment_timeout_task = asyncio.create_task( | ||
self.wait_for_establishment() | ||
) | ||
|
||
async def wait_for_establishment(self) -> None: | ||
await asyncio.sleep(5.0) | ||
if self.sync.state == bumble.device.PeriodicAdvertisingSync.State.PENDING: | ||
print( | ||
color( | ||
'!!! Periodic advertisement sync not established in time, ' | ||
'canceling', | ||
'red', | ||
) | ||
) | ||
await self.sync.terminate() | ||
|
||
def update(self, advertisement: bumble.device.Advertisement) -> None: | ||
self.rssi = advertisement.rssi | ||
for service_data in advertisement.data.get_all( | ||
bumble.core.AdvertisingData.SERVICE_DATA | ||
): | ||
assert isinstance(service_data, tuple) | ||
service_uuid, data = service_data | ||
assert isinstance(data, bytes) | ||
|
||
if ( | ||
service_uuid | ||
== bumble.gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE | ||
): | ||
self.public_broadcast_announcement = ( | ||
bumble.profiles.pbp.PublicBroadcastAnnouncement.from_bytes(data) | ||
) | ||
continue | ||
|
||
if ( | ||
service_uuid | ||
== bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE | ||
): | ||
self.broadcast_audio_announcement = ( | ||
bumble.profiles.bap.BroadcastAudioAnnouncement.from_bytes(data) | ||
) | ||
continue | ||
|
||
self.appearance = advertisement.data.get( # type: ignore[assignment] | ||
bumble.core.AdvertisingData.APPEARANCE | ||
) | ||
|
||
def print(self) -> None: | ||
print( | ||
color('Broadcast:', 'yellow'), | ||
self.sync.advertiser_address, | ||
color(self.sync.state.name, 'green'), | ||
) | ||
print(f' {color("Name", "cyan")}: {self.name}') | ||
if self.appearance: | ||
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') | ||
print(f' {color("RSSI", "cyan")}: {self.rssi}') | ||
print(f' {color("SID", "cyan")}: {self.sync.sid}') | ||
|
||
if self.broadcast_audio_announcement: | ||
print( | ||
f' {color("Broadcast ID", "cyan")}: ' | ||
f'{self.broadcast_audio_announcement.broadcast_id}' | ||
) | ||
|
||
if self.public_broadcast_announcement: | ||
print( | ||
f' {color("Features", "cyan")}: ' | ||
f'{self.public_broadcast_announcement.features}' | ||
) | ||
print( | ||
f' {color("Metadata", "cyan")}: ' | ||
f'{self.public_broadcast_announcement.metadata}' | ||
) | ||
|
||
if self.basic_audio_announcement: | ||
print(color(' Audio:', 'cyan')) | ||
print( | ||
color(' Presentation Delay:', 'magenta'), | ||
self.basic_audio_announcement.presentation_delay, | ||
) | ||
for subgroup in self.basic_audio_announcement.subgroups: | ||
print(color(' Subgroup:', 'magenta')) | ||
print(color(' Codec ID:', 'yellow')) | ||
print( | ||
color(' Coding Format: ', 'green'), | ||
subgroup.codec_id.coding_format.name, | ||
) | ||
print( | ||
color(' Company ID: ', 'green'), | ||
subgroup.codec_id.company_id, | ||
) | ||
print( | ||
color(' Vendor Specific Codec ID:', 'green'), | ||
subgroup.codec_id.vendor_specific_codec_id, | ||
) | ||
print( | ||
color(' Codec Config:', 'yellow'), | ||
subgroup.codec_specific_configuration, | ||
) | ||
print(color(' Metadata: ', 'yellow'), subgroup.metadata) | ||
|
||
for bis in subgroup.bis: | ||
print(color(f' BIS [{bis.index}]:', 'yellow')) | ||
print( | ||
color(' Codec Config:', 'green'), | ||
bis.codec_specific_configuration, | ||
) | ||
|
||
if self.biginfo: | ||
print(color(' BIG:', 'cyan')) | ||
print( | ||
color(' Number of BIS:', 'magenta'), | ||
self.biginfo.num_bis, | ||
) | ||
print( | ||
color(' PHY: ', 'magenta'), | ||
self.biginfo.phy.name, | ||
) | ||
print( | ||
color(' Framed: ', 'magenta'), | ||
self.biginfo.framed, | ||
) | ||
print( | ||
color(' Encrypted: ', 'magenta'), | ||
self.biginfo.encrypted, | ||
) | ||
|
||
def on_sync_establishment(self) -> None: | ||
self.establishment_timeout_task.cancel() | ||
self.emit('change') | ||
|
||
def on_sync_loss(self) -> None: | ||
self.basic_audio_announcement = None | ||
self.biginfo = None | ||
self.emit('change') | ||
|
||
def on_periodic_advertisement( | ||
self, advertisement: bumble.device.PeriodicAdvertisement | ||
) -> None: | ||
if advertisement.data is None: | ||
return | ||
|
||
for service_data in advertisement.data.get_all( | ||
bumble.core.AdvertisingData.SERVICE_DATA | ||
): | ||
assert isinstance(service_data, tuple) | ||
service_uuid, data = service_data | ||
assert isinstance(data, bytes) | ||
|
||
if service_uuid == bumble.gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE: | ||
self.basic_audio_announcement = ( | ||
bumble.profiles.bap.BasicAudioAnnouncement.from_bytes(data) | ||
) | ||
break | ||
|
||
self.emit('change') | ||
|
||
def on_biginfo_advertisement( | ||
self, advertisement: bumble.device.BIGInfoAdvertisement | ||
) -> None: | ||
self.biginfo = advertisement | ||
self.emit('change') | ||
|
||
def __init__( | ||
self, | ||
device: bumble.device.Device, | ||
filter_duplicates: bool, | ||
sync_timeout: float, | ||
): | ||
self.device = device | ||
self.filter_duplicates = filter_duplicates | ||
self.sync_timeout = sync_timeout | ||
self.broadcasts: Dict[bumble.hci.Address, BroadcastDiscoverer.Broadcast] = {} | ||
self.status_message = '' | ||
device.on('advertisement', self.on_advertisement) | ||
|
||
async def run(self) -> None: | ||
self.status_message = color('Scanning...', 'green') | ||
await self.device.start_scanning( | ||
active=False, | ||
filter_duplicates=False, | ||
) | ||
|
||
def refresh(self) -> None: | ||
# Clear the screen from the top | ||
print('\033[H') | ||
print('\033[0J') | ||
print('\033[H') | ||
|
||
# Print the status message | ||
print(self.status_message) | ||
print("==========================================") | ||
|
||
# Print all broadcasts | ||
for broadcast in self.broadcasts.values(): | ||
broadcast.print() | ||
print('------------------------------------------') | ||
|
||
# Clear the screen to the bottom | ||
print('\033[0J') | ||
|
||
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: | ||
if ( | ||
broadcast_name := advertisement.data.get( | ||
bumble.core.AdvertisingData.BROADCAST_NAME | ||
) | ||
) is None: | ||
return | ||
assert isinstance(broadcast_name, str) | ||
|
||
if broadcast := self.broadcasts.get(advertisement.address): | ||
broadcast.update(advertisement) | ||
self.refresh() | ||
return | ||
|
||
bumble.utils.AsyncRunner.spawn( | ||
self.on_new_broadcast(broadcast_name, advertisement) | ||
) | ||
|
||
async def on_new_broadcast( | ||
self, name: str, advertisement: bumble.device.Advertisement | ||
) -> None: | ||
periodic_advertising_sync = await self.device.create_periodic_advertising_sync( | ||
advertiser_address=advertisement.address, | ||
sid=advertisement.sid, | ||
sync_timeout=self.sync_timeout, | ||
filter_duplicates=self.filter_duplicates, | ||
) | ||
broadcast = self.Broadcast( | ||
name, | ||
periodic_advertising_sync, | ||
) | ||
broadcast.on('change', self.refresh) | ||
broadcast.update(advertisement) | ||
self.broadcasts[advertisement.address] = broadcast | ||
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) | ||
self.status_message = color( | ||
f'+Found {len(self.broadcasts)} broadcasts', 'green' | ||
) | ||
self.refresh() | ||
|
||
def on_broadcast_loss(self, broadcast: Broadcast) -> None: | ||
del self.broadcasts[broadcast.sync.advertiser_address] | ||
bumble.utils.AsyncRunner.spawn(broadcast.sync.terminate()) | ||
self.status_message = color( | ||
f'-Found {len(self.broadcasts)} broadcasts', 'green' | ||
) | ||
self.refresh() | ||
|
||
|
||
async def run_discover_broadcasts( | ||
filter_duplicates: bool, sync_timeout: float, transport: str | ||
) -> None: | ||
async with await bumble.transport.open_transport(transport) as ( | ||
hci_source, | ||
hci_sink, | ||
): | ||
device = bumble.device.Device.with_hci( | ||
AURACAST_DEFAULT_DEVICE_NAME, | ||
AURACAST_DEFAULT_DEVICE_ADDRESS, | ||
hci_source, | ||
hci_sink, | ||
) | ||
await device.power_on() | ||
discoverer = BroadcastDiscoverer(device, filter_duplicates, sync_timeout) | ||
await discoverer.run() | ||
await hci_source.terminated | ||
|
||
|
||
# ----------------------------------------------------------------------------- | ||
# Main | ||
# ----------------------------------------------------------------------------- | ||
@click.group() | ||
@click.pass_context | ||
def auracast( | ||
ctx, | ||
): | ||
ctx.ensure_object(dict) | ||
|
||
|
||
@auracast.command('discover-broadcasts') | ||
@click.option( | ||
'--filter-duplicates', is_flag=True, default=False, help='Filter duplicates' | ||
) | ||
@click.option( | ||
'--sync-timeout', | ||
metavar='SYNC_TIMEOUT', | ||
type=float, | ||
default=5.0, | ||
help='Sync timeout (in seconds)', | ||
) | ||
@click.argument('transport') | ||
@click.pass_context | ||
def discover_broadcasts(ctx, filter_duplicates, sync_timeout, transport): | ||
"""Discover public broadcasts""" | ||
asyncio.run(run_discover_broadcasts(filter_duplicates, sync_timeout, transport)) | ||
|
||
|
||
def main(): | ||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) | ||
auracast() | ||
|
||
|
||
# ----------------------------------------------------------------------------- | ||
if __name__ == "__main__": | ||
main() # pylint: disable=no-value-for-parameter |
Oops, something went wrong.