-
-
Notifications
You must be signed in to change notification settings - Fork 28.6k
/
__init__.py
301 lines (252 loc) · 9.88 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""Support to embed Plex."""
from functools import partial
import logging
import plexapi.exceptions
from plexapi.gdm import GDM
from plexwebsocket import (
SIGNAL_CONNECTION_STATE,
STATE_CONNECTED,
STATE_DISCONNECTED,
STATE_STOPPED,
PlexWebsocket,
)
import requests.exceptions
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN, BrowseError
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
from homeassistant.const import CONF_URL, CONF_VERIFY_SSL, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import device_registry as dev_reg, entity_registry as ent_reg
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.network import is_internal_request
from homeassistant.helpers.typing import ConfigType
from .const import (
CONF_SERVER,
CONF_SERVER_IDENTIFIER,
DISPATCHERS,
DOMAIN as PLEX_DOMAIN,
GDM_DEBOUNCER,
GDM_SCANNER,
PLATFORMS,
PLATFORMS_COMPLETED,
PLEX_SERVER_CONFIG,
PLEX_UPDATE_LIBRARY_SIGNAL,
PLEX_UPDATE_PLATFORMS_SIGNAL,
PLEX_URI_SCHEME,
SERVERS,
WEBSOCKETS,
)
from .errors import ShouldUpdateConfigEntry
from .media_browser import browse_media
from .server import PlexServer
from .services import async_setup_services
_LOGGER = logging.getLogger(__package__)
def is_plex_media_id(media_content_id):
"""Return whether the media_content_id is a valid Plex media_id."""
return media_content_id and media_content_id.startswith(PLEX_URI_SCHEME)
async def async_browse_media(hass, media_content_type, media_content_id, platform=None):
"""Browse Plex media."""
plex_server = next(iter(hass.data[PLEX_DOMAIN][SERVERS].values()), None)
if not plex_server:
raise BrowseError("No Plex servers available")
is_internal = is_internal_request(hass)
return await hass.async_add_executor_job(
partial(
browse_media,
plex_server,
is_internal,
media_content_type,
media_content_id,
platform=platform,
)
)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Plex component."""
hass.data.setdefault(
PLEX_DOMAIN,
{SERVERS: {}, DISPATCHERS: {}, WEBSOCKETS: {}, PLATFORMS_COMPLETED: {}},
)
await async_setup_services(hass)
gdm = hass.data[PLEX_DOMAIN][GDM_SCANNER] = GDM()
def gdm_scan():
_LOGGER.debug("Scanning for GDM clients")
gdm.scan(scan_for_clients=True)
hass.data[PLEX_DOMAIN][GDM_DEBOUNCER] = Debouncer(
hass,
_LOGGER,
cooldown=10,
immediate=True,
function=gdm_scan,
).async_call
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Plex from a config entry."""
server_config = entry.data[PLEX_SERVER_CONFIG]
if entry.unique_id is None:
hass.config_entries.async_update_entry(
entry, unique_id=entry.data[CONF_SERVER_IDENTIFIER]
)
if MP_DOMAIN not in entry.options:
options = dict(entry.options)
options.setdefault(MP_DOMAIN, {})
hass.config_entries.async_update_entry(entry, options=options)
plex_server = PlexServer(
hass,
server_config,
entry.data[CONF_SERVER_IDENTIFIER],
entry.options,
entry.entry_id,
)
try:
await hass.async_add_executor_job(plex_server.connect)
except ShouldUpdateConfigEntry:
new_server_data = {
**entry.data[PLEX_SERVER_CONFIG],
CONF_URL: plex_server.url_in_use,
CONF_SERVER: plex_server.friendly_name,
}
hass.config_entries.async_update_entry(
entry, data={**entry.data, PLEX_SERVER_CONFIG: new_server_data}
)
except requests.exceptions.ConnectionError as error:
if entry.state is not ConfigEntryState.SETUP_RETRY:
_LOGGER.error(
"Plex server (%s) could not be reached: [%s]",
server_config[CONF_URL],
error,
)
raise ConfigEntryNotReady from error
except plexapi.exceptions.Unauthorized as ex:
raise ConfigEntryAuthFailed(
f"Token not accepted, please reauthenticate Plex server '{entry.data[CONF_SERVER]}'"
) from ex
except (
plexapi.exceptions.BadRequest,
plexapi.exceptions.NotFound,
) as error:
_LOGGER.error(
"Login to %s failed, verify token and SSL settings: [%s]",
entry.data[CONF_SERVER],
error,
)
return False
_LOGGER.debug(
"Connected to: %s (%s)", plex_server.friendly_name, plex_server.url_in_use
)
server_id = plex_server.machine_identifier
hass.data[PLEX_DOMAIN][SERVERS][server_id] = plex_server
hass.data[PLEX_DOMAIN][PLATFORMS_COMPLETED][server_id] = set()
entry.add_update_listener(async_options_updated)
unsub = async_dispatcher_connect(
hass,
PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id),
plex_server.async_update_platforms,
)
hass.data[PLEX_DOMAIN][DISPATCHERS].setdefault(server_id, [])
hass.data[PLEX_DOMAIN][DISPATCHERS][server_id].append(unsub)
@callback
def plex_websocket_callback(msgtype, data, error):
"""Handle callbacks from plexwebsocket library."""
if msgtype == SIGNAL_CONNECTION_STATE:
if data == STATE_CONNECTED:
_LOGGER.debug("Websocket to %s successful", entry.data[CONF_SERVER])
hass.async_create_task(plex_server.async_update_platforms())
elif data == STATE_DISCONNECTED:
_LOGGER.debug(
"Websocket to %s disconnected, retrying", entry.data[CONF_SERVER]
)
# Stopped websockets without errors are expected during shutdown and ignored
elif data == STATE_STOPPED and error:
_LOGGER.error(
"Websocket to %s failed, aborting [Error: %s]",
entry.data[CONF_SERVER],
error,
)
hass.async_create_task(hass.config_entries.async_reload(entry.entry_id))
elif msgtype == "playing":
hass.async_create_task(plex_server.async_update_session(data))
elif msgtype == "status":
if data["StatusNotification"][0]["title"] == "Library scan complete":
async_dispatcher_send(
hass,
PLEX_UPDATE_LIBRARY_SIGNAL.format(server_id),
)
session = async_get_clientsession(hass)
subscriptions = ["playing", "status"]
verify_ssl = server_config.get(CONF_VERIFY_SSL)
websocket = PlexWebsocket(
plex_server.plex_server,
plex_websocket_callback,
subscriptions=subscriptions,
session=session,
verify_ssl=verify_ssl,
)
hass.data[PLEX_DOMAIN][WEBSOCKETS][server_id] = websocket
def start_websocket_session(platform, _):
hass.data[PLEX_DOMAIN][PLATFORMS_COMPLETED][server_id].add(platform)
if hass.data[PLEX_DOMAIN][PLATFORMS_COMPLETED][server_id] == PLATFORMS:
hass.loop.create_task(websocket.listen())
def close_websocket_session(_):
websocket.close()
unsub = hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, close_websocket_session
)
hass.data[PLEX_DOMAIN][DISPATCHERS][server_id].append(unsub)
for platform in PLATFORMS:
task = hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, platform)
)
task.add_done_callback(partial(start_websocket_session, platform))
async_cleanup_plex_devices(hass, entry)
def get_plex_account(plex_server):
try:
return plex_server.account
except (plexapi.exceptions.BadRequest, plexapi.exceptions.Unauthorized):
return None
await hass.async_add_executor_job(get_plex_account, plex_server)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
server_id = entry.data[CONF_SERVER_IDENTIFIER]
websocket = hass.data[PLEX_DOMAIN][WEBSOCKETS].pop(server_id)
websocket.close()
dispatchers = hass.data[PLEX_DOMAIN][DISPATCHERS].pop(server_id)
for unsub in dispatchers:
unsub()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
hass.data[PLEX_DOMAIN][SERVERS].pop(server_id)
return unload_ok
async def async_options_updated(hass, entry):
"""Triggered by config entry options updates."""
server_id = entry.data[CONF_SERVER_IDENTIFIER]
# Guard incomplete setup during reauth flows
if server_id in hass.data[PLEX_DOMAIN][SERVERS]:
hass.data[PLEX_DOMAIN][SERVERS][server_id].options = entry.options
@callback
def async_cleanup_plex_devices(hass, entry):
"""Clean up old and invalid devices from the registry."""
device_registry = dev_reg.async_get(hass)
entity_registry = ent_reg.async_get(hass)
device_entries = hass.helpers.device_registry.async_entries_for_config_entry(
device_registry, entry.entry_id
)
for device_entry in device_entries:
if (
len(
hass.helpers.entity_registry.async_entries_for_device(
entity_registry, device_entry.id, include_disabled_entities=True
)
)
== 0
):
_LOGGER.debug(
"Removing orphaned device: %s / %s",
device_entry.name,
device_entry.identifiers,
)
device_registry.async_remove_device(device_entry.id)