-
Notifications
You must be signed in to change notification settings - Fork 230
/
events.py
488 lines (399 loc) · 18.2 KB
/
events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# pylint: disable=not-context-manager
# NOTE: The pylint not-content-manager warning is disabled pending the fix of
# a bug in pylint. See https://github.com/PyCQA/pylint/issues/782
"""Classes to handle Sonos UPnP Events and Subscriptions.
The `Subscription` class from this module will be used in
:py:mod:`soco.services` unless `config.EVENTS_MODULE` is set to
point to :py:mod:`soco.events_twisted`, in which case
:py:mod:`soco.events_twisted.Subscription` will be used. See the
Example in :py:mod:`soco.events_twisted`.
Example:
Run this code, and change your volume, tracks etc::
from queue import Empty
import logging
logging.basicConfig()
import soco
from pprint import pprint
from soco.events import event_listener
# pick a device at random and use it to get
# the group coordinator
device = soco.discover().pop().group.coordinator
print (device.player_name)
sub = device.renderingControl.subscribe()
sub2 = device.avTransport.subscribe()
while True:
try:
event = sub.events.get(timeout=0.5)
pprint (event.variables)
except Empty:
pass
try:
event = sub2.events.get(timeout=0.5)
pprint (event.variables)
except Empty:
pass
except KeyboardInterrupt:
sub.unsubscribe()
sub2.unsubscribe()
event_listener.stop()
break
"""
import errno
import logging
import socketserver
import threading
from http.server import BaseHTTPRequestHandler
from urllib.error import URLError
from urllib.request import urlopen
import requests
# Event is imported so that 'from events import Events' still works
# pylint: disable=unused-import
from .events_base import Event # noqa: F401
from .events_base import (
EventNotifyHandlerBase,
EventListenerBase,
SubscriptionBase,
SubscriptionsMap,
)
from .exceptions import SoCoException
log = logging.getLogger(__name__) # pylint: disable=C0103
class EventServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""A TCP server which handles each new request in a new thread."""
allow_reuse_address = True
class EventNotifyHandler(BaseHTTPRequestHandler, EventNotifyHandlerBase):
"""Handles HTTP ``NOTIFY`` Verbs sent to the listener server.
Inherits from `soco.events_base.EventNotifyHandlerBase`.
"""
def __init__(self, *args, **kwargs):
# The SubscriptionsMap instance created when this module is imported.
# This is referenced by soco.events_base.EventNotifyHandlerBase.
self.subscriptions_map = subscriptions_map
# super appears at the end of __init__, because
# BaseHTTPRequestHandler.__init__ does not return.
super().__init__(*args, **kwargs)
def do_NOTIFY(self): # pylint: disable=invalid-name
"""Serve a ``NOTIFY`` request by calling `handle_notification`
with the headers and content.
"""
headers = requests.structures.CaseInsensitiveDict(self.headers)
content_length = int(headers["content-length"])
content = self.rfile.read(content_length)
self.handle_notification(headers, content)
self.send_response(200)
self.end_headers()
# pylint: disable=no-self-use, missing-docstring
def log_event(self, seq, service_id, timestamp):
log.debug(
"Event %s received for %s service on thread %s at %s",
seq,
service_id,
threading.current_thread(),
timestamp,
)
def log_message(self, fmt, *args): # pylint: disable=arguments-differ
# Divert standard webserver logging to the debug log
log.debug(fmt, *args)
class EventServerThread(threading.Thread):
"""The thread in which the event listener server will run."""
def __init__(self, server):
"""
Args:
address (tuple): The (ip, port) address on which the server
should listen.
"""
super().__init__()
#: `threading.Event`: Used to signal that the server should stop.
self.stop_flag = threading.Event()
#: `tuple`: The (ip, port) address on which the server is
#: configured to listen.
self.server = server
def run(self):
"""Start the server
Handling of requests is delegated to an instance of the
`EventNotifyHandler` class.
"""
log.debug("Event listener running on %s", self.server.server_address)
# Listen for events until told to stop
while not self.stop_flag.is_set():
self.server.handle_request()
def stop(self):
"""Stop the server."""
self.stop_flag.set()
class EventListener(EventListenerBase):
"""The Event Listener.
Runs an http server in a thread which is an endpoint for ``NOTIFY``
requests from Sonos devices. Inherits from
`soco.events_base.EventListenerBase`.
"""
def __init__(self):
super().__init__()
#: `EventServerThread`: thread on which to run.
self._listener_thread = None
def listen(self, ip_address):
"""Start the event listener listening on the local machine at
port 1400 (default). If this port is unavailable, the
listener will attempt to listen on the next available port,
within a range of 100.
Make sure that your firewall allows connections to this port.
This method is called by `soco.events_base.EventListenerBase.start`
Args:
ip_address (str): The local network interface on which the server
should start listening.
Returns:
int: `requested_port_number`. Included for
compatibility with `soco.events_twisted.EventListener.listen`
Note:
The port on which the event listener listens is configurable.
See `config.EVENT_LISTENER_PORT`
"""
for port_number in range(
self.requested_port_number, self.requested_port_number + 100
):
address = (ip_address, port_number)
try:
server = EventServer(address, EventNotifyHandler)
break
except OSError as oserror:
if oserror.errno == errno.EADDRINUSE:
log.debug("Port %s:%d is in use", ip_address, port_number)
else:
raise
self._listener_thread = EventServerThread(server)
self._listener_thread.daemon = True
self._listener_thread.start()
if port_number != self.requested_port_number:
log.debug(
"The first available port %d was used instead of %d",
port_number,
self.requested_port_number,
)
return port_number
def stop_listening(self, address):
"""Stop the listener."""
# Signal the thread to stop before handling the next request
self._listener_thread.stop()
# Send a dummy request in case the http server is currently listening
try:
# pylint: disable=R1732
urlopen("http://{}:{}/".format(address[0], address[1]))
except URLError:
# If the server is already shut down, we receive a socket error,
# which we ignore.
pass
# wait for the thread to finish, with a timeout of one second
# to ensure the main thread does not hang
self._listener_thread.join(1)
# check if join timed out and issue a warning if it did
if self._listener_thread.is_alive():
log.warning("Event Listener did not shutdown gracefully.")
class Subscription(SubscriptionBase):
"""A class representing the subscription to a UPnP event.
Inherits from `soco.events_base.SubscriptionBase`.
"""
def __init__(self, service, event_queue=None):
"""
Args:
service (Service): The SoCo `Service` to which the subscription
should be made.
event_queue (:class:`~queue.Queue`): A queue on which received
events will be put. If not specified, a queue will be
created and used.
"""
super().__init__(service, event_queue)
# Used to keep track of the auto_renew thread
self._auto_renew_thread = None
self._auto_renew_thread_flag = threading.Event()
# The SubscriptionsMap instance created when this module is imported.
# This is referenced by soco.events_base.SubscriptionBase.
self.subscriptions_map = subscriptions_map
# The EventListener instance created when this module is imported.
# This is referenced by soco.events_base.SubscriptionBase.
self.event_listener = event_listener
# Used to stop race conditions, as autorenewal may occur from a thread
self._lock = threading.Lock()
# pylint: disable=arguments-differ
def subscribe(self, requested_timeout=None, auto_renew=False, strict=True):
"""Subscribe to the service.
If requested_timeout is provided, a subscription valid for that number
of seconds will be requested, but not guaranteed. Check
`timeout` on return to find out what period of validity is
actually allocated.
This method calls `events_base.SubscriptionBase.subscribe`.
Note:
SoCo will try to unsubscribe any subscriptions which are still
subscribed on program termination, but it is good practice for
you to clean up by making sure that you call :meth:`unsubscribe`
yourself.
Args:
requested_timeout(int, optional): The timeout to be requested.
auto_renew (bool, optional): If `True`, renew the subscription
automatically shortly before timeout. Default `False`.
strict (bool, optional): If True and an Exception occurs during
execution, the Exception will be raised or, if False, the
Exception will be logged and the Subscription instance will be
returned. Default `True`.
Returns:
`Subscription`: The Subscription instance.
"""
subscribe = super().subscribe
return self._wrap(subscribe, strict, requested_timeout, auto_renew)
def renew(self, requested_timeout=None, is_autorenew=False, strict=True):
"""renew(requested_timeout=None)
Renew the event subscription.
You should not try to renew a subscription which has been
unsubscribed, or once it has expired.
This method calls `events_base.SubscriptionBase.renew`.
Args:
requested_timeout (int, optional): The period for which a renewal
request should be made. If None (the default), use the timeout
requested on subscription.
is_autorenew (bool, optional): Whether this is an autorenewal.
Default 'False'.
strict (bool, optional): If True and an Exception occurs during
execution, the Exception will be raised or, if False, the
Exception will be logged and the Subscription instance will be
returned. Default `True`.
Returns:
`Subscription`: The Subscription instance.
"""
renew = super().renew
return self._wrap(renew, strict, requested_timeout, is_autorenew)
def unsubscribe(self, strict=True):
"""unsubscribe()
Unsubscribe from the service's events.
Once unsubscribed, a Subscription instance should not be reused
This method calls `events_base.SubscriptionBase.unsubscribe`.
Args:
strict (bool, optional): If True and an Exception occurs during
execution, the Exception will be raised or, if False, the
Exception will be logged and the Subscription instance will be
returned. Default `True`.
Returns:
`Subscription`: The Subscription instance.
"""
unsubscribe = super().unsubscribe
return self._wrap(unsubscribe, strict)
def _auto_renew_start(self, interval):
"""Starts the auto_renew thread."""
class AutoRenewThread(threading.Thread):
"""Used by the auto_renew code to renew a subscription from within
a thread.
"""
def __init__(self, interval, stop_flag, sub, *args, **kwargs):
super().__init__(*args, **kwargs)
self.interval = interval
self.subscription = sub
self.stop_flag = stop_flag
self.daemon = True
def run(self):
subscription = self.subscription
stop_flag = self.stop_flag
interval = self.interval
while not stop_flag.wait(interval):
subscription.renew(is_autorenew=True, strict=False)
auto_renew_thread = AutoRenewThread(
interval, self._auto_renew_thread_flag, self
)
auto_renew_thread.start()
def _auto_renew_cancel(self):
"""Cancels the auto_renew thread"""
self._auto_renew_thread_flag.set()
# pylint: disable=no-self-use
def _request(self, method, url, headers, success, unconditional=None):
"""Sends an HTTP request.
Args:
method (str): 'SUBSCRIBE' or 'UNSUBSCRIBE'.
url (str): The full endpoint to which the request is being sent.
headers (dict): A dict of headers, each key and each value being
of type `str`.
success (function): A function to be called if the
request succeeds. The function will be called with a dict
of response headers as its only parameter.
unconditional (function): An optional function to be called after
the request is complete, regardless of its success. Takes
no parameters.
"""
response = None
try:
response = requests.request(method, url, headers=headers, timeout=3)
except requests.exceptions.RequestException:
# Ignore timeout for unsubscribe since we are leaving anyway.
if method != "UNSUBSCRIBE":
raise
# Ignore "412 Client Error: Precondition Failed for url:" from
# rebooted speakers. The reboot will have unsubscribed us which is
# what we are trying to do.
if response and response.status_code != 412:
response.raise_for_status()
if response and success:
success(response.headers)
if unconditional:
unconditional()
# pylint: disable=inconsistent-return-statements
def _wrap(self, method, strict, *args, **kwargs):
"""This is a wrapper for `Subscription.subscribe`, `Subscription.renew`
and `Subscription.unsubscribe` which:
* Returns the`Subscription` instance.
* If an Exception has occurred:
* Cancels the Subscription (unless the Exception was caused by
a SoCoException upon subscribe).
* On an autorenew, if the strict flag was set to False, calls
the optional self.auto_renew_fail method with the
Exception. This method needs to be threadsafe.
* If the strict flag was set to True (the default), reraises
the Exception or, if the strict flag was set to False, logs
the Exception instead.
* Calls the wrapped methods with a threading.Lock, to prevent race
conditions (e.g. to prevent unsubscribe and autorenew being
called simultaneously).
"""
action = method.__name__
# A lock is used, because autorenewal occurs in
# a thread
with self._lock:
try:
method(*args, **kwargs)
except Exception as exc: # pylint: disable=broad-except
# If an Exception occurred during execution of subscribe,
# renew or unsubscribe, set the cancel flag to True unless
# the Exception was a SoCoException upon subscribe
cancel = action == "renew" or not isinstance(exc, SoCoException)
if cancel:
# If the cancel flag was set to true, cancel the
# subscription with an explanation.
msg = (
"An Exception occurred. Subscription to"
+ " {}, sid: {} has been cancelled".format(
self.service.base_url + self.service.event_subscription_url,
self.sid,
)
)
self._cancel_subscription(msg)
# If we're not being strict, log the Exception
if not strict:
msg = (
"Exception received in Subscription."
+ "{} for Subscription to:\n{}, sid: {}".format(
action,
self.service.base_url + self.service.event_subscription_url,
self.sid,
)
)
log.exception(msg)
# If we're not being strict upon a renewal
# (e.g. an autorenewal) call the optional
# self.auto_renew_fail method, if it has been set
if action == "renew" and self.auto_renew_fail is not None:
if hasattr(self.auto_renew_fail, "__call__"):
# pylint: disable=not-callable
self.auto_renew_fail(exc)
# If we're being strict, reraise the Exception
else:
raise # pylint: disable=raising-bad-type
else:
# Return the Subscription to the function that
# called subscribe, renew or unsubscribe (unless an
# Exception occurred and it was reraised above)
return self # pylint: disable=lost-exception
subscriptions_map = SubscriptionsMap() # pylint: disable=C0103
event_listener = EventListener() # pylint: disable=C0103