This notebook sets up the Agentic system.
The simplified flow of the system is as follows:
User gives input question as a prompt -> Preprocess the question via prompt tuning -> do a semantic search query over the summary database  -> 

(branch 1): is the answer in a summary (or spread over multiple)? -> use the summary/summaries to generate summary answer -> output answer to user

(branch 2): is the answer not in a summary? -> semantic search query the entire dataset for the answer -> retrieve the relevant chunks -> generate summary of the relevant results -> output answer to user

In [3]:
# This code implements the RAG system for the Agentic chatbot, called Rizzbot.
import os
import json
import boto3
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from dotenv import load_dotenv, find_dotenv
from pydantic import SecretStr
from functools import lru_cache
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# LangChain imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_pinecone import PineconeVectorStore
from langchain.schema import Document

# LangSmith imports
from langsmith import Client
import langsmith

# Pinecone
import pinecone
from pinecone import Pinecone


# Initialize LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "rizzbot"

# Set up API keys
_ = load_dotenv(find_dotenv())
openai_api_key = os.getenv("OPENAI_API_KEY")
langsmith_api_key = os.getenv("LANGSMITH_API_KEY")
pinecone_api_key = os.getenv("PINECONE_API_KEY")

# Initialize Pinecone
pc = Pinecone(api_key=pinecone_api_key)


@dataclass
class SearchResult:
    """Container for search results"""
    content: str
    score: float
    source: str
    metadata: Dict = None

class Rizzbot:
    """
    Agentic chatbot system for charisma and personal development questions.
    Uses RAG with Pinecone vector stores and AWS S3 for summaries.
    """
    
    def __init__(self):
        """Initialize the RAG system with all required components"""
        self.similarity_threshold = 0.85
        self.top_k = 3
        
        # Topic ID constraints for rizzbot-summaries index
        self.min_topic_id = 0
        self.max_topic_id = 53
        
        # Initialize summary cache
        self.summary_cache = {}
        self.cache_max_size = 100  # Maximum number of summaries to cache
        self.cache_ttl = 3600  # Cache TTL in seconds (1 hour)
        
        # Initialize LangSmith client
        self.langsmith_client = Client(api_key=langsmith_api_key)
        
        # Initialize OpenAI components
        self.llm = ChatOpenAI(
            model="gpt-4o",
            temperature=0.25,
            api_key=openai_api_key,
            tags=["rizzbot_v1"]
        )
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
        
        # Initialize Pinecone
        self.pc = Pinecone(api_key=pinecone_api_key)
        
        # Initialize vector stores
        self._init_vector_stores()
        
        # Initialize AWS S3 client
        self.s3_client = boto3.client("s3")
        
        # Initialize prompts
        self._init_prompts()
    
    def _init_vector_stores(self):
        """Initialize Pinecone vector stores"""
        # Main dataset index - using full_text field
        self.main_vectorstore = PineconeVectorStore(
            index_name="rizzbot",
            embedding=self.embeddings,
            text_key="full_text"  # Specify the correct text field
        )
        
        # Clusters index
        self.clusters_vectorstore = PineconeVectorStore(
            index_name="rizzbot-clusters", 
            embedding=self.embeddings
        )
        
        # Summaries index - topic_id metadata field contains integers 0-53
        self.summaries_index = self.pc.Index("rizzbot-summaries")
    
    def _init_prompts(self):
        """Initialize prompt templates"""
        self.summary_answer_prompt = ChatPromptTemplate.from_template("""
        You are a charisma and personal development expert. Based on the following summary/summaries, 
        provide a comprehensive and helpful answer to the user's question.
        
        User Question: {question}
        
        Relevant Summary/Summaries:
        {summaries}
        
        Instructions:
        - Provide a clear, actionable answer based on the summaries
        - Be conversational and engaging
        - If the summaries don't fully address the question, mention this limitation
        - Keep your response focused and practical
        
        Answer:
        """)
        
        self.full_search_answer_prompt = ChatPromptTemplate.from_template("""
        You are a charisma and personal development expert. Based on the following relevant content 
        from the database, provide a comprehensive and helpful answer to the user's question.
        
        User Question: {question}
        
        Relevant Content:
        {content}
        
        Instructions:
        - Synthesize the information to provide a clear, actionable answer
        - Be conversational and engaging
        - Focus on practical advice and insights
        - If the content doesn't fully address the question, mention this limitation
        
        Answer:
        """)
        
        self.no_answer_response = """My apologies, but I don't have a good answer for you based on the information in my database. My database is limited up to the past 5 years. For further research, we recommend you visit the source of my knowledge: https://www.youtube.com/user/Charismaoncommand/"""
    
    def _is_cache_valid(self, timestamp: float) -> bool:
        """Check if cache entry is still valid based on TTL"""
        return time.time() - timestamp < self.cache_ttl
    
    def _clean_cache(self):
        """Remove expired entries and maintain cache size limit"""
        current_time = time.time()
        
        # Remove expired entries
        expired_keys = [
            key for key, (_, timestamp) in self.summary_cache.items()
            if not self._is_cache_valid(timestamp)
        ]
        for key in expired_keys:
            del self.summary_cache[key]
        
        # If cache is still too large, remove oldest entries
        if len(self.summary_cache) > self.cache_max_size:
            # Sort by timestamp and remove oldest
            sorted_items = sorted(
                self.summary_cache.items(), 
                key=lambda x: x[1][1]  # Sort by timestamp
            )
            items_to_remove = len(self.summary_cache) - self.cache_max_size
            for key, _ in sorted_items[:items_to_remove]:
                del self.summary_cache[key]
    
    def _get_cached_summary(self, topic_id: int) -> Optional[str]:
        """Get summary from cache if available and valid"""
        cache_key = f"topic_{topic_id}"
        if cache_key in self.summary_cache:
            summary, timestamp = self.summary_cache[cache_key]
            if self._is_cache_valid(timestamp):
                return summary
            else:
                # Remove expired entry
                del self.summary_cache[cache_key]
        return None
    
    def _cache_summary(self, topic_id: int, summary: str):
        """Add summary to cache"""
        self._clean_cache()  # Clean cache before adding new entry
        cache_key = f"topic_{topic_id}"
        self.summary_cache[cache_key] = (summary, time.time())
    
    def _fetch_single_summary_from_s3(self, topic_id: int) -> Optional[str]:
        """Fetch a single summary from S3"""
        bucket_name = "rizzbot-temp-storage"
        base_path = "rizzbot/Summaries/summaries_1kwords/"
        
        try:
            key = f"{base_path}topic_{topic_id}.json"
            response = self.s3_client.get_object(Bucket=bucket_name, Key=key)
            content = response['Body'].read().decode('utf-8')
            # Handle both JSON and plain text formats
            summary_text = json.loads(content) if content.startswith('{') else content
            return str(summary_text)
        except Exception as e:
            print(f"Could not retrieve topic_{topic_id}.json: {e}")
            return None

    def _is_valid_topic_id(self, topic_id: int) -> bool:
        """Validate that topic_id is within expected range (0-53)"""
        return self.min_topic_id <= topic_id <= self.max_topic_id

    def search_summaries(self, question: str, topic_id_filter: Optional[int] = None) -> List[int]:
        """
        Search the rizzbot-summaries index manually using the Pinecone SDK, bypassing LangChain's text assumptions.
        Returns: List of relevant topic IDs
        """
        try:
            # Create embedding for the question
            embedded_query = self.embeddings.embed_query(question)
            
            # Prepare filter (if any)
            filter_dict = {}
            if topic_id_filter is not None:
                if self._is_valid_topic_id(topic_id_filter):
                    filter_dict["topic_id"] = {"$eq": topic_id_filter}
                else:
                    print(f"Warning: topic_id {topic_id_filter} is outside valid range")
                    return []

            # Access the index directly
            index = self.summaries_index

            query_response = index.query(
                vector=embedded_query,
                top_k=self.top_k,
                include_metadata=True,
                filter=filter_dict or None
            )

            topic_ids = []
            for match in query_response.get("matches", []):
                score = match.get("score", 0.0)
                if score >= self.similarity_threshold:
                    metadata = match.get("metadata", {})
                    raw_id = metadata.get("topic_id")
                    try:
                        topic_id = int(raw_id)
                        if self._is_valid_topic_id(topic_id):
                            topic_ids.append(topic_id)
                        else:
                            print(f"Topic ID {topic_id} out of range")
                    except Exception as e:
                        print(f"Could not parse topic_id: {raw_id} — {e}")
            return topic_ids

        except Exception as e:
            print(f"Error querying Pinecone summaries: {e}")
            return []

    
    @langsmith.traceable(tags=["rizzbot_v1"])
    def search_full_dataset(self, question: str) -> List[SearchResult]:
        """Search in the full dataset index"""
        try:
            results = self.main_vectorstore.similarity_search_with_score(
                question, 
                k=self.top_k
            )
            
            search_results = []
            for doc, score in results:
                search_results.append(SearchResult(
                    content=doc.page_content,
                    score=score,
                    source="full_dataset",
                    metadata=doc.metadata
                ))
            
            return search_results
        except Exception as e:
            print(f"Error searching full dataset: {e}")
            return []
    
    @langsmith.traceable(tags=["rizzbot_v1"])
    def get_s3_summaries(self, topic_ids: List[int]) -> List[str]:
        """
        Retrieve summaries with caching and batching optimizations
        
        Args:
            topic_ids: List of topic IDs to retrieve (should be 0-53)
            
        Returns:
            List of summary texts
        """
        # Filter valid topic IDs
        valid_topic_ids = [tid for tid in topic_ids if self._is_valid_topic_id(tid)]
        invalid_topic_ids = [tid for tid in topic_ids if not self._is_valid_topic_id(tid)]
        
        if invalid_topic_ids:
            print(f"Warning: Skipping invalid topic_ids: {invalid_topic_ids} (valid range: {self.min_topic_id}-{self.max_topic_id})")
        
        if not valid_topic_ids:
            print("No valid topic_ids to retrieve")
            return []
        
        summaries = []
        uncached_ids = []
        
        # Step 1: Check cache for existing summaries
        for topic_id in valid_topic_ids:
            cached_summary = self._get_cached_summary(topic_id)
            if cached_summary:
                summaries.append(cached_summary)
                print(f"Retrieved topic_{topic_id} from cache")
            else:
                uncached_ids.append(topic_id)
        
        # Step 2: Batch fetch uncached summaries from S3
        if uncached_ids:
            print(f"Fetching {len(uncached_ids)} summaries from S3...")
            
            # Use ThreadPoolExecutor for concurrent S3 requests
            with ThreadPoolExecutor(max_workers=min(len(uncached_ids), 5)) as executor:
                # Submit all fetch tasks
                future_to_topic = {
                    executor.submit(self._fetch_single_summary_from_s3, topic_id): topic_id
                    for topic_id in uncached_ids
                }
                
                # Collect results as they complete
                for future in as_completed(future_to_topic):
                    topic_id = future_to_topic[future]
                    try:
                        summary = future.result()
                        if summary:
                            summaries.append(summary)
                            # Cache the retrieved summary
                            self._cache_summary(topic_id, summary)
                            print(f"Retrieved and cached topic_{topic_id}")
                        else:
                            print(f"Failed to retrieve topic_{topic_id}")
                    except Exception as e:
                        print(f"Error fetching topic_{topic_id}: {e}")
        
        print(f"Total summaries retrieved: {len(summaries)}")
        return summaries
    
    @langsmith.traceable(tags=["rizzbot_v1"])
    def generate_summary_answer(self, question: str, summaries: List[str]) -> str:
        """Generate answer based on summaries"""
        combined_summaries = "\n\n".join(summaries)
        
        chain = self.summary_answer_prompt | self.llm | StrOutputParser()
        
        response = chain.invoke({
            "question": question,
            "summaries": combined_summaries
        })
        
        return response
    
    @langsmith.traceable(tags=["rizzbot_v1"])
    def generate_full_search_answer(self, question: str, search_results: List[SearchResult]) -> str:
        """Generate answer based on full dataset search results"""
        combined_content = "\n\n".join([result.content for result in search_results])
        
        chain = self.full_search_answer_prompt | self.llm | StrOutputParser()
        
        response = chain.invoke({
            "question": question,
            "content": combined_content
        })
        
        return response
    
    @langsmith.traceable(tags=["rizzbot_v1"])
    def answer_question(self, question: str, topic_id_filter: Optional[int] = None) -> Dict[str, any]:
        """
        Main method to answer user questions following the RAG flow
        
        Args:
            question: User's question
            topic_id_filter: Optional specific topic_id to search within (0-53)
        
        Returns:
            Dict containing answer, source, and metadata
        """
        try:
            # Step 1: Search summaries
            print("Searching summaries...")
            topic_ids = self.search_summaries(question, topic_id_filter)
            if topic_ids:
                print(f"Found relevant topic IDs: {topic_ids}")
                full_summaries = self.get_s3_summaries(topic_ids)
                if full_summaries:
                    answer = self.generate_summary_answer(question, full_summaries)
                    return {
                        "answer": answer,
                        "source": "summaries",
                        "num_sources": len(full_summaries),
                        "confidence_scores": [],  # You could optionally store Pinecone scores earlier
                        "topic_ids": topic_ids
                    }                
                print(f"Extracted valid topic IDs: {topic_ids}")
                
                # Get full summaries from S3
                if topic_ids:
                    full_summaries = self.get_s3_summaries(topic_ids)
                    if full_summaries:
                        answer = self.generate_summary_answer(question, full_summaries)
                        return {
                            "answer": answer,
                            "source": "summaries",
                            "num_sources": len(full_summaries),
                            "confidence_scores": [r.score for r in summary_results],
                            "topic_ids": topic_ids
                        }
                    else:
                        print("No summaries retrieved from S3, falling back to full dataset search")
                else:
                    print("No valid topic IDs found in metadata, falling back to full dataset search")
            
            # Step 2: Search full dataset (only if not filtering by topic_id)
            if topic_id_filter is None:
                print("Searching full dataset...")
                full_results = self.search_full_dataset(question)
                
                if full_results:
                    print(f"Found {len(full_results)} relevant chunks")
                    answer = self.generate_full_search_answer(question, full_results)
                    return {
                        "answer": answer,
                        "source": "full_dataset", 
                        "num_sources": len(full_results),
                        "confidence_scores": [r.score for r in full_results]
                    }
            else:
                print(f"Topic filter ({topic_id_filter}) specified, skipping full dataset search")
            
            # Step 3: No relevant results found
            print("No relevant results found")
            return {
                "answer": self.no_answer_response,
                "source": "none",
                "num_sources": 0,
                "confidence_scores": []
            }
            
        except Exception as e:
            print(f"Error in answer_question: {e}")
            return {
                "answer": self.no_answer_response,
                "source": "error",
                "num_sources": 0,
                "confidence_scores": [],
                "error": str(e)
            }

# Convenience function for easy usage
def create_rizzbot():
    """Factory function to create and return a Rizzbot instance"""
    return Rizzbot()

# Example usage function
@langsmith.traceable(tags=["rizzbot_v1"])
def ask_charisma_question(bot: Rizzbot, question: str, topic_id_filter: Optional[int] = None, verbose: bool = True):
    """
    Ask a question to the charisma bot and get a formatted response
    
    Args:
        bot: Rizzbot instance
        question: User's question
        topic_id_filter: Optional specific topic_id to search within (0-53)
        verbose: Whether to print detailed information
    
    Returns:
        Dict with answer and metadata
    """
    result = bot.answer_question(question, topic_id_filter)
    
    if verbose:
        print(f"\n{'='*50}")
        print(f"Question: {question}")
        if topic_id_filter is not None:
            print(f"Topic Filter: {topic_id_filter}")
        print(f"{'='*50}")
        print(f"Answer: {result['answer']}")
        print(f"\n Metadata:")
        print(f"  Source: {result['source']}")
        print(f"  Number of sources: {result['num_sources']}")
        if result['confidence_scores']:
            print(f"  Confidence scores: {[f'{score:.3f}' for score in result['confidence_scores']]}")
        if 'topic_ids' in result:
            print(f"  Topic IDs used: {result['topic_ids']}")
        print(f"{'='*50}\n")
    
    return result

In [7]:
# Demo cell 
# Demo: How to use the Rizzbot System
# Run this in a Jupyter notebook cell after the main system code

# Initialize the bot (this will take a moment as it connects to all services)
print("Initializing Rizzbot System...")
bot = create_rizzbot()
print("System ready!")

# Test questions
test_questions = [ 
    "What is the ROLLS system and how can it help me improve my charisma?",
    "Can you give me concrete tips on how to be a better public speaker?",
]

# Test the system with different questions
for question in test_questions:
    result = ask_charisma_question(bot, question, verbose=True)
    
    # You can also access individual components:
    # print(f"Just the answer: {result['answer']}")
    # print(f"Source type: {result['source']}")

# You can also use the bot directly for more control:
# custom_result = bot.answer_question("Your custom question here")
# print(custom_result)

Initializing Rizzbot System...
System ready!
Searching summaries...
Searching full dataset...
Found 3 relevant chunks

Question: What is the ROLLS system and how can it help me improve my charisma?
Answer: The ROLLS system isn't directly mentioned in the content provided, so I can't give you a detailed explanation of what it specifically entails. However, I can offer some general advice on improving charisma based on the themes and insights from the content.

Charisma is often about building rapport, confidence, and effective communication. Here are some practical steps you can take to enhance your charisma:

1. **Build Rapport**: Establishing a connection with others is crucial. This involves active listening, showing genuine interest in others, and finding common ground. The content mentions the importance of rapport in negotiations and relationships, which is a key component of charisma.

2. **Confidence**: Confidence can significantly boost your charisma. The Charisma University co