diff --git a/deepgram/audio/microphone/microphone.py b/deepgram/audio/microphone/microphone.py index f4b9209c..db178994 100644 --- a/deepgram/audio/microphone/microphone.py +++ b/deepgram/audio/microphone/microphone.py @@ -23,6 +23,7 @@ def __init__( rate=RATE, chunk=CHUNK, channels=CHANNELS, + input_device_index=None ): # dynamic import of pyaudio as not to force the requirements on the SDK (and users) import pyaudio @@ -30,15 +31,34 @@ def __init__( self.logger = logging.getLogger(__name__) self.logger.addHandler(logging.StreamHandler()) self.logger.setLevel(verbose) + self.exit = threading.Event() self.audio = pyaudio.PyAudio() self.chunk = chunk self.rate = rate self.format = pyaudio.paInt16 self.channels = channels - self.push_callback = push_callback + self.input_device_index = input_device_index + self.asyncio_loop = None + self.asyncio_thread = None + + if inspect.iscoroutinefunction(push_callback): + self.logger.verbose("async/await callback - wrapping") + #Run our own asyncio loop. + self.asyncio_thread = threading.Thread(target=self._start_asyncio_loop) + self.asyncio_thread.start() + + self.push_callback = lambda data: asyncio.run_coroutine_threadsafe(push_callback(data), self.asyncio_loop).result() + else: + self.logger.verbose("regular threaded callback") + self.push_callback = push_callback + self.stream = None + def _start_asyncio_loop(self): + self.asyncio_loop = asyncio.new_event_loop() + self.asyncio_loop.run_forever() + def is_active(self): """ returns True if the stream is active, False otherwise @@ -51,8 +71,9 @@ def is_active(self): val = self.stream.is_active() self.logger.info("is_active: %s", val) + self.logger.info("is_exiting: %s", self.exit.is_set()) self.logger.debug("Microphone.is_active LEAVE") - return + return val def start(self): """ @@ -69,6 +90,7 @@ def start(self): self.logger.info("channels: %d", self.channels) self.logger.info("rate: %d", self.rate) self.logger.info("chunk: %d", self.chunk) + self.logger.info("input_device_id: %d", self.input_device_index) self.stream = self.audio.open( format=self.format, @@ -76,73 +98,63 @@ def start(self): rate=self.rate, input=True, frames_per_buffer=self.chunk, + input_device_index=self.input_device_index, + stream_callback=self._callback ) - self.exit = False - self.lock = threading.Lock() - + self.exit.clear() self.stream.start_stream() - self.thread = threading.Thread(target=self._processing) - self.thread.start() self.logger.notice("start succeeded") self.logger.debug("Microphone.start LEAVE") - def _processing(self): + def _callback(self, input_data, frame_count, time_info, status_flags): """ - the main processing loop for the microphone + The callback used to process data in callback mode. """ - self.logger.debug("Microphone._processing ENTER") + import pyaudio + self.logger.debug("Microphone._callback ENTER") - try: - while True: - data = self.stream.read(self.chunk) - - self.lock.acquire() - localExit = self.exit - self.lock.release() - if localExit: - self.logger.info("exit is True") - break - if data is None: - self.logger.info("data is None") - continue - - if inspect.iscoroutinefunction(self.push_callback): - self.logger.verbose("async/await callback") - asyncio.run(self.push_callback(data)) - else: - self.logger.verbose("regular threaded callback") - self.push_callback(data) - - self.logger.notice("_processing exiting...") - self.logger.debug("Microphone._processing LEAVE") + if self.exit.is_set(): + self.logger.info("exit is Set") + self.logger.notice("_callback stopping...") + self.logger.debug("Microphone._callback LEAVE") + return None, pyaudio.paAbort + if input_data is None: + self.logger.warning("input_data is None") + self.logger.debug("Microphone._callback LEAVE") + return None, pyaudio.paContinue + + try: + self.push_callback(input_data) except Exception as e: self.logger.error("Error while sending: %s", str(e)) - self.logger.debug("Microphone._processing LEAVE") + self.logger.debug("Microphone._callback LEAVE") raise + self.logger.debug("Microphone._callback LEAVE") + return input_data, pyaudio.paContinue + def finish(self): """ Stops the microphone stream """ self.logger.debug("Microphone.finish ENTER") - self.lock.acquire() self.logger.notice("signal exit") - self.exit = True - self.lock.release() - - if self.thread is not None: - self.thread.join() - self.thread = None - self.logger.notice("_processing/send thread joined") + self.exit.set() if self.stream is not None: self.stream.stop_stream() self.stream.close() self.stream = None + + if self.asyncio_thread is not None: + self.asyncio_loop.call_soon_threadsafe(self.asyncio_loop.stop) + self.asyncio_thread.join() #Clean up. + self.asyncio_thread = None + self.logger.notice("stream/recv thread joined") self.logger.notice("finish succeeded") diff --git a/examples/streaming/microphone/main.py b/examples/streaming/microphone/main.py index fb975382..1aade269 100644 --- a/examples/streaming/microphone/main.py +++ b/examples/streaming/microphone/main.py @@ -62,7 +62,7 @@ def on_error(self, error, **kwargs): ) dg_connection.start(options, addons=dict(myattr="hello"), test="hello") - # Open a microphone stream + # Open a microphone stream on the default input device microphone = Microphone(dg_connection.send) # start microphone