In [6]:
import os
import threading
import numpy as np
import wavio
import sounddevice as sd
from playsound import playsound
from openai import OpenAI
import ipywidgets as widgets
from IPython.display import display
import pickle
from typing import List
from dotenv import load_dotenv, find_dotenv
from financial_assistant_define import cosine_similarity, get_relevant_context, get_transcript, get_ai_response, play_ai_response_with_tts, process_voice_query

_ = load_dotenv(find_dotenv())
client = OpenAI(api_key=os.environ['OPENAI_API_KEY'])

DB_VECTORS_FILE = "financial_db.npy"
DB_CHUNKS_FILE = "financial_db_chunks.pkl"
SPEECH_FILE_PATH = "./response.wav"

In [7]:
class EmbeddingModel:
    def __init__(self, model_name="text-embedding-3-small"):
        self.client = client
        self.model_name = model_name
    
    def get_embedding(self, text: str) -> List[float]:
        response = self.client.embeddings.create(input=text, model=self.model_name)
        return response.data[0].embedding

EMBEDDING_MODEL = EmbeddingModel()

In [8]:
class AudioRecorder:
    def __init__(self):
        self.is_recording = False
        self.audio_data = []
        self.fs = 44100
        self.channels = 1

    def start_recording(self):
        self.is_recording = True
        self.audio_data = []
        threading.Thread(target=self._record).start()

    def stop_recording(self):
        self.is_recording = False

    def _record(self):
        with sd.InputStream(samplerate=self.fs, channels=self.channels) as stream:
            while self.is_recording:
                data, _ = stream.read(1024)
                self.audio_data.append(data)

    def save(self, filename='input.wav'):
        if not self.audio_data:
            print("Input wav is not recorded yet.")
            return None
        wav_data = np.concatenate(self.audio_data, axis=0)
        wavio.write(filename, wav_data, self.fs, sampwidth=2)
        print(f"Generated voice is saved as '{filename}'")
        return filename

recorder = AudioRecorder()

In [None]:
start_button = widgets.Button(description="녹음 시작🎙️", button_style='success')
stop_button = widgets.Button(description="녹음 중지⏹️", button_style='danger')
output_area = widgets.Output()

In [10]:
def on_start_clicked(b):
    with output_area:
        output_area.clear_output()
        recorder.start_recording()
        print("녹음 시작... 질문을 말씀해주세요.")

def on_stop_clicked(b):
    with output_area:
        recorder.stop_recording()
        print("녹음 중지. 잠시만 기다려주세요...")
        file_name = recorder.save()
        if file_name:
            try:
                process_voice_query(file_name)
            except Exception as e:
                print(f"error occurse: {e}")
            finally:
                if os.path.exists(file_name):
                    os.remove(file_name)
        print("\n다시 질문하시려면 '녹음 시작' 버튼을 눌러주세요.")

In [11]:
start_button.on_click(on_start_clicked)
stop_button.on_click(on_stop_clicked)

print("금융 비서가 준비되었습니다. '녹음 시작' 버튼을 누르고 질문해주세요.")
display(start_button, stop_button, output_area)

금융 비서가 준비되었습니다. '녹음 시작' 버튼을 누르고 질문해주세요.


Button(button_style='success', description='녹음 시작🎙️', style=ButtonStyle())

Button(button_style='danger', description='녹음 중지⏹️', style=ButtonStyle())

Output()