Skip to content

Commit

Permalink
Use pyaudio's built-in async api. (#645)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerjou authored and Jon Wayne Parrott committed Nov 11, 2016
1 parent d7c851e commit 5a8e12f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 57 deletions.
71 changes: 32 additions & 39 deletions speech/grpc/transcribe_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
from __future__ import division

import contextlib
import functools
import re
import signal
import sys
import threading

from google.cloud import credentials
from google.cloud.speech.v1beta1 import cloud_speech_pb2 as cloud_speech
Expand Down Expand Up @@ -76,8 +76,7 @@ def _audio_data_generator(buff):
stop = False
while not stop:
# Use a blocking get() to ensure there's at least one chunk of data.
chunk = buff.get()
data = [chunk]
data = [buff.get()]

# Now consume whatever other data's still buffered.
while True:
Expand All @@ -86,54 +85,47 @@ def _audio_data_generator(buff):
except queue.Empty:
break

# If `_fill_buffer` adds `None` to the buffer, the audio stream is
# closed. Yield the final bit of the buffer and exit the loop.
# `None` in the buffer signals that the audio stream is closed. Yield
# the final bit of the buffer and exit the loop.
if None in data:
stop = True
data.remove(None)

yield b''.join(data)


def _fill_buffer(audio_stream, buff, chunk, stoprequest):
def _fill_buffer(buff, in_data, frame_count, time_info, status_flags):
"""Continuously collect data from the audio stream, into the buffer."""
try:
while not stoprequest.is_set():
buff.put(audio_stream.read(chunk))
except IOError:
pass
finally:
# Add `None` to the buff, indicating that a stop request is made.
# This will signal `_audio_data_generator` to exit.
buff.put(None)
buff.put(in_data)
return None, pyaudio.paContinue


# [START audio_stream]
@contextlib.contextmanager
def record_audio(rate, chunk, stoprequest):
def record_audio(rate, chunk):
"""Opens a recording stream in a context manager."""
# Create a thread-safe buffer of audio data
buff = queue.Queue()

audio_interface = pyaudio.PyAudio()
audio_stream = audio_interface.open(
format=pyaudio.paInt16,
# The API currently only supports 1-channel (mono) audio
# https://goo.gl/z757pE
channels=1, rate=rate,
input=True, frames_per_buffer=chunk,
# Run the audio stream asynchronously to fill the buffer object.
# This is necessary so that the input device's buffer doesn't overflow
# while the calling thread makes network requests, etc.
stream_callback=functools.partial(_fill_buffer, buff),
)

# Create a thread-safe buffer of audio data
buff = queue.Queue()

# Spin up a separate thread to buffer audio data from the microphone
# This is necessary so that the input device's buffer doesn't overflow
# while the calling thread makes network requests, etc.
fill_buffer_thread = threading.Thread(
target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest))
fill_buffer_thread.start()

yield _audio_data_generator(buff)

fill_buffer_thread.join()
audio_stream.stop_stream()
audio_stream.close()
# Signal the _audio_data_generator to finish
buff.put(None)
audio_interface.terminate()
# [END audio_stream]

Expand Down Expand Up @@ -172,7 +164,17 @@ def request_stream(data_stream, rate, interim_results=True):
yield cloud_speech.StreamingRecognizeRequest(audio_content=data)


def listen_print_loop(recognize_stream, stoprequest):
def listen_print_loop(recognize_stream):
"""Iterates through server responses and prints them.
The recognize_stream passed is a generator that will block until a response
is provided by the server. When the transcription response comes, print it.
In this case, responses are provided for interim results as well. If the
response is an interim one, print a line feed at the end of it, to allow
the next result to overwrite it, until the response is a final one. For the
final one, print a newline to preserve the finalized transcription.
"""
num_chars_printed = 0
for resp in recognize_stream:
if resp.error.code != code_pb2.OK:
Expand Down Expand Up @@ -204,7 +206,6 @@ def listen_print_loop(recognize_stream, stoprequest):
# one of our keywords.
if re.search(r'\b(exit|quit)\b', transcript, re.I):
print('Exiting..')
stoprequest.set()
break

num_chars_printed = 0
Expand All @@ -213,17 +214,9 @@ def listen_print_loop(recognize_stream, stoprequest):
def main():
with cloud_speech.beta_create_Speech_stub(
make_channel('speech.googleapis.com', 443)) as service:

# stoprequest is event object which is set in `listen_print_loop`
# to indicate that the trancsription should be stopped.
#
# The `_fill_buffer` thread checks this object, and closes
# the `audio_stream` once it's set.
stoprequest = threading.Event()

# For streaming audio from the microphone, there are three threads.
# First, a thread that collects audio data as it comes in
with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data:
with record_audio(RATE, CHUNK) as buffered_audio_data:
# Second, a thread that sends requests with that data
requests = request_stream(buffered_audio_data, RATE)
# Third, a thread that listens for transcription responses
Expand All @@ -235,7 +228,7 @@ def main():

# Now, put the transcription responses to use.
try:
listen_print_loop(recognize_stream, stoprequest)
listen_print_loop(recognize_stream)

recognize_stream.cancel()
except face.CancellationError:
Expand Down
37 changes: 19 additions & 18 deletions speech/grpc/transcribe_streaming_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

import re
import threading
import time

import transcribe_streaming
Expand All @@ -24,34 +25,34 @@ def __init__(self, audio_filename):
def __call__(self, *args):
return self

def open(self, *args, **kwargs):
self.audio_file = open(self.audio_filename, 'rb')
def open(self, stream_callback, *args, **kwargs):
self.closed = threading.Event()
self.stream_thread = threading.Thread(
target=self.stream_audio, args=(
self.audio_filename, stream_callback, self.closed))
self.stream_thread.start()
return self

def close(self):
self.audio_file.close()
self.closed.set()

def stop_stream(self):
pass

def terminate(self):
pass

def read(self, num_frames):
if self.audio_file.closed:
raise IOError()
# Approximate realtime by sleeping for the appropriate time for the
# requested number of frames
time.sleep(num_frames / float(transcribe_streaming.RATE))
# audio is 16-bit samples, whereas python byte is 8-bit
num_bytes = 2 * num_frames
try:
chunk = self.audio_file.read(num_bytes)
except ValueError:
raise IOError()
if not chunk:
raise IOError()
return chunk
@staticmethod
def stream_audio(audio_filename, callback, closed, num_frames=512):
with open(audio_filename, 'rb') as audio_file:
while not closed.is_set():
# Approximate realtime by sleeping for the appropriate time for
# the requested number of frames
time.sleep(num_frames / float(transcribe_streaming.RATE))
# audio is 16-bit samples, whereas python byte is 8-bit
num_bytes = 2 * num_frames
chunk = audio_file.read(num_bytes) or b'\0' * num_bytes
callback(chunk, None, None, None)


def test_main(resource, monkeypatch, capsys):
Expand Down

0 comments on commit 5a8e12f

Please sign in to comment.