Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions deepgram/audio/microphone/microphone.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,10 @@ def __init__(
self.format = pyaudio.paInt16
self.channels = channels
self.input_device_index = input_device_index
self.push_callback_org = push_callback

self.asyncio_loop = None
self.asyncio_thread = None

if inspect.iscoroutinefunction(push_callback):
self.logger.verbose("async/await callback - wrapping")
# Run our own asyncio loop.
self.asyncio_thread = threading.Thread(target=self._start_asyncio_loop)
self.asyncio_thread.start()

self.push_callback = lambda data: asyncio.run_coroutine_threadsafe(
push_callback(data), self.asyncio_loop
).result()
else:
self.logger.verbose("regular threaded callback")
self.push_callback = push_callback

self.stream = None

def _start_asyncio_loop(self) -> None:
Expand Down Expand Up @@ -105,6 +93,19 @@ def start(self) -> bool:
stream_callback=self._callback,
)

if inspect.iscoroutinefunction(self.push_callback_org):
self.logger.verbose("async/await callback - wrapping")
# Run our own asyncio loop.
self.asyncio_thread = threading.Thread(target=self._start_asyncio_loop)
self.asyncio_thread.start()

self.push_callback = lambda data: asyncio.run_coroutine_threadsafe(
self.push_callback_org(data), self.asyncio_loop
).result()
else:
self.logger.verbose("regular threaded callback")
self.push_callback = self.push_callback_org

self.exit.clear()
self.stream.start_stream()

Expand Down Expand Up @@ -150,16 +151,17 @@ def finish(self) -> bool:
self.logger.notice("signal exit")
self.exit.set()

# Stop the stream.
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.stream = None

# clean up the thread
if self.asyncio_thread is not None:
self.asyncio_loop.call_soon_threadsafe(self.asyncio_loop.stop)
self.asyncio_thread.join() # Clean up.
self.asyncio_thread = None

self.asyncio_thread.join()
self.asyncio_thread = None
self.logger.notice("stream/recv thread joined")

self.logger.notice("finish succeeded")
Expand Down
177 changes: 104 additions & 73 deletions deepgram/clients/live/v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, config: DeepgramClientOptions):
self._socket = None
self._event_handlers = {event: [] for event in LiveTranscriptionEvents}
self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint)
self.exit_event = None

# starts the WebSocket connection for live transcription
async def start(
Expand All @@ -61,10 +62,10 @@ async def start(
**kwargs,
) -> bool:
self.logger.debug("AsyncLiveClient.start ENTER")
self.logger.info("kwargs: %s", options)
self.logger.info("options: %s", options)
self.logger.info("addons: %s", addons)
self.logger.info("members: %s", members)
self.logger.info("options: %s", kwargs)
self.logger.info("kwargs: %s", kwargs)

if isinstance(options, LiveOptions) and not options.check():
self.logger.error("options.check failed")
Expand All @@ -83,7 +84,7 @@ async def start(
if members is not None:
self.__dict__.update(members)

# add "kwargs" as members of the class
# set kwargs as members of the class
if kwargs is not None:
self.kwargs = kwargs
else:
Expand All @@ -101,6 +102,8 @@ async def start(
self.logger.debug("combined_options: %s", combined_options)

url_with_params = append_query_params(self.websocket_url, combined_options)
self.exit_event = asyncio.Event()

try:
self._socket = await _socket_connect(url_with_params, self.config.headers)

Expand Down Expand Up @@ -135,8 +138,24 @@ async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None:
async def _listening(self) -> None:
self.logger.debug("AsyncLiveClient._listening ENTER")

try:
async for message in self._socket:
while True:
try:
if self.exit_event.is_set():
self.logger.notice("_listening exiting gracefully")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

if self._socket is None:
self.logger.warning("socket is empty")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

message = await self._socket.recv()

if message is None:
self.logger.spam("message is None")
continue

data = json.loads(message)
response_type = data.get("type")
self.logger.debug("response_type: %s, data: %s", response_type, data)
Expand Down Expand Up @@ -206,48 +225,48 @@ async def _listening(self) -> None:
)
await self._emit(LiveTranscriptionEvents.Error, error=error)

except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice(f"_listening({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

except websockets.exceptions.WebSocketException as e:
error: ErrorResponse = {
"type": "Exception",
"description": "WebSocketException in _listening",
"message": f"{e}",
"variant": "",
}
self.logger.notice(
f"WebSocket exception in _listening with code {e.code}: {e.reason}"
)
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise

except Exception as e:
error: ErrorResponse = {
"type": "Exception",
"description": "Exception in _listening",
"message": f"{e}",
"variant": "",
}
self.logger.error("Exception in _listening: %s", str(e))
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise
except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice(f"_listening({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._listening LEAVE")
return

except websockets.exceptions.WebSocketException as e:
error: ErrorResponse = {
"type": "Exception",
"description": "WebSocketException in AsyncLiveClient._listening",
"message": f"{e}",
"variant": "",
}
self.logger.notice(
f"WebSocket exception in AsyncLiveClient._listening with code {e.code}: {e.reason}"
)
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise

except Exception as e:
error: ErrorResponse = {
"type": "Exception",
"description": "Exception in AsyncLiveClient._listening",
"message": f"{e}",
"variant": "",
}
self.logger.error("Exception in AsyncLiveClient._listening: %s", str(e))
await self._emit(LiveTranscriptionEvents.Error, error)

self.logger.debug("AsyncLiveClient._listening LEAVE")

if (
"termination_exception" in self.options
and self.options["termination_exception"] == "true"
):
raise

# keep the connection alive by sending keepalive messages
async def _keep_alive(self) -> None:
Expand All @@ -259,6 +278,11 @@ async def _keep_alive(self) -> None:
counter += 1
await asyncio.sleep(ONE_SECOND)

if self.exit_event.is_set():
self.logger.notice("_keep_alive exiting gracefully")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
return

if self._socket is None:
self.logger.notice("socket is None, exiting keep_alive")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
Expand All @@ -270,25 +294,22 @@ async def _keep_alive(self) -> None:
and self.config.options.get("keepalive") == "true"
):
self.logger.verbose("Sending KeepAlive...")
try:
await self.send(json.dumps({"type": "KeepAlive"}))
except websockets.exceptions.WebSocketException as e:
self.logger.error("KeepAlive failed: %s", e)
await self.send(json.dumps({"type": "KeepAlive"}))

except websockets.exceptions.ConnectionClosedOK as e:
self.logger.notice(f"_keep_alive({e.code}) exiting gracefully")
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")
return

except websockets.exceptions.ConnectionClosedError as e:
except websockets.exceptions.WebSocketException as e:
error: ErrorResponse = {
"type": "Exception",
"description": "ConnectionClosedError in _keep_alive",
"description": "WebSocketException in AsyncLiveClient._keep_alive",
"message": f"{e}",
"variant": "",
}
self.logger.error(
f"WebSocket connection closed in _keep_alive with code {e.code}: {e.reason}"
f"WebSocket connection closed in AsyncLiveClient._keep_alive with code {e.code}: {e.reason}"
)
await self._emit(LiveTranscriptionEvents.Error, error)

Expand All @@ -308,8 +329,10 @@ async def _keep_alive(self) -> None:
"message": f"{e}",
"variant": "",
}
self.logger.error(
"Exception in AsyncLiveClient._keep_alive: %s", str(e)
)
await self._emit(LiveTranscriptionEvents.Error, error)
self.logger.error("Exception in _keep_alive: %s", str(e))

self.logger.debug("AsyncLiveClient._keep_alive LEAVE")

Expand All @@ -323,31 +346,45 @@ async def _keep_alive(self) -> None:
self.logger.debug("AsyncLiveClient._keep_alive LEAVE")

# sends data over the WebSocket connection
async def send(self, data: Union[str, bytes]) -> int:
async def send(self, data: Union[str, bytes]) -> bool:
"""
Sends data over the WebSocket connection.
"""
self.logger.spam("AsyncLiveClient.send ENTER")
self.logger.spam("data: %s", data)

if self._socket is not None:
cnt = await self._socket.send(data)
self.logger.spam(f"send() succeeded. bytes: {cnt}")
try:
await self._socket.send(data)
except websockets.exceptions.WebSocketException as e:
self.logger.error("send() failed - WebSocketException: %s", str(e))
self.logger.spam("AsyncLiveClient.send LEAVE")
return False
except Exception as e:
self.logger.error("send() failed - Exception: %s", str(e))
self.logger.spam("AsyncLiveClient.send LEAVE")
return False

self.logger.spam(f"send() succeeded")
self.logger.spam("AsyncLiveClient.send LEAVE")
return cnt
return True

self.logger.error("send() failed. socket is None")
self.logger.spam("AsyncLiveClient.send LEAVE")
return 0
return False

async def finish(self) -> bool:
"""
Closes the WebSocket connection gracefully.
"""
self.logger.debug("AsyncLiveClient.finish ENTER")

if self._socket:
self.logger.notice("send CloseStream...")
# signal exit
self.exit_event.set()

# close the stream
self.logger.verbose("closing socket...")
if self._socket is not None:
self.logger.verbose("send CloseStream...")
await self._socket.send(json.dumps({"type": "CloseStream"}))

await asyncio.sleep(0.5)
Expand All @@ -358,14 +395,14 @@ async def finish(self) -> bool:
CloseResponse(type=LiveTranscriptionEvents.Close.value),
)

self.logger.notice("socket.wait_closed...")
self.logger.verbose("socket.wait_closed...")
try:
await self._socket.wait_closed()
except websockets.exceptions.WebSocketException as e:
self.logger.error("socket.wait_closed failed: %s", e)
self.logger.notice("socket.wait_closed succeeded")
self._socket = None

self.logger.notice("cancelling tasks...")
self.logger.verbose("cancelling tasks...")
try:
# Before cancelling, check if the tasks were created
if self._listen_thread is not None:
Expand All @@ -380,13 +417,7 @@ async def finish(self) -> bool:
except asyncio.CancelledError as e:
self.logger.error("tasks cancelled error: %s", e)

if self._socket is not None:
self.logger.notice("closing socket...")
await self._socket.close()

self._socket = None

self.logger.notice("finish succeeded")
self.logger.info("finish succeeded")
self.logger.debug("AsyncLiveClient.finish LEAVE")
return True

Expand Down
Loading