-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
dbus.py
162 lines (137 loc) · 5.15 KB
/
dbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from __future__ import annotations
import asyncio
import logging
from functools import cache
from pathlib import Path
from typing import Any
import async_timeout
from dbus_fast import BusType, Message, MessageType, unpack_variants
from dbus_fast.aio import MessageBus
from .history import AdvertisementHistory, load_history_from_managed_objects
_LOGGER = logging.getLogger(__name__)
REPLY_TIMEOUT = 8
class BlueZDBusObjects:
"""Fetch and parse BlueZObjects."""
def __init__(self) -> None:
"""Init the manager."""
self._packed_managed_objects: dict[str, Any] = {}
self._unpacked_managed_objects: dict[str, Any] = {}
async def load(self) -> None:
"""Load from the bus."""
self._packed_managed_objects = await _get_dbus_managed_objects()
self._unpacked_managed_objects = {}
@property
def adapters(self) -> list[str]:
"""Get adapters."""
return list(self.adapter_details)
@property
def unpacked_managed_objects(self) -> dict[str, Any]:
"""Get unpacked managed objects."""
if not self._unpacked_managed_objects:
self._unpacked_managed_objects = unpack_variants(
self._packed_managed_objects
)
return self._unpacked_managed_objects
@property
def adapter_details(self) -> dict[str, dict[str, Any]]:
"""Get adapters."""
return _adapters_from_managed_objects(self.unpacked_managed_objects)
@property
def history(self) -> dict[str, AdvertisementHistory]:
"""Get history from managed objects."""
return load_history_from_managed_objects(self.unpacked_managed_objects)
def _adapters_from_managed_objects(
managed_objects: dict[str, Any]
) -> dict[str, dict[str, Any]]:
adapters: dict[str, dict[str, Any]] = {}
for path, unpacked_data in managed_objects.items():
path_str = str(path)
if path_str.startswith("/org/bluez/hci"):
split_path = path_str.split("/")
adapter = split_path[3]
if adapter not in adapters:
adapters[adapter] = unpacked_data
return adapters
async def get_bluetooth_adapters() -> list[str]:
"""Return a list of bluetooth adapters."""
return list(await get_bluetooth_adapter_details())
async def get_bluetooth_adapter_details() -> dict[str, dict[str, Any]]:
"""Return a list of bluetooth adapter with details."""
results = await _get_dbus_managed_objects()
return {
adapter: unpack_variants(packed_data)
for adapter, packed_data in _adapters_from_managed_objects(results).items()
}
async def get_dbus_managed_objects() -> dict[str, Any]:
"""Return a list of bluetooth adapter with details."""
results = await _get_dbus_managed_objects()
return {path: unpack_variants(packed_data) for path, packed_data in results.items()}
async def _get_dbus_managed_objects() -> dict[str, Any]:
try:
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
except FileNotFoundError as ex:
if is_docker_env():
_LOGGER.debug(
"DBus service not found; docker config may "
"be missing `-v /run/dbus:/run/dbus:ro`: %s",
ex,
)
_LOGGER.debug(
"DBus service not found; make sure the DBus socket " "is available: %s",
ex,
)
return {}
except BrokenPipeError as ex:
if is_docker_env():
_LOGGER.debug(
"DBus connection broken: %s; try restarting "
"`bluetooth`, `dbus`, and finally the docker container",
ex,
)
_LOGGER.debug(
"DBus connection broken: %s; try restarting " "`bluetooth` and `dbus`", ex
)
return {}
except ConnectionRefusedError as ex:
if is_docker_env():
_LOGGER.debug(
"DBus connection refused: %s; try restarting "
"`bluetooth`, `dbus`, and finally the docker container",
ex,
)
_LOGGER.debug(
"DBus connection refused: %s; try restarting " "`bluetooth` and `dbus`", ex
)
return {}
msg = Message(
destination="org.bluez",
path="/",
interface="org.freedesktop.DBus.ObjectManager",
member="GetManagedObjects",
)
try:
async with async_timeout.timeout(REPLY_TIMEOUT):
reply = await bus.call(msg)
except EOFError as ex:
_LOGGER.debug("DBus connection closed: %s", ex)
return {}
except asyncio.TimeoutError:
_LOGGER.debug(
"Dbus timeout waiting for reply to GetManagedObjects; try restarting "
"`bluetooth` and `dbus`"
)
return {}
bus.disconnect()
if not reply or reply.message_type != MessageType.METHOD_RETURN:
_LOGGER.debug(
"Received an unexpected reply from Dbus while "
"calling GetManagedObjects on org.bluez: %s",
reply,
)
return {}
results: dict[str, Any] = reply.body[0]
return results
@cache
def is_docker_env() -> bool:
"""Return True if we run in a docker env."""
return Path("/.dockerenv").exists()