diff --git a/deepgram/clients/live/v1/async_client.py b/deepgram/clients/live/v1/async_client.py index e21bd302..63344591 100644 --- a/deepgram/clients/live/v1/async_client.py +++ b/deepgram/clients/live/v1/async_client.py @@ -162,6 +162,8 @@ async def finish(self): await self._socket.wait_closed() self.logger.notice("socket.wait_closed succeeded") + self._socket = None + self.logger.notice("finish succeeded") self.logger.debug("AsyncLiveClient.finish LEAVE") diff --git a/deepgram/clients/live/v1/client.py b/deepgram/clients/live/v1/client.py index 22266732..7f2c2fc1 100644 --- a/deepgram/clients/live/v1/client.py +++ b/deepgram/clients/live/v1/client.py @@ -81,11 +81,8 @@ def start(self, options: LiveOptions = None, addons: dict = None, **kwargs): self.listening.start() # keepalive thread - self.processing = None - if self.config.options.get("keepalive") == "true": - self.logger.info("KeepAlive enabled") - self.processing = threading.Thread(target=self._processing) - self.processing.start() + self.processing = threading.Thread(target=self._processing) + self.processing.start() self.logger.notice("start succeeded") self.logger.debug("LiveClient.start LEAVE") @@ -111,7 +108,7 @@ def _listening(self) -> None: myExit = self.exit self.lock_exit.release() if myExit: - self.logger.notice("exiting gracefully") + self.logger.notice("_listening exiting gracefully") self.logger.debug("LiveClient._listening LEAVE") return @@ -156,19 +153,15 @@ def _listening(self) -> None: **dict(self.kwargs), ) case _: - self.logger.error( - "response_type: %s, data: %s", response_type, data - ) - error = ErrorResponse( - type="UnhandledMessage", - description="Unknown message type", - message=f"Unhandle message type: {response_type}", + self.logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, ) - self._emit(LiveTranscriptionEvents.Error, error=error) except Exception as e: if e.code == 1000: - self.logger.notice("exiting thread gracefully") + self.logger.notice("_listening(1000) exiting gracefully") self.logger.debug("LiveClient._listening LEAVE") return @@ -186,25 +179,34 @@ def _listening(self) -> None: def _processing(self) -> None: self.logger.debug("LiveClient._processing ENTER") + counter = 0 + while True: try: time.sleep(PING_INTERVAL) + counter += 1 self.lock_exit.acquire() myExit = self.exit self.lock_exit.release() if myExit: - self.logger.notice("exiting gracefully") + self.logger.notice("_processing exiting gracefully") self.logger.debug("LiveClient._processing LEAVE") return # deepgram keepalive - self.logger.debug("Sending KeepAlive...") - self.send(json.dumps({"type": "KeepAlive"})) + if self.config.options.get("keepalive") == "true": + self.logger.debug("Sending KeepAlive...") + self.send(json.dumps({"type": "KeepAlive"})) + + # websocket keepalive + if counter % 4 == 0: + self.logger.debug("Sending Ping...") + self.send_ping() except Exception as e: if e.code == 1000: - self.logger.notice("exiting thread gracefully") + self.logger.notice("_processing(1000) exiting gracefully") self.logger.debug("LiveClient._processing LEAVE") return @@ -239,6 +241,19 @@ def send(self, data) -> int: self.logger.spam("LiveClient.send LEAVE") return 0 + def send_ping(self) -> None: + """ + Sends a ping over the WebSocket connection. + """ + self.logger.spam("LiveClient.send_ping ENTER") + + if self._socket: + self.lock_send.acquire() + self._socket.ping() + self.lock_send.release() + + self.logger.spam("LiveClient.send_ping LEAVE") + def finish(self): """ Closes the WebSocket connection gracefully. @@ -247,8 +262,8 @@ def finish(self): if self._socket: self.logger.notice("sending CloseStream...") - self._socket.send(json.dumps({"type": "CloseStream"})) - time.sleep(1) + self.send(json.dumps({"type": "CloseStream"})) + time.sleep(0.5) self.lock_exit.acquire() self.logger.notice("signal exit") @@ -271,6 +286,7 @@ def finish(self): self._socket = None self.lock_exit = None + self.lock_send = None self.logger.notice("finish succeeded") self.logger.spam("LiveClient.finish LEAVE") diff --git a/examples/streaming/microphone/main.py b/examples/streaming/microphone/main.py index 2efb0881..7e26fd3a 100644 --- a/examples/streaming/microphone/main.py +++ b/examples/streaming/microphone/main.py @@ -22,7 +22,8 @@ def main(): try: # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM # config = DeepgramClientOptions( - # verbose=logging.SPAM, options={"keepalive": "true"} + # verbose=logging.DEBUG, + # options={"keepalive": "true"} # ) # deepgram: DeepgramClient = DeepgramClient("", config) # otherwise, use default config @@ -89,6 +90,8 @@ def on_error(self, error, **kwargs): dg_connection.finish() print("Finished") + # sleep(30) # wait 30 seconds to see if there is any additional socket activity + # print("Really done!") except Exception as e: print(f"Could not open socket: {e}")