Skip to content

Commit

Permalink
feat: better P2P with audio
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatakgun committed Nov 29, 2023
1 parent af7eacf commit ff23d2e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ async def check_live_stream(self):

async def start_livestream(self) -> bool:
"""Process start p2p livestream call"""
self.stream_future = asyncio.create_task(self.p2p_streamer.start())
if await self._initiate_start_stream(StreamProvider.P2P) is False:
return False
self.stream_future = asyncio.create_task(self.p2p_streamer.start())
self.stream_checker = asyncio.create_task(self.check_live_stream())
self.stream_status = StreamStatus.STREAMING
return True
Expand Down
20 changes: 9 additions & 11 deletions custom_components/eufy_security/eufy_security_api/p2p_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,13 @@ def __init__(self, camera) -> None:
async def chunk_generator(self, queue, queue_name):
retry_count = 0
max_retry_count = 10
try:
await asyncio.wait_for(self.camera.p2p_started_event.wait(), 5)
except asyncio.TimeoutError as te:
_LOGGER.debug(f"chunk_generator {queue_name} - event did not receive in timeout")
raise te

while retry_count < max_retry_count:
try:
item = queue.popleft()
_LOGGER.debug(f"chunk_generator {queue_name} yield data {retry_count} - {len(item)}")
#_LOGGER.debug(f"chunk_generator {queue_name} yield data {retry_count} - {len(item)}")
retry_count = 0
yield item
except IndexError as qe:
except IndexError:
retry_count = retry_count + 1
await asyncio.sleep(0.1)

Expand Down Expand Up @@ -64,7 +58,7 @@ async def write_bytes(self, queue, queue_name):
_LOGGER.debug(f"write_bytes {queue_name} general exception no retry {ex} - traceback: {traceback.format_exc()}")
self.retry = False

_LOGGER.debug("write_bytes {queue_name} - ended")
_LOGGER.debug(f"write_bytes {queue_name} - ended")

async def create_stream_on_go2rtc(self):
parameters = {"name": str(self.camera.serial_no)}
Expand All @@ -83,11 +77,15 @@ async def create_stream_on_go2rtc(self):
result = response.status, await response.text()
_LOGGER.debug(f"create_stream_on_go2rtc - put stream response {result}")

def run(self, queue, name):
asyncio.run(self.write_bytes(queue, name))

async def start(self):
"""start streaming thread"""
# send API command to go2rtc to create a new stream
self.retry = None
await self.create_stream_on_go2rtc()
await asyncio.gather(
self.write_bytes(self.camera.audio_queue, "audio"),
self.write_bytes(self.camera.video_queue, "video")
asyncio.to_thread(self.run, self.camera.audio_queue, "audio"),
asyncio.to_thread(self.run, self.camera.video_queue, "video")
)

0 comments on commit ff23d2e

Please sign in to comment.