Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deepgram/clients/live/v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ async def finish(self):
await self._socket.wait_closed()
self.logger.notice("socket.wait_closed succeeded")

self._socket = None

self.logger.notice("finish succeeded")
self.logger.debug("AsyncLiveClient.finish LEAVE")

Expand Down
58 changes: 37 additions & 21 deletions deepgram/clients/live/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,8 @@ def start(self, options: LiveOptions = None, addons: dict = None, **kwargs):
self.listening.start()

# keepalive thread
self.processing = None
if self.config.options.get("keepalive") == "true":
self.logger.info("KeepAlive enabled")
self.processing = threading.Thread(target=self._processing)
self.processing.start()
self.processing = threading.Thread(target=self._processing)
self.processing.start()

self.logger.notice("start succeeded")
self.logger.debug("LiveClient.start LEAVE")
Expand All @@ -111,7 +108,7 @@ def _listening(self) -> None:
myExit = self.exit
self.lock_exit.release()
if myExit:
self.logger.notice("exiting gracefully")
self.logger.notice("_listening exiting gracefully")
self.logger.debug("LiveClient._listening LEAVE")
return

Expand Down Expand Up @@ -156,19 +153,15 @@ def _listening(self) -> None:
**dict(self.kwargs),
)
case _:
self.logger.error(
"response_type: %s, data: %s", response_type, data
)
error = ErrorResponse(
type="UnhandledMessage",
description="Unknown message type",
message=f"Unhandle message type: {response_type}",
self.logger.warning(
"Unknown Message: response_type: %s, data: %s",
response_type,
data,
)
self._emit(LiveTranscriptionEvents.Error, error=error)

except Exception as e:
if e.code == 1000:
self.logger.notice("exiting thread gracefully")
self.logger.notice("_listening(1000) exiting gracefully")
self.logger.debug("LiveClient._listening LEAVE")
return

Expand All @@ -186,25 +179,34 @@ def _listening(self) -> None:
def _processing(self) -> None:
self.logger.debug("LiveClient._processing ENTER")

counter = 0

while True:
try:
time.sleep(PING_INTERVAL)
counter += 1

self.lock_exit.acquire()
myExit = self.exit
self.lock_exit.release()
if myExit:
self.logger.notice("exiting gracefully")
self.logger.notice("_processing exiting gracefully")
self.logger.debug("LiveClient._processing LEAVE")
return

# deepgram keepalive
self.logger.debug("Sending KeepAlive...")
self.send(json.dumps({"type": "KeepAlive"}))
if self.config.options.get("keepalive") == "true":
self.logger.debug("Sending KeepAlive...")
self.send(json.dumps({"type": "KeepAlive"}))

# websocket keepalive
if counter % 4 == 0:
self.logger.debug("Sending Ping...")
self.send_ping()

except Exception as e:
if e.code == 1000:
self.logger.notice("exiting thread gracefully")
self.logger.notice("_processing(1000) exiting gracefully")
self.logger.debug("LiveClient._processing LEAVE")
return

Expand Down Expand Up @@ -239,6 +241,19 @@ def send(self, data) -> int:
self.logger.spam("LiveClient.send LEAVE")
return 0

def send_ping(self) -> None:
"""
Sends a ping over the WebSocket connection.
"""
self.logger.spam("LiveClient.send_ping ENTER")

if self._socket:
self.lock_send.acquire()
self._socket.ping()
self.lock_send.release()

self.logger.spam("LiveClient.send_ping LEAVE")

def finish(self):
"""
Closes the WebSocket connection gracefully.
Expand All @@ -247,8 +262,8 @@ def finish(self):

if self._socket:
self.logger.notice("sending CloseStream...")
self._socket.send(json.dumps({"type": "CloseStream"}))
time.sleep(1)
self.send(json.dumps({"type": "CloseStream"}))
time.sleep(0.5)

self.lock_exit.acquire()
self.logger.notice("signal exit")
Expand All @@ -271,6 +286,7 @@ def finish(self):

self._socket = None
self.lock_exit = None
self.lock_send = None

self.logger.notice("finish succeeded")
self.logger.spam("LiveClient.finish LEAVE")
5 changes: 4 additions & 1 deletion examples/streaming/microphone/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def main():
try:
# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
# config = DeepgramClientOptions(
# verbose=logging.SPAM, options={"keepalive": "true"}
# verbose=logging.DEBUG,
# options={"keepalive": "true"}
# )
# deepgram: DeepgramClient = DeepgramClient("", config)
# otherwise, use default config
Expand Down Expand Up @@ -89,6 +90,8 @@ def on_error(self, error, **kwargs):
dg_connection.finish()

print("Finished")
# sleep(30) # wait 30 seconds to see if there is any additional socket activity
# print("Really done!")

except Exception as e:
print(f"Could not open socket: {e}")
Expand Down