# features names
1) navigate - run car detection model
2) sign language - run ASL code
3) translate - translate speech to text
4) transcribe - transcribe speech to text
5) question - activate groq endpoint
6) object - activate GPT-4Vision to describe items or images

In [94]:
import pvporcupine
import pyaudio
import struct
import wave
import os
import numpy as np
import cv2
import requests
import base64
import json

from openai import OpenAI
import dotenv

dotenv.load_dotenv()

True

In [73]:
custom_keyword_path = 'focus_win.ppn'

porcupine = pvporcupine.create(
    access_key=os.environ.get('PORCUPINE_API_KEY'),
    keyword_paths=[custom_keyword_path]
)

client = OpenAI(api_key=os.environ.get('OPENAI_API_KEY'))

Fixed-length audio recording

In [74]:
# def record_audio(duration=2, filename="output.wav"):
#     """
#     Record audio from the default microphone for the given duration
#     and save it to the specified filename.
#     """
#     pa = pyaudio.PyAudio()

#     stream = pa.open(format=pyaudio.paInt16, channels=1, rate=16000,
#                      input=True, frames_per_buffer=1024)

#     print(f"Recording for {duration} seconds...")

#     frames = []

#     for _ in range(0, int(16000 / 1024 * duration)):
#         data = stream.read(1024)
#         frames.append(data)

#     print("Recording finished.")

#     stream.stop_stream()
#     stream.close()
#     pa.terminate()

#     with wave.open(filename, 'wb') as wf:
#         wf.setnchannels(1)
#         wf.setsampwidth(pa.get_sample_size(pyaudio.paInt16))
#         wf.setframerate(16000)
#         wf.writeframes(b''.join(frames))

### TTS

In [75]:
def tts(text):
    response = client.audio.speech.with_streaming_response.create(
                model="tts-1",
                voice='nova',
                input=text,
                response_format="wav"
            )
    p = pyaudio.PyAudio()
    stream = p.open(format=pyaudio.paInt16,
                    channels=1,
                    rate=22050,
                    output=True)
    with response as res:
        if res.status_code == 200:
            for chunk in res.iter_bytes(chunk_size=2048):
                stream.write(chunk)
    stream.stop_stream()
    stream.close()
    p.terminate()
    

Records until a pause is detected

In [76]:
def record_until_pause(threshold=500, pause_duration=3):
    """
    Continuously record audio from the microphone until a pause is detected.
    
    :param threshold: The volume threshold below which is considered silence.
    :param pause_duration: The duration of silence in seconds to consider as a pause.
    """
    pa = pyaudio.PyAudio()

    stream = pa.open(format=pyaudio.paInt16, channels=1, rate=16000,
                     input=True, frames_per_buffer=1024)

    print("Start speaking...")

    frames = []
    silent_frames = 0
    pause_frames = int(16000 / 1024 * pause_duration)
    
    while True:
        data = stream.read(1024)
        frames.append(data)

        # Check volume
        amplitude = np.frombuffer(data, np.int16)
        volume = np.sqrt(np.mean(amplitude**2))

        if volume < threshold:
            silent_frames += 1
        else:
            silent_frames = 0

        if silent_frames >= pause_frames:
            print("Pause detected, processing audio.")
            break

    stream.stop_stream()
    stream.close()
    pa.terminate()

    filename = "output.wav"
    with wave.open(filename, 'wb') as wf:
        wf.setnchannels(1)
        wf.setsampwidth(pa.get_sample_size(pyaudio.paInt16))
        wf.setframerate(16000)
        wf.writeframes(b''.join(frames))

    return filename


OpenAI Transcriptions

In [77]:
def transcribe_feature(filename):
    """
    Transcribe the specified audio file using OpenAI's Whisper.
    """

    with open(filename, "rb") as audio_file:
        transcript = client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file,
            response_format="text",
            language="en",
            prompt = "Transcribe the following audio clip in one or two words, present-tense:"
        )

    return transcript

def transcribe_audio(filename):
    """
    Transcribe the specified audio file using OpenAI's Whisper.
    """

    with open(filename, "rb") as audio_file:
        transcript = client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file,
            response_format="text",
            language="en",
            prompt = "Transcribe the following audio clip in one or two words:"
        )

    return transcript

def transcribe_mode(filename):
    """
    Transcribe the specified audio file using OpenAI's Whisper.
    """

    with open(filename, "rb") as audio_file:
        transcript = client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file,
            response_format="text",
            language="en",
            prompt = "Transcribe the following audio clip:"
        )

    return transcript

def translate_mode(filename):
    """
    Transcribe the specified audio file using OpenAI's Whisper.
    """

    with open(filename, "rb") as audio_file:
        translation = client.audio.translations.create(
            model="whisper-1",
            file=audio_file,
            response_format="text",
            prompt = "Translate the following audio clip into english:"
        )

    return translation

### Groq Mode - Deprecated due to poor inference

In [78]:
# import time
# def question_mode(transcript):
#     tic = time.time()
#     stream = groq.chat.completions.create(
#         messages=[
#             {
#                 "role": "system",
#                 "content": "you are a helpful assistant. provide brief responses in around 10 words."
#             },
#             {
#                 "role": "user",
#                 "content": f"{transcript}",
#             }
#         ],

#         model="llama2-70b-4096",
#         #model = "mixtral-8x7b-32768",

#         max_tokens=100,
#         stream = True,
#     )
#     toc = time.time()
#     print (f"Time taken for question mode: {toc-tic}")
#     for chunk in stream:
#         print(chunk.choices[0].delta.content, end="")
#     #return chat_completion.choices[0].message.content

### Object - Mode

In [98]:
import time
def capture_image(save_path='output.jpg', width=640, height=480, quality=90):
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        return False

    cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        time.sleep(1)
        cv2.imwrite(save_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
        break

    cap.release()
    cv2.destroyAllWindows()
    return True

def encode_image(img_path):
    with open(img_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

def object_mode(image_path = "output.jpg"):
    base64_image = encode_image(image_path)
    api_key = os.environ.get('OPENAI_API_KEY')

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}"
    }

    payload = {
        "model": "gpt-4-vision-preview",
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": "In around 10 words, describe the object in the image."
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}",
                            "detail": "low"
                        }
                    }
                ]
            }
        ],
        "max_tokens": 100
    }
    response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

    try:
        response_json = response.json()
        item = response_json.get('choices', [])[0].get('message', {}).get('content', '')
        return item
    except Exception as e:
        return f"An error occurred: {str(e)}"

# Q&A with OpenAI

In [80]:
def question_mode(question):
    response = ""
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
                {
                    "role": "system",
                    "content": "You are a helpful assistant. provide brief responses in around 10 words."
                },
                {
                    "role": "user",
                    "content": question,
                }
            ],
        stream=True,
    )
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="")
            response += chunk.choices[0].delta.content
    return response

In [97]:
pa = pyaudio.PyAudio()
audio_stream = pa.open(rate=porcupine.sample_rate, channels=1,
                       format=pyaudio.paInt16, input=True,
                       frames_per_buffer=porcupine.frame_length)

print("Listening for the wake word...")

while True:
    pcm = audio_stream.read(porcupine.frame_length)
    pcm = struct.unpack_from("h" * porcupine.frame_length, pcm)

    if porcupine.process(pcm) >= 0:
        print("Wake word detected!")
        break

audio_stream.close()
pa.terminate()

record_until_pause()
transcription = transcribe_feature("output.wav")
if transcription:
    #remove punctuation
    transcription = transcription.replace(".", "").replace(",", "").replace("?", "").replace("!", "").replace(":", "").replace(";", "").lower()
    print(transcription)

    if "navigate" in transcription:
        print("Car Detection Mode...")
        #process_car()
    elif "sign" in transcription:
        print("Sign Detection Mode...")
        #process_sign()
    elif "translate" in transcription:
        print("Translation Mode...")
        #process_translation()
    elif "transcribe" in transcription:
        print("Transcription Mode...")
        record_until_pause()
        transcribe = transcribe_audio("output.wav")
        #process_transcription()
    elif "question" in transcription:
        print("Question and Answering...")
        record_until_pause()
        question = transcribe_audio("output.wav")
        print(question)
        answer = question_mode(question)
        tts(answer)
    elif "object" in transcription:
        print("Object Detection Mode...")
        capture_image()
        object = object_mode()
        tts(object)
    else:
        print("Unknown command. Please try again.")


Listening for the wake word...
Wake word detected!
Start speaking...
Pause detected, processing audio.
i don't think it charges object mode yeah

Object Detection Mode...


In [None]:
capture_image()
object_mode()

'A person holding a red apple in a room.'

In [102]:
capture_image()
object_mode()

'Person wearing large white-framed safety goggles indoors.'