In [53]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### ✅Install Required Packages

In [1]:
!pip install -U openai-whisper gradio yt-dlp python-dotenv \
langchain langchain-community langchain-openai duckduckgo-search \
chromadb tiktoken

Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/800.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.5/800.5 kB[0m [31m20.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting gradio
  Downloading gradio-5.29.0-py3-none-any.whl.metadata (16 kB)
Collecting yt-dlp
  Downloading yt_dlp-2025.4.30-py3-none-any.whl.metadata (173 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.3/173.3 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Collecting langchain
  Downloading langchain-0.3.25-py3-none-any.whl.metadata (7.8 kB)
Collecting langchain-community
  Dow

In [2]:
pip install -U langsmith openai

Collecting langsmith
  Downloading langsmith-0.3.42-py3-none-any.whl.metadata (15 kB)
Collecting openai
  Downloading openai-1.77.0-py3-none-any.whl.metadata (25 kB)
Downloading langsmith-0.3.42-py3-none-any.whl (360 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m360.3/360.3 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading openai-1.77.0-py3-none-any.whl (662 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m662.0/662.0 kB[0m [31m34.4 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: Operation cancelled by user[0m[31m
[0m^C




### ✅  Import All Required Libraries

In [6]:
import os
import math
import tempfile
import hashlib
import shutil
import traceback
import functools
from getpass import getpass
from dotenv import load_dotenv
from typing import Tuple
from pydub import AudioSegment
import getpass
import gradio as gr
from concurrent.futures import ThreadPoolExecutor


# Whisper + yt_dlp
import whisper
import yt_dlp

# LangChain
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains.question_answering import load_qa_chain
from langchain.chains.summarize import load_summarize_chain
from langchain.chains import RetrievalQA
from langchain.agents import initialize_agent, Tool
from langchain.agents.agent_types import AgentType
from langchain.callbacks import LangChainTracer
from langchain.memory import ConversationBufferMemory


##⚙️ Make sure ffmpeg is found

In [7]:

from pydub import AudioSegment
import shutil

AudioSegment.converter = shutil.which("ffmpeg")
print("FFmpeg path:", AudioSegment.converter)


FFmpeg path: /usr/bin/ffmpeg


### 🔐  Secure API Keys for OpenAI and LangSmith

In [10]:
import os
from getpass import getpass

# Prompt user for API keys securely
os.environ["OPENAI_API_KEY"] = getpass("🔑 Enter your OpenAI API key: ")
os.environ["LANGCHAIN_API_KEY"] = getpass("🧠 Enter your LangSmith API key: ")

# Enable LangSmith Tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "pr-prickly-cobbler-4"
os.environ["LANGSMITH_ENDPOINT"] = "https://api.smith.langchain.com"

print("✅ LangSmith tracing is enabled and ready!")


🔑 Enter your OpenAI API key: ··········
🧠 Enter your LangSmith API key: ··········
✅ LangSmith tracing is enabled and ready!


In [4]:
import os
os.environ["LANGCHAIN_PROJECT"] = "default"


# #🔧 Environment & Application Configuration


In [11]:
# Load environment variables
load_dotenv()

# Configuration
class Config:
    AUDIO_DIR = "downloads"
    DB_DIR = "db"
    MODEL_NAME = "gpt-4-1106-preview"
    WHISPER_MODEL = "tiny"
    CHUNK_SIZE = 1500
    CHUNK_OVERLAP = 150
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    SUPPORTED_LANGUAGES = {
        "ar": "Arabic",
        "en": "English",
        "zh": "Chinese"
    }

# 🎙️ Whisper Model Loader


In [12]:
WHISPER_MODEL_INSTANCE = None

def load_whisper_model():
    global WHISPER_MODEL_INSTANCE
    if WHISPER_MODEL_INSTANCE is None:
        WHISPER_MODEL_INSTANCE = whisper.load_model(Config.WHISPER_MODEL)
    return WHISPER_MODEL_INSTANCE

In [13]:
@functools.lru_cache()
def load_whisper_model():
    return whisper.load_model("tiny")

### 📥 Audio Downloader from YouTube

In [14]:
# Downloads audio from a YouTube video using yt_dlp.
# Uses caching to avoid re-downloading the same video more than once.
@functools.lru_cache(maxsize=32)
def download_audio(url: str) -> str:
    os.makedirs(Config.AUDIO_DIR, exist_ok=True)
    video_id = hashlib.md5(url.encode()).hexdigest()
    audio_path = os.path.join(Config.AUDIO_DIR, f"{video_id}.mp3")
# Return cached file if it already exists
    if os.path.exists(audio_path):
        return audio_path

    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': audio_path.replace('.mp3', '.%(ext)s'),
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '8',
        }],
        'quiet': True,
        'no_warnings': True
    }
# Download and convert to mp3
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])
    return audio_path


#🧠  Audio Transcription Utility
# 🧠 Transcribe audio with smart chunking and optional language setting
def transcribe_audio_auto(path: str, language: str = "auto", chunk_length_min: int = 10) -> str:
    """
    Transcribes audio intelligently:
    - Uses full transcription for short audio
    - Splits long audio into chunks and combines results
    - Supports language selection (auto/ar/en/zh)
    """
    model = load_whisper_model()
    audio = AudioSegment.from_file(path)
    duration_min = len(audio) / (60 * 1000)

    def run_transcription(audio_path):
        if language == "auto":
            return model.transcribe(audio_path, fp16=False)
        else:
            return model.transcribe(audio_path, fp16=False, language=language)

    # If short, transcribe directly
    if duration_min <= chunk_length_min:
        result = run_transcription(path)
        return result["text"]

        # 🔄 Chunking Long Audio Files
    # If long, split into smaller chunks and process each one
    chunk_ms = chunk_length_min * 60 * 1000
    total_chunks = math.ceil(len(audio) / chunk_ms)
    transcript = []

    for i in range(total_chunks):
        start = i * chunk_ms
        end = min((i + 1) * chunk_ms, len(audio))
        chunk_audio = audio[start:end]



        with tempfile.NamedTemporaryFile(suffix=".mp3", delete=True) as tmp:
            chunk_audio.export(tmp.name, format="mp3")
            tmp.flush()  # Ensure data is written

            # Print size and duration for debug
            size_mb = os.path.getsize(tmp.name) / (1024 * 1024)
            print(f"🔹 Chunk {i+1}/{total_chunks} | Size: {size_mb:.2f} MB | Duration: {(end - start) / 60000:.2f} min")

            result = run_transcription(tmp.name)
            transcript.append(result["text"])

            print(f"✅ Transcribed chunk {i+1}/{total_chunks}")

    return "\n".join(transcript)


###  🤖 Question Answering Agent Using LangChain and Transcript Search


In [15]:
def get_agent_answer(llm, retriever, question: str) -> str:
   # Define a single tool that searches relevant text from the transcript
    tools = [
        Tool(
            name="VideoContextSearch",
            func=lambda q: "\n".join([doc.page_content for doc in retriever.get_relevant_documents(q)]),
            description="Search the video transcript"
        )
    ]

    # Set up LangChain tracer to track the agent's reasoning steps (useful for
    tracer = LangChainTracer()

    # Initialize the agent using zero-shot reasoning with the defined tools
    agent = initialize_agent(
        tools=tools,
        llm=llm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        callbacks=[tracer],
        verbose=False
    )
    # Run the agent on the user’s question and return the result
    return agent.run(question, callbacks=[tracer])

##🌐 Get Language Prompt Based on User’s Language Code

In [16]:
# 🌐 Generate Language-Specific Prompt

# Returns a prompt telling the model to respond in the selected language.
# Defaults to English if the language code is not recognized.
def get_language_prompt(lang_code):
    return f"Please respond in {Config.SUPPORTED_LANGUAGES.get(lang_code, 'English')}"


###🧠 Question Type Classifier

In [31]:
# 🧠 Question Type Classifier

# Categorizes the question into types for better analysis or routing logic

from langchain.chains.question_answering import load_qa_chain

def classify_question_type(question: str) -> str:
    question = question.lower()
    if any(word in question for word in ["how", "explain", "why"]):
        return "Explanatory"
    elif any(word in question for word in ["do you think", "opinion", "should"]):
        return "Opinion"
    elif any(word in question for word in ["how many", "what is the number", "amount", "percentage"]):
        return "Numerical"
    elif any(word in question for word in ["when", "date", "year"]):
        return "Historical"
    else:
        return "General"

 # 🎬 Full Video Processing Pipeline: Download → Transcribe → Summarize → Q&A

def process_video(url: str, questions: str, language: str) -> Tuple[str, str]:
    try:
        # 1. Download audio
        audio_path = download_audio(url)

        # 2. Transcribe (with auto chunking and language)
        transcript = transcribe_audio_auto(audio_path, language=language)
        print("🧾 Transcript (preview):", transcript[:300])

        # 3. Split transcript into chunks
        splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
        chunks = splitter.split_text(transcript)
        docs = [Document(page_content=chunk) for chunk in chunks]

        # 4. Load LLM and QA chain
        llm = ChatOpenAI(
            model_name=Config.MODEL_NAME,
            openai_api_key=Config.OPENAI_API_KEY
        )
        qa_chain = load_qa_chain(llm, chain_type="map_reduce")

        # 5. Generate summary
        summary = load_summarize_chain(llm, chain_type="stuff").run([Document(page_content=transcript)])

        # 6. Process each question
        questions_list = [q.strip() for q in questions.split('\n') if q.strip()]
        result_text = ""

        for q in questions_list:
            q_type = classify_question_type(q)
            answer = qa_chain.run(input_documents=docs, question=q)
            result_text += f"🔹 **Question**: {q}\n📂 **Type**: {q_type}\n💬 **Answer**: {answer}\n\n"

        return summary, result_text

    except Exception as e:
        import traceback
        traceback.print_exc()
        return "❌ An error occurred during processing.", str(e)




## Multilingual YouTube Q&A Interface using Gradio

In [51]:
#🎨 Custom CSS for Gradio Interface
# Function to create a multilingual Gradio UI interface
def create_interface():
    # Custom CSS styling for light blue theme and modern UI
    custom_css = """
    body {
        background: linear-gradient(to right, #dbeafe, #eff6ff);
        font-family: 'Segoe UI', sans-serif;
    }
    .gradio-container {
        max-width: 950px;
        margin: 2rem auto;
        border-radius: 20px;
        box-shadow: 0 6px 18px rgba(0, 0, 0, 0.08);
        background-color: #ffffffcc;
        backdrop-filter: blur(10px);
        padding: 25px;
    }
    .gr-button {
        background: linear-gradient(to right, #2563eb, #3b82f6) !important;
        color: white !important;
        font-weight: bold !important;
        border-radius: 10px !important;
        padding: 12px 28px !important;
        border: none !important;
        box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
    }
    .gr-textbox, .gr-dropdown {
        border-radius: 12px !important;
        padding: 10px !important;
        font-size: 15px !important;
    }
    h1 {
        text-align: center;
        color: #1e3a8a;
        margin-bottom: 20px;
    }
    """



    # 🌍 UI Text Labels for Multi-language Support
    # Text translations for each UI element by language
    UI_TEXTS = {
        "en": {
            "title": "🤖 Advanced YouTube Video Q&A Assistant",
            "language": "🌍 Language",
            "video_url": "🔗 YouTube Video URL",
            "questions": "📝 Your Questions (one per line)",
            "submit": "🚀 Start Analysis",
            "summary": "📄 Summary",
            "answers": "💬 Answers"
        },
        "ar": {
            "title": "🤖 مساعد متقدم للأسئلة والأجوبة لفيديوهات يوتيوب",
            "language": "🌍 اللغة",
            "video_url": "🔗 رابط فيديو يوتيوب",
            "questions": "📝 أسئلتك (كل سؤال في سطر)",
            "submit": "🚀 ابدأ التحليل",
            "summary": "📄 الملخص",
            "answers": "💬 الإجابات"
        },
        "zh": {
            "title": "🤖 高级 YouTube 视频问答助手",
            "language": "🌍 语言",
            "video_url": "🔗 YouTube 视频链接",
            "questions": "📝 你的问题（每行一个）",
            "submit": "🚀 开始分析",
            "summary": "📄 总结",
            "answers": "💬 答案"
        }
    }




    #🧩  Gradio UI Construction and Interactivity
    # Start building the UI
    with gr.Blocks(css=custom_css) as app:
        # Page title
        title = gr.Markdown(f"<h1>{UI_TEXTS['en']['title']}</h1>")

        # Input widgets
        language = gr.Dropdown(choices=["en", "ar", "zh"], value="en", label=UI_TEXTS["en"]["language"])
        url_input = gr.Textbox(label=UI_TEXTS["en"]["video_url"])
        questions_input = gr.Textbox(lines=6, label=UI_TEXTS["en"]["questions"])
        submit_btn = gr.Button(value=UI_TEXTS["en"]["submit"])

        # Output tabs
        with gr.Tabs():
            with gr.TabItem("📄 Summary") as tab1:
                summary_output = gr.Textbox(label=UI_TEXTS["en"]["summary"], lines=20, max_lines=40, show_copy_button=True)

            with gr.TabItem("💬 Answers") as tab2:
                answers_output = gr.Textbox(label=UI_TEXTS["en"]["answers"], lines=20, max_lines=40, show_copy_button=True)

        # Function to update all labels based on language selection
        def update_labels(lang):
            labels = UI_TEXTS[lang]
            title.update(f"<h1>{labels['title']}</h1>")
            language.label = labels["language"]
            url_input.label = labels["video_url"]
            questions_input.label = labels["questions"]
            submit_btn.value = labels["submit"]
            summary_output.label = labels["summary"]
            answers_output.label = labels["answers"]

        # Apply label updates when language changes
        language.change(fn=update_labels, inputs=language, outputs=[])

        # Call main process function on button click
        submit_btn.click(fn=process_video, inputs=[url_input, questions_input, language], outputs=[summary_output, answers_output])

    return app


##🚀 App Entry Point: Load Models, Create Folders, Launch UI

In [52]:
# 🚀 App Entry Point: Load Models, Create Folders, Launch UI

if __name__ == "__main__":
    # Pre-load the Whisper model (cached or global instance)
    load_whisper_model()

    # Ensure required directories exist
    os.makedirs(Config.AUDIO_DIR, exist_ok=True)
    os.makedirs(Config.DB_DIR, exist_ok=True)

    # Create and launch the Gradio web app
    app = create_interface()

    if app is not None:
        app.launch(share=True)  # share=True allows access via public link
    else:
        print("❌ Failed to create the interface. Please check the errors.")


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://6da7f9b19840d35769.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
