Skip to content

Commit

Permalink
Add Upnp volume control/status to SamsungTV (#68663)
Browse files Browse the repository at this point in the history
Co-authored-by: epenet <epenet@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
  • Loading branch information
3 people committed Mar 27, 2022
1 parent b549644 commit c024033
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 16 deletions.
3 changes: 2 additions & 1 deletion homeassistant/components/samsungtv/manifest.json
Expand Up @@ -6,7 +6,8 @@
"getmac==0.8.2",
"samsungctl[websocket]==0.7.1",
"samsungtvws[async,encrypted]==2.5.0",
"wakeonlan==2.0.1"
"wakeonlan==2.0.1",
"async-upnp-client==0.27.0"
],
"ssdp": [
{
Expand Down
112 changes: 99 additions & 13 deletions homeassistant/components/samsungtv/media_player.py
Expand Up @@ -2,9 +2,15 @@
from __future__ import annotations

import asyncio
from collections.abc import Coroutine
import contextlib
from datetime import datetime, timedelta
from typing import Any

from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.client import UpnpDevice, UpnpService
from async_upnp_client.client_factory import UpnpFactory
from async_upnp_client.exceptions import UpnpActionResponseError, UpnpConnectionError
import voluptuous as vol
from wakeonlan import send_magic_packet

Expand All @@ -24,12 +30,14 @@
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.config_entries import SOURCE_REAUTH, ConfigEntry
from homeassistant.const import CONF_HOST, CONF_MAC, CONF_NAME, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_component
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.entity import DeviceInfo
Expand All @@ -42,9 +50,11 @@
CONF_MANUFACTURER,
CONF_MODEL,
CONF_ON_ACTION,
CONF_SSDP_RENDERING_CONTROL_LOCATION,
DEFAULT_NAME,
DOMAIN,
LOGGER,
UPNP_SVC_RENDERINGCONTROL,
)

SOURCES = {"TV": "KEY_TV", "HDMI": "KEY_HDMI"}
Expand Down Expand Up @@ -104,6 +114,9 @@ def __init__(
self._config_entry = config_entry
self._host: str | None = config_entry.data[CONF_HOST]
self._mac: str | None = config_entry.data.get(CONF_MAC)
self._ssdp_rendering_control_location = config_entry.data.get(
CONF_SSDP_RENDERING_CONTROL_LOCATION
)
self._on_script = on_script
# Assume that the TV is in Play mode
self._playing: bool = True
Expand All @@ -121,6 +134,8 @@ def __init__(
if self._on_script or self._mac:
# Add turn-on if on_script or mac is available
self._attr_supported_features |= SUPPORT_TURN_ON
if self._ssdp_rendering_control_location:
self._attr_supported_features |= SUPPORT_VOLUME_SET

self._attr_device_info = DeviceInfo(
name=self.name,
Expand All @@ -142,6 +157,8 @@ def __init__(
self._bridge.register_reauth_callback(self.access_denied)
self._bridge.register_app_list_callback(self._app_list_callback)

self._upnp_device: UpnpDevice | None = None

def _update_sources(self) -> None:
self._attr_source_list = list(SOURCES)
if app_list := self._app_list:
Expand Down Expand Up @@ -179,21 +196,77 @@ async def async_update(self) -> None:
STATE_ON if await self._bridge.async_is_on() else STATE_OFF
)

if self._attr_state == STATE_ON and not self._app_list_event.is_set():
await self._bridge.async_request_app_list()
if self._app_list_event.is_set():
# The try+wait_for is a bit expensive so we should try not to
# enter it unless we have to (Python 3.11 will have zero cost try)
return
try:
await asyncio.wait_for(self._app_list_event.wait(), APP_LIST_DELAY)
except asyncio.TimeoutError as err:
# No need to try again
self._app_list_event.set()
LOGGER.debug(
"Failed to load app list from %s: %s", self._host, err.__repr__()
if self._attr_state != STATE_ON:
return

startup_tasks: list[Coroutine[Any, Any, None]] = []

if not self._app_list_event.is_set():
startup_tasks.append(self._async_startup_app_list())

if not self._upnp_device and self._ssdp_rendering_control_location:
startup_tasks.append(self._async_startup_upnp())

if startup_tasks:
await asyncio.gather(*startup_tasks)

if not (service := self._get_upnp_service()):
return

get_volume, get_mute = await asyncio.gather(
service.action("GetVolume").async_call(InstanceID=0, Channel="Master"),
service.action("GetMute").async_call(InstanceID=0, Channel="Master"),
)
LOGGER.debug("Upnp GetVolume on %s: %s", self._host, get_volume)
if (volume_level := get_volume.get("CurrentVolume")) is not None:
self._attr_volume_level = volume_level / 100
LOGGER.debug("Upnp GetMute on %s: %s", self._host, get_mute)
if (is_muted := get_mute.get("CurrentMute")) is not None:
self._attr_is_volume_muted = is_muted

async def _async_startup_app_list(self) -> None:
await self._bridge.async_request_app_list()
if self._app_list_event.is_set():
# The try+wait_for is a bit expensive so we should try not to
# enter it unless we have to (Python 3.11 will have zero cost try)
return
try:
await asyncio.wait_for(self._app_list_event.wait(), APP_LIST_DELAY)
except asyncio.TimeoutError as err:
# No need to try again
self._app_list_event.set()
LOGGER.debug(
"Failed to load app list from %s: %s", self._host, err.__repr__()
)

async def _async_startup_upnp(self) -> None:
assert self._ssdp_rendering_control_location is not None
if self._upnp_device is None:
session = async_get_clientsession(self.hass)
upnp_requester = AiohttpSessionRequester(session)
upnp_factory = UpnpFactory(upnp_requester)
with contextlib.suppress(UpnpConnectionError):
self._upnp_device = await upnp_factory.async_create_device(
self._ssdp_rendering_control_location
)

def _get_upnp_service(self, log: bool = False) -> UpnpService | None:
if self._upnp_device is None:
if log:
LOGGER.info("Upnp services are not available on %s", self._host)
return None

if service := self._upnp_device.services.get(UPNP_SVC_RENDERINGCONTROL):
return service

if log:
LOGGER.info(
"Upnp service %s is not available on %s",
UPNP_SVC_RENDERINGCONTROL,
self._host,
)
return None

async def _async_launch_app(self, app_id: str) -> None:
"""Send launch_app to the tv."""
if self._power_off_in_progress():
Expand Down Expand Up @@ -233,6 +306,19 @@ async def async_turn_off(self) -> None:
self._end_of_power_off = dt_util.utcnow() + SCAN_INTERVAL_PLUS_OFF_TIME
await self._bridge.async_power_off()

async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level on the media player."""
if not (service := self._get_upnp_service(log=True)):
return
try:
await service.action("SetVolume").async_call(
InstanceID=0, Channel="Master", DesiredVolume=int(volume * 100)
)
except UpnpActionResponseError as err:
LOGGER.warning(
"Unable to set volume level on %s: %s", self._host, err.__repr__()
)

async def async_volume_up(self) -> None:
"""Volume up the media player."""
await self._async_send_keys(["KEY_VOLUP"])
Expand Down
1 change: 1 addition & 0 deletions requirements_all.txt
Expand Up @@ -326,6 +326,7 @@ asterisk_mbox==0.5.0

# homeassistant.components.dlna_dmr
# homeassistant.components.dlna_dms
# homeassistant.components.samsungtv
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
Expand Down
1 change: 1 addition & 0 deletions requirements_test_all.txt
Expand Up @@ -259,6 +259,7 @@ arcam-fmj==0.12.0

# homeassistant.components.dlna_dmr
# homeassistant.components.dlna_dms
# homeassistant.components.samsungtv
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight
Expand Down
25 changes: 25 additions & 0 deletions tests/components/samsungtv/__init__.py
@@ -1,6 +1,10 @@
"""Tests for the samsungtv component."""
from __future__ import annotations

from unittest.mock import Mock

from async_upnp_client.client import UpnpAction, UpnpService

from homeassistant.components.samsungtv.const import DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
Expand All @@ -21,3 +25,24 @@ async def setup_samsungtv_entry(hass: HomeAssistant, data: ConfigType) -> Config
await hass.async_block_till_done()

return entry


def upnp_get_action_mock(device: Mock, service_type: str, action: str) -> Mock:
"""Get or Add UpnpService/UpnpAction to UpnpDevice mock."""
upnp_service: Mock | None
if (upnp_service := device.services.get(service_type)) is None:
upnp_service = Mock(UpnpService)
upnp_service.actions = {}

def _get_action(action: str):
return upnp_service.actions.get(action)

upnp_service.action.side_effect = _get_action
device.services[service_type] = upnp_service

upnp_action: Mock | None
if (upnp_action := upnp_service.actions.get(action)) is None:
upnp_action = Mock(UpnpAction)
upnp_service.actions[action] = upnp_action

return upnp_action
24 changes: 24 additions & 0 deletions tests/components/samsungtv/conftest.py
Expand Up @@ -6,6 +6,8 @@
from typing import Any
from unittest.mock import AsyncMock, Mock, patch

from async_upnp_client.client import UpnpDevice
from async_upnp_client.exceptions import UpnpConnectionError
import pytest
from samsungctl import Remote
from samsungtvws.async_remote import SamsungTVWSAsyncRemote
Expand Down Expand Up @@ -38,6 +40,28 @@ def app_list_delay_fixture() -> None:
yield


@pytest.fixture(name="upnp_factory", autouse=True)
def upnp_factory_fixture() -> Mock:
"""Patch UpnpFactory."""
with patch(
"homeassistant.components.samsungtv.media_player.UpnpFactory",
autospec=True,
) as upnp_factory_class:
upnp_factory: Mock = upnp_factory_class.return_value
upnp_factory.async_create_device.side_effect = UpnpConnectionError
yield upnp_factory


@pytest.fixture(name="upnp_device")
async def upnp_device_fixture(upnp_factory: Mock) -> Mock:
"""Patch async_upnp_client."""
upnp_device = Mock(UpnpDevice)
upnp_device.services = {}

with patch.object(upnp_factory, "async_create_device", side_effect=[upnp_device]):
yield upnp_device


@pytest.fixture(name="remote")
def remote_fixture() -> Mock:
"""Patch the samsungctl Remote."""
Expand Down

0 comments on commit c024033

Please sign in to comment.