Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions deepgram/clients/live/v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
)
from .options import LiveOptions

ONE_SECOND = 1
DEEPGRAM_INTERVAL = 5
PING_INTERVAL = 20


class AsyncLiveClient:
"""
Expand Down Expand Up @@ -96,6 +100,7 @@ async def start(
try:
self._socket = await _socket_connect(url_with_params, self.config.headers)
asyncio.create_task(self._start())
asyncio.create_task(self._keep_alive())

self.logger.notice("start succeeded")
self.logger.debug("AsyncLiveClient.start LEAVE")
Expand Down Expand Up @@ -219,6 +224,34 @@ async def _start(self) -> None:
):
raise

async def _keep_alive(self) -> None:
self.logger.debug("AsyncLiveClient._keep_alive ENTER")

counter = 0
while True:
counter += 1
await asyncio.sleep(ONE_SECOND)

if self._socket is None:
self.logger.notice("socket is None, exiting keep_alive")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
break

# deepgram keepalive
if (
counter % DEEPGRAM_INTERVAL == 0
and self.config.options.get("keepalive") == "true"
):
self.logger.verbose("Sending KeepAlive...")
await self.send(json.dumps({"type": "KeepAlive"}))

# protocol level ping
if counter % PING_INTERVAL == 0:
self.logger.verbose("Sending Protocol Ping...")
await self._socket.ping()

self.logger.debug("AsyncLiveClient._keep_alive LEAVE")

async def send(self, data):
"""
Sends data over the WebSocket connection.
Expand Down
41 changes: 23 additions & 18 deletions deepgram/clients/live/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
)
from .options import LiveOptions

PING_INTERVAL = 5
ONE_SECOND = 1
DEEPGRAM_INTERVAL = 5
PING_INTERVAL = 20


class LiveClient:
Expand Down Expand Up @@ -112,7 +114,7 @@ def start(
self.listening.start()

# keepalive thread
self.processing = threading.Thread(target=self._processing)
self.processing = threading.Thread(target=self._keep_alive)
self.processing.start()

self.logger.notice("start succeeded")
Expand Down Expand Up @@ -254,47 +256,50 @@ def _listening(self) -> None:
raise
return

def _processing(self) -> None:
self.logger.debug("LiveClient._processing ENTER")
def _keep_alive(self) -> None:
self.logger.debug("LiveClient._keep_alive ENTER")

counter = 0

while True:
try:
time.sleep(PING_INTERVAL)
counter += 1
time.sleep(ONE_SECOND)

with self.lock_exit:
myExit = self.exit

if myExit:
self.logger.notice("_processing exiting gracefully")
self.logger.debug("LiveClient._processing LEAVE")
self.logger.notice("_keep_alive exiting gracefully")
self.logger.debug("LiveClient._keep_alive LEAVE")
return

# deepgram keepalive
if self.config.options.get("keepalive") == "true":
self.logger.debug("Sending KeepAlive...")
if (
counter % DEEPGRAM_INTERVAL == 0
and self.config.options.get("keepalive") == "true"
):
self.logger.verbose("Sending KeepAlive...")
self.send(json.dumps({"type": "KeepAlive"}))

# websocket keepalive
if counter % 4 == 0:
self.logger.debug("Sending Ping...")
if counter % PING_INTERVAL == 0:
self.logger.verbose("Sending Protocol Ping...")
self.send_ping()

except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice("_processing({e.code}) exiting gracefully")
self.logger.notice("_keep_alive({e.code}) exiting gracefully")

# signal exit and close
self.signal_exit()

self.logger.debug("LiveClient._processing LEAVE")
self.logger.debug("LiveClient._keep_alive LEAVE")
return

except websockets.exceptions.ConnectionClosedError as e:
error: ErrorResponse = {
"type": "Exception",
"description": "ConnectionClosedError in _processing",
"description": "ConnectionClosedError in _keep_alive",
"message": f"{e}",
"variant": "",
}
Expand All @@ -306,7 +311,7 @@ def _processing(self) -> None:
# signal exit and close
self.signal_exit()

self.logger.debug("LiveClient._processing LEAVE")
self.logger.debug("LiveClient._keep_alive LEAVE")

if (
"termination_exception" in self.options
Expand All @@ -318,17 +323,17 @@ def _processing(self) -> None:
except Exception as e:
error: ErrorResponse = {
"type": "Exception",
"description": "Exception in _processing",
"description": "Exception in _keep_alive",
"message": f"{e}",
"variant": "",
}
self._emit(LiveTranscriptionEvents.Error, error)
self.logger.error("Exception in _processing: %s", str(e))
self.logger.error("Exception in _keep_alive: %s", str(e))

# signal exit and close
self.signal_exit()

self.logger.debug("LiveClient._processing LEAVE")
self.logger.debug("LiveClient._keep_alive LEAVE")

if (
"termination_exception" in self.options
Expand Down
16 changes: 14 additions & 2 deletions examples/streaming/async_http/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,29 @@
import os
from dotenv import load_dotenv

from deepgram import DeepgramClient, LiveTranscriptionEvents, LiveOptions
from deepgram import (
DeepgramClient,
DeepgramClientOptions,
LiveTranscriptionEvents,
LiveOptions,
)

load_dotenv()

API_KEY = os.getenv("DG_API_KEY")
API_KEY = os.getenv("DEEPGRAM_API_KEY")

# URL for the realtime streaming audio you would like to transcribe
URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"


async def main():
# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
# config = DeepgramClientOptions(
# verbose=logging.DEBUG,
# options={"keepalive": "true"}
# )
# deepgram: DeepgramClient = DeepgramClient(API_KEY, config)
# otherwise, use default config
deepgram = DeepgramClient(API_KEY)

# Create a websocket connection to Deepgram
Expand Down
9 changes: 3 additions & 6 deletions examples/streaming/async_microphone/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
async def main():
try:
# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
# config = DeepgramClientOptions(
# verbose=logging.DEBUG,
# options={"keepalive": "true"}
# )
# deepgram: DeepgramClient = DeepgramClient("", config)
config = DeepgramClientOptions(options={"keepalive": "true"})
deepgram: DeepgramClient = DeepgramClient("", config)
# otherwise, use default config
deepgram = DeepgramClient()
# deepgram = DeepgramClient()

dg_connection = deepgram.listen.asynclive.v("1")

Expand Down