In [2]:
from dotenv import load_dotenv
load_dotenv()

import os

if not os.environ.get("GOOGLE_API_KEY"):
    raise RuntimeError("Please set the GOOGLE_API_KEY environment variable with your Google API key.")

import gradio as gr
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_core.prompts import PromptTemplate
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint, ChatHuggingFace
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
from langchain_google_genai import GoogleGenerativeAIEmbeddings

# Initialize the text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", " ", ".", ","],
    length_function=len,
    is_separator_regex=False
)

embeddings = GoogleGenerativeAIEmbeddings(
    model="models/gemini-embedding-exp-03-07",
    task_type="RETRIEVAL_DOCUMENT"  # optionally specify task type
)

chat = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",
    temperature=0.3,
    max_tokens=1024,
    timeout=None,
    max_retries=2
)

# Define the prompt template
prompt = PromptTemplate(
    template="""
      You are a helpful assistant.
      Answer ONLY from the provided transcript context.
      If the context is insufficient, just say you don't know.

      {context}
      Question: {question}
    """,
    input_variables=['context', 'question']
)

# Global variable to store the current retriever
current_retriever = None
current_video_id = None

def extract_video_id(url):
    """Extract video ID from YouTube URL."""
    if "youtube.com/watch?v=" in url:
        return url.split("watch?v=")[1].split("&")[0]
    elif "youtu.be/" in url:
        return url.split("youtu.be/")[1].split("?")[0]
    return url  # Assume it's already a video ID

def process_video_url(video_url_or_id):
    """Process video URL and create retriever object."""
    global current_retriever, current_video_id
    
    try:
        # Extract video ID if URL is provided
        video_id = extract_video_id(video_url_or_id)
        
        # Check if we already have a retriever for this video
        if current_video_id == video_id and current_retriever is not None:
            return f"✅ Video already processed: {video_id}"
        
        # Get transcript
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        print(transcript)
        # Extract text segments
        list_of_text_segments = [item['text'] for item in transcript]
        full_transcript_text = " ".join(list_of_text_segments)
        print(full_transcript_text)
        # Create chunks
        chunks = text_splitter.create_documents([full_transcript_text])
        print("chunks", chunks)
        # Create vector store
        vector_store = FAISS.from_documents(chunks, embeddings)
        print("vector_store", vector_store) 
        current_retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 4})
        print("current_retriever", current_retriever)
        current_video_id = video_id
        
        return f"✅ Video processed successfully: {video_id}"
    
    except Exception as e:
        return f"❌ Error processing video: {str(e)}"

def answer_question(question):
    """Answer question using the current retriever."""

    try:
        # Retrieve relevant documents
        retrieved_docs = current_retriever.invoke(question)
        context_text = "\n\n".join(doc.page_content for doc in retrieved_docs)
        
        # Generate answer
        final_prompt = prompt.invoke({"context": context_text, "question": question})
        answer = chat.invoke(final_prompt)
        
        return answer.content
    
    except Exception as e:
        return f"❌ Error answering question: {str(e)}"

def process_video(video_url_or_id, question):
    """Legacy function for backward compatibility."""
    # Process video first
    process_result = process_video_url(video_url_or_id)
    if "❌" in process_result:
        return process_result
    
    # Then answer question
    return answer_question(question)

if __name__ == "__main__":
    process_video_url("https://www.youtube.com/watch?v=JaRGJVrJBQ8")
    print(answer_question("What is the main topic of the video?"))

[{'text': 'Welcome to Huberman Lab Essentials,', 'start': 0.24, 'duration': 4.48}, {'text': 'where we revisit past episodes for the', 'start': 2.32, 'duration': 4.64}, {'text': 'most potent and actionable science-based', 'start': 4.72, 'duration': 4.24}, {'text': 'tools for mental health, physical', 'start': 6.96, 'duration': 4.96}, {'text': 'health, and performance.', 'start': 8.96, 'duration': 5.12}, {'text': 'And now, my discussion with Dr. Matt', 'start': 11.92, 'duration': 5.04}, {'text': "Walker. Let's start off very basic. What", 'start': 14.08, 'duration': 7.279}, {'text': 'is sleep? Sleep is probably the single', 'start': 16.96, 'duration': 6.399}, {'text': 'most effective thing you can do to reset', 'start': 21.359, 'duration': 4.08}, {'text': 'your brain and body health. Sleep as a', 'start': 23.359, 'duration': 5.68}, {'text': 'process though is an incredibly complex', 'start': 25.439, 'duration': 6.401}, {'text': 'physiological ballet. Sleep is broadly', 'start': 29.039, '