## **1. Setup**

The project requires several packages that need to be installed into Workspace:

- Langchain: is a framework for developing generative Al applications.
- yt_dip: lets you download YouTube vide
- tiktoken: converts text into tokens.
- docarray: makes it easier to work with multi-model data (in this case mixing audio and text).

In [40]:
# !pip install -r requirements.txt

In [41]:
#!pip install openai

In [42]:
#!pip install python-dotenv

In [43]:
#!pip install -U langchain-community

In [44]:
#!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu

In [45]:
#!pip install transformers soundfile langchain docarray

In [46]:
#!pip install librosa


In [47]:
import yt_dlp as youtube_dl
from yt_dlp import DownloadError
import os
import glob
import torch
import soundfile as sf
import librosa
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from langchain.document_loaders import TextLoader
from langchain.vectorstores import DocArrayInMemorySearch
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.agents import Tool, initialize_agent
from langchain.memory import ConversationBufferMemory
from langchain_core.chat_history import InMemoryChatMessageHistory
from dotenv import load_dotenv


### **YouTube Audio Extraction & MP3 Conversion**
This snippet specifies an output folder and a target YouTube URL, then configures yt_dlp to:

1. Download the highest-quality audio stream available.

2. Use FFmpeg to extract and convert that stream into a 192 kbps MP3.

3. Name the resulting file after the video’s title.

4. Enable verbose logging so you can see detailed progress.

It wraps the download call in a try/except block to catch and report any DownloadError that might occur.

In [48]:
output_dir = "audio_files/"
youtube_url ="https://youtu.be/4h9lQfYLOZU?si=4Z4RCfJjaAdAp2e1"

In [49]:
# Configuration for yt_dlp
def download_audio(url: str) -> str:
    
    ydl_config = {
        "format": "bestaudio/best",
        # List of post-processing steps; each dict represents one processor
        "postprocessors": [
            {
                "key": "FFmpegExtractAudio",   # Extract audio from the downloaded file
                "preferredcodec": "mp3",       # Convert audio to MP3 format
                "preferredquality": "192"      # Set audio quality to 192 kbps
            }
        ],
        # Template for naming the output file: "<video title>.<extension>"
        "outtmpl": "output/%(title)s.%(ext)s",
        "verbose": True  # Enable detailed logging
    }

    try:
        # Initialize the downloader with the specified config and start download
        with youtube_dl.YoutubeDL(ydl_config) as ydl:
            info = ydl.extract_info(url, download=True)
    except DownloadError as e:
        # Print any download errors that occur
        print("DownloadError:", e)
    
    return f"Downloaded: output_audio/{info['id']}.mp3"

### **Batch Transcription of MP3 Files with Whisper**

1. Detects whether to run on GPU (CUDA) or CPU and sets the optimal PyTorch data type for memory efficiency.

2. Loads the openai/whisper-large-v3 model (using safetensors for faster, lower-memory loading) and moves it to the chosen device.

3. Loads the accompanying processor (tokenizer + feature extractor).

4. Builds an ASR pipeline around that model + processor.

5. Uses glob to collect all .mp3 files in the specified output_dir and validates that at least one file exists.

6. Defines a single transcript output path (files/transcripts/transcript.txt) and ensures its folder is created.

7. Iterates over each MP3, reads the audio into an array, runs the ASR pipeline to generate text, and writes the resulting transcript (overwriting on each loop) to the designated text file.

In [50]:
# # 1. Figure out if I can use GPU
# device = "cuda:0" if torch.cuda.is_available() else "cpu"
# #    and choose torch dtype to save memory on GPU
# torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

# # 2. Pick the Whisper model I want
# model_id = "openai/whisper-small-v3"

# # 3. Load the model weights (using safetensors to speed things up)
# model = AutoModelForSpeechSeq2Seq.from_pretrained(
#     model_id,
#     torch_dtype=torch_dtype,
#     low_cpu_mem_usage=True,
#     use_safetensors=True
# )
# # move the model to the right device
# model.to(device)


In [51]:
# # 4. Load the processor that has both tokenizer & feature extractor
# processor = AutoProcessor.from_pretrained(model_id)


In [52]:
# # 5. Build the ASR pipeline with my model and processor
# pipe = pipeline(
#     "automatic-speech-recognition",
#     model=model,
#     tokenizer=processor.tokenizer,
#     feature_extractor=processor.feature_extractor,
#     torch_dtype=torch_dtype,
#     device=device,
# )

In [53]:
print("Current working dir:", os.getcwd())
print("ffmpeg exists:", os.path.exists(r"C:\ffmpeg\ffmpeg-7.1.1-full_build\bin\ffmpeg.exe"))
print("ffprobe exists:", os.path.exists(r"C:\ffmpeg\ffmpeg-7.1.1-full_build\bin\ffprobe.exe"))

Current working dir: d:\Ironhack\Final_project\Multimodal-AI-ChatBot-for-YouTube-Video-QA
ffmpeg exists: True
ffprobe exists: True


In [None]:
from pydub import AudioSegment
from openai import OpenAI
import ffmpeg

# Load your OpenAI API Key
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


# AudioSegment.converter = r"C:\ffmpeg\ffmpeg-7.1.1-full_build\bin\ffmpeg.exe"
# AudioSegment.ffprobe = r"C:\ffmpeg\ffmpeg-7.1.1-full_build\bin\ffprobe.exe"



AudioSegment.ffmpeg = r"C:\ffmpeg\ffmpeg-7.1.1-full_build\bin\ffmpeg.exe"
AudioSegment.converter = r"C:\ffmpeg\ffmpeg-7.1.1-full_build\bin\ffmpeg.exe"




# Paths
input_folder = "audio_files/"
chunk_folder = "chunks"
output_file = "files/transcripts/final_transcript.txt"

os.makedirs(chunk_folder, exist_ok=True)
os.makedirs(os.path.dirname(output_file), exist_ok=True)

# Settings
chunk_length = 20 * 60 * 1000  # 20 minutes in milliseconds

# Find MP3 files
audio_files = glob.glob(os.path.join(input_folder, "*.mp3"))
if not audio_files:
    raise ValueError("❌ No MP3 files found in the folder!")

# Process files one by one
for audio_path in audio_files:
    base_name = os.path.splitext(os.path.basename(audio_path))[0]
    print(f"\n🎧 Processing: {base_name}")

    # Load audio file
    audio = AudioSegment.from_file(audio_path)
    total_length = len(audio)
    num_chunks = (total_length // chunk_length) + 1

    for i in range(num_chunks):
        start = i * chunk_length
        end = min(start + chunk_length, total_length)
        chunk = audio[start:end]

        chunk_filename = f"{base_name}_chunk_{i}.mp3"
        chunk_path = os.path.join(chunk_folder, chunk_filename)
        chunk.export(chunk_path, format="mp3")

        print(f"🔹 Transcribing chunk {i+1}/{num_chunks}...")

        # Transcribe
        with open(chunk_path, "rb") as f:
            response = client.audio.transcriptions.create(
                file=f,
                model="whisper-1"
            )

        # Save transcript
        with open(output_file, "a", encoding="utf-8") as out:
            out.write(f"## {chunk_filename}\n")
            out.write(response.text + "\n\n")

    print(f"✅ Done with {base_name}")

print(f"\n📄 Full transcript saved to: {output_file}")



🎧 Processing: IELTS Listening Practice ｜ What You NEED to Know!
🔹 Transcribing chunk 1/3...
🔹 Transcribing chunk 2/3...
🔹 Transcribing chunk 3/3...
✅ Done with IELTS Listening Practice ｜ What You NEED to Know!

🎧 Processing: IELTS Vocabulary Synonyms for the 40 Most Commonly Used Words
🔹 Transcribing chunk 1/5...
🔹 Transcribing chunk 2/5...
🔹 Transcribing chunk 3/5...
🔹 Transcribing chunk 4/5...
🔹 Transcribing chunk 5/5...
✅ Done with IELTS Vocabulary Synonyms for the 40 Most Commonly Used Words

📄 Full transcript saved to: files/transcripts/final_transcript.txt


### **Load Transcript File into LangChain Documents** 

Uses LangChain’s TextLoader to read transcript.txt and convert it into a list of Document objects for downstream processing.

In [55]:
# Create a new instance of the TextLoader class, specifying the directory containing the text files
loader = TextLoader("./files/transcripts/transcript.txt")

# Load the documents from the specified directory using the TextLoader instance
docs = loader.load()

In [56]:
docs[0]

Document(metadata={'source': './files/transcripts/transcript.txt'}, page_content="Hi everyone, Chris here from IELTSadvantage.com with another lesson, and today what we're gonna focus on is how to practice IELTS listening. So what we're gonna do is look at why doing lots of practice tests is a terrible idea. In fact, this is the worst thing you could do. If you think that just doing lots and lots and lots of practice tests is gonna help you get a higher score, you are wrong. But what then I'm gonna show you is three better ways to practice that will actually improve your scores because at the end of the day, what are we doing? We're helping you improve your scores. So we're only gonna teach you the things that work and make you aware of the things that don't work. And these three ways are totally free and you can do them at home by yourself without a teacher. So they're absolutely brilliant. So number one, why doing lots of practice tests is a terrible idea. Well, the first thing is th

In [57]:
from langchain.text_splitter import CharacterTextSplitter
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=100)
docs = text_splitter.split_documents(docs)

In [58]:
docs

[Document(metadata={'source': './files/transcripts/transcript.txt'}, page_content="Hi everyone, Chris here from IELTSadvantage.com with another lesson, and today what we're gonna focus on is how to practice IELTS listening. So what we're gonna do is look at why doing lots of practice tests is a terrible idea. In fact, this is the worst thing you could do. If you think that just doing lots and lots and lots of practice tests is gonna help you get a higher score, you are wrong. But what then I'm gonna show you is three better ways to practice that will actually improve your scores because at the end of the day, what are we doing? We're helping you improve your scores. So we're only gonna teach you the things that work and make you aware of the things that don't work. And these three ways are totally free and you can do them at home by yourself without a teacher. So they're absolutely brilliant. So number one, why doing lots of practice tests is a terrible idea. Well, the first thing is t

### **Build & Configure the RetrievalQA Pipeline**



1. Creates a DocArrayInMemorySearch index from your docs, embedding each with OpenAI’s embeddings API.



In [59]:
# Load .env into environment
_ = load_dotenv()


In [60]:
print("API KEY:", os.getenv("OPENAI_API_KEY"))


API KEY: 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


In [61]:
#Create a new DoCArrayInMenorySearch Instance from the Specified documents and embeddings
db = DocArrayInMemorySearch.from_documents(
docs,
OpenAIEmbeddings())
# embeddings = OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY"))
# from langchain.vectorstores import Chroma
# db= Chroma.from_documents(
#     docs,
#     embeddings,
#     collection_name="ielts-rag",
#     persist_directory="./chroma_db")

2. Converts that index into a retriever for semantic search.

3. Instantiates a ChatOpenAI model with zero temperature for deterministic responses.

In [62]:
#Convert DocArrayInMemorySearch instance to a retriever
retriever = db.as_retriever()

#Create a new chatOpenAi 
llm = ChatOpenAI(temperature = 0.0)

4. Builds a RetrievalQA chain (using the “stuff” strategy) that ties together the LLM and retriever, with verbose=True to print intermediate debug info.

In [77]:
# Create a new RetrievalQA instance with the specified parameters
qa_stuff = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever= retriever,
return_source_documents=True,
verbose=True)


In [78]:
#qa_stuff?

### **Multi-Tool Conversational Agent with Memory**

Sets up three custom tools (transcript Q&A, summarization, and YouTube audio download), configures a conversation buffer to remember past messages, and initializes a LangChain agent that can use these tools interactively while preserving chat history.

In [79]:
#define a summarization function and wrap it as a tool
def summarize_transcript(text: str) -> str:
    output = qa_stuff({"query": f"Summarize this:\n\n{text}"})
    return output["result"]  


def run_TranscriptQA(symptoms: str) -> str:
    result = qa_stuff.invoke({"query": symptoms})
    return result["result"]

In [80]:
tools = [
    Tool(
    name="TranscriptQA",
    func=run_TranscriptQA,
    description="Answer questions based on the transcripts."
), 
         
    Tool(
    name="TranscriptSummarizer",
    func=summarize_transcript,
    description="Generate a concise summary of a given transcript text."
)
]

In [67]:
#et up a downloader tool to grab YouTube audio and convert it to MP3
# downloader_tool = Tool(
#     name="YouTubeDownloader",
#     func=download_audio,
#     description="Download and convert a YouTube URL to an MP3 file."
# )


In [81]:
session_store = {}

def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
    """Ensures each session has its own message history"""
    if session_id not in session_store:
        session_store[session_id] = InMemoryChatMessageHistory()
    return session_store[session_id]

In [82]:
import uuid
session_id = str(uuid.uuid4())  # e.g. "4f9b8a2e-1c3d-4f5a-9e6b-7d8f0a1b2c3d"

#configure a memory buffer to keep the full conversation history
memory = get_session_history(session_id)

In [83]:
chat_history = InMemoryChatMessageHistory()

memory = ConversationBufferMemory(
    memory_key="chat_history",
    chat_memory=chat_history,
    return_messages=True
)


In [84]:
# I initialize the agent with my tools, the LLM, and the memory buffer
agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent="chat-conversational-react-description",
    memory=memory,
    handle_parsing_errors=True,
    verbose=True
)

In [None]:
agent?

### **Execute RetrievalQA Query & Display Answer**
1. Defines the user’s question as the query string.

2. Runs that query through the previously configured RetrievalQA chain (qa_stuff), which retrieves relevant passages and then generates an answer.

3. Prints out the final response text to the console.

In [85]:
# set the query
query = "What is this video about?"

#run the query 
response = agent.invoke(query)

#print response
print(response)




[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m```json
{
    "action": "TranscriptSummarizer",
    "action_input": "Provide the transcript of the video for summarization."
}
```[0m

[1m> Entering new RetrievalQA chain...[0m


  output = qa_stuff({"query": f"Summarize this:\n\n{text}"})



[1m> Finished chain.[0m

Observation: [33;1m[1;3mThe video discusses the ineffective practice of doing lots of IELTS listening practice tests and offers three better ways to improve listening skills and scores. The first point highlights that doing numerous practice tests is boring and does not lead to improvement. The second point emphasizes the importance of actively listening and focusing on specific areas like vocabulary, accent, or intonation. The third point suggests reflecting on mistakes made in genuine practice tests to identify patterns and take corrective action. The video encourages listening for pleasure, active listening, and reflective listening as effective strategies for enhancing listening skills and IELTS performance.[0m
Thought:[32;1m[1;3m```json
{
    "action": "Final Answer",
    "action_input": "The video discusses the ineffective practice of doing lots of IELTS listening practice tests and offers three better ways to improve listening skills and scores: 

In [88]:
query = "2+2"

#run the query 
response = agent.run(query)

#print response
print(response)



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m```json
{
    "action": "Final Answer",
    "action_input": "The result of 2+2 is 4."
}
```[0m

[1m> Finished chain.[0m
The result of 2+2 is 4.
