Skip to content

Commit

Permalink
Adds automatic failover to secondary HLS
Browse files Browse the repository at this point in the history
  • Loading branch information
AngellusMortis committed Jul 16, 2021
1 parent f72e189 commit 9e8a958
Show file tree
Hide file tree
Showing 7 changed files with 1,945 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.9.6-slim-buster

RUN apt-get update \
&& apt-get install -y git ffmpeg build-essential vim \
&& apt-get install -y git ffmpeg build-essential vim procps curl \
# cleaning up unused files
&& apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false \
&& rm -rf /var/lib/apt/lists/*
Expand Down
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ History
* Adds `primary_hls` and `seconary_hls`
* Adds quality selection
* Overhauls time/datetime management
* Automatic failover to secondary HLS

0.2.4 (2021-08-15)
------------------
Expand Down
7 changes: 4 additions & 3 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ bandit==1.7.0
# via sxm (pyproject.toml)
beautifulsoup4==4.9.3
# via furo
black==21.6b0
black==21.7b0
# via sxm (pyproject.toml)
certifi==2021.5.30
# via
# httpx
# requests
chardet==4.0.0
# via aiohttp
charset-normalizer==2.0.2
charset-normalizer==2.0.3
# via requests
click==7.1.2
# via
Expand Down Expand Up @@ -268,7 +268,6 @@ termcolor==1.1.0
# via pytest-sugar
toml==0.10.2
# via
# black
# flit
# flit-core
# mypy
Expand All @@ -279,6 +278,8 @@ toml==0.10.2
# pytest-cov
# snooty-lextudio
# tox
tomli==1.0.4
# via black
tox==3.24.0
# via sxm (pyproject.toml)
typer==0.3.2
Expand Down
169 changes: 147 additions & 22 deletions sxm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@
from tenacity import retry, stop_after_attempt, wait_fixed
from ua_parser import user_agent_parser # type: ignore

from sxm.models import (
LIVE_PRIMARY_HLS,
QualitySize,
RegionChoice,
XMChannel,
XMLiveChannel,
)
from sxm.models import QualitySize, RegionChoice, XMChannel, XMLiveChannel

__all__ = [
"HLS_AES_KEY",
Expand All @@ -41,16 +35,24 @@
REST_V4_FORMAT = "https://player.siriusxm.com/rest/v4/experience/modules/{}"
SESSION_MAX_LIFE = 14400

ENABLE_NEW_CHANNELS = False
ENABLE_NEW_CHANNELS = True


class SXMError(Exception):
"""Base class for all other SXM Errors"""


class AuthenticationError(Exception):
class ConfigurationError(SXMError):
"""SXM Configuration retrive failed, renew session, and try again later"""


class AuthenticationError(SXMError):
"""SXM Authentication failed, renew session"""

pass


class SegmentRetrievalException(Exception):
class SegmentRetrievalException(SXMError):
"""failed to get HLS segment, renew session"""

pass
Expand Down Expand Up @@ -106,8 +108,11 @@ class SXMClientAsync:
_channels: Optional[List[XMChannel]]
_favorite_channels: Optional[List[XMChannel]]
_playlists: Dict[str, str]
_use_primary: bool
_ua: Dict[str, Any]
_session: httpx.AsyncClient
_configuration: Optional[Dict] = None
_urls: Optional[Dict[str, str]] = None

def __init__(
self,
Expand Down Expand Up @@ -140,6 +145,7 @@ def __init__(
self._playlists = {}
self._channels = None
self._favorite_channels = None
self._use_primary = True

# vars to manage session cache
self.last_renew = None
Expand All @@ -148,6 +154,9 @@ def __init__(
# hook function to call whenever the playlist updates
self.update_handler = update_handler

def __del__(self):
make_sync(self.close_session)()

@property
def is_logged_in(self) -> bool:
return "SXMAUTHNEW" in self._session.cookies
Expand Down Expand Up @@ -198,6 +207,63 @@ async def favorite_channels(self) -> List[XMChannel]:
self._favorite_channels = [c for c in await self.channels if c.is_favorite]
return self._favorite_channels

def _extract_configuration(self, data: dict):
_config = {}
config = data["moduleList"]["modules"][0]["moduleResponse"]["configuration"][
"components"
]
for item in config:
_config[item["name"]] = item
return _config

@property
async def configuration(self) -> dict:
if self._configuration is None:
data = await self.get_configuration()
if data is None:
raise ConfigurationError()
self._configuration = self._extract_configuration(data)

return self._configuration

def _extract_urls(self, urls: dict):
_urls = {}
for url in urls["settings"][0]["relativeUrls"]:
if "url" in url:
_urls[url["name"]] = url["url"]

return _urls

@property
async def urls(self) -> Dict[str, str]:
if self._urls is None:
urls = (await self.configuration)["relativeUrls"]
self._urls = self._extract_urls(urls)
return self._urls

@property
def primary(self) -> bool:
return self._use_primary

async def get_primary_hls_root(self) -> str:
urls = await self.urls

return urls["Live_Primary_HLS"]

async def get_secondary_hls_root(self) -> str:
urls = await self.urls

return urls["Live_Secondary_HLS"]

async def get_hls_root(self) -> str:
if self._use_primary:
return await self.get_primary_hls_root()
return await self.get_secondary_hls_root()

def set_primary(self, value: bool):
self._use_primary = value
self._playlists = {}

async def login(self) -> bool:
"""Attempts to log into SXM with stored username/password"""

Expand Down Expand Up @@ -251,6 +317,16 @@ async def authenticate(self) -> bool:
self._log.error(traceback.format_exc())
return False

@retry(wait=wait_fixed(3), stop=stop_after_attempt(10))
async def get_configuration(self) -> Optional[Dict[str, Any]]:
params = {
"result-template": "html5",
"app-region": self.region.value,
"cacheBuster": str(int(time.time())),
}

return await self._get("get/configuration", params=params)

@retry(stop=stop_after_attempt(25), wait=wait_fixed(1))
async def get_playlist(
self, channel_id: str, use_cache: bool = True
Expand Down Expand Up @@ -303,15 +379,13 @@ async def get_playlist(
return "\n".join(playlist_entries)

@retry(wait=wait_fixed(1), stop=stop_after_attempt(5))
async def get_segment(self, path: str, max_attempts: int = 5) -> Union[bytes, None]:
async def get_segment(self, path: str) -> Union[bytes, None]:
"""Gets raw HLS segment for given path
Parameters
----------
path : :class:`str`
SXM path
max_attempts : :class:`int`
Number of times to try to get segment. Defaults to 5.
Raises
------
Expand All @@ -320,7 +394,7 @@ async def get_segment(self, path: str, max_attempts: int = 5) -> Union[bytes, No
needs reset
"""

url = f"{LIVE_PRIMARY_HLS}/{path}"
url = urllib.parse.urljoin(await self.get_hls_root(), path)
res = await self._session.get(url, params=self._token_params())

if res.status_code == 403:
Expand Down Expand Up @@ -436,6 +510,8 @@ def reset_session(self) -> None:
self._session_start = time.monotonic()
self._session = httpx.AsyncClient()
self._session.headers.update({"User-Agent": self._ua["string"]})
self._urls = None
self._configuration = None

def _token_params(self) -> Dict[str, Union[str, None]]:
return {
Expand Down Expand Up @@ -580,7 +656,10 @@ async def _post(
)

async def _get_playlist_url(
self, channel_id: str, use_cache: bool = True, max_attempts: int = 5
self,
channel_id: str,
use_cache: bool = True,
max_attempts: int = 5,
) -> Union[str, None]:
"""Returns HLS live stream URL for a given `XMChannel`"""

Expand All @@ -590,7 +669,6 @@ async def _get_playlist_url(
return None

now = time.monotonic()

if use_cache and channel.id in self._playlists:
if (
self.last_renew is None
Expand Down Expand Up @@ -649,11 +727,18 @@ async def _get_playlist_url(
live_channel_raw = data["moduleList"]["modules"][0]
live_channel = XMLiveChannel.from_dict(live_channel_raw)
live_channel.set_stream_quality(self.stream_quality)
live_channel.set_hls_roots(
await self.get_primary_hls_root(), await self.get_secondary_hls_root()
)

self.update_interval = int(data["moduleList"]["modules"][0]["updateFrequency"])

# get m3u8 url
playlist = await self._get_playlist_variant_url(live_channel.primary_hls.url)
url = live_channel.primary_hls.url
if not self._use_primary:
url = live_channel.secondary_hls.url

playlist = await self._get_playlist_variant_url(url)
if playlist is not None:
self._playlists[channel.id] = playlist
self.last_renew = time.monotonic()
Expand Down Expand Up @@ -763,6 +848,10 @@ def update_interval(self) -> int:
def username(self) -> str:
return self.async_client.username

@property
def stream_quality(self) -> QualitySize:
return self.async_client.stream_quality

@property
def is_logged_in(self) -> bool:
return self.async_client.is_logged_in
Expand Down Expand Up @@ -807,21 +896,57 @@ def favorite_channels(self) -> List[XMChannel]:
]
return self.async_client._favorite_channels

@property
def configuration(self) -> dict:
if self.async_client._configuration is None:
data = self.get_configuration()
if data is None:
raise ConfigurationError()

self.async_client._configuration = self.async_client._extract_configuration(
data
)
return self.async_client._configuration

@property
def urls(self) -> Dict[str, str]:
if self.async_client._urls is None:
urls = self.configuration["relativeUrls"]
self.async_client._urls = self.async_client._extract_urls(urls)
return self.async_client._urls

@property
def primary(self) -> bool:
return self.async_client._use_primary

def get_primary_hls_root(self) -> str:
return make_sync(self.async_client.get_primary_hls_root)()

def get_secondary_hls_root(self) -> str:
return make_sync(self.async_client.get_secondary_hls_root)()

def get_hls_root(self) -> str:
return make_sync(self.async_client.get_hls_root)()

def set_primary(self, value: bool):
self.async_client.set_primary(value)

def login(self) -> bool:
return make_sync(self.async_client.is_logged_in)()
return make_sync(self.async_client.login)()

def authenticate(self) -> bool:
return make_sync(self.async_client.authenticate)()

def get_configuration(self) -> Optional[Dict[str, Any]]:
return make_sync(self.async_client.get_configuration)()

def get_playlist(self, channel_id: str, use_cache: bool = True) -> Union[str, None]:
return make_sync(self.async_client.get_playlist)(
channel_id=channel_id, use_cache=use_cache
)

def get_segment(self, path: str, max_attempts: int = 5) -> Union[bytes, None]:
return make_sync(self.async_client.get_segment)(
path=path, max_attempts=max_attempts
)
def get_segment(self, path: str) -> Union[bytes, None]:
return make_sync(self.async_client.get_segment)(path=path)

def get_channels(self) -> List[dict]:
return make_sync(self.async_client.get_channels)()
Expand Down
15 changes: 15 additions & 0 deletions sxm/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ async def sxm_handler(request: web.Request):

response = web.Response(status=404)
if request.path.endswith(".m3u8"):
if not sxm.primary:
sxm.set_primary(True)
playlist = await sxm.get_playlist(request.path.rsplit("/", 1)[1][:-5])

if not playlist:
sxm.set_primary(False)
playlist = await sxm.get_playlist(request.path.rsplit("/", 1)[1][:-5])

if playlist:
response = web.Response(
status=200,
Expand Down Expand Up @@ -108,6 +115,14 @@ def run_http_server(
if logger is None:
logger = logging.getLogger(__file__)

if not sxm.authenticate():
logging.fatal("Could not log into SXM")
exit(1)

if not sxm.configuration:
logging.fatal("Could not get SXM configuration")
exit(1)

app = web.Application()
app.router.add_get("/{_:.*}", make_http_handler(sxm.async_client))
try:
Expand Down
Loading

0 comments on commit 9e8a958

Please sign in to comment.