Skip to content

Commit

Permalink
Live Streams Component (#21473)
Browse files Browse the repository at this point in the history
* initial commit of streams

* refactor stream component

* refactor so stream formats are not considered a platform

* initial test and minor refactor

* fix linting

* update requirements

* need av in tests as well

* fix import in class def vs method

* fix travis and docker builds

* address code review comments

* fix logger, add stream start/stop logs, listen to HASS stop

* address additional code review comments

* beef up tests

* fix tests

* fix lint

* add stream_source to onvif camera

* address pr comments

* add keepalive to camera play_stream service

* remove keepalive and move import

* implement registry and have output provider remove itself from stream after idle, set libav log level to error
  • Loading branch information
hunterjm authored and balloob committed Mar 12, 2019
1 parent 0a6ba14 commit 7ccd0bb
Show file tree
Hide file tree
Showing 18 changed files with 993 additions and 5 deletions.
13 changes: 10 additions & 3 deletions .travis.yml
@@ -1,8 +1,18 @@
sudo: false
dist: xenial
addons:
apt:
sources:
- sourceline: "ppa:jonathonf/ffmpeg-4"
packages:
- libudev-dev
- libavformat-dev
- libavcodec-dev
- libavdevice-dev
- libavutil-dev
- libswscale-dev
- libswresample-dev
- libavfilter-dev
matrix:
fast_finish: true
include:
Expand All @@ -19,15 +29,12 @@ matrix:
env: TOXENV=py36
- python: "3.7"
env: TOXENV=py37
dist: xenial
- python: "3.8-dev"
env: TOXENV=py38
dist: xenial
if: branch = dev AND type = push
allow_failures:
- python: "3.8-dev"
env: TOXENV=py38
dist: xenial

cache:
directories:
Expand Down
73 changes: 73 additions & 0 deletions homeassistant/components/camera/__init__.py
Expand Up @@ -28,6 +28,12 @@
from homeassistant.helpers.config_validation import ( # noqa
PLATFORM_SCHEMA, PLATFORM_SCHEMA_BASE)
from homeassistant.components.http import HomeAssistantView, KEY_AUTHENTICATED
from homeassistant.components.media_player.const import (
ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_CONTENT_TYPE,
SERVICE_PLAY_MEDIA, DOMAIN as DOMAIN_MP)
from homeassistant.components.stream import request_stream
from homeassistant.components.stream.const import (
OUTPUT_FORMATS, FORMAT_CONTENT_TYPE)
from homeassistant.components import websocket_api
import homeassistant.helpers.config_validation as cv

Expand All @@ -39,11 +45,14 @@
SERVICE_ENABLE_MOTION = 'enable_motion_detection'
SERVICE_DISABLE_MOTION = 'disable_motion_detection'
SERVICE_SNAPSHOT = 'snapshot'
SERVICE_PLAY_STREAM = 'play_stream'

SCAN_INTERVAL = timedelta(seconds=30)
ENTITY_ID_FORMAT = DOMAIN + '.{}'

ATTR_FILENAME = 'filename'
ATTR_MEDIA_PLAYER = 'media_player'
ATTR_FORMAT = 'format'

STATE_RECORDING = 'recording'
STATE_STREAMING = 'streaming'
Expand All @@ -69,6 +78,11 @@
vol.Required(ATTR_FILENAME): cv.template
})

CAMERA_SERVICE_PLAY_STREAM = CAMERA_SERVICE_SCHEMA.extend({
vol.Required(ATTR_MEDIA_PLAYER): cv.entities_domain(DOMAIN_MP),
vol.Optional(ATTR_FORMAT, default='hls'): vol.In(OUTPUT_FORMATS),
})

WS_TYPE_CAMERA_THUMBNAIL = 'camera_thumbnail'
SCHEMA_WS_CAMERA_THUMBNAIL = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({
vol.Required('type'): WS_TYPE_CAMERA_THUMBNAIL,
Expand Down Expand Up @@ -176,6 +190,7 @@ async def async_setup(hass, config):
WS_TYPE_CAMERA_THUMBNAIL, websocket_camera_thumbnail,
SCHEMA_WS_CAMERA_THUMBNAIL
)
hass.components.websocket_api.async_register_command(ws_camera_stream)

await component.async_setup(config)

Expand Down Expand Up @@ -209,6 +224,10 @@ def update_tokens(time):
SERVICE_SNAPSHOT, CAMERA_SERVICE_SNAPSHOT,
async_handle_snapshot_service
)
component.async_register_entity_service(
SERVICE_PLAY_STREAM, CAMERA_SERVICE_PLAY_STREAM,
async_handle_play_stream_service
)

return True

Expand Down Expand Up @@ -273,6 +292,11 @@ def frame_interval(self):
"""Return the interval between frames of the mjpeg stream."""
return 0.5

@property
def stream_source(self):
"""Return the source of the stream."""
return None

def camera_image(self):
"""Return bytes of camera image."""
raise NotImplementedError()
Expand Down Expand Up @@ -473,6 +497,33 @@ async def websocket_camera_thumbnail(hass, connection, msg):
msg['id'], 'image_fetch_failed', 'Unable to fetch image'))


@websocket_api.async_response
@websocket_api.websocket_command({
vol.Required('type'): 'camera/stream',
vol.Required('entity_id'): cv.entity_id,
vol.Optional('format', default='hls'): vol.In(OUTPUT_FORMATS),
})
async def ws_camera_stream(hass, connection, msg):
"""Handle get camera stream websocket command.
Async friendly.
"""
try:
camera = _get_camera_from_entity_id(hass, msg['entity_id'])

if not camera.stream_source:
raise HomeAssistantError("{} does not support play stream service"
.format(camera.entity_id))

fmt = msg['format']
url = request_stream(hass, camera.stream_source, fmt=fmt)
connection.send_result(msg['id'], {'url': url})
except HomeAssistantError as ex:
_LOGGER.error(ex)
connection.send_error(
msg['id'], 'start_stream_failed', str(ex))


async def async_handle_snapshot_service(camera, service):
"""Handle snapshot services calls."""
hass = camera.hass
Expand Down Expand Up @@ -500,3 +551,25 @@ def _write_image(to_file, image_data):
_write_image, snapshot_file, image)
except OSError as err:
_LOGGER.error("Can't write image to file: %s", err)


async def async_handle_play_stream_service(camera, service_call):
"""Handle play stream services calls."""
if not camera.stream_source:
raise HomeAssistantError("{} does not support play stream service"
.format(camera.entity_id))

hass = camera.hass
fmt = service_call.data[ATTR_FORMAT]
entity_ids = service_call.data[ATTR_MEDIA_PLAYER]

url = request_stream(hass, camera.stream_source, fmt=fmt)
data = {
ATTR_ENTITY_ID: entity_ids,
ATTR_MEDIA_CONTENT_ID: url,
ATTR_MEDIA_CONTENT_TYPE: FORMAT_CONTENT_TYPE[fmt]
}

await hass.services.async_call(
DOMAIN_MP, SERVICE_PLAY_MEDIA, data,
blocking=True, context=service_call.context)
5 changes: 5 additions & 0 deletions homeassistant/components/camera/ffmpeg.py
Expand Up @@ -76,3 +76,8 @@ async def handle_async_mjpeg_stream(self, request):
def name(self):
"""Return the name of this camera."""
return self._name

@property
def stream_source(self):
"""Return the source of the stream."""
return self._input
5 changes: 5 additions & 0 deletions homeassistant/components/camera/onvif.py
Expand Up @@ -230,3 +230,8 @@ async def handle_async_mjpeg_stream(self, request):
def name(self):
"""Return the name of this camera."""
return self._name

@property
def stream_source(self):
"""Return the source of the stream."""
return self._input
16 changes: 16 additions & 0 deletions homeassistant/components/camera/services.yaml
Expand Up @@ -38,6 +38,22 @@ snapshot:
description: Template of a Filename. Variable is entity_id.
example: '/tmp/snapshot_{{ entity_id }}'

play_stream:
description: Play camera stream on supported media player.
fields:
entity_id:
description: Name(s) of entities to stream from.
example: 'camera.living_room_camera'
media_player:
description: Name(s) of media player to stream to.
example: 'media_player.living_room_tv'
format:
description: (Optional) Stream format supported by media player.
example: 'hls'
keepalive:
description: (Optional) Keep the stream worker alive for fast access.
example: 'true'

local_file_update_file_path:
description: Update the file_path for a local_file camera.
fields:
Expand Down
153 changes: 153 additions & 0 deletions homeassistant/components/stream/__init__.py
@@ -0,0 +1,153 @@
"""
Provide functionality to stream video source.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/stream/
"""
import logging
import threading

import voluptuous as vol

from homeassistant.auth.util import generate_secret
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import bind_hass

from .const import DOMAIN, ATTR_STREAMS, ATTR_ENDPOINTS
from .core import PROVIDERS
from .worker import stream_worker
from .hls import async_setup_hls

REQUIREMENTS = ['av==6.1.2']

_LOGGER = logging.getLogger(__name__)

DEPENDENCIES = ['http']

CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({}),
}, extra=vol.ALLOW_EXTRA)

# Set log level to error for libav
logging.getLogger('libav').setLevel(logging.ERROR)


@bind_hass
def request_stream(hass, stream_source, *, fmt='hls',
keepalive=False, options=None):
"""Set up stream with token."""
if DOMAIN not in hass.config.components:
raise HomeAssistantError("Stream component is not set up.")

if options is None:
options = {}

try:
streams = hass.data[DOMAIN][ATTR_STREAMS]
stream = streams.get(stream_source)
if not stream:
stream = Stream(hass, stream_source,
options=options, keepalive=keepalive)
streams[stream_source] = stream

# Add provider
stream.add_provider(fmt)

if not stream.access_token:
stream.access_token = generate_secret()
stream.start()
return hass.data[DOMAIN][ATTR_ENDPOINTS][fmt].format(
hass.config.api.base_url, stream.access_token)
except Exception:
raise HomeAssistantError('Unable to get stream')


async def async_setup(hass, config):
"""Set up stream."""
hass.data[DOMAIN] = {}
hass.data[DOMAIN][ATTR_ENDPOINTS] = {}
hass.data[DOMAIN][ATTR_STREAMS] = {}

# Setup HLS
hls_endpoint = async_setup_hls(hass)
hass.data[DOMAIN][ATTR_ENDPOINTS]['hls'] = hls_endpoint

@callback
def shutdown(event):
"""Stop all stream workers."""
for stream in hass.data[DOMAIN][ATTR_STREAMS].values():
stream.keepalive = False
stream.stop()
_LOGGER.info("Stopped stream workers.")

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)

return True


class Stream:
"""Represents a single stream."""

def __init__(self, hass, source, options=None, keepalive=False):
"""Initialize a stream."""
self.hass = hass
self.source = source
self.options = options
self.keepalive = keepalive
self.access_token = None
self._thread = None
self._thread_quit = None
self._outputs = {}

if self.options is None:
self.options = {}

@property
def outputs(self):
"""Return stream outputs."""
return self._outputs

def add_provider(self, fmt):
"""Add provider output stream."""
provider = PROVIDERS[fmt](self)
if not self._outputs.get(provider.format):
self._outputs[provider.format] = provider
return self._outputs[provider.format]

def remove_provider(self, provider):
"""Remove provider output stream."""
if provider.format in self._outputs:
del self._outputs[provider.format]

if not self._outputs:
self.stop()

def start(self):
"""Start a stream."""
if self._thread is None or not self._thread.isAlive():
self._thread_quit = threading.Event()
self._thread = threading.Thread(
name='stream_worker',
target=stream_worker,
args=(
self.hass, self, self._thread_quit))
self._thread.start()
_LOGGER.info("Started stream: %s", self.source)

def stop(self):
"""Remove outputs and access token."""
self._outputs = {}
self.access_token = None

if not self.keepalive:
self._stop()

def _stop(self):
"""Stop worker thread."""
if self._thread is not None:
self._thread_quit.set()
self._thread.join()
self._thread = None
_LOGGER.info("Stopped stream: %s", self.source)
14 changes: 14 additions & 0 deletions homeassistant/components/stream/const.py
@@ -0,0 +1,14 @@
"""Constants for Stream component."""
DOMAIN = 'stream'

ATTR_ENDPOINTS = 'endpoints'
ATTR_STREAMS = 'streams'
ATTR_KEEPALIVE = 'keepalive'

OUTPUT_FORMATS = ['hls']

FORMAT_CONTENT_TYPE = {
'hls': 'application/vnd.apple.mpegurl'
}

AUDIO_SAMPLE_RATE = 44100

0 comments on commit 7ccd0bb

Please sign in to comment.