Skip to content

Commit

Permalink
Cache ZoneGroupState while subscriptions are active
Browse files Browse the repository at this point in the history
  • Loading branch information
jjlawren committed Dec 26, 2023
1 parent 54bcfac commit cdbf7db
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
2 changes: 2 additions & 0 deletions soco/events_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ def success(headers):
self.timeout = int(timeout.lstrip("Second-"))
self._timestamp = time.time()
self.is_subscribed = True
service.soco.zone_group_state.add_subscription(self)
log.debug(
"Subscribed to %s, sid: %s",
service.base_url + service.event_subscription_url,
Expand Down Expand Up @@ -653,6 +654,7 @@ def _cancel_subscription(self, msg=None):
# an attempt to unsubscribe fails
self._has_been_unsubscribed = True
self._timestamp = None
self.service.soco.zone_group_state.remove_subscription(self)
# Cancel any auto renew
self._auto_renew_cancel()
if msg:
Expand Down
47 changes: 35 additions & 12 deletions soco/zonegroupstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@
import asyncio
import logging
import time
from weakref import WeakSet

from lxml import etree as LXML

from . import config
from .events_base import SubscriptionBase
from .exceptions import NotSupportedException, SoCoException, SoCoUPnPException
from .groups import ZoneGroup

EVENT_CACHE_TIMEOUT = 60
POLLING_CACHE_TIMEOUT = 5
NEVER_TIME = -1200.0

Expand Down Expand Up @@ -115,6 +116,7 @@ def __init__(self):

self._cache_until = NEVER_TIME
self._last_zgs = None
self._subscriptions = WeakSet()

# Statistics
self.total_requests = 0
Expand All @@ -124,6 +126,27 @@ def clear_cache(self):
"""Clear the cache timestamp."""
self._cache_until = NEVER_TIME

def add_subscription(self, subscription: SubscriptionBase):
"""Start tracking a ZoneGroupTopology subscription."""
if subscription.service.service_type == "ZoneGroupTopology" and subscription not in self._subscriptions:
self._subscriptions.add(subscription)
_LOG.debug("Monitoring ZoneGroupTopology subscription %s on %s", subscription.sid, subscription.service.soco)

def remove_subscription(self, subscription: SubscriptionBase):
"""Stop tracking a ZoneGroupTopology subscription."""
if subscription in self._subscriptions:
self._subscriptions.remove(subscription)
_LOG.debug("Discarded unsubscribed subscription %s from %s, %d remaining", subscription.sid, subscription.service.soco, len(self._subscriptions))

@property
def has_subscriptions(self):
"""Return True if active subscriptions are updating this ZoneGroupState."""
stale_subscriptions = [sub for sub in self._subscriptions if not sub.time_left]
for sub in stale_subscriptions:
_LOG.debug("Discarding stale subscription: %s", sub.sid)
self.remove_subscription(sub)
return bool(self._subscriptions)

def clear_zone_groups(self):
"""Clear all known group sets."""
self.groups.clear()
Expand All @@ -133,6 +156,15 @@ def clear_zone_groups(self):
def poll(self, soco):
"""Poll using the provided SoCo instance and process the payload."""
# pylint: disable=protected-access
if self.has_subscriptions:
self.total_requests += 1
_LOG.debug(
"Subscriptions (%s) still active (GetZoneGroupState) during poll for %s, using cache",
len(self._subscriptions),
soco.ip_address,
)
return

if time.monotonic() < self._cache_until:
self.total_requests += 1
_LOG.debug(
Expand All @@ -156,6 +188,8 @@ def poll(self, soco):
try:
zgs = soco.zoneGroupTopology.GetZoneGroupState()["ZoneGroupState"]
self.process_payload(payload=zgs, source="poll", source_ip=soco.ip_address)
self._cache_until = time.monotonic() + POLLING_CACHE_TIMEOUT
_LOG.debug("Extending ZGS cache by %ss", POLLING_CACHE_TIMEOUT)

# In the event of failure, we fall back to using a ZGT event to
# determine the ZGS. Fallback behaviour can be disabled by setting the
Expand Down Expand Up @@ -244,22 +278,12 @@ async def update_zgs_by_event_asyncio(speaker):
def process_payload(self, payload, source, source_ip):
"""Update using the provided XML payload."""
self.total_requests += 1

def update_cache():
if source == "event":
timeout = EVENT_CACHE_TIMEOUT
else:
timeout = POLLING_CACHE_TIMEOUT
self._cache_until = time.monotonic() + timeout
_LOG.debug("Setting ZGS cache to %ss", timeout)

tree = normalize_zgs_xml(payload)
normalized_zgs = str(tree)
if normalized_zgs == self._last_zgs:
_LOG.debug(
"Duplicate ZGS received from %s (%s), ignoring", source_ip, source
)
update_cache()
return

self.processed_count += 1
Expand All @@ -272,7 +296,6 @@ def update_cache():
)

self.update_soco_instances(tree)
update_cache()
self._last_zgs = normalized_zgs

def parse_zone_group_member(self, member_element):
Expand Down

0 comments on commit cdbf7db

Please sign in to comment.