diff --git a/soco/events_base.py b/soco/events_base.py index f53e8e20f..8c7e6a6a0 100644 --- a/soco/events_base.py +++ b/soco/events_base.py @@ -438,6 +438,7 @@ def success(headers): self.timeout = int(timeout.lstrip("Second-")) self._timestamp = time.time() self.is_subscribed = True + service.soco.zone_group_state.add_subscription(self) log.debug( "Subscribed to %s, sid: %s", service.base_url + service.event_subscription_url, @@ -653,6 +654,7 @@ def _cancel_subscription(self, msg=None): # an attempt to unsubscribe fails self._has_been_unsubscribed = True self._timestamp = None + self.service.soco.zone_group_state.remove_subscription(self) # Cancel any auto renew self._auto_renew_cancel() if msg: diff --git a/soco/zonegroupstate.py b/soco/zonegroupstate.py index 1f89952f2..8c9c3b925 100755 --- a/soco/zonegroupstate.py +++ b/soco/zonegroupstate.py @@ -63,14 +63,15 @@ import asyncio import logging import time +from weakref import WeakSet from lxml import etree as LXML from . import config +from .events_base import SubscriptionBase from .exceptions import NotSupportedException, SoCoException, SoCoUPnPException from .groups import ZoneGroup -EVENT_CACHE_TIMEOUT = 60 POLLING_CACHE_TIMEOUT = 5 NEVER_TIME = -1200.0 @@ -115,6 +116,7 @@ def __init__(self): self._cache_until = NEVER_TIME self._last_zgs = None + self._subscriptions = WeakSet() # Statistics self.total_requests = 0 @@ -124,6 +126,27 @@ def clear_cache(self): """Clear the cache timestamp.""" self._cache_until = NEVER_TIME + def add_subscription(self, subscription: SubscriptionBase): + """Start tracking a ZoneGroupTopology subscription.""" + if subscription.service.service_type == "ZoneGroupTopology" and subscription not in self._subscriptions: + self._subscriptions.add(subscription) + _LOG.debug("Monitoring ZoneGroupTopology subscription %s on %s", subscription.sid, subscription.service.soco) + + def remove_subscription(self, subscription: SubscriptionBase): + """Stop tracking a ZoneGroupTopology subscription.""" + if subscription in self._subscriptions: + self._subscriptions.remove(subscription) + _LOG.debug("Discarded unsubscribed subscription %s from %s, %d remaining", subscription.sid, subscription.service.soco, len(self._subscriptions)) + + @property + def has_subscriptions(self): + """Return True if active subscriptions are updating this ZoneGroupState.""" + stale_subscriptions = [sub for sub in self._subscriptions if not sub.time_left] + for sub in stale_subscriptions: + _LOG.debug("Discarding stale subscription: %s", sub.sid) + self.remove_subscription(sub) + return bool(self._subscriptions) + def clear_zone_groups(self): """Clear all known group sets.""" self.groups.clear() @@ -133,6 +156,15 @@ def clear_zone_groups(self): def poll(self, soco): """Poll using the provided SoCo instance and process the payload.""" # pylint: disable=protected-access + if self.has_subscriptions: + self.total_requests += 1 + _LOG.debug( + "Subscriptions (%s) still active (GetZoneGroupState) during poll for %s, using cache", + len(self._subscriptions), + soco.ip_address, + ) + return + if time.monotonic() < self._cache_until: self.total_requests += 1 _LOG.debug( @@ -156,6 +188,8 @@ def poll(self, soco): try: zgs = soco.zoneGroupTopology.GetZoneGroupState()["ZoneGroupState"] self.process_payload(payload=zgs, source="poll", source_ip=soco.ip_address) + self._cache_until = time.monotonic() + POLLING_CACHE_TIMEOUT + _LOG.debug("Extending ZGS cache by %ss", POLLING_CACHE_TIMEOUT) # In the event of failure, we fall back to using a ZGT event to # determine the ZGS. Fallback behaviour can be disabled by setting the @@ -244,22 +278,12 @@ async def update_zgs_by_event_asyncio(speaker): def process_payload(self, payload, source, source_ip): """Update using the provided XML payload.""" self.total_requests += 1 - - def update_cache(): - if source == "event": - timeout = EVENT_CACHE_TIMEOUT - else: - timeout = POLLING_CACHE_TIMEOUT - self._cache_until = time.monotonic() + timeout - _LOG.debug("Setting ZGS cache to %ss", timeout) - tree = normalize_zgs_xml(payload) normalized_zgs = str(tree) if normalized_zgs == self._last_zgs: _LOG.debug( "Duplicate ZGS received from %s (%s), ignoring", source_ip, source ) - update_cache() return self.processed_count += 1 @@ -272,7 +296,6 @@ def update_cache(): ) self.update_soco_instances(tree) - update_cache() self._last_zgs = normalized_zgs def parse_zone_group_member(self, member_element):