Skip to content

Commit

Permalink
feat: try to have more reliable p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatakgun committed Nov 12, 2023
1 parent d2616db commit 088ac61
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 202 deletions.
11 changes: 10 additions & 1 deletion custom_components/eufy_security/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import asyncio
import contextlib
import logging
import traceback

from haffmpeg.camera import CameraMjpeg
from haffmpeg.tools import ImageFrame
from base64 import b64decode
from homeassistant.components import ffmpeg
from homeassistant.components.camera import Camera, CameraEntityFeature
Expand Down Expand Up @@ -79,14 +81,17 @@ def __init__(self, coordinator: EufySecurityDataUpdateCoordinator, metadata: Met

# ffmpeg entities
self.ffmpeg = self.coordinator.hass.data[DATA_FFMPEG]
self.product.set_ffmpeg(CameraMjpeg(self.ffmpeg.binary))
self.product.set_ffmpeg(CameraMjpeg(self.ffmpeg.binary), ImageFrame(self.ffmpeg.binary))

async def stream_source(self) -> str:
#for line in traceback.format_stack():
# _LOGGER.debug(f"stream_source - {line.strip()}")
if self.is_streaming is False:
return None
return self.product.stream_url

async def handle_async_mjpeg_stream(self, request):
"""this is probabaly triggered by user request, turn on"""
stream_source = await self.stream_source()
if stream_source is None:
return await super().handle_async_mjpeg_stream(request)
Expand Down Expand Up @@ -160,23 +165,27 @@ async def _start_livestream(self) -> None:
await self._stop_livestream()
else:
await self._start_hass_streaming()
self.async_write_ha_state()

async def _stop_livestream(self) -> None:
"""stop byte based livestream on camera"""
await self._stop_hass_streaming()
await self.product.stop_livestream()
self.async_write_ha_state()

async def _start_rtsp_livestream(self) -> None:
"""start rtsp based livestream on camera"""
if await self.product.start_rtsp_livestream() is False:
await self._stop_rtsp_livestream()
else:
await self._start_hass_streaming()
self.async_write_ha_state()

async def _stop_rtsp_livestream(self) -> None:
"""stop rtsp based livestream on camera"""
await self._stop_hass_streaming()
await self.product.stop_rtsp_livestream()
self.async_write_ha_state()

async def _async_alarm_trigger(self, duration: int = 10):
"""trigger alarm for a duration on camera"""
Expand Down
1 change: 1 addition & 0 deletions custom_components/eufy_security/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,4 @@ class PlatformToPropertyType(Enum):
SWITCH = MetadataFilter(readable=True, writeable=True, types=[PropertyType.boolean])
SELECT = MetadataFilter(readable=True, writeable=True, types=[PropertyType.number], any_fields=[MessageField.STATES.value])
NUMBER = MetadataFilter(readable=True, writeable=True, types=[PropertyType.number], no_fields=[MessageField.STATES.value])
DEVICE_TRACKER = MetadataFilter(readable=True, writeable=False, types=[PropertyType.boolean])
53 changes: 53 additions & 0 deletions custom_components/eufy_security/device_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
from typing import Any

from homeassistant.components.device_tracker import SOURCE_TYPE_GPS
from homeassistant.components.device_tracker.config_entry import TrackerEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback

from .const import (
COORDINATOR,
DOMAIN,
Platform,
PlatformToPropertyType,
)
from .coordinator import EufySecurityDataUpdateCoordinator
from .entity import EufySecurityEntity
from .eufy_security_api.metadata import Metadata
from .eufy_security_api.util import get_child_value
from .util import get_product_properties_by_filter

_LOGGER: logging.Logger = logging.getLogger(__package__)


async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
"""Setup switch entities."""

coordinator: EufySecurityDataUpdateCoordinator = hass.data[DOMAIN][COORDINATOR]
product_properties = get_product_properties_by_filter(
[coordinator.devices.values(), coordinator.stations.values()], PlatformToPropertyType[Platform.SWITCH.name].value
)
entities = [EufySwitchEntity(coordinator, metadata) for metadata in product_properties]
async_add_entities(entities)


class EufyDeviceTrackerEntity(TrackerEntity, EufySecurityEntity):
"""Base switch entity for integration"""

def __init__(self, coordinator: EufySecurityDataUpdateCoordinator, metadata: Metadata) -> None:
super().__init__(coordinator, metadata)

@property
def is_on(self):
"""Return true if the switch is on."""
return bool(get_child_value(self.product.properties, self.metadata.name))

async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
await self.product.set_property(self.metadata, False)

async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
await self.product.set_property(self.metadata, True)
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,12 @@ async def reboot(self, product_type: ProductType, serial_no: str) -> None:
await self._send_message_get_response(OutgoingMessage(OutgoingMessageType.reboot, serial_no=serial_no))

async def _on_message(self, message: dict) -> None:
message_str = str(message)[0:1000]
message_str = str(message)[0:5000]
if "livestream video data" not in message_str and "livestream audio data" not in message_str:
_LOGGER.debug(f"_on_message - {message_str}")
else:
# _LOGGER.debug(f"_on_message - livestream data received - {len(str(message))}")
pass
if message[MessageField.TYPE.value] == IncomingMessageType.result.name:
future = self._result_futures.get(message.get(MessageField.MESSAGE_ID.value, -1), None)

Expand Down
148 changes: 68 additions & 80 deletions custom_components/eufy_security/eufy_security_api/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,20 @@
import contextlib
from enum import Enum
import logging
from queue import Queue
import threading
from base64 import b64decode
import datetime
import traceback


from aiortsp.rtsp.reader import RTSPReader

from .const import MessageField
from .const import MessageField, STREAM_TIMEOUT_SECONDS, STREAM_SLEEP_SECONDS
from .event import Event
from .exceptions import CameraRTSPStreamNotEnabled, CameraRTSPStreamNotSupported
from .p2p_stream_handler import P2PStreamHandler
from .p2p_streamer import P2PStreamer
from .product import Device
from .util import wait_for_value

_LOGGER: logging.Logger = logging.getLogger(__package__)

STREAM_TIMEOUT_SECONDS = 15
STREAM_SLEEP_SECONDS = 0.5


class StreamStatus(Enum):
"""Stream status"""
Expand Down Expand Up @@ -58,14 +52,15 @@ def __init__(self, api, serial_no: str, properties: dict, metadata: dict, comman
self.stream_provider: StreamProvider = None
self.stream_url: str = None
self.codec: str = None
self.video_queue: Queue = Queue()

self.video_queue = asyncio.Queue()
self.config = config
self.voices = voices
self.ffmpeg = None
self.imagempeg = None
self.image_last_updated = None

self.p2p_stream_handler = P2PStreamHandler(self)
self.p2p_stream_thread = None
self.p2p_streamer = P2PStreamer(self)

if self.is_rtsp_enabled is True:
self.set_stream_prodiver(StreamProvider.RTSP)
Expand All @@ -82,20 +77,21 @@ def is_streaming(self) -> bool:
"""Is Camera in Streaming Status"""
return self.stream_status == StreamStatus.STREAMING

def set_ffmpeg(self, ffmpeg):
def set_ffmpeg(self, ffmpeg, imagempeg):
"""set ffmpeg binary"""
self.ffmpeg = ffmpeg
self.p2p_stream_handler.set_ffmpeg(ffmpeg)
self.imagempeg = imagempeg

async def _handle_livestream_started(self, event: Event):
# automatically find this function for respective event
_LOGGER.debug(f"_handle_livestream_started - {event}")
self.p2p_started_event.set()

async def _handle_livestream_stopped(self, event: Event):
# automatically find this function for respective event
_LOGGER.debug(f"_handle_livestream_stopped - {event}")
self.stream_status = StreamStatus.IDLE
self.video_queue.queue.clear()
self.video_queue = asyncio.Queue()

async def _handle_rtsp_livestream_started(self, event: Event):
# automatically find this function for respective event
Expand All @@ -111,95 +107,88 @@ async def _handle_livestream_video_data_received(self, event: Event):
# automatically find this function for respective event
if self.codec is None:
self.codec = event.data["metadata"]["videoCodec"].lower()
await self._start_ffmpeg()

self.video_queue.put(bytearray(event.data["buffer"]["data"]))
await self.video_queue.put(event.data["buffer"]["data"])

async def _start_ffmpeg(self):
await self.p2p_stream_handler.start_ffmpeg(self.config.ffmpeg_analyze_duration)
async def _start_p2p_streamer(self):
self.stream_debug = "info - wait for codec value"
await wait_for_value(self.__dict__, "codec", None)
await self.p2p_streamer.start()

async def _is_stream_url_ready(self) -> bool:
_LOGGER.debug("_is_stream_url_ready - 1")
with contextlib.suppress(Exception):
while True:
async with RTSPReader(self.stream_url.replace("rtsp://", "rtspt://")) as reader:
_LOGGER.debug("_is_stream_url_ready - 2 - reader opened")
async for pkt in reader.iter_packets():
_LOGGER.debug(f"_is_stream_url_ready - 3 - received {len(pkt)}")
return True
_LOGGER.debug("_is_stream_url_ready - 4 - reader closed")
await asyncio.sleep(STREAM_SLEEP_SECONDS)
if await self.imagempeg.get_image(self.stream_url, timeout=1) is not None:
return True
return False

async def start_livestream(self) -> bool:
"""Process start p2p livestream call"""
self.set_stream_prodiver(StreamProvider.P2P)
self.stream_status = StreamStatus.PREPARING
self.stream_debug = "info - send command to add-on"
await self.api.start_livestream(self.product_type, self.serial_no)
self.stream_debug = "info - command was done, open a local tcp port"
self.p2p_stream_thread = threading.Thread(target=self.p2p_stream_handler.setup, daemon=True)
self.p2p_stream_thread.start()
await wait_for_value(self.p2p_stream_handler.__dict__, "port", None)
self.stream_debug = "info - local tcp was setup, checking for codec"

if self.codec is not None:
self.stream_debug = "info - codec is known, start ffmpeg consuming tcp port and forwarding to rtsp add-on"
await self._start_ffmpeg()
self.stream_debug = "info - ffmpeg was started"

with contextlib.suppress(asyncio.TimeoutError):
self.stream_debug = "info - wait for bytes to arrive from add-on, they will be written to tcp port"
await asyncio.wait_for(self.p2p_started_event.wait(), STREAM_TIMEOUT_SECONDS)

if self.p2p_started_event.is_set() is False:
self.stream_debug = "error - ffmpeg pocess could not connect"
return False

async def _check_stream_url(self) -> bool:
try:
self.stream_debug = "info - check if rtsp url is a valid stream"
self.stream_debug = "info - check if stream url is a valid stream"
_LOGGER.debug(f"_check_stream_url - {self.stream_debug}")
await asyncio.wait_for(self._is_stream_url_ready(), STREAM_TIMEOUT_SECONDS)
self.stream_status = StreamStatus.STREAMING
self.stream_debug = "info - streaming"
_LOGGER.debug(f"_check_stream_url - {self.stream_debug}")
return True
except asyncio.TimeoutError:
self.stream_debug = "error - rtsp url was not a valid stream"
_LOGGER.debug(f"_check_stream_url - {self.stream_debug}")
return False

self.stream_status = StreamStatus.STREAMING
self.stream_debug = "info - streaming"
return True

async def stop_livestream(self):
"""Process stop p2p livestream call"""
await self.api.stop_livestream(self.product_type, self.serial_no)
if self.p2p_stream_thread.is_alive() is True:
await self.p2p_stream_handler.stop()

async def start_rtsp_livestream(self):
"""Process start rtsp livestream call"""
self.set_stream_prodiver(StreamProvider.RTSP)
async def _initiate_start_stream(self, stream_type) -> bool:
self.set_stream_prodiver(stream_type)
self.stream_status = StreamStatus.PREPARING
self.stream_debug = "info - send command to add-on"
await self.api.start_rtsp_livestream(self.product_type, self.serial_no)
_LOGGER.debug(f"_initiate_start_stream - {self.stream_debug} - {stream_type}")
event = None
if stream_type == StreamProvider.P2P:
event = self.p2p_started_event
event.clear()
if await self.api.start_livestream(self.product_type, self.serial_no) is False:
return False
else:
event = self.rtsp_started_event
event.clear()
if await self.api.start_rtsp_livestream(self.product_type, self.serial_no) is False:
return False

try:
await asyncio.wait_for(self.rtsp_started_event.wait(), 5)
await asyncio.wait_for(event.wait(), 5)
self.stream_debug = "info - command was done"
_LOGGER.debug(f"_initiate_start_stream - {self.stream_debug}")
return True
except asyncio.TimeoutError:
self.stream_debug = "error - command was failed"
self.stream_debug = f"error - command was failed - {event}"
_LOGGER.debug(f"_initiate_start_stream - {self.stream_debug}")
return False

try:
self.stream_status = StreamStatus.STREAMING
self.stream_debug = "info - check if rtsp url is a valid stream"
await asyncio.wait_for(self._is_stream_url_ready(), 5)
_LOGGER.debug(f"start_rtsp_livestream - 2 - try success - {self.stream_status}")
return True
except asyncio.TimeoutError:
self.stream_debug = "error - rtsp url was not a valid stream"
_LOGGER.debug("start_rtsp_livestream - 2 - try timeout")
async def start_livestream(self) -> bool:
"""Process start p2p livestream call"""
if await self._initiate_start_stream(StreamProvider.P2P) is False:
return False

self.stream_debug = "info - start ffmpeg"
_LOGGER.debug(f"start_livestream - {self.stream_debug}")
await self._start_p2p_streamer()

return await self._check_stream_url()

async def check_and_stop_livestream(self):
if self.stream_status != StreamStatus.IDLE:
await self.stop_livestream()

async def stop_livestream(self):
"""Process stop p2p livestream call"""
await self.api.stop_livestream(self.product_type, self.serial_no)

async def start_rtsp_livestream(self) -> bool:
"""Process start rtsp livestream call"""
if await self._initiate_start_stream(StreamProvider.RTSP) is False:
return False

self.stream_debug = "info - streaming"
return True
return await self._check_stream_url()

async def stop_rtsp_livestream(self):
"""Process stop rtsp livestream call"""
Expand Down Expand Up @@ -276,7 +265,6 @@ def set_stream_prodiver(self, stream_provider: StreamProvider) -> None:
self.stream_url = url.replace("{rtsp_stream_url}", self.rtsp_stream_url)

elif self.stream_provider == StreamProvider.P2P:
_LOGGER.debug(f"{self.p2p_stream_handler.port}")
url = url.replace("{serial_no}", str(self.serial_no))
url = url.replace("{server_address}", str(self.config.rtsp_server_address))
url = url.replace("{server_port}", str(self.config.rtsp_server_port))
Expand Down
3 changes: 3 additions & 0 deletions custom_components/eufy_security/eufy_security_api/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

UNSUPPORTED = "Unsupported"

STREAM_TIMEOUT_SECONDS = 15
STREAM_SLEEP_SECONDS = 0.25


class MessageField(Enum):
"""Incoming or outgoing message field types"""
Expand Down

0 comments on commit 088ac61

Please sign in to comment.