Skip to content

Commit

Permalink
Merge branch 'master' of github.com:GoogleCloudPlatform/python-docs-s…
Browse files Browse the repository at this point in the history
…amples
  • Loading branch information
Jon Wayne Parrott committed Nov 8, 2016
2 parents 8e99bba + 08b4da6 commit 8169d89
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions speech/grpc/transcribe_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,10 @@ def _audio_data_generator(buff):
A chunk of data that is the aggregate of all chunks of data in `buff`.
The function will block until at least one data chunk is available.
"""
while True:
# Use a blocking get() to ensure there's at least one chunk of data
stop = False
while not stop:
# Use a blocking get() to ensure there's at least one chunk of data.
chunk = buff.get()
if not chunk:
# A falsey value indicates the stream is closed.
break
data = [chunk]

# Now consume whatever other data's still buffered.
Expand All @@ -87,22 +85,31 @@ def _audio_data_generator(buff):
data.append(buff.get(block=False))
except queue.Empty:
break

# If `_fill_buffer` adds `None` to the buffer, the audio stream is
# closed. Yield the final bit of the buffer and exit the loop.
if None in data:
stop = True
data.remove(None)
yield b''.join(data)


def _fill_buffer(audio_stream, buff, chunk):
def _fill_buffer(audio_stream, buff, chunk, stoprequest):
"""Continuously collect data from the audio stream, into the buffer."""
try:
while True:
while not stoprequest.is_set():
buff.put(audio_stream.read(chunk))
except IOError:
# This happens when the stream is closed. Signal that we're done.
pass
finally:
# Add `None` to the buff, indicating that a stop request is made.
# This will signal `_audio_data_generator` to exit.
buff.put(None)


# [START audio_stream]
@contextlib.contextmanager
def record_audio(rate, chunk):
def record_audio(rate, chunk, stoprequest):
"""Opens a recording stream in a context manager."""
audio_interface = pyaudio.PyAudio()
audio_stream = audio_interface.open(
Expand All @@ -120,14 +127,13 @@ def record_audio(rate, chunk):
# This is necessary so that the input device's buffer doesn't overflow
# while the calling thread makes network requests, etc.
fill_buffer_thread = threading.Thread(
target=_fill_buffer, args=(audio_stream, buff, chunk))
target=_fill_buffer, args=(audio_stream, buff, chunk, stoprequest))
fill_buffer_thread.start()

yield _audio_data_generator(buff)

audio_stream.stop_stream()
audio_stream.close()
fill_buffer_thread.join()
audio_stream.close()
audio_interface.terminate()
# [END audio_stream]

Expand Down Expand Up @@ -166,7 +172,7 @@ def request_stream(data_stream, rate, interim_results=True):
yield cloud_speech.StreamingRecognizeRequest(audio_content=data)


def listen_print_loop(recognize_stream):
def listen_print_loop(recognize_stream, stoprequest):
num_chars_printed = 0
for resp in recognize_stream:
if resp.error.code != code_pb2.OK:
Expand Down Expand Up @@ -198,6 +204,7 @@ def listen_print_loop(recognize_stream):
# one of our keywords.
if re.search(r'\b(exit|quit)\b', transcript, re.I):
print('Exiting..')
stoprequest.set()
break

num_chars_printed = 0
Expand All @@ -206,9 +213,17 @@ def listen_print_loop(recognize_stream):
def main():
with cloud_speech.beta_create_Speech_stub(
make_channel('speech.googleapis.com', 443)) as service:

# stoprequest is event object which is set in `listen_print_loop`
# to indicate that the trancsription should be stopped.
#
# The `_fill_buffer` thread checks this object, and closes
# the `audio_stream` once it's set.
stoprequest = threading.Event()

# For streaming audio from the microphone, there are three threads.
# First, a thread that collects audio data as it comes in
with record_audio(RATE, CHUNK) as buffered_audio_data:
with record_audio(RATE, CHUNK, stoprequest) as buffered_audio_data:
# Second, a thread that sends requests with that data
requests = request_stream(buffered_audio_data, RATE)
# Third, a thread that listens for transcription responses
Expand All @@ -220,7 +235,7 @@ def main():

# Now, put the transcription responses to use.
try:
listen_print_loop(recognize_stream)
listen_print_loop(recognize_stream, stoprequest)

recognize_stream.cancel()
except face.CancellationError:
Expand Down

0 comments on commit 8169d89

Please sign in to comment.