diff --git a/deepgram/clients/live/v1/async_client.py b/deepgram/clients/live/v1/async_client.py index 36aac098..9f228d7b 100644 --- a/deepgram/clients/live/v1/async_client.py +++ b/deepgram/clients/live/v1/async_client.py @@ -20,6 +20,10 @@ ) from .options import LiveOptions +ONE_SECOND = 1 +DEEPGRAM_INTERVAL = 5 +PING_INTERVAL = 20 + class AsyncLiveClient: """ @@ -96,6 +100,7 @@ async def start( try: self._socket = await _socket_connect(url_with_params, self.config.headers) asyncio.create_task(self._start()) + asyncio.create_task(self._keep_alive()) self.logger.notice("start succeeded") self.logger.debug("AsyncLiveClient.start LEAVE") @@ -219,6 +224,34 @@ async def _start(self) -> None: ): raise + async def _keep_alive(self) -> None: + self.logger.debug("AsyncLiveClient._keep_alive ENTER") + + counter = 0 + while True: + counter += 1 + await asyncio.sleep(ONE_SECOND) + + if self._socket is None: + self.logger.notice("socket is None, exiting keep_alive") + self.logger.debug("AsyncLiveClient._keep_alive LEAVE") + break + + # deepgram keepalive + if ( + counter % DEEPGRAM_INTERVAL == 0 + and self.config.options.get("keepalive") == "true" + ): + self.logger.verbose("Sending KeepAlive...") + await self.send(json.dumps({"type": "KeepAlive"})) + + # protocol level ping + if counter % PING_INTERVAL == 0: + self.logger.verbose("Sending Protocol Ping...") + await self._socket.ping() + + self.logger.debug("AsyncLiveClient._keep_alive LEAVE") + async def send(self, data): """ Sends data over the WebSocket connection. diff --git a/deepgram/clients/live/v1/client.py b/deepgram/clients/live/v1/client.py index 6bf07b5e..fdbd7669 100644 --- a/deepgram/clients/live/v1/client.py +++ b/deepgram/clients/live/v1/client.py @@ -22,7 +22,9 @@ ) from .options import LiveOptions -PING_INTERVAL = 5 +ONE_SECOND = 1 +DEEPGRAM_INTERVAL = 5 +PING_INTERVAL = 20 class LiveClient: @@ -112,7 +114,7 @@ def start( self.listening.start() # keepalive thread - self.processing = threading.Thread(target=self._processing) + self.processing = threading.Thread(target=self._keep_alive) self.processing.start() self.logger.notice("start succeeded") @@ -254,47 +256,50 @@ def _listening(self) -> None: raise return - def _processing(self) -> None: - self.logger.debug("LiveClient._processing ENTER") + def _keep_alive(self) -> None: + self.logger.debug("LiveClient._keep_alive ENTER") counter = 0 while True: try: - time.sleep(PING_INTERVAL) counter += 1 + time.sleep(ONE_SECOND) with self.lock_exit: myExit = self.exit if myExit: - self.logger.notice("_processing exiting gracefully") - self.logger.debug("LiveClient._processing LEAVE") + self.logger.notice("_keep_alive exiting gracefully") + self.logger.debug("LiveClient._keep_alive LEAVE") return # deepgram keepalive - if self.config.options.get("keepalive") == "true": - self.logger.debug("Sending KeepAlive...") + if ( + counter % DEEPGRAM_INTERVAL == 0 + and self.config.options.get("keepalive") == "true" + ): + self.logger.verbose("Sending KeepAlive...") self.send(json.dumps({"type": "KeepAlive"})) # websocket keepalive - if counter % 4 == 0: - self.logger.debug("Sending Ping...") + if counter % PING_INTERVAL == 0: + self.logger.verbose("Sending Protocol Ping...") self.send_ping() except websockets.exceptions.ConnectionClosedOK as e: - self.logger.notice("_processing({e.code}) exiting gracefully") + self.logger.notice("_keep_alive({e.code}) exiting gracefully") # signal exit and close self.signal_exit() - self.logger.debug("LiveClient._processing LEAVE") + self.logger.debug("LiveClient._keep_alive LEAVE") return except websockets.exceptions.ConnectionClosedError as e: error: ErrorResponse = { "type": "Exception", - "description": "ConnectionClosedError in _processing", + "description": "ConnectionClosedError in _keep_alive", "message": f"{e}", "variant": "", } @@ -306,7 +311,7 @@ def _processing(self) -> None: # signal exit and close self.signal_exit() - self.logger.debug("LiveClient._processing LEAVE") + self.logger.debug("LiveClient._keep_alive LEAVE") if ( "termination_exception" in self.options @@ -318,17 +323,17 @@ def _processing(self) -> None: except Exception as e: error: ErrorResponse = { "type": "Exception", - "description": "Exception in _processing", + "description": "Exception in _keep_alive", "message": f"{e}", "variant": "", } self._emit(LiveTranscriptionEvents.Error, error) - self.logger.error("Exception in _processing: %s", str(e)) + self.logger.error("Exception in _keep_alive: %s", str(e)) # signal exit and close self.signal_exit() - self.logger.debug("LiveClient._processing LEAVE") + self.logger.debug("LiveClient._keep_alive LEAVE") if ( "termination_exception" in self.options diff --git a/examples/streaming/async_http/main.py b/examples/streaming/async_http/main.py index 388c2d01..a3118f9c 100644 --- a/examples/streaming/async_http/main.py +++ b/examples/streaming/async_http/main.py @@ -7,17 +7,29 @@ import os from dotenv import load_dotenv -from deepgram import DeepgramClient, LiveTranscriptionEvents, LiveOptions +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + LiveTranscriptionEvents, + LiveOptions, +) load_dotenv() -API_KEY = os.getenv("DG_API_KEY") +API_KEY = os.getenv("DEEPGRAM_API_KEY") # URL for the realtime streaming audio you would like to transcribe URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service" async def main(): + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config = DeepgramClientOptions( + # verbose=logging.DEBUG, + # options={"keepalive": "true"} + # ) + # deepgram: DeepgramClient = DeepgramClient(API_KEY, config) + # otherwise, use default config deepgram = DeepgramClient(API_KEY) # Create a websocket connection to Deepgram diff --git a/examples/streaming/async_microphone/main.py b/examples/streaming/async_microphone/main.py index 55f520a4..375f07c5 100644 --- a/examples/streaming/async_microphone/main.py +++ b/examples/streaming/async_microphone/main.py @@ -21,13 +21,10 @@ async def main(): try: # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM - # config = DeepgramClientOptions( - # verbose=logging.DEBUG, - # options={"keepalive": "true"} - # ) - # deepgram: DeepgramClient = DeepgramClient("", config) + config = DeepgramClientOptions(options={"keepalive": "true"}) + deepgram: DeepgramClient = DeepgramClient("", config) # otherwise, use default config - deepgram = DeepgramClient() + # deepgram = DeepgramClient() dg_connection = deepgram.listen.asynclive.v("1")