Skip to content

Commit

Permalink
feat: p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatakgun committed Nov 25, 2023
1 parent 2143026 commit 55bf0c7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
8 changes: 8 additions & 0 deletions custom_components/eufy_security/eufy_security_api/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def __init__(self, api, serial_no: str, properties: dict, metadata: dict, comman
self.voices = voices
self.image_last_updated = None

self.stream_future = None

self.p2p_streamer = P2PStreamer(self)

if self.is_rtsp_enabled is True:
Expand Down Expand Up @@ -136,10 +138,16 @@ async def start_livestream(self) -> bool:
if await self._initiate_start_stream(StreamProvider.P2P) is False:
return False

self.stream_future = asyncio.get_running_loop().create_future()
self.stream_future.add_done_callback(self.sync_check_and_stop_livestream)
await self.p2p_streamer.start()
self.stream_status = StreamStatus.STREAMING
return True

def sync_check_and_stop_livestream(self, future):
retry = future.result()
return asyncio.run_coroutine_threadsafe(self.check_and_stop_livestream(retry), asyncio.get_running_loop()).result()

async def check_and_stop_livestream(self, retry):
_LOGGER.debug(f"check_and_stop_livestream - start - {retry}")
if self.stream_status != StreamStatus.IDLE:
Expand Down
20 changes: 7 additions & 13 deletions custom_components/eufy_security/eufy_security_api/p2p_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class P2PStreamer:

def __init__(self, camera) -> None:
self.camera = camera
self.completed_future = None


async def chunk_generator(self, queue):
retry = 0
Expand All @@ -38,7 +36,7 @@ async def chunk_generator(self, queue):
retry = retry + 1
await asyncio.sleep(0.1)

async def write_bytes(self, queue, completed_future):
async def write_bytes(self, queue, future):
url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT)
url = f"{url}?dst={str(self.camera.serial_no)}"

Expand All @@ -48,21 +46,23 @@ async def write_bytes(self, queue, completed_future):
resp = await session.post(url, data = self.chunk_generator(queue), timeout=aiohttp.ClientTimeout(total=None, connect=5))
_LOGGER.debug(f"write_bytes - post response - {resp.status} - {await resp.text()}")
_LOGGER.debug("write_bytes - post ended - retry")
retry = True
#retry = True

except (asyncio.exceptions.TimeoutError, asyncio.exceptions.CancelledError) as ex:
# live stream probabaly stopped, handle peacefully
_LOGGER.debug(f"write_bytes timeout/cancelled exception %s - traceback: %s", ex, traceback.format_exc())
retry = False
except aiohttp.client_exceptions.ServerDisconnectedError as ex:
# connection to go2rtc server is broken, try again
_LOGGER.debug(f"write_bytes server_disconnected exception %s - traceback: %s", ex, traceback.format_exc())
retry = True
except Exception as ex: # pylint: disable=broad-except
# other exceptions, log the error
_LOGGER.debug(f"write_bytes general exception %s - traceback: %s", ex, traceback.format_exc())
retry = False

_LOGGER.debug("write_bytes - ended")
completed_future.get_loop().call_soon_threadsafe(completed_future.set_result, retry)
future.get_loop().call_soon_threadsafe(future.set_result, retry)

async def create_stream_on_go2rtc(self):
parameters = {"name": str(self.camera.serial_no), "src": str(self.camera.serial_no)}
Expand All @@ -76,17 +76,11 @@ async def create_stream_on_go2rtc(self):
def p2p_worker(self):
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop.run_until_complete(self.write_bytes(self.camera.video_queue, self.completed_future))
new_loop.run_until_complete(self.write_bytes(self.camera.video_queue, self.camera.stream_future))

async def start(self):
"""start streaming thread"""
# send API command to go2rtc to create a new stream
await self.create_stream_on_go2rtc()
self.completed_future = asyncio.get_running_loop().create_future()
p2p_thread = threading.Thread(target=self.p2p_worker, daemon=True)
p2p_thread.start()
_LOGGER.debug(f"start - {self.completed_future}")

await asyncio.wait_for(self.completed_future, timeout=None)
retry = self.completed_future.result()
await self.camera.check_and_stop_livestream(retry)
p2p_thread.start()

0 comments on commit 55bf0c7

Please sign in to comment.