From 08b4da65487aa15bb427c475b33b71a3277d0d7b Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Mon, 7 Nov 2016 18:56:16 -0800 Subject: [PATCH] streaming code used to hang when it was asked to "exit" - fixed now (#635) --- speech/grpc/transcribe_streaming.py | 45 +++++++++++++++++++---------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 77a99fc779b0..8c6b84bb8fb8 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -73,12 +73,10 @@ def _audio_data_generator(buff): A chunk of data that is the aggregate of all chunks of data in `buff`. The function will block until at least one data chunk is available. """ - while True: - # Use a blocking get() to ensure there's at least one chunk of data + stop = False + while not stop: + # Use a blocking get() to ensure there's at least one chunk of data. chunk = buff.get() - if not chunk: - # A falsey value indicates the stream is closed. - break data = [chunk] # Now consume whatever other data's still buffered. @@ -87,22 +85,31 @@ def _audio_data_generator(buff): data.append(buff.get(block=False)) except queue.Empty: break + + # If `_fill_buffer` adds `None` to the buffer, the audio stream is + # closed. Yield the final bit of the buffer and exit the loop. + if None in data: + stop = True + data.remove(None) yield b''.join(data) -def _fill_buffer(audio_stream, buff, chunk): +def _fill_buffer(audio_stream, buff, chunk, stoprequest): """Continuously collect data from the audio stream, into the buffer.""" try: - while True: + while not stoprequest.is_set(): buff.put(audio_stream.read(chunk)) except IOError: - # This happens when the stream is closed. Signal that we're done. + pass + finally: + # Add `None` to the buff, indicating that a stop request is made. + # This will signal `_audio_data_generator` to exit. buff.put(None) # [START audio_stream] @contextlib.contextmanager -def record_audio(rate, chunk): +def record_audio(rate, chunk, stoprequest): """Opens a recording stream in a context manager.""" audio_interface = pyaudio.PyAudio() audio_stream = audio_interface.open( @@ -120,14 +127,13 @@ def record_audio(rate, chunk): # This is necessary so that the input device's buffer doesn't overflow # while the calling thread makes network requests, etc. fill_buffer_thread = threading.Thread( - target=_fill_buffer, args=(audio_stream, buff, chunk)) + target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest)) fill_buffer_thread.start() yield _audio_data_generator(buff) - audio_stream.stop_stream() - audio_stream.close() fill_buffer_thread.join() + audio_stream.close() audio_interface.terminate() # [END audio_stream] @@ -166,7 +172,7 @@ def request_stream(data_stream, rate, interim_results=True): yield cloud_speech.StreamingRecognizeRequest(audio_content=data) -def listen_print_loop(recognize_stream): +def listen_print_loop(recognize_stream, stoprequest): num_chars_printed = 0 for resp in recognize_stream: if resp.error.code != code_pb2.OK: @@ -198,6 +204,7 @@ def listen_print_loop(recognize_stream): # one of our keywords. if re.search(r'\b(exit|quit)\b', transcript, re.I): print('Exiting..') + stoprequest.set() break num_chars_printed = 0 @@ -206,9 +213,17 @@ def listen_print_loop(recognize_stream): def main(): with cloud_speech.beta_create_Speech_stub( make_channel('speech.googleapis.com', 443)) as service: + + # stoprequest is event object which is set in `listen_print_loop` + # to indicate that the trancsription should be stopped. + # + # The `_fill_buffer` thread checks this object, and closes + # the `audio_stream` once it's set. + stoprequest = threading.Event() + # For streaming audio from the microphone, there are three threads. # First, a thread that collects audio data as it comes in - with record_audio(RATE, CHUNK) as buffered_audio_data: + with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data: # Second, a thread that sends requests with that data requests = request_stream(buffered_audio_data, RATE) # Third, a thread that listens for transcription responses @@ -220,7 +235,7 @@ def main(): # Now, put the transcription responses to use. try: - listen_print_loop(recognize_stream) + listen_print_loop(recognize_stream, stoprequest) recognize_stream.cancel() except face.CancellationError: