From 5a8e12f4ad5d2b8d103cd90fc45fdf39babc793e Mon Sep 17 00:00:00 2001 From: Jerjou Date: Fri, 11 Nov 2016 11:10:20 -0800 Subject: [PATCH] Use pyaudio's built-in async api. (#645) --- speech/grpc/transcribe_streaming.py | 71 +++++++++++------------- speech/grpc/transcribe_streaming_test.py | 37 ++++++------ 2 files changed, 51 insertions(+), 57 deletions(-) diff --git a/speech/grpc/transcribe_streaming.py b/speech/grpc/transcribe_streaming.py index 8c6b84bb8fb8..27dd9c87b791 100644 --- a/speech/grpc/transcribe_streaming.py +++ b/speech/grpc/transcribe_streaming.py @@ -17,10 +17,10 @@ from __future__ import division import contextlib +import functools import re import signal import sys -import threading from google.cloud import credentials from google.cloud.speech.v1beta1 import cloud_speech_pb2 as cloud_speech @@ -76,8 +76,7 @@ def _audio_data_generator(buff): stop = False while not stop: # Use a blocking get() to ensure there's at least one chunk of data. - chunk = buff.get() - data = [chunk] + data = [buff.get()] # Now consume whatever other data's still buffered. while True: @@ -86,31 +85,28 @@ def _audio_data_generator(buff): except queue.Empty: break - # If `_fill_buffer` adds `None` to the buffer, the audio stream is - # closed. Yield the final bit of the buffer and exit the loop. + # `None` in the buffer signals that the audio stream is closed. Yield + # the final bit of the buffer and exit the loop. if None in data: stop = True data.remove(None) + yield b''.join(data) -def _fill_buffer(audio_stream, buff, chunk, stoprequest): +def _fill_buffer(buff, in_data, frame_count, time_info, status_flags): """Continuously collect data from the audio stream, into the buffer.""" - try: - while not stoprequest.is_set(): - buff.put(audio_stream.read(chunk)) - except IOError: - pass - finally: - # Add `None` to the buff, indicating that a stop request is made. - # This will signal `_audio_data_generator` to exit. - buff.put(None) + buff.put(in_data) + return None, pyaudio.paContinue # [START audio_stream] @contextlib.contextmanager -def record_audio(rate, chunk, stoprequest): +def record_audio(rate, chunk): """Opens a recording stream in a context manager.""" + # Create a thread-safe buffer of audio data + buff = queue.Queue() + audio_interface = pyaudio.PyAudio() audio_stream = audio_interface.open( format=pyaudio.paInt16, @@ -118,22 +114,18 @@ def record_audio(rate, chunk, stoprequest): # https://goo.gl/z757pE channels=1, rate=rate, input=True, frames_per_buffer=chunk, + # Run the audio stream asynchronously to fill the buffer object. + # This is necessary so that the input device's buffer doesn't overflow + # while the calling thread makes network requests, etc. + stream_callback=functools.partial(_fill_buffer, buff), ) - # Create a thread-safe buffer of audio data - buff = queue.Queue() - - # Spin up a separate thread to buffer audio data from the microphone - # This is necessary so that the input device's buffer doesn't overflow - # while the calling thread makes network requests, etc. - fill_buffer_thread = threading.Thread( - target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest)) - fill_buffer_thread.start() - yield _audio_data_generator(buff) - fill_buffer_thread.join() + audio_stream.stop_stream() audio_stream.close() + # Signal the _audio_data_generator to finish + buff.put(None) audio_interface.terminate() # [END audio_stream] @@ -172,7 +164,17 @@ def request_stream(data_stream, rate, interim_results=True): yield cloud_speech.StreamingRecognizeRequest(audio_content=data) -def listen_print_loop(recognize_stream, stoprequest): +def listen_print_loop(recognize_stream): + """Iterates through server responses and prints them. + + The recognize_stream passed is a generator that will block until a response + is provided by the server. When the transcription response comes, print it. + + In this case, responses are provided for interim results as well. If the + response is an interim one, print a line feed at the end of it, to allow + the next result to overwrite it, until the response is a final one. For the + final one, print a newline to preserve the finalized transcription. + """ num_chars_printed = 0 for resp in recognize_stream: if resp.error.code != code_pb2.OK: @@ -204,7 +206,6 @@ def listen_print_loop(recognize_stream, stoprequest): # one of our keywords. if re.search(r'\b(exit|quit)\b', transcript, re.I): print('Exiting..') - stoprequest.set() break num_chars_printed = 0 @@ -213,17 +214,9 @@ def listen_print_loop(recognize_stream, stoprequest): def main(): with cloud_speech.beta_create_Speech_stub( make_channel('speech.googleapis.com', 443)) as service: - - # stoprequest is event object which is set in `listen_print_loop` - # to indicate that the trancsription should be stopped. - # - # The `_fill_buffer` thread checks this object, and closes - # the `audio_stream` once it's set. - stoprequest = threading.Event() - # For streaming audio from the microphone, there are three threads. # First, a thread that collects audio data as it comes in - with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data: + with record_audio(RATE, CHUNK) as buffered_audio_data: # Second, a thread that sends requests with that data requests = request_stream(buffered_audio_data, RATE) # Third, a thread that listens for transcription responses @@ -235,7 +228,7 @@ def main(): # Now, put the transcription responses to use. try: - listen_print_loop(recognize_stream, stoprequest) + listen_print_loop(recognize_stream) recognize_stream.cancel() except face.CancellationError: diff --git a/speech/grpc/transcribe_streaming_test.py b/speech/grpc/transcribe_streaming_test.py index 51ea3fecdd0c..d5b3c0d07d09 100644 --- a/speech/grpc/transcribe_streaming_test.py +++ b/speech/grpc/transcribe_streaming_test.py @@ -12,6 +12,7 @@ # limitations under the License. import re +import threading import time import transcribe_streaming @@ -24,12 +25,16 @@ def __init__(self, audio_filename): def __call__(self, *args): return self - def open(self, *args, **kwargs): - self.audio_file = open(self.audio_filename, 'rb') + def open(self, stream_callback, *args, **kwargs): + self.closed = threading.Event() + self.stream_thread = threading.Thread( + target=self.stream_audio, args=( + self.audio_filename, stream_callback, self.closed)) + self.stream_thread.start() return self def close(self): - self.audio_file.close() + self.closed.set() def stop_stream(self): pass @@ -37,21 +42,17 @@ def stop_stream(self): def terminate(self): pass - def read(self, num_frames): - if self.audio_file.closed: - raise IOError() - # Approximate realtime by sleeping for the appropriate time for the - # requested number of frames - time.sleep(num_frames / float(transcribe_streaming.RATE)) - # audio is 16-bit samples, whereas python byte is 8-bit - num_bytes = 2 * num_frames - try: - chunk = self.audio_file.read(num_bytes) - except ValueError: - raise IOError() - if not chunk: - raise IOError() - return chunk + @staticmethod + def stream_audio(audio_filename, callback, closed, num_frames=512): + with open(audio_filename, 'rb') as audio_file: + while not closed.is_set(): + # Approximate realtime by sleeping for the appropriate time for + # the requested number of frames + time.sleep(num_frames / float(transcribe_streaming.RATE)) + # audio is 16-bit samples, whereas python byte is 8-bit + num_bytes = 2 * num_frames + chunk = audio_file.read(num_bytes) or b'\0' * num_bytes + callback(chunk, None, None, None) def test_main(resource, monkeypatch, capsys):