Skip to content

Conversation

jjmaldonis
Copy link
Contributor

@jjmaldonis jjmaldonis commented Aug 1, 2023

This PR fixes a bug where the streaming websocket could be closed and marked as completed before the socket's messages were processed.

Below is the old code:

        while not self.done:
            try:
                body = await self._socket.recv()
                self._queue.put_nowait((True, body))
            except Exception as exc:
                self.done = True # socket closed, will terminate on next loop

This code resulted in a race condition where the last message of the websocket would not always get processed before setting self.done = True. This code would put a new message in the queue for processing and then execute the next iteration of the While loop. The next iteration would raise an Exception and therefore set self.done = True. Setting self.done = True resulted in immediate completion of the websocket and the data was returned to the user before the previous iteration's body was processed.

The changes in this PR fix the race condition.

This PR also makes a small change to the exception handling. Rather than catching all websocket closures (and other any exceptions!) by using except Exception as exc, I replaced the error handling with a more explicit catch: except websockets.exceptions.ConnectionClosedOK:. This change will result in errors that are not ConnectionClosedOK bubbling up to the user. I think this is appropriate because any other websocket closures are errors that the user may want to handle or be aware of. I was unable to produce any situations where a different exception was thrown, but we may get messages from customers who find edge cases.

The code to reproduce the race condition (before this PR) is below:

import asyncio
import os

from deepgram import Deepgram

async def run_main():
    transcription_client = Deepgram(os.environ['KEY'])
    socket = await transcription_client.transcription.live()
    with open(os.environ['AUDIO'], 'rb') as audio:
        CHUNK_SIZE_BYTES = 8192
        CHUNK_RATE_SEC = 0.005
        chunk = audio.read(CHUNK_SIZE_BYTES)
        while chunk:
            socket.send(chunk)
            await asyncio.sleep(CHUNK_RATE_SEC)
            chunk = audio.read(CHUNK_SIZE_BYTES)
    await socket.finish()
    print(socket.received[-1]['request_id'])

if __name__ == '__main__':
    asyncio.run(run_main())

+

[N] (env) andrew@dg-andrew /t/tmp.e0DlxTlkJh> KEY=(secret-tool lookup deepgram internal-api-token) AUDIO=$THE_MISSILE_SHORT python main.py
097cf693-bc1e-4fb5-9214-164e650e68fa
[N] (env) andrew@dg-andrew /t/tmp.e0DlxTlkJh> KEY=(secret-tool lookup deepgram internal-api-token) AUDIO=$THE_MISSILE_SHORT python main.py
Traceback (most recent call last):
  File "/tmp/tmp.e0DlxTlkJh/main.py", line 21, in <module>
    asyncio.run(run_main())
  File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/tmp/tmp.e0DlxTlkJh/main.py", line 18, in run_main
    print(socket.received[-1]['request_id'])
          ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^
KeyError: 'request_id'

OR

import asyncio
import os
from deepgram import Deepgram


async def run_main():
    transcription_client = Deepgram(os.environ["DEEPGRAM_API_KEY"])
    socket = await transcription_client.transcription.live()
    with open("./test-audio-files/en_NatGen_Medical_DocDictation.m4a", "rb") as audio:
        CHUNK_SIZE_BYTES = 8192
        CHUNK_RATE_SEC = 0.005
        chunk = audio.read(CHUNK_SIZE_BYTES)
        while chunk:
            socket.send(chunk)
            await asyncio.sleep(CHUNK_RATE_SEC)
            chunk = audio.read(CHUNK_SIZE_BYTES)
    await socket.finish()
    # print(socket.received)
    print(socket.received[-1])
    print(socket.received[-1]["request_id"])


if __name__ == "__main__":
    asyncio.run(run_main())

…mpleted before the message queue was cleared
Copy link
Contributor

@DamienDeepgram DamienDeepgram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment

try:
body = await self._socket.recv()
self._queue.put_nowait((True, body))
except Exception as exc:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this still catch Exception after the new exception catch just in case some other error happens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think we should not catch all exceptions. My guess is that the original code caught all exceptions specifically so that the ConnectionClosedOk exception was handled correctly. Below is more info on my opinion:

This PR also makes a small change to the exception handling. Rather than catching all websocket closures (and other any exceptions!) by using except Exception as exc, I replaced the error handling with a more explicit catch: except websockets.exceptions.ConnectionClosedOK:. This change will result in errors that are not ConnectionClosedOK bubbling up to the user. I think this is appropriate because any other websocket closures are errors that the user may want to handle or be aware of. I was unable to produce any situations where a different exception was thrown, but we may get messages from customers who find edge cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we catch those other errors and log them rather than throwing them instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DamienDeepgram what makes you want to catch and log these specific errors rather than throwing them?

The new try/except clauses catches the 1011 "error" and closes the websocket cleaning when that response code is received. Deepgram returns the 1011 "error" as a notification to the user that Deepgram closed the socket due to inactivity, which the previous version of the Python SDK considered a normal piece of the workflow (i.e. not an exceptional case).

The new code handles the 1011 "error" in the same way, but raises other errors to the user. I think that makes sense because those other errors are real errors (whereas the 1011 error is a simply closure due to inactivity). But I'm curious to hear what you're thinking.

@jpvajda
Copy link
Contributor

jpvajda commented Aug 21, 2023

@DamienDeepgram @jjmaldonis are you both still working on this open PR?

@DamienDeepgram
Copy link
Contributor

@DamienDeepgram @jjmaldonis are you both still working on this open PR?

Sorry I need to get better at filtering these github alerts into my inbox and missed this.

I think if the desired approach here is to throw the errors then this is fine as is

@DamienDeepgram DamienDeepgram self-requested a review September 19, 2023 00:59
Copy link
Contributor

@DamienDeepgram DamienDeepgram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jpvajda
Copy link
Contributor

jpvajda commented Sep 19, 2023

@DamienDeepgram just to confirm was this tested locally?

@jpvajda jpvajda self-requested a review September 19, 2023 17:11
@jpvajda jpvajda self-assigned this Sep 19, 2023
@DamienDeepgram
Copy link
Contributor

@DamienDeepgram just to confirm was this tested locally?

I did not test this locally since it is an error handling code change no logic has changed just that we only catch these errors now not all errors

websockets.exceptions.ConnectionClosedOK:

@jpvajda jpvajda merged commit 8e1ca48 into deepgram:main Sep 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants