In [None]:
import io
import os
import whisper
import torch
from dotenv import load_dotenv
from sys import platform
from time import sleep
from tempfile import NamedTemporaryFile
from queue import Queue
from datetime import datetime, timedelta
import speech_recognition as sr

# Load environment variables
load_dotenv()


In [None]:
# last time a recording from retreived from queue
phrase_time = None

# current rawy audio bytes
last_sample = bytes()

# thread safe queue for passing data from the threaded recording callback
data_queue = Queue()

# SpeechRecognizer to record audio
recorder = sr.Recognizer()
recorder.energy_threshold = 1000

# dynamic energy comp lowers energy threshold to a point where we're always recording
recorder.dynamic_energy_threshold = False


In [None]:
# record callback
def record_callback(_, audio: sr.AudioData) -> None:
    """
    Threaded callback func to receive audio data when recordings finish.
    audio: An AudioData object containing recorded bytes.
    """
    # raw bytes push it into thread safe queue
    data = audio.get_raw_data()
    data_queue.put(data)


In [None]:
microphone_name = "Microphone (2- USB PnP Audio Device)"
source = ""

for index, name in enumerate(sr.Microphone.list_microphone_names()):
    print(
        'Microphone with name "{1}" found for `Microphone(device_index={0})`'.format(
            index, name
        )
    )
    if name == microphone_name:
        source = sr.Microphone(sample_rate=16000, device_index=1)
        # print(f"Microphone {name} found at index {index}")
        break


In [None]:
# load the whisper model
audio_model = whisper.load_model("tiny.en")

record_timeout = 2
phrase_timeout = 3

temp_file = NamedTemporaryFile().name
transcription = [""]

with source:
    recorder.adjust_for_ambient_noise(source)

# background thread that gives us raw bytes
recorder.listen_in_background(source, record_callback, phrase_time_limit=record_timeout)

# Signal user we're ready
print("Model loaded...")

In [None]:
# main loop
while True:
    try:
        now = datetime.utcnow()
        # get raw record from queue
        if not data_queue.empty():
            phrase_complete = False
            # if enough time has passed since last phrase - phrase is complete
            # clear current audio buffer
            if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
                last_sample = bytes()
                phrase_complete = True
            # last time we received new audio from queue
            phrase_time = now

            # concatenate last sample with new sample
            while not data_queue.empty():
                data = data_queue.get()
                last_sample += data

            # use AudioData to convert raw data into wav file
            audio_data = sr.AudioData(
                last_sample, source.SAMPLE_RATE, source.SAMPLE_WIDTH
            )
            wav_data = io.BytesIO(audio_data.get_wav_data())

            # write wav to file
            with open(temp_file, "w+b") as f:
                f.write(wav_data.read())

            # read transcriptions from model
            result = audio_model.transcribe(temp_file, fp16=torch.cuda.is_available())
            text = result["text"].strip()

            # if we detect pauses between recordings, add new item to transcription list
            # otherwise edit existing one
            if phrase_complete:
                transcription.append(text)
            else:
                transcription[-1] = text

            # clear console to reprint transcriptions
            os.system("cls" if os.name == "nt" else "clear")
            for line in transcription:
                print(line)

            # flush stout to make sure we see the transcriptions
            print(" ", end="", flush=True)

            # processor sleep
            sleep(0.15)
    except KeyboardInterrupt:
        break

    print("\n\nTranscription:")
    for line in transcription:
        print(line)

In [None]:
import os
import openai

openai.api_key = os.environ["OPENAI_API_KEY"]
available_models = [x["id"] for x in openai.Model.list()["data"]]
print(sorted(available_models))
