Create and environment to use with this notebook:<br>
conda create -n ml3_final_project scipy scikit-image statsmodels scikit-learn pandas ipykernel tqdm keras pillow matplotlib pytorch==2.4.1 pytorch-cuda=12.4 torchinfo tensorflow=2.17 accelerate langchain pydub -c pytorch -c nvidia -c conda-forge

Install prerequisites uncomment and run once

In [1]:
# ! pip install parler-tts
# ! pip install spaces
# ! pip install accelerate
# ! pip install pydub
# ! pip install transformers
# # ! pip install --upgrade protobuf
# ! pip install openai-whisper
# !pip install langchain_openai
# !pip install langchain_deepseek
# !pip install gtts

In [2]:
# !apt install ffmpeg

In [3]:
# for Colab
from google.colab import drive
drive.mount('/content/drive')
data_dir = '/content/drive/MyDrive/ml3_final_project_chatbot'

# for local
# data_dir = './'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Setup The LLM Part

For Colab, store your OPENAI_API_KEY in your Secrets section

In [4]:
import os
from google.colab import userdata
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

In [5]:
# from langchain_deepseek import ChatDeepSeek
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from datetime import datetime
import random
import numpy as np

In [6]:
conversation_history = []
intro_given = False
active = True
inactive_timer = datetime.now()
talking = False
silence_counter = 0
voice_data=np.array([])
last_query = ''
last_response=''
summary = '<EMPTY>'

In [7]:
def format_transcript(history):
  transcript = ''
  for item in history:
    transcript += f'Patient: {item["patient"]}\nChloe: {item["you"]}\n'
  if len(transcript) == 0:
    transcript='<EMPTY>'
  return transcript

In [8]:
def get_response(query):
    global intro_given
    confused_messages = [
        "I'm sorry, I did not understand. Can you repeat that please?",
        "Paumanhin, hindi ko maintindihan ang iyong sinabi. Maari mo bang ulitin?",
        "Sorry, I did not understand. Can you repeat that last message please?",
        "I'm sorry, I did not quite get that. Can you repeat that please?",
        "Sorry, I did not quite get that. Can you repeat that please?",
        "I'm sorry, I did not quite get that. Can you repeat please?",
        "I'm sorry, I did not get that. Can you repeat that last message please?",
    ]


    if query == None:
        return None, status, summary, format_transcript(conversation_history)

    template = """Question: {question}

    Answer: """

    prompt = PromptTemplate.from_template(template)
    # llm =ChatDeepSeek(
    #         model="deepseek-chat",
    llm =ChatOpenAI(
            model="gpt-4o",
            temperature=0,
            max_tokens=None,
            timeout=None,
            max_retries=2,
            # other params...
        )
    llm_chain = prompt | llm
    if len(conversation_history) > 20:
        del conversation_history[0]

    intro_intsruction = ''

    if not intro_given:
        intro_intsruction ='Introduce yourself to the patient.'

    lang_detect = f"what language is the following text? give a one word answer: {query}"
    language = llm_chain.invoke(lang_detect).content
    print(f'LANGUAGE={language}')
    if (language.strip().lower() not in ['english', 'filipino', 'tagalog']):
        response = random.choice(confused_messages)
        interaction = {'patient':query, 'you':response}
        conversation_history.append(interaction)
        return response, status, summary, format_transcript(conversation_history)
    question = f"You are Chloe, Doctor Mike's assistant interviewing a patient. {intro_intsruction} Only ask one question at a time if needed.  Ask around 5 questions then tell the patient that you will relay the information to the doctor. You can give a summary of the conversation whan asked. Do not give answers as lists, and do not use any markup. Do not use apostrpohes to shorten words. Use word representations for numbers. Do not abbreviate words. Give answers in {language}. Give a summary of the conversaion to the patient after you have asked your questions. Conversaion history is:{conversation_history} The current query is: {query}"
    print(f'conversation history size={len(conversation_history)} question length={len(question)}')
    response = llm_chain.invoke(question).content
    interaction = {'patient':query, 'you':response}
    conversation_history.append(interaction)
    if len(conversation_history) > 1:
      summary_prompt = f'Summarize the symptoms given by the patient from this conversation in bullet points: {conversation_history}'
      summary = llm_chain.invoke(summary_prompt).content
    else:
      summary = 'START CONVERSATION'

    intro_given = True
    intro_intsruction = ''

    return response, status, summary, format_transcript(conversation_history)

Setup Whisper Speech To Text

In [9]:
from transformers import pipeline
import gradio as gr
import torch
import whisper

if torch.cuda.is_available():
  device = "cuda:0"
else:
  device = "cpu"
print("Using device", device)
model = whisper.load_model("turbo")
model.to(device)
# wake the model up

print(f'WHISPER SAYS {model.transcribe(f"{data_dir}/hello.wav")["text"]}')

# pipe = pipeline(model=model, device=device , return_timestamps=True)

# def transcribe(audio):
#     if audio == None:
#       #  print('NO DATA')
#        response = 'NO DATA GIVEN'
#        return response
#     try:

#       text = model.transcribe(audio, language="tl")["text"]
#       if len(text.strip()) > 0:
#         print(f'TRANSCRIBE {text}')
#       return text
#     except:
#        response = 'ERROR TRANSCRIBING'
#        return response

Using device cuda:0


  checkpoint = torch.load(fp, map_location=device)


WHISPER SAYS  Hello.


In [10]:


import soundfile as sf
import librosa





def transcribe(data, orig_rate):
    audio_array = librosa.resample(data, orig_sr=orig_rate, target_sr=16000)
    norm = np.linalg.norm(audio_array)
    if norm != 0:
        audio_array = audio_array / norm
    # sf.write('new_file.wav', audio_array, orig_rate)
    audio_array =  torch.from_numpy(audio_array.astype(np.float32))
    audio_array.to(device)
    # print(np.abs(data).mean())
    result = model.transcribe(audio_array,language="tl")
    print(result)
    return result['text']

def detect_signal(data):
    global talking
    global silence_counter
    global voice_data
    global last_query
    global last_response
    global active
    global inactive_timer
    result = None

    silence_amplitude_threshold = 700
    silence_time_treshold = 3
    mean = np.mean(np.abs(data[1]))
    # print(mean)
    if mean > silence_amplitude_threshold:
        talking = True
        silence_counter=0
        voice_data = np.concatenate([voice_data,data[1]])
        # print(f'SAMPLE RATE={data[0]} VOICE DATA SHAPE={voice_data.shape}')

    elif talking:
        silence_counter += 1

    if silence_counter > silence_time_treshold:
        silence_counter = 0
        talking = False
        result = voice_data
        voice_data = np.array([])

    # print(f'SILENCE {silence_counter}')

    if type(None) != type(result):
        transcription = transcribe(result, data[0])
        if not active and transcription.lower().replace(',','').strip().startswith('hey chloe'):
            last_query = transcription
            active = True
            inactive_timer = datetime.now()
        elif active == True:
            inactive_timer = datetime.now()
            last_query = transcription
    else:
        inactive_time =  datetime.now() - inactive_timer
        # print(f'INACTIVE TIME {inactive_time.total_seconds()}')
        # if inactive_time.total_seconds() > 60:
        #     active = False
    status = 'INACTIVE! Say "Hey, Chloe" to wake up.'
    if active:
        status = 'ACTIVE!'
    return last_query, status

Setup the Parler Text To Speech Model

In [11]:
# from parler_tts import ParlerTTSStreamer
# from parler_tts import ParlerTTSForConditionalGeneration

# from transformers import AutoTokenizer, AutoFeatureExtractor, set_seed
# import numpy as np
# import spaces
# import torch
# from threading import Thread


# device = "cuda:0" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
# torch_dtype = torch.float16 if device != "cpu" else torch.float32

# modelid_tiny = "parler-tts/parler-tts-tiny-v1"
# print('LOADING PARLER TTS MODEL')
# tts_model = ParlerTTSForConditionalGeneration.from_pretrained(
#     modelid_tiny, torch_dtype=torch_dtype, low_cpu_mem_usage=True
# ).to(device)

# tokenizer = AutoTokenizer.from_pretrained(modelid_tiny)
# feature_extractor = AutoFeatureExtractor.from_pretrained(modelid_tiny)

# sampling_rate = tts_model.audio_encoder.config.sampling_rate
# frame_rate = tts_model.audio_encoder.config.frame_rate
# description = "Jenna speaks at an average pace with a calm delivery in a very confined sounding environment with clear audio quality."
# description_tokens = tokenizer(description, return_tensors="pt").to(device)

# print('READY')
# SAMPLE_RATE = feature_extractor.sampling_rate
# SEED = 42


In [12]:

from pydub import AudioSegment
import io
import re
from pydub.effects import speedup

def numpy_to_mp3(audio_array, sampling_rate):
    # Normalize audio_array if it's floating-point
    if np.issubdtype(audio_array.dtype, np.floating):
        max_val = np.max(np.abs(audio_array))
        audio_array = (audio_array / max_val) * 32767 # Normalize to 16-bit range
        audio_array = audio_array.astype(np.int16)

    # Create an audio segment from the numpy array
    audio_segment = AudioSegment(
        audio_array.tobytes(),
        frame_rate=sampling_rate,
        sample_width=audio_array.dtype.itemsize,
        channels=1
    )

    # so = audio_segment.speedup(1.25, 50, 25)

    so = audio_segment._spawn(audio_segment.raw_data, overrides={
        "frame_rate": int(audio_segment.frame_rate * 1.25)
    }).set_frame_rate(audio_segment.frame_rate)


    # Export the audio segment to MP3 bytes - use a high bitrate to maximise quality
    mp3_io = io.BytesIO()
    so.export(mp3_io, format="mp3", bitrate="320k")

    # Get the MP3 bytes
    mp3_bytes = mp3_io.getvalue()
    mp3_io.close()

    return mp3_bytes

# sampling_rate = tts_model.audio_encoder.config.sampling_rate
# frame_rate = tts_model.audio_encoder.config.frame_rate

def split_text(text):
    text = text.replace('(','').replace(')','')
    phrases = re.split(r'(\.\s+|\n|\?\s+|\!\s+|\:\s+)', text)
    # print(phrases)
    reconstructed = [ ''.join(x) for x in zip(phrases[0::2], phrases[1::2])]
    reconstructed.append(phrases[-1])
    stripped = [x.strip().replace('.','').replace('!','') for x in reconstructed if len(x.strip()) > 1]
    # print(reconstructed)
    return stripped

In [13]:
# @spaces.GPU
# def read_response(answer):
#     if answer == None:
#         return None

#     print(f'READING RESPONSE {answer}')

#     play_steps_in_s = 5.0
#     play_steps = int(frame_rate * play_steps_in_s)


#     phrases = split_text(answer)

#     for phrase in phrases:
#         streamer = ParlerTTSStreamer(tts_model, device=device, play_steps=play_steps)
#         prompt = tokenizer(phrase, return_tensors="pt").to(device)

#         generation_kwargs = dict(
#             input_ids=description_tokens.input_ids,
#             prompt_input_ids=prompt.input_ids,
#             streamer=streamer,
#             do_sample=True,
#             temperature=1.0,
#             min_new_tokens=20,
#         )

#         set_seed(42)
#         thread = Thread(target=tts_model.generate, kwargs=generation_kwargs)
#         thread.start()

#         for new_audio in streamer:
#             print(f"Sample of length: {round(new_audio.shape[0] / sampling_rate, 2)} seconds")
#             yield phrase, numpy_to_mp3(new_audio, sampling_rate=sampling_rate)

In [14]:
from gtts import gTTS
from io import BytesIO
from pydub import AudioSegment

def read_response_gtts(answer):
    if answer == None or answer=='':
        return None
    print(f'READING RESPONSE {answer}')
    language = 'tl'

    phrases = split_text(answer)

    for phrase in phrases:
        print(phrase)
        myMP3 = BytesIO()
        tts = gTTS(text=phrase, lang=language, slow=False)
        tts.write_to_fp(myMP3)
        myMP3.seek(0)
        audio = AudioSegment.from_file(io.BytesIO(myMP3.getvalue()), format="mp3")
        # speed_up = audio._spawn(audio.raw_data, overrides={
        #     "frame_rate": int(audio.frame_rate * 1.5)
        # }).set_frame_rate(audio.frame_rate)

        speed_up = audio.speedup(1.25)


        output_io = io.BytesIO()
        speed_up.export(output_io, format="mp3")
        output_io.seek(0)  #

        yield output_io.getvalue()

In [15]:
def update_output():
    print('UPDATE')

In [16]:
def reset_audio(input):
    return None, None


In [17]:
def reset():
  global conversation_history
  global intro_given
  global inactive_timer
  global talking
  global silence_counter
  global voice_data
  global last_query
  global last_response
  conversation_history = []
  intro_given = False
  inactive_timer = datetime.now()
  talking = False
  silence_counter = 0
  voice_data=np.array([])
  last_query = ''
  last_response=''
  last_query = 'Hello'
  print('RESET')
  return 'RESET', last_query

Launch the Web Interface

In [18]:
with gr.Blocks() as block:
    gr.HTML(
        f"""
        <h1 style='text-align: center;'>LT5 Healthcare Assistant Chatbot</h1>
        <h3 style='text-align: center;'>Click on "Record" to start the conversation, wait 5 seconds then say Hello</h3>
        """
    )
    with gr.Group():
        with gr.Row():
            reset_btn = gr.Button("Reset Chat Session")

            status = gr.Textbox(label="Status")
            query = gr.Textbox(label="Query", interactive=False)
            text_query = gr.Textbox(label="Text Query")
            answer = gr.Textbox(label="Answer")
            # state = gr.State()
        with gr.Row():
            audio_out = gr.Audio(label="Spoken Answer", streaming=True, autoplay=True)
            audio_in = gr.Audio(label="Speak your question", sources="microphone", streaming=True, type="numpy")
        with gr.Row():
          summary = gr.Textbox(label="Summary")
        with gr.Row():
          transcript = gr.Textbox(label="Transcript")
    audio_in.stream(detect_signal, inputs = audio_in, outputs = [query,status])
    text_query.submit(reset_audio,inputs=text_query, outputs=[answer, status]).then(get_response,inputs=text_query, outputs=[answer,status, summary, transcript])
    query.change(reset_audio,inputs=query, outputs=[answer, status]).then(get_response,inputs=query, outputs=[answer,status, summary, transcript])
    answer.change(read_response_gtts,inputs=answer, outputs=[audio_out])
    answer.submit(read_response_gtts,inputs=answer, outputs=[audio_out])
    reset_btn.click(fn=reset, outputs=[status,query] , api_name="reset")



block.launch(inbrowser=True,  share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://7cda23e02acecc544a.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


{'text': ' Hello!', 'segments': [{'id': 0, 'seek': 0, 'start': 0.0, 'end': 0.14, 'text': ' Hello!', 'tokens': [50365, 2425, 0, 50372], 'temperature': 0.0, 'avg_logprob': -0.5119153976440429, 'compression_ratio': 0.42857142857142855, 'no_speech_prob': 7.036488752776293e-11}], 'language': 'tl'}
LANGUAGE=English
conversation history size=0 question length=620
READING RESPONSE Hello! My name is Chloe, and I am Doctor Mike's assistant. How are you feeling today?
Hello
My name is Chloe, and I am Doctor Mike's assistant
How are you feeling today?
{'text': ' shoulders.', 'segments': [{'id': 0, 'seek': 0, 'start': 0.0, 'end': 0.36, 'text': ' shoulders.', 'tokens': [50365, 10245, 13, 50383], 'temperature': 0.0, 'avg_logprob': -0.5081272602081299, 'compression_ratio': 0.5555555555555556, 'no_speech_prob': 3.5225773686864414e-11}], 'language': 'tl'}
LANGUAGE=English
conversation history size=1 question length=708
READING RESPONSE Could you please describe any discomfort or pain you are experiencin

