# TubeTalk ChatBot 
YouTube Q&A Bot using Whisper, LangChain, and GPT-4

This notebook demonstrates how to build a video understanding chatbot that:
- Downloads a YouTube video
- Transcribes it with Whisper
- Builds a searchable vectorstore
- Answers questions using a LangChain agent with GPT-4

## Import Required Libraries

In [None]:

import os
import shutil
import uuid
import tempfile
import asyncio
from io import BytesIO
import whisper
import yt_dlp
from dotenv import load_dotenv
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from transformers import AutoTokenizer
from langchain_openai import ChatOpenAI
from langchain.agents import initialize_agent, Tool
import chainlit as cl
from chainlit.element import Element
import numpy as np
import soundfile as sf 

## Load API Keys

In [None]:
# Load environment variables from .env file
load_dotenv()

# Retrieve API keys
openai_api_key = os.getenv("OPENAI_API_KEY")
langchain_api_key = os.getenv("LANGCHAIN_API_KEY")


## Initialize Models and Variables

In [None]:
# Load the embedding model
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)

# Load Whisper model for audio transcription
whisper_model = whisper.load_model("base")

# Initialize global variables
session_dir = None
vectorstore = None
full_transcript = ""

# Enable LangChain tracing (for debugging/monitoring)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "TubeTalk bot"


## Session Management Functions

In [None]:
# Create a new unique session directory
def create_new_session():
    global session_dir
    session_dir = os.path.join("sessions", str(uuid.uuid4()))
    os.makedirs(session_dir, exist_ok=True)

# Clean up any existing session resources
def clean_session():
    global session_dir, vectorstore, full_transcript
    # Remove the vectorstore if it exists
    if vectorstore is not None:
        try:
            vectorstore._collection = None
            vectorstore = None
        except:
            pass
    # Remove the session directory if it exists
    if session_dir and os.path.exists(session_dir):
        shutil.rmtree(session_dir)
    session_dir = None
    full_transcript = ""


## YouTube Audio Download and Transcription

In [None]:
# Download audio from a YouTube video and return the mp3 path
def download_audio_from_youtube(url):
    output_template = os.path.join(session_dir, 'audio')
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': output_template,
        'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}],
        'quiet': True
    }
    # Use yt-dlp to download the audio
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.extract_info(url, download=True)
    return output_template + ".mp3"

# Transcribe audio file to text using Whisper
def transcribe_audio(filepath):
    result = whisper_model.transcribe(filepath)
    # Extract the text and language from the result
    return result["text"], result["language"]


## Tokenization and Chunking

In [None]:
# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

# Tokenize and chunk the text using HuggingFace tokenizer
def tokenize_and_chunk_hf(text, max_tokens=256):
    tokens = tokenizer.encode(text, add_special_tokens=False)
    return [tokenizer.decode(tokens[i:i + max_tokens]).strip() for i in range(0, len(tokens), max_tokens)]


## Build Vectorstore 

In [None]:
# Build and persist a Chroma vectorstore from text chunks
def build_vectorstore(chunks):
    persist_directory = os.path.join(session_dir, "chroma_db")
    db = Chroma.from_texts(chunks, embedding_model, persist_directory=persist_directory)
    db.persist()
    return db


## Summarization and Search Tools

In [None]:
# This function is called when the user requests a summary of the video transcript.
def summarize_text(transcript):
    try:
        llm = ChatOpenAI(openai_api_key=openai_api_key, model_name="gpt-4", temperature=0.3)
        prompt = f"""
        Summarize the following video transcript in a concise and informative way.

        Transcript:
        {transcript}

        Summary:
        """
        response = llm.invoke(prompt)

        # Check if the response has content and return it, otherwise return a string representation of the response
        return response.content if hasattr(response, "content") else str(response)
    except Exception as e:
        return "‚ö†Ô∏è Summarization failed: " + str(e)

# This function is called when the user asks a question about the video transcript.
def search_transcript(question):
    # Search the vectorstore for relevant chunks
    vectorstore = cl.user_session.get("vectorstore")
    # Check if the vectorstore is available
    if vectorstore is None:
        return "‚ùå Vectorstore is not available. Please upload a YouTube video first."
    # Perform a similarity search to find the most relevant chunks
    retrieved_chunks = vectorstore.similarity_search(question, k=3)
    return "\n\n".join([doc.page_content for doc in retrieved_chunks])


## Create LangChain Agent 

In [None]:
# This function initializes the LangChain agent with the summarization and search tools.
def build_agent(vectorstore, transcript):
    # Wrap the summarize function to use the full transcript from the session
    def summarize_wrapper(_):
        transcript = cl.user_session.get("full_transcript")
        return summarize_text(transcript)
    
    # Wrap the search function to use the vectorstore from the session
    def search_wrapper(question):
        if vectorstore is None:
            return "‚ùå Vectorstore is not available. Please upload a YouTube video first."
        retrieved_chunks = vectorstore.similarity_search(question, k=3)
        return "\n\n".join([doc.page_content for doc in retrieved_chunks])

    # Create the tools for the agent
    # Summarization tool
    summarize_tool = Tool(
        name="Summarizer",
        func=summarize_wrapper,
        description="Use this tool to summarize the transcript that extracted from the video in short sentences."
    )

    # Search tool
    search_tool = Tool(
        name="Search",
        func=search_wrapper,
        description="Use this tool to search for answers from the video transcript,\
            if the question is not related to the video transcript response with:\
            This question isn't related to the video, ask another question."
    )

    # Concatenate the tools into a list
    tools = [summarize_tool, search_tool]

    # Initialize the agent with the tools and the LLM
    agent = initialize_agent(
        tools=tools,
        llm=ChatOpenAI(openai_api_key=openai_api_key, model_name="gpt-4"),
        agent="zero-shot-react-description",
        verbose=True
    )
    return agent


## Chainlit Chatbot Logic

In [None]:
@cl.on_chat_start
async def start():
    # Clean up any existing session resources
    clean_session()             
    # Create a new session directory
    create_new_session()  

    await cl.Message(content="üëã Hello! Please enter the YouTube URL to process the video.").send()
    # Set the initial state of the user session
    cl.user_session.set("state", "awaiting_link")
    # Set the initial agent 
    cl.user_session.set("agent", None)

# Handle user messages
@cl.on_message
async def handle_message(message: cl.Message):
    # Get the current state and agent from the user session
    state = cl.user_session.get("state")
    agent = cl.user_session.get("agent")

    if state == "awaiting_link":
        try:
            # Check if the message is a valid YouTube link
            if not message.content.startswith("https://www.youtube.com/watch?v="):
                await cl.Message(content="üö´ Please enter a valid YouTube link.").send()
                return

            url = message.content # Get the YouTube URL from the message
            #clean_session()
            #create_new_session()

            # Download the audio from the YouTube video
            await cl.Message(content="üîÑ Downloading audio from video...").send()
            audio_file = download_audio_from_youtube(url) 

            # Transcribe the audio file to text
            transcript, _ = await asyncio.to_thread(transcribe_audio, audio_file)
            cl.user_session.set("full_transcript", transcript)

            # Chunk the transcript into smaller pieces for embedding
            chunks = await asyncio.to_thread(tokenize_and_chunk_hf, transcript)

            # Create a new Chroma vectorstore and persist it
            db = await asyncio.to_thread(build_vectorstore, chunks)
            cl.user_session.set("vectorstore", db)

            # Create the agent with the vectorstore and transcript
            agent = build_agent(db, transcript)
            cl.user_session.set("agent", agent)
            cl.user_session.set("state", "ready_for_questions")

            # Send a message to the user indicating that the video has been processed
            await cl.Message(content="‚úÖ Video processed! Ask your question by text or voice.").send()

        except Exception as e:
            # Handle any exceptions that occur during the download or transcription process
            await cl.Message(content=f"‚ùå Error processing video: {str(e)}").send()

    elif state == "ready_for_questions":
        try:
            await cl.Message(content="ü§î Thinking...").send()
            # Use the agent to answer the question
            response = agent.invoke(message.content) 

            # Check if the response is a string or a dictionary and handle accordingly
            if isinstance(response, dict) and "output" in response:
                await cl.Message(content=response["output"]).send()
            elif hasattr(response, "content"):
                await cl.Message(content=response.content).send()
            else:
                await cl.Message(content=str(response)).send()

        except Exception as e:
            # Handle any exceptions that occur during the agent invocation
            await cl.Message(content=f"‚ö†Ô∏è Error answering: {str(e)}").send()
    else:
        # If the state is not recognized, prompt the user to enter a YouTube link
        await cl.Message(content="üö´ Please enter a YouTube link first.").send()



## Voice Input Handling Section

In [None]:
@cl.on_audio_start
# This function is called when the audio recording starts
async def on_audio_start():
    await cl.Message(content="üé§ Start recording...").send()
    return True


@cl.on_audio_chunk
# This function is called when an audio chunk is received
async def on_audio_chunk(chunk: cl.InputAudioChunk):
    # Check if the chunk is the start of a new audio recording
    if chunk.isStart:
        buffer = BytesIO()
        extension = "wav"
        # Check if the chunk has a mimeType and extract the extension from it
        # If the mimeType is not empty and contains a "/", split it to get the extension
        if chunk.mimeType and "/" in chunk.mimeType:
            parts = chunk.mimeType.split("/")
            # Check if the mimeType is "audio/wav" or "audio/mp3" and set the extension accordingly
            if len(parts) > 1: # Check if there are at least two parts after splitting
                extension = parts[1]
        buffer.name = f"input_audio.{extension}"
        # Set the buffer and mimeType in the user session
        # Store the buffer in the user session for later use
        cl.user_session.set("audio_buffer", buffer)
        cl.user_session.set("audio_mime_type", chunk.mimeType)

    # Check if the chunk is not the start of a new audio recording
    buffer = cl.user_session.get("audio_buffer")
    if buffer: # If the buffer exists in the user session
        # Write the audio data to the buffer
        buffer.write(chunk.data)

@cl.on_audio_end
# This function is called when the audio recording ends
async def on_audio_end(elements: list[Element] = None):
    elements = elements or [] # Default to an empty list if no elements are provided

    audio_buffer: BytesIO = cl.user_session.get("audio_buffer")
    if not audio_buffer: # If the audio buffer is not available in the user session
        await cl.Message(content="‚ö†Ô∏è Doesn't recording anything, Please try again ").send()
        return

    # Reset the buffer position to the beginning
    # Read the audio data from the buffer and set the mimeType in the user session
    # Set the mimeType in the user session for later use
    audio_buffer.seek(0)
    audio_file = audio_buffer.read()
    audio_mime_type: str = cl.user_session.get("audio_mime_type") or "audio/wav"

    # Create a tuple with the audio file name, bytes, and mimeType
    whisper_input = (audio_buffer.name, audio_file, audio_mime_type)
    # Transcribe the audio using Whisper     
    transcription = await speech_to_text(whisper_input)

    # Display the transcription in the chat
    await cl.Message(
        author="You",
        type="user_message",
        content=transcription
    ).send()

    # Set the state in the user session to indicate that the user can ask questions
    cl.user_session.set("state", "ready_for_questions")

    # Create a new message object with the transcription
    msg = cl.Message(author="You", content=transcription, elements=[]) 
    # Set the message type to "user_message" and add it to the elements list
    await handle_message(message=msg) 

# This function is called to transcribe the audio using Whisper
async def speech_to_text(whisper_input):
    file_name, file_bytes, mime_type = whisper_input

    os.makedirs("saved_audio", exist_ok=True) # Create the directory if it doesn't exist
    # Generate a temporary file name for the audio file
    temp_path = f"saved_audio/{file_name}"

    # Write the audio bytes to a temporary file
    audio_data = np.frombuffer(file_bytes, dtype=np.int16)
    sf.write(temp_path, audio_data, samplerate=16000, format='WAV', subtype='PCM_16')

    # Transcribe the audio file using Whisper
    vc_transcript = whisper_model.transcribe(temp_path)["text"]

    return vc_transcript