Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 54 additions & 42 deletions deepgram/audio/microphone/microphone.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,42 @@ def __init__(
rate=RATE,
chunk=CHUNK,
channels=CHANNELS,
input_device_index=None
):
# dynamic import of pyaudio as not to force the requirements on the SDK (and users)
import pyaudio

self.logger = logging.getLogger(__name__)
self.logger.addHandler(logging.StreamHandler())
self.logger.setLevel(verbose)
self.exit = threading.Event()

self.audio = pyaudio.PyAudio()
self.chunk = chunk
self.rate = rate
self.format = pyaudio.paInt16
self.channels = channels
self.push_callback = push_callback
self.input_device_index = input_device_index
self.asyncio_loop = None
self.asyncio_thread = None

if inspect.iscoroutinefunction(push_callback):
self.logger.verbose("async/await callback - wrapping")
#Run our own asyncio loop.
self.asyncio_thread = threading.Thread(target=self._start_asyncio_loop)
self.asyncio_thread.start()

self.push_callback = lambda data: asyncio.run_coroutine_threadsafe(push_callback(data), self.asyncio_loop).result()
else:
self.logger.verbose("regular threaded callback")
self.push_callback = push_callback

self.stream = None

def _start_asyncio_loop(self):
self.asyncio_loop = asyncio.new_event_loop()
self.asyncio_loop.run_forever()

def is_active(self):
"""
returns True if the stream is active, False otherwise
Expand All @@ -51,8 +71,9 @@ def is_active(self):

val = self.stream.is_active()
self.logger.info("is_active: %s", val)
self.logger.info("is_exiting: %s", self.exit.is_set())
self.logger.debug("Microphone.is_active LEAVE")
return
return val

def start(self):
"""
Expand All @@ -69,80 +90,71 @@ def start(self):
self.logger.info("channels: %d", self.channels)
self.logger.info("rate: %d", self.rate)
self.logger.info("chunk: %d", self.chunk)
self.logger.info("input_device_id: %d", self.input_device_index)

self.stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk,
input_device_index=self.input_device_index,
stream_callback=self._callback
)

self.exit = False
self.lock = threading.Lock()

self.exit.clear()
self.stream.start_stream()
self.thread = threading.Thread(target=self._processing)
self.thread.start()

self.logger.notice("start succeeded")
self.logger.debug("Microphone.start LEAVE")

def _processing(self):
def _callback(self, input_data, frame_count, time_info, status_flags):
"""
the main processing loop for the microphone
The callback used to process data in callback mode.
"""
self.logger.debug("Microphone._processing ENTER")
import pyaudio
self.logger.debug("Microphone._callback ENTER")

try:
while True:
data = self.stream.read(self.chunk)

self.lock.acquire()
localExit = self.exit
self.lock.release()
if localExit:
self.logger.info("exit is True")
break
if data is None:
self.logger.info("data is None")
continue

if inspect.iscoroutinefunction(self.push_callback):
self.logger.verbose("async/await callback")
asyncio.run(self.push_callback(data))
else:
self.logger.verbose("regular threaded callback")
self.push_callback(data)

self.logger.notice("_processing exiting...")
self.logger.debug("Microphone._processing LEAVE")
if self.exit.is_set():
self.logger.info("exit is Set")
self.logger.notice("_callback stopping...")
self.logger.debug("Microphone._callback LEAVE")
return None, pyaudio.paAbort

if input_data is None:
self.logger.warning("input_data is None")
self.logger.debug("Microphone._callback LEAVE")
return None, pyaudio.paContinue

try:
self.push_callback(input_data)
except Exception as e:
self.logger.error("Error while sending: %s", str(e))
self.logger.debug("Microphone._processing LEAVE")
self.logger.debug("Microphone._callback LEAVE")
raise

self.logger.debug("Microphone._callback LEAVE")
return input_data, pyaudio.paContinue

def finish(self):
"""
Stops the microphone stream
"""
self.logger.debug("Microphone.finish ENTER")

self.lock.acquire()
self.logger.notice("signal exit")
self.exit = True
self.lock.release()

if self.thread is not None:
self.thread.join()
self.thread = None
self.logger.notice("_processing/send thread joined")
self.exit.set()

if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.stream = None

if self.asyncio_thread is not None:
self.asyncio_loop.call_soon_threadsafe(self.asyncio_loop.stop)
self.asyncio_thread.join() #Clean up.
self.asyncio_thread = None

self.logger.notice("stream/recv thread joined")

self.logger.notice("finish succeeded")
Expand Down
2 changes: 1 addition & 1 deletion examples/streaming/microphone/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def on_error(self, error, **kwargs):
)
dg_connection.start(options, addons=dict(myattr="hello"), test="hello")

# Open a microphone stream
# Open a microphone stream on the default input device
microphone = Microphone(dg_connection.send)

# start microphone
Expand Down