diff --git a/.coveragerc b/.coveragerc index 7fded7dd8b294..23531c159fd51 100644 --- a/.coveragerc +++ b/.coveragerc @@ -64,6 +64,7 @@ omit = homeassistant/components/anthemav/media_player.py homeassistant/components/apcupsd/* homeassistant/components/apple_tv/__init__.py + homeassistant/components/apple_tv/browse_media.py homeassistant/components/apple_tv/media_player.py homeassistant/components/apple_tv/remote.py homeassistant/components/aqualogic/* diff --git a/homeassistant/components/apple_tv/browse_media.py b/homeassistant/components/apple_tv/browse_media.py index 0673c9923fb55..9944c49a82325 100644 --- a/homeassistant/components/apple_tv/browse_media.py +++ b/homeassistant/components/apple_tv/browse_media.py @@ -1,34 +1,29 @@ """Support for media browsing.""" +from typing import Any -from homeassistant.components.media_player import BrowseMedia -from homeassistant.components.media_player.const import ( - MEDIA_CLASS_APP, - MEDIA_CLASS_DIRECTORY, - MEDIA_TYPE_APP, - MEDIA_TYPE_APPS, -) +from homeassistant.components.media_player import BrowseMedia, MediaClass, MediaType -def build_app_list(app_list): +def build_app_list(app_list: dict[str, str]) -> BrowseMedia: """Create response payload for app list.""" - app_list = [ - {"app_id": app_id, "title": app_name, "type": MEDIA_TYPE_APP} + media_list = [ + {"app_id": app_id, "title": app_name, "type": MediaType.APP} for app_name, app_id in app_list.items() ] return BrowseMedia( - media_class=MEDIA_CLASS_DIRECTORY, + media_class=MediaClass.DIRECTORY, media_content_id="apps", - media_content_type=MEDIA_TYPE_APPS, + media_content_type=MediaType.APPS, title="Apps", can_play=False, can_expand=True, - children=[item_payload(item) for item in app_list], - children_media_class=MEDIA_CLASS_APP, + children=[item_payload(item) for item in media_list], + children_media_class=MediaClass.APP, ) -def item_payload(item): +def item_payload(item: dict[str, Any]) -> BrowseMedia: """ Create response payload for a single media item. @@ -36,8 +31,8 @@ def item_payload(item): """ return BrowseMedia( title=item["title"], - media_class=MEDIA_CLASS_APP, - media_content_type=MEDIA_TYPE_APP, + media_class=MediaClass.APP, + media_content_type=MediaType.APP, media_content_id=item["app_id"], can_play=False, can_expand=False, diff --git a/homeassistant/components/apple_tv/media_player.py b/homeassistant/components/apple_tv/media_player.py index 771b27a6dc386..06618e4f2a354 100644 --- a/homeassistant/components/apple_tv/media_player.py +++ b/homeassistant/components/apple_tv/media_player.py @@ -1,6 +1,7 @@ """Support for Apple TV media player.""" from __future__ import annotations +from datetime import datetime import logging from typing import Any @@ -9,45 +10,31 @@ DeviceState, FeatureName, FeatureState, - MediaType, + MediaType as AppleMediaType, PowerState, RepeatState, ShuffleState, ) from pyatv.helpers import is_streamable +from pyatv.interface import AppleTV, Playing from homeassistant.components import media_source from homeassistant.components.media_player import ( BrowseMedia, MediaPlayerEntity, MediaPlayerEntityFeature, -) -from homeassistant.components.media_player.browse_media import ( + MediaPlayerState, + MediaType, + RepeatMode, async_process_play_media_url, ) -from homeassistant.components.media_player.const import ( - MEDIA_TYPE_APP, - MEDIA_TYPE_MUSIC, - MEDIA_TYPE_TVSHOW, - MEDIA_TYPE_VIDEO, - REPEAT_MODE_ALL, - REPEAT_MODE_OFF, - REPEAT_MODE_ONE, -) from homeassistant.config_entries import ConfigEntry -from homeassistant.const import ( - CONF_NAME, - STATE_IDLE, - STATE_OFF, - STATE_PAUSED, - STATE_PLAYING, - STATE_STANDBY, -) +from homeassistant.const import CONF_NAME from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.entity_platform import AddEntitiesCallback import homeassistant.util.dt as dt_util -from . import AppleTVEntity +from . import AppleTVEntity, AppleTVManager from .browse_media import build_app_list from .const import DOMAIN @@ -108,8 +95,9 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Load Apple TV media player based on a config entry.""" - name = config_entry.data[CONF_NAME] - manager = hass.data[DOMAIN][config_entry.unique_id] + name: str = config_entry.data[CONF_NAME] + assert config_entry.unique_id is not None + manager: AppleTVManager = hass.data[DOMAIN][config_entry.unique_id] async_add_entities([AppleTvMediaPlayer(name, config_entry.unique_id, manager)]) @@ -118,14 +106,14 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity): _attr_supported_features = SUPPORT_APPLE_TV - def __init__(self, name, identifier, manager, **kwargs): + def __init__(self, name: str, identifier: str, manager: AppleTVManager) -> None: """Initialize the Apple TV media player.""" - super().__init__(name, identifier, manager, **kwargs) - self._playing = None - self._app_list = {} + super().__init__(name, identifier, manager) + self._playing: Playing | None = None + self._app_list: dict[str, str] = {} @callback - def async_device_connected(self, atv): + def async_device_connected(self, atv: AppleTV) -> None: """Handle when connection is made to device.""" # NB: Do not use _is_feature_available here as it only works when playing if self.atv.features.in_state(FeatureState.Available, FeatureName.PushUpdates): @@ -153,7 +141,7 @@ def async_device_connected(self, atv): if self.atv.features.in_state(FeatureState.Available, FeatureName.AppList): self.hass.create_task(self._update_app_list()) - async def _update_app_list(self): + async def _update_app_list(self) -> None: _LOGGER.debug("Updating app list") try: apps = await self.atv.apps.app_list() @@ -165,127 +153,128 @@ async def _update_app_list(self): self._app_list = { app.name: app.identifier for app in sorted(apps, key=lambda app: app.name.lower()) + if app.name is not None } self.async_write_ha_state() @callback - def async_device_disconnected(self): + def async_device_disconnected(self) -> None: """Handle when connection was lost to device.""" self._attr_supported_features = SUPPORT_APPLE_TV @property - def state(self): + def state(self) -> MediaPlayerState | None: """Return the state of the device.""" if self.manager.is_connecting: return None if self.atv is None: - return STATE_OFF + return MediaPlayerState.OFF if ( self._is_feature_available(FeatureName.PowerState) and self.atv.power.power_state == PowerState.Off ): - return STATE_STANDBY + return MediaPlayerState.STANDBY if self._playing: state = self._playing.device_state if state in (DeviceState.Idle, DeviceState.Loading): - return STATE_IDLE + return MediaPlayerState.IDLE if state == DeviceState.Playing: - return STATE_PLAYING + return MediaPlayerState.PLAYING if state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped): - return STATE_PAUSED - return STATE_STANDBY # Bad or unknown state? + return MediaPlayerState.PAUSED + return MediaPlayerState.STANDBY # Bad or unknown state? return None @callback - def playstatus_update(self, _, playing): + def playstatus_update(self, _, playing: Playing) -> None: """Print what is currently playing when it changes.""" self._playing = playing self.async_write_ha_state() @callback - def playstatus_error(self, _, exception): + def playstatus_error(self, _, exception: Exception) -> None: """Inform about an error and restart push updates.""" _LOGGER.warning("A %s error occurred: %s", exception.__class__, exception) self._playing = None self.async_write_ha_state() @callback - def powerstate_update(self, old_state: PowerState, new_state: PowerState): + def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None: """Update power state when it changes.""" self.async_write_ha_state() @property - def app_id(self): + def app_id(self) -> str | None: """ID of the current running app.""" if self._is_feature_available(FeatureName.App): return self.atv.metadata.app.identifier return None @property - def app_name(self): + def app_name(self) -> str | None: """Name of the current running app.""" if self._is_feature_available(FeatureName.App): return self.atv.metadata.app.name return None @property - def source_list(self): + def source_list(self) -> list[str]: """List of available input sources.""" return list(self._app_list.keys()) @property - def media_content_type(self): + def media_content_type(self) -> MediaType | None: """Content type of current playing media.""" if self._playing: return { - MediaType.Video: MEDIA_TYPE_VIDEO, - MediaType.Music: MEDIA_TYPE_MUSIC, - MediaType.TV: MEDIA_TYPE_TVSHOW, + AppleMediaType.Video: MediaType.VIDEO, + AppleMediaType.Music: MediaType.MUSIC, + AppleMediaType.TV: MediaType.TVSHOW, }.get(self._playing.media_type) return None @property - def media_content_id(self): + def media_content_id(self) -> str | None: """Content ID of current playing media.""" if self._playing: return self._playing.content_identifier return None @property - def volume_level(self): + def volume_level(self) -> float | None: """Volume level of the media player (0..1).""" if self._is_feature_available(FeatureName.Volume): return self.atv.audio.volume / 100.0 # from percent return None @property - def media_duration(self): + def media_duration(self) -> int | None: """Duration of current playing media in seconds.""" if self._playing: return self._playing.total_time return None @property - def media_position(self): + def media_position(self) -> int | None: """Position of current playing media in seconds.""" if self._playing: return self._playing.position return None @property - def media_position_updated_at(self): + def media_position_updated_at(self) -> datetime | None: """Last valid time of media position.""" - if self.state in (STATE_PLAYING, STATE_PAUSED): + if self.state in {MediaPlayerState.PLAYING, MediaPlayerState.PAUSED}: return dt_util.utcnow() return None async def async_play_media( - self, media_type: str, media_id: str, **kwargs: Any + self, media_type: MediaType | str, media_id: str, **kwargs: Any ) -> None: """Send the play_media command to the media player.""" # If input (file) has a file format supported by pyatv, then stream it with # RAOP. Otherwise try to play it with regular AirPlay. - if media_type == MEDIA_TYPE_APP: + if media_type == MediaType.APP: await self.atv.apps.launch_app(media_id) return @@ -294,10 +283,10 @@ async def async_play_media( self.hass, media_id, self.entity_id ) media_id = async_process_play_media_url(self.hass, play_item.url) - media_type = MEDIA_TYPE_MUSIC + media_type = MediaType.MUSIC if self._is_feature_available(FeatureName.StreamFile) and ( - media_type == MEDIA_TYPE_MUSIC or await is_streamable(media_id) + media_type == MediaType.MUSIC or await is_streamable(media_id) ): _LOGGER.debug("Streaming %s via RAOP", media_id) await self.atv.stream.stream_file(media_id) @@ -308,13 +297,13 @@ async def async_play_media( _LOGGER.error("Media streaming is not possible with current configuration") @property - def media_image_hash(self): + def media_image_hash(self) -> str | None: """Hash value for media image.""" state = self.state if ( self._playing and self._is_feature_available(FeatureName.Artwork) - and state not in [None, STATE_OFF, STATE_IDLE] + and state not in {None, MediaPlayerState.OFF, MediaPlayerState.IDLE} ): return self.atv.metadata.artwork_id return None @@ -322,7 +311,7 @@ def media_image_hash(self): async def async_get_media_image(self) -> tuple[bytes | None, str | None]: """Fetch media image of current playing image.""" state = self.state - if self._playing and state not in [STATE_OFF, STATE_IDLE]: + if self._playing and state not in {MediaPlayerState.OFF, MediaPlayerState.IDLE}: artwork = await self.atv.metadata.artwork() if artwork: return artwork.bytes, artwork.mimetype @@ -330,65 +319,65 @@ async def async_get_media_image(self) -> tuple[bytes | None, str | None]: return None, None @property - def media_title(self): + def media_title(self) -> str | None: """Title of current playing media.""" if self._playing: return self._playing.title return None @property - def media_artist(self): + def media_artist(self) -> str | None: """Artist of current playing media, music track only.""" - if self._is_feature_available(FeatureName.Artist): + if self._playing and self._is_feature_available(FeatureName.Artist): return self._playing.artist return None @property - def media_album_name(self): + def media_album_name(self) -> str | None: """Album name of current playing media, music track only.""" - if self._is_feature_available(FeatureName.Album): + if self._playing and self._is_feature_available(FeatureName.Album): return self._playing.album return None @property - def media_series_title(self): + def media_series_title(self) -> str | None: """Title of series of current playing media, TV show only.""" - if self._is_feature_available(FeatureName.SeriesName): + if self._playing and self._is_feature_available(FeatureName.SeriesName): return self._playing.series_name return None @property - def media_season(self): + def media_season(self) -> str | None: """Season of current playing media, TV show only.""" - if self._is_feature_available(FeatureName.SeasonNumber): + if self._playing and self._is_feature_available(FeatureName.SeasonNumber): return str(self._playing.season_number) return None @property - def media_episode(self): + def media_episode(self) -> str | None: """Episode of current playing media, TV show only.""" - if self._is_feature_available(FeatureName.EpisodeNumber): + if self._playing and self._is_feature_available(FeatureName.EpisodeNumber): return str(self._playing.episode_number) return None @property - def repeat(self): + def repeat(self) -> RepeatMode | None: """Return current repeat mode.""" - if self._is_feature_available(FeatureName.Repeat): + if self._playing and self._is_feature_available(FeatureName.Repeat): return { - RepeatState.Track: REPEAT_MODE_ONE, - RepeatState.All: REPEAT_MODE_ALL, - }.get(self._playing.repeat, REPEAT_MODE_OFF) + RepeatState.Track: RepeatMode.ONE, + RepeatState.All: RepeatMode.ALL, + }.get(self._playing.repeat, RepeatMode.OFF) return None @property - def shuffle(self): + def shuffle(self) -> bool | None: """Boolean if shuffle is enabled.""" - if self._is_feature_available(FeatureName.Shuffle): + if self._playing and self._is_feature_available(FeatureName.Shuffle): return self._playing.shuffle != ShuffleState.Off return None - def _is_feature_available(self, feature): + def _is_feature_available(self, feature: FeatureName) -> bool: """Return if a feature is available.""" if self.atv and self._playing: return self.atv.features.in_state(FeatureState.Available, feature) @@ -396,7 +385,7 @@ def _is_feature_available(self, feature): async def async_browse_media( self, - media_content_type: str | None = None, + media_content_type: MediaType | str | None = None, media_content_id: str | None = None, ) -> BrowseMedia: """Implement the websocket media browsing helper.""" @@ -496,12 +485,12 @@ async def async_set_volume_level(self, volume: float) -> None: # pyatv expects volume in percent await self.atv.audio.set_volume(volume * 100.0) - async def async_set_repeat(self, repeat: str) -> None: + async def async_set_repeat(self, repeat: RepeatMode) -> None: """Set repeat mode.""" if self.atv: mode = { - REPEAT_MODE_ONE: RepeatState.Track, - REPEAT_MODE_ALL: RepeatState.All, + RepeatMode.ONE: RepeatState.Track, + RepeatMode.ALL: RepeatState.All, }.get(repeat, RepeatState.Off) await self.atv.remote_control.set_repeat(mode) diff --git a/homeassistant/components/media_player/__init__.py b/homeassistant/components/media_player/__init__.py index e607f312df67c..2c24a43ffc3ac 100644 --- a/homeassistant/components/media_player/__init__.py +++ b/homeassistant/components/media_player/__init__.py @@ -127,6 +127,7 @@ SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_STEP, + MediaClass, MediaPlayerEntityFeature, MediaPlayerState, MediaType,