In [2]:
!pip install pinecone

Collecting pinecone
  Downloading pinecone-6.0.2-py3-none-any.whl.metadata (9.0 kB)
Collecting pinecone-plugin-interface<0.0.8,>=0.0.7 (from pinecone)
  Downloading pinecone_plugin_interface-0.0.7-py3-none-any.whl.metadata (1.2 kB)
Downloading pinecone-6.0.2-py3-none-any.whl (421 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m421.9/421.9 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pinecone_plugin_interface-0.0.7-py3-none-any.whl (6.2 kB)
Installing collected packages: pinecone-plugin-interface, pinecone
Successfully installed pinecone-6.0.2 pinecone-plugin-interface-0.0.7


In [3]:
!pip install groq youtube-transcript-api

Collecting groq
  Downloading groq-0.22.0-py3-none-any.whl.metadata (15 kB)
Collecting youtube-transcript-api
  Downloading youtube_transcript_api-1.0.3-py3-none-any.whl.metadata (23 kB)
Downloading groq-0.22.0-py3-none-any.whl (126 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m126.7/126.7 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading youtube_transcript_api-1.0.3-py3-none-any.whl (2.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m72.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: youtube-transcript-api, groq
Successfully installed groq-0.22.0 youtube-transcript-api-1.0.3


In [7]:
import os
import re
import numpy as np
import pinecone
from sentence_transformers import SentenceTransformer
from groq import Groq
import torch
from typing import List, Dict, Any, Optional, Tuple
from youtube_transcript_api import YouTubeTranscriptApi
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import spacy
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class Agent:
    """Base class for all agents in the system"""
    def __init__(self, name: str):
        self.name = name

    def process(self, *args, **kwargs):
        """Process method to be implemented by each agent"""
        raise NotImplementedError("Each agent must implement a process method")

    def log(self, message: str):
        """Simple logging method"""
        print(f"[{self.name}] {message}")

class EmbeddingAgent(Agent):
    """Agent responsible for text embeddings and preprocessing"""
    def __init__(self, model_name: str = "sentence-transformers/all-mpnet-base-v2"):
        super().__init__("Embedding Agent")

        # Check for GPU availability
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.log(f"Using device: {self.device}")

        # Initialize embedding model
        self.embedding_model = SentenceTransformer(model_name).to(self.device)

        # Initialize NLP tools with better error handling
        try:
            nltk.download('stopwords', quiet=True)
            nltk.download('wordnet', quiet=True)
            self.stop_words = set(stopwords.words('english'))
            self.lemmatizer = WordNetLemmatizer()
        except Exception as e:
            self.log(f"Warning: NLTK resource download issue. Error: {e}")
            self.stop_words = {'a', 'an', 'the', 'and', 'or', 'but', 'is', 'are', 'was', 'were', 'to', 'of', 'in', 'for'}
            self.lemmatizer = None

        # Load spaCy model with fallback
        try:
            self.nlp = spacy.load("en_core_web_sm")
        except:
            self.log("Warning: spaCy model 'en_core_web_sm' not found. Using a simple pipeline.")
            self.nlp = spacy.blank("en")

    def simple_tokenize(self, text):
        """Simple tokenizer that avoids NLTK's punkt."""
        text = re.sub(r'[^\w\s]', ' ', text)
        return [token for token in text.lower().split() if token]

    def preprocess_text(self, text):
        """NLP preprocessing: stopword removal, lemmatization."""
        if not text or not isinstance(text, str):
            return "", {}

        text = re.sub(r'[^\w\s]', ' ', text).lower()
        tokens = self.simple_tokenize(text)
        filtered_tokens = [word for word in tokens if word not in self.stop_words]

        # Use lemmatizer if available
        if self.lemmatizer:
            lemmatized_tokens = [self.lemmatizer.lemmatize(word) for word in filtered_tokens]
        else:
            lemmatized_tokens = filtered_tokens

        processed_text = ' '.join(lemmatized_tokens)
        return processed_text, {}

    def process(self, text: str) -> List[float]:
        """Generate embedding for the given text."""
        if not text or not isinstance(text, str):
            self.log("Warning: Empty or invalid text received for embedding")
            return [0.0] * self.embedding_model.get_sentence_embedding_dimension()

        # Apply preprocessing
        processed_text, _ = self.preprocess_text(text)

        # Ensure we have text to embed
        if not processed_text:
            processed_text = text  # Fall back to original text

        # Generate and return embedding
        self.log(f"Generating embedding for: {text[:50]}...")
        return self.embedding_model.encode(processed_text).tolist()

class RetrievalAgent(Agent):
    """Agent responsible for retrieving relevant information from the vector database"""
    def __init__(self, pinecone_api_key: str, index_name: str):
        super().__init__("Retrieval Agent")

        # Initialize Pinecone client
        self.pc = pinecone.Pinecone(api_key=pinecone_api_key)
        self.index = self.pc.Index(index_name)

    def process(self, query_embedding: List[float], top_k: int = 1,
                video_id: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Retrieve the most relevant chunks from Pinecone based on the query embedding.
        Optionally filter by video ID.
        """
        # Prepare query parameters
        query_params = {
            "vector": query_embedding,
            "top_k": top_k,
            "include_metadata": True
        }

        # Add video ID filter if provided
        if video_id:
            query_params["filter"] = {"video_id": video_id}

        # Query Pinecone index using similarity search
        try:
            self.log(f"Querying Pinecone for top {top_k} matches" +
                    (f" filtered by video_id: {video_id}" if video_id else ""))
            query_response = self.index.query(**query_params)
        except Exception as e:
            self.log(f"Error querying Pinecone: {e}")
            return []

        # Extract matches with their metadata
        matches = query_response.get('matches', [])

        # Format results
        results = []
        for match in matches:
            # Extract text sample from metadata if available
            text = match.metadata.get('text_sample', 'No text available')

            # Format the result
            result = {
                'id': match.id,
                'score': match.score,
                'text': text,
                'video_id': match.metadata.get('video_id', 'unknown'),
                'chunk_id': match.metadata.get('chunk_id', -1)
            }
            results.append(result)

        self.log(f"Retrieved {len(results)} relevant chunks")
        return results

class TranscriptAgent(Agent):
    """Agent responsible for extracting and processing YouTube transcripts"""
    def __init__(self):
        super().__init__("Transcript Agent")

    def process(self, video_id: str) -> Optional[str]:
        """
        Extract transcript from a YouTube video.

        Args:
            video_id: YouTube video ID

        Returns:
            Transcript text or None if extraction fails
        """
        if not video_id or not isinstance(video_id, str):
            self.log("Invalid video ID provided")
            return None

        try:
            self.log(f"Fetching transcript for video: {video_id}")
            transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
            if not transcript_list:
                self.log(f"No transcript found for video {video_id}")
                return None

            # Ensure proper formatting with punctuation
            formatted_segments = []
            for segment in transcript_list:
                text = segment.get('text', '').strip()
                if text:
                    # Add period if segment doesn't end with punctuation
                    if not text[-1] in ['.', '!', '?', ':', ';']:
                        text += '.'
                    formatted_segments.append(text)

            full_transcript = ' '.join(formatted_segments)
            self.log(f"Successfully extracted transcript ({len(full_transcript)} chars)")
            return full_transcript
        except Exception as e:
            self.log(f"Error fetching transcript for video {video_id}: {e}")
            return None

class LLMAgent(Agent):
    """Agent responsible for generating answers using an LLM"""
    def __init__(self, groq_api_key: str, model_name: str = "llama3-70b-8192"):
        super().__init__("LLM Agent")
        # Initialize Groq client
        self.groq_client = Groq(api_key=groq_api_key)
        self.model_name = model_name

    def process(self, query: str, relevant_chunks: List[Dict[str, Any]],
               transcript: Optional[str] = None) -> Dict[str, Any]:
        """
        Generate a comprehensive answer using Groq's language model based on the query and relevant chunks.
        """
        if not relevant_chunks and not transcript:
            self.log("No information available to answer the query")
            return {
                "answer": "I couldn't find sufficient information to answer your question. Please try rephrasing or asking a different question.",
                "success": False
            }

        # Prepare context from relevant chunks
        chunk_context = ""
        if relevant_chunks:
            # Use the top (most relevant) chunk
            top_chunk = relevant_chunks[0]
            chunk_context = top_chunk['text']

        # Add transcript excerpt if available
        transcript_context = ""
        if transcript:
            # Use first 1000 characters of transcript
            transcript_context = transcript[:1000] + "..." if len(transcript) > 1000 else transcript

        # Combine contexts
        full_context = chunk_context
        if transcript_context and not chunk_context:
            full_context = transcript_context
        elif transcript_context:
            full_context += f"\n\nAdditional context from video transcript:\n{transcript_context}"

        # Prepare a focused prompt for the Groq language model
        prompt = f"""
        You are an intelligent assistant specialized in educational content. Your task is to create a comprehensive, well-structured answer to the user's question using the provided context.

        USER QUESTION:
        {query}

        RELEVANT CONTEXT:
        {full_context}
        """

        # Use Groq's chat completion API
        try:
            self.log(f"Generating answer using {self.model_name} for query: {query}")
            response = self.groq_client.chat.completions.create(
                model=self.model_name,
                messages=[
                    {"role": "system", "content": "You are an intelligent assistant specialized in educational content."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=512,
                temperature=0.7,
                top_p=0.95
            )

            # Extract the answer from the response
            answer = response.choices[0].message.content.strip()

            self.log("Successfully generated answer")
            return {
                "answer": answer,
                "success": True
            }

        except Exception as e:
            self.log(f"Error generating answer with Groq: {e}")
            return {
                "answer": "I encountered an error while generating the answer. Please try again.",
                "success": False
            }

class FormattingAgent(Agent):
    """Agent responsible for formatting responses and results"""
    def __init__(self):
        super().__init__("Formatting Agent")

    def process(self, result: Dict[str, Any]) -> str:
        """
        Format the query result into a well-structured response string.
        """
        self.log("Formatting final response")
        response = f"QUESTION: {result['question']}\n\nANSWER:\n{result['answer']}\n\n"

        if result.get('source'):
            source = result['source']
            response += "TOP SOURCE:\n"
            response += f"Score: {source['score']:.2f}\n"
            response += f"Video ID: {source['video_id']}\n"
            response += f"Text: {source['text']}\n"

        # Add video transcript if available
        if result.get('video_transcript'):
            response += "\nVIDEO TRANSCRIPT EXCERPT:\n"
            # Limit transcript to first 500 characters
            transcript_excerpt = result['video_transcript'][:500] + "..."
            response += transcript_excerpt + "\n"

        return response

class OrchestrationAgent(Agent):
    """Agent responsible for orchestrating the overall workflow"""
    def __init__(self):
        super().__init__("Orchestration Agent")
        self.agents = {}
        self.executor = ThreadPoolExecutor(max_workers=3)

    def register_agent(self, agent_type: str, agent: Agent):
        """Register an agent to be used in the workflow"""
        self.agents[agent_type] = agent
        self.log(f"Registered {agent.name}")

    def process(self, question: str, video_id: Optional[str] = None) -> Dict[str, Any]:
        """
        Orchestrate the entire workflow to process a user query
        """
        start_time = time.time()
        self.log(f"Starting workflow for question: '{question}'" +
                (f" with video ID: {video_id}" if video_id else ""))

        # Track workflow metrics
        metrics = {
            "embedding_time": 0,
            "retrieval_time": 0,
            "transcript_time": 0,
            "llm_time": 0,
            "total_time": 0
        }

        # Step 1: Launch transcript extraction if video_id is provided (async)
        transcript_future = None
        if video_id:
            transcript_future = self.executor.submit(self.agents["transcript"].process, video_id)

        # Step 2: Generate embedding for the question
        embedding_start = time.time()
        query_embedding = self.agents["embedding"].process(question)
        metrics["embedding_time"] = time.time() - embedding_start

        # Step 3: Retrieve relevant chunks
        retrieval_start = time.time()
        relevant_chunks = self.agents["retrieval"].process(
            query_embedding,
            top_k=1,
            video_id=video_id
        )
        metrics["retrieval_time"] = time.time() - retrieval_start

        # If no chunks found and a video ID was provided, try without the filter
        if not relevant_chunks and video_id:
            self.log("No matches with video filter, trying without filter")
            relevant_chunks = self.agents["retrieval"].process(query_embedding, top_k=1)

        # Wait for transcript result if we started one
        transcript = None
        if transcript_future:
            transcript_start = time.time()
            try:
                transcript = transcript_future.result(timeout=10)  # Wait up to 10 seconds
            except Exception as e:
                self.log(f"Error waiting for transcript: {e}")
            metrics["transcript_time"] = time.time() - transcript_start

        # Step 4: Generate answer
        llm_start = time.time()
        answer_result = self.agents["llm"].process(question, relevant_chunks, transcript)
        metrics["llm_time"] = time.time() - llm_start

        # Step 5: Prepare final result
        result = {
            "question": question,
            "answer": answer_result["answer"],
            "source": relevant_chunks[0] if relevant_chunks else None,
            "video_id": video_id,
            "video_transcript": transcript,
            "success": answer_result["success"],
            "metrics": metrics
        }

        # Step 6: Format the response
        formatted_response = self.agents["formatting"].process(result)
        result["formatted_response"] = formatted_response

        # Calculate total processing time
        metrics["total_time"] = time.time() - start_time
        self.log(f"Workflow completed in {metrics['total_time']:.2f} seconds")

        return result

class AgenticRAGSystem:
    """Main class for the Agentic RAG System, integrating all agents"""
    def __init__(self, pinecone_api_key: str, groq_api_key: str, index_name: str,
                 model_name: str = "sentence-transformers/all-mpnet-base-v2",
                 llm_model: str = "llama3-70b-8192"):
        """
        Initialize Agentic RAG system with all required agents
        """
        # Create all agents
        self.embedding_agent = EmbeddingAgent(model_name)
        self.retrieval_agent = RetrievalAgent(pinecone_api_key, index_name)
        self.transcript_agent = TranscriptAgent()
        self.llm_agent = LLMAgent(groq_api_key, llm_model)
        self.formatting_agent = FormattingAgent()

        # Create and configure orchestration agent
        self.orchestrator = OrchestrationAgent()
        self.orchestrator.register_agent("embedding", self.embedding_agent)
        self.orchestrator.register_agent("retrieval", self.retrieval_agent)
        self.orchestrator.register_agent("transcript", self.transcript_agent)
        self.orchestrator.register_agent("llm", self.llm_agent)
        self.orchestrator.register_agent("formatting", self.formatting_agent)

        print(f"AgenticRAGSystem initialized with {len(self.orchestrator.agents)} specialized agents")

    def query(self, question: str, video_id: Optional[str] = None) -> Dict[str, Any]:
        """
        Process a user query using the agentic workflow
        """
        # Delegate to the orchestrator
        return self.orchestrator.process(question, video_id)

    def format_response(self, result: Dict[str, Any]) -> str:
        """
        Return the formatted response from the result
        """
        return result.get("formatted_response", "No formatted response available")

# Example usage
def main():
    # Initialize the Agentic RAG system
    pinecone_api_key = "pcsk_7EKroD_MaZi2zjikyZTdpaDPCkit4qEAE6cjKuJ7C2ot9htS7EE6uurWQLrfznykMd7bW3"
    groq_api_key = "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"
    index_name = "embeddings"

    print("Initializing Agentic RAG system...")
    rag = AgenticRAGSystem(
        pinecone_api_key=pinecone_api_key,
        groq_api_key=groq_api_key,
        index_name=index_name,
        model_name="sentence-transformers/all-mpnet-base-v2",
        llm_model="llama3-70b-8192"
    )

    # Example query with video ID
    question = "What is JVM?"
    video_id = "NUy_wOxOM8E"  # Example YouTube video ID

    # Add error handling around the main query operation
    try:
        print(f"\nProcessing query: '{question}' with video ID: {video_id}")
        result = rag.query(question, video_id)

        # Print the formatted result
        print("\n" + "="*50)
        print(result["formatted_response"])
        print("="*50)

        # Print timing metrics
        print("\nProcess Timing:")
        for key, value in result["metrics"].items():
            print(f"  {key}: {value:.2f} seconds")

    except Exception as e:
        print(f"An error occurred during query processing: {e}")

if __name__ == "__main__":
    main()

Initializing Agentic RAG system...
[Embedding Agent] Using device: cpu
[Orchestration Agent] Registered Embedding Agent
[Orchestration Agent] Registered Retrieval Agent
[Orchestration Agent] Registered Transcript Agent
[Orchestration Agent] Registered LLM Agent
[Orchestration Agent] Registered Formatting Agent
AgenticRAGSystem initialized with 5 specialized agents

Processing query: 'What is JVM?' with video ID: NUy_wOxOM8E
[Orchestration Agent] Starting workflow for question: 'What is JVM?' with video ID: NUy_wOxOM8E
[Transcript Agent] Fetching transcript for video: NUy_wOxOM8E
[Transcript Agent] Successfully extracted transcript (12779 chars)
[Embedding Agent] Generating embedding for: What is JVM?...
[Retrieval Agent] Querying Pinecone for top 1 matches filtered by video_id: NUy_wOxOM8E
[Retrieval Agent] Retrieved 1 relevant chunks
[LLM Agent] Generating answer using llama3-70b-8192 for query: What is JVM?
[LLM Agent] Successfully generated answer
[Formatting Agent] Formatting final