In [9]:
!pip install -U langchain_community langchain chromadb pypdf langchain sentence-transformers langchain_chroma langchain_groq

Collecting langchain_groq
  Downloading langchain_groq-0.3.6-py3-none-any.whl.metadata (2.6 kB)
Collecting groq<1,>=0.29.0 (from langchain_groq)
  Downloading groq-0.30.0-py3-none-any.whl.metadata (16 kB)
Downloading langchain_groq-0.3.6-py3-none-any.whl (16 kB)
Downloading groq-0.30.0-py3-none-any.whl (131 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m131.1/131.1 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: groq, langchain_groq
Successfully installed groq-0.30.0 langchain_groq-0.3.6


In [10]:
from langchain_chroma import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain_groq import ChatGroq
from langchain.memory import ConversationBufferMemory
from langchain.schema import HumanMessage, AIMessage
from langchain.chains import ConversationalRetrievalChain
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from sentence_transformers import SentenceTransformer
from langchain.embeddings import HuggingFaceEmbeddings
import shutil
from pathlib import Path
from typing import List
import os
import warnings
import json
from datetime import datetime
warnings.filterwarnings('ignore')

In [None]:
from google.colab import files
uploaded = files.upload()

In [17]:
CHROMA_PATH = "/content/chroma"  # Same path as chatbot
DATA_PATH = "/content/data"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

def get_embedding_function():
    """
    Create and return an embedding function - IDENTICAL to chatbot version.
    """
    from sentence_transformers import SentenceTransformer
    from langchain.embeddings import HuggingFaceEmbeddings

    model_name = "all-MiniLM-L6-v2"
    embeddings = HuggingFaceEmbeddings(
        model_name=model_name,
        model_kwargs={'device': 'cpu'},
        encode_kwargs={'normalize_embeddings': False}
    )

    # embeddings = HuggingFaceEmbeddings(
    #     model_name="sentence-transformers/all-mpnet-base-v2",
    #     model_kwargs={'device': 'cpu'},
    #     encode_kwargs={'normalize_embeddings': True}
    # )


    return embeddings

class AlignedDataIngestionPipeline:
    """
    Data ingestion pipeline aligned with the Enhanced RAG Chatbot System.
    Supports: Text files and PDFs with enhanced error handling and logging.
    """

    def __init__(self, data_path: str = DATA_PATH, chroma_path: str = CHROMA_PATH):
        self.data_path = Path(data_path)
        self.chroma_path = chroma_path
        self.embedding_function = get_embedding_function()
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=CHUNK_SIZE,
            chunk_overlap=CHUNK_OVERLAP,
            length_function=len,
            is_separator_regex=False,
        )
        self.supported_extensions = {'.txt', '.pdf'}
        self.processing_log = []

    def log_processing(self, message: str, level: str = "INFO"):
        """Enhanced logging for processing steps"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"[{timestamp}] {level}: {message}"
        self.processing_log.append(log_entry)
        print(log_entry)

    def load_text_file(self, file_path: Path) -> List[Document]:
        """Load text files with enhanced error handling"""
        try:
            self.log_processing(f"Loading text file: {file_path}")

            # Try multiple encodings
            encodings = ['utf-8', 'latin-1', 'cp1252']
            documents = None

            for encoding in encodings:
                try:
                    loader = TextLoader(str(file_path), encoding=encoding)
                    documents = loader.load()
                    self.log_processing(f"Successfully loaded with {encoding} encoding")
                    break
                except UnicodeDecodeError:
                    continue

            if documents is None:
                raise Exception("Could not load file with any encoding")

            # Enhanced metadata
            for doc in documents:
                doc.metadata.update({
                    'source': str(file_path),
                    'type': 'text',
                    'file_name': file_path.name,
                    'file_size': file_path.stat().st_size,
                    'processed_at': datetime.now().isoformat()
                })

            self.log_processing(f"Loaded {len(documents)} documents from {file_path.name}")
            return documents

        except Exception as e:
            self.log_processing(f"Error loading text file {file_path}: {e}", "ERROR")
            return []

    def load_pdf_file(self, file_path: Path) -> List[Document]:
        """Load PDF files with enhanced error handling"""
        try:
            self.log_processing(f"Loading PDF file: {file_path}")

            loader = PyPDFLoader(str(file_path))
            documents = loader.load()

            # Enhanced metadata
            for i, doc in enumerate(documents):
                doc.metadata.update({
                    'source': str(file_path),
                    'type': 'pdf',
                    'file_name': file_path.name,
                    'page_number': i + 1,
                    'total_pages': len(documents),
                    'file_size': file_path.stat().st_size,
                    'processed_at': datetime.now().isoformat()
                })

            self.log_processing(f"Loaded {len(documents)} pages from {file_path.name}")
            return documents

        except Exception as e:
            self.log_processing(f"Error loading PDF file {file_path}: {e}", "ERROR")
            return []

    def load_documents_from_directory(self) -> List[Document]:
        """Load all supported documents from the data directory"""
        documents = []

        # Create data directory if it doesn't exist
        self.data_path.mkdir(exist_ok=True)

        # Check if directory is empty
        files = list(self.data_path.rglob('*'))
        supported_files = [f for f in files if f.is_file() and f.suffix.lower() in self.supported_extensions]

        if not supported_files:
            self.log_processing("No supported files found in data directory", "WARNING")
            self.log_processing(f"Looking for files in: {self.data_path.absolute()}")
            self.log_processing(f"Supported extensions: {self.supported_extensions}")
            return []

        self.log_processing(f"Found {len(supported_files)} supported files")

        for file_path in supported_files:
            self.log_processing(f"Processing: {file_path.name}")

            if file_path.suffix.lower() == '.txt':
                docs = self.load_text_file(file_path)
            elif file_path.suffix.lower() == '.pdf':
                docs = self.load_pdf_file(file_path)
            else:
                continue

            documents.extend(docs)

        self.log_processing(f"Total documents loaded: {len(documents)}")
        return documents

    def split_documents(self, documents: List[Document]) -> List[Document]:
        """Split documents into chunks with enhanced logging"""
        self.log_processing(f"Splitting {len(documents)} documents into chunks...")

        chunks = self.text_splitter.split_documents(documents)

        # Add chunk statistics
        chunk_sizes = [len(chunk.page_content) for chunk in chunks]
        avg_chunk_size = sum(chunk_sizes) / len(chunk_sizes) if chunk_sizes else 0

        self.log_processing(f"Created {len(chunks)} chunks")
        self.log_processing(f"Average chunk size: {avg_chunk_size:.0f} characters")

        return chunks

    def add_chunk_ids(self, chunks: List[Document]) -> List[Document]:
        """Add unique IDs to chunks with enhanced metadata"""
        self.log_processing("Adding unique IDs to chunks...")

        for i, chunk in enumerate(chunks):
            source = chunk.metadata.get('source', 'unknown')
            file_name = chunk.metadata.get('file_name', Path(source).stem)

            # Create more descriptive ID
            chunk_id = f"{file_name}_{i}"
            chunk.metadata['id'] = chunk_id
            chunk.metadata['chunk_index'] = i
            chunk.metadata['chunk_size'] = len(chunk.page_content)

        self.log_processing(f"Added IDs to {len(chunks)} chunks")
        return chunks

    def test_database_connection(self) -> bool:
        """Test ChromaDB connection before saving"""
        try:
            self.log_processing("Testing ChromaDB connection...")

            # Create a test document
            test_doc = Document(
                page_content="This is a test document for ChromaDB connection.",
                metadata={'source': 'test', 'type': 'test'}
            )

            # Try to create a temporary ChromaDB instance
            temp_path = f"{self.chroma_path}_test"
            if os.path.exists(temp_path):
                shutil.rmtree(temp_path)

            test_db = Chroma.from_documents(
                [test_doc],
                self.embedding_function,
                persist_directory=temp_path
            )

            # Test search
            results = test_db.similarity_search("test", k=1)

            # Cleanup
            shutil.rmtree(temp_path)

            self.log_processing("ChromaDB connection test passed!")
            return True

        except Exception as e:
            self.log_processing(f"ChromaDB connection test failed: {e}", "ERROR")
            return False

    def save_to_chroma(self, chunks: List[Document]):
        """Save chunks to ChromaDB with enhanced error handling"""
        try:
            self.log_processing("Preparing to save to ChromaDB...")

            # Test connection first
            if not self.test_database_connection():
                raise Exception("ChromaDB connection test failed")

            # Clear existing database
            if os.path.exists(self.chroma_path):
                self.log_processing("Clearing existing database...")
                shutil.rmtree(self.chroma_path)

            # Create new database
            self.log_processing("Creating new ChromaDB database...")

            db = Chroma.from_documents(
                chunks,
                self.embedding_function,
                persist_directory=self.chroma_path
            )

            # Verify database creation
            self.log_processing("Verifying database creation...")
            test_results = db.similarity_search("test", k=1)

            # Get database statistics
            try:
                collection = db._collection
                doc_count = collection.count()
                self.log_processing(f"Database verification: {doc_count} documents stored")
            except:
                self.log_processing("Database created successfully (count verification unavailable)")

            self.log_processing(f"✅ Successfully saved {len(chunks)} chunks to ChromaDB")
            self.log_processing(f"✅ Database location: {os.path.abspath(self.chroma_path)}")

            return db

        except Exception as e:
            self.log_processing(f"Error saving to ChromaDB: {e}", "ERROR")
            raise

    def show_enhanced_summary(self, documents: List[Document], chunks: List[Document]):
        """Show enhanced processing summary"""
        file_types = {}
        processed_files = set()
        total_content_length = 0

        for doc in documents:
            doc_type = doc.metadata.get('type', 'unknown')
            source = doc.metadata.get('source', 'unknown')
            file_name = doc.metadata.get('file_name', Path(source).name)

            file_types[doc_type] = file_types.get(doc_type, 0) + 1
            processed_files.add(file_name)
            total_content_length += len(doc.page_content)

        chunk_sizes = [len(chunk.page_content) for chunk in chunks]
        avg_chunk_size = sum(chunk_sizes) / len(chunk_sizes) if chunk_sizes else 0

        print(f"\n{'='*60}")
        print(f"🎉 DATA INGESTION COMPLETE!")
        print(f"{'='*60}")
        print(f"📊 STATISTICS:")
        print(f"   • Documents processed: {len(documents)}")
        print(f"   • Chunks created: {len(chunks)}")
        print(f"   • Files processed: {len(processed_files)}")
        print(f"   • File types: {dict(file_types)}")
        print(f"   • Total content: {total_content_length:,} characters")
        print(f"   • Average chunk size: {avg_chunk_size:.0f} characters")
        print(f"   • Database path: {os.path.abspath(self.chroma_path)}")
        print(f"{'='*60}")
        print(f"📁 PROCESSED FILES:")
        for file_name in sorted(processed_files):
            print(f"   • {file_name}")
        print(f"{'='*60}")

        # Show some processing log entries
        print(f"📋 PROCESSING LOG (last 5 entries):")
        for log_entry in self.processing_log[-5:]:
            print(f"   {log_entry}")
        print(f"{'='*60}")

    def process_all_data(self):
        """Main processing pipeline with enhanced error handling"""
        try:
            self.log_processing("🚀 Starting enhanced data ingestion pipeline...")

            # Load documents
            documents = self.load_documents_from_directory()
            if not documents:
                self.log_processing("❌ No documents found to process!", "ERROR")
                return False

            # Split into chunks
            chunks = self.split_documents(documents)
            if not chunks:
                self.log_processing("❌ No chunks created!", "ERROR")
                return False

            # Add IDs and save
            chunks = self.add_chunk_ids(chunks)
            db = self.save_to_chroma(chunks)

            # Show summary
            self.show_enhanced_summary(documents, chunks)

            self.log_processing("✅ Data ingestion pipeline completed successfully!")
            return True

        except Exception as e:
            self.log_processing(f"❌ Pipeline failed: {e}", "ERROR")
            return False

def run_enhanced_ingestion(data_path=DATA_PATH, chroma_path=CHROMA_PATH,
                          chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
    """
    Run the enhanced data ingestion pipeline with full compatibility.
    """
    print("🔧 ENHANCED DATA INGESTION FOR RAG SYSTEM")
    print("="*50)
    print(f"📁 Data path: {data_path}")
    print(f"🗄️  Chroma path: {chroma_path}")
    print(f"✂️  Chunk size: {chunk_size}")
    print(f"🔄 Chunk overlap: {chunk_overlap}")
    print("="*50)

    pipeline = AlignedDataIngestionPipeline(data_path=data_path, chroma_path=chroma_path)

    # Update chunk settings if different from defaults
    if chunk_size != CHUNK_SIZE or chunk_overlap != CHUNK_OVERLAP:
        pipeline.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            is_separator_regex=False,
        )
        pipeline.log_processing(f"Using custom chunk settings: size={chunk_size}, overlap={chunk_overlap}")

    success = pipeline.process_all_data()

    if success:
        print("\n🎯 NEXT STEPS:")
        print("1. Your ChromaDB is ready for the RAG chatbot!")
        print("2. Run the chatbot cells to start asking questions")
        print("3. The chatbot will automatically use this database")
        print("\n✅ Ready to chat with Harry!")
    else:
        print("\n❌ Ingestion failed. Check the logs above for details.")

    return pipeline

def verify_database_compatibility():
    """
    Verify that the created database is compatible with the chatbot.
    """
    print("🔍 VERIFYING DATABASE COMPATIBILITY...")
    print("="*50)

    try:
        # Test embedding function compatibility
        embedding_function = get_embedding_function()
        print("✅ Embedding function loaded successfully")

        # Test ChromaDB loading
        if not os.path.exists(CHROMA_PATH):
            print("❌ ChromaDB not found. Run ingestion first.")
            return False

        db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function)
        print("✅ ChromaDB loaded successfully")

        # Test search functionality
        test_results = db.similarity_search("test query", k=1)
        print(f"✅ Search test passed - found {len(test_results)} results")

        # Test with score
        results_with_score = db.similarity_search_with_score("test query", k=1)
        print(f"✅ Search with score test passed - found {len(results_with_score)} results")

        # Show database info
        try:
            collection = db._collection
            doc_count = collection.count()
            print(f"✅ Database contains {doc_count} documents")
        except:
            print("✅ Database is accessible (document count unavailable)")

        print("="*50)
        print("🎉 DATABASE IS FULLY COMPATIBLE WITH THE CHATBOT!")
        print("="*50)

        return True

    except Exception as e:
        print(f"❌ Compatibility test failed: {e}")
        return False

def setup_colab_environment():
    """
    Set up the Google Colab environment for data ingestion.
    """
    print("🔧 SETTING UP GOOGLE COLAB ENVIRONMENT...")
    print("="*50)

    # Create data directory
    data_dir = Path(DATA_PATH)
    data_dir.mkdir(exist_ok=True)
    print(f"✅ Created data directory: {data_dir.absolute()}")

    # Check if files exist
    files = list(data_dir.rglob('*'))
    supported_files = [f for f in files if f.is_file() and f.suffix.lower() in {'.txt', '.pdf'}]

    if not supported_files:
        print("📋 UPLOAD INSTRUCTIONS:")
        print("1. Upload your .txt or .pdf files to the 'data' folder")
        print("2. You can drag and drop files in the Colab file browser")
        print("3. Or use the upload button in the file browser")
        print("4. Supported formats: .txt, .pdf")
        print()
        print("🔍 To upload files programmatically:")
        print("   from google.colab import files")
        print("   uploaded = files.upload()")
        print("   # Then move files to the data folder")

        return False
    else:
        print(f"✅ Found {len(supported_files)} supported files:")
        for file_path in supported_files:
            print(f"   • {file_path.name} ({file_path.suffix})")

        return True

def main():
    """
    Main function to run the complete ingestion process.
    """
    print("🚀 ENHANCED DATA INGESTION FOR RAG CHATBOT")
    print("="*60)

    has_files = setup_colab_environment()

    if not has_files:
        print("\n⚠️  Please upload your documents first!")
        return

    # Run ingestion
    print("\n🔄 Starting ingestion process...")
    pipeline = run_enhanced_ingestion()

    # Verify compatibility
    print("\n🔍 Verifying compatibility...")
    verify_database_compatibility()

    print("\n🎯 SUMMARY:")
    print("Your ChromaDB is ready for the RAG chatbot system!")
    print("The database is fully compatible and aligned with the chatbot.")


# Uncomment to test the system
# print("🧪 Testing the aligned system...")
main()

🚀 ENHANCED DATA INGESTION FOR RAG CHATBOT
🔧 SETTING UP GOOGLE COLAB ENVIRONMENT...
✅ Created data directory: /content/data
✅ Found 1 supported files:
   • Build_Dont_Talk_.pdf (.pdf)

🔄 Starting ingestion process...
🔧 ENHANCED DATA INGESTION FOR RAG SYSTEM
📁 Data path: /content/data
🗄️  Chroma path: /content/chroma
✂️  Chunk size: 1000
🔄 Chunk overlap: 200


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

[2025-07-12 14:03:58] INFO: 🚀 Starting enhanced data ingestion pipeline...
[2025-07-12 14:03:58] INFO: Found 1 supported files
[2025-07-12 14:03:58] INFO: Processing: Build_Dont_Talk_.pdf
[2025-07-12 14:03:58] INFO: Loading PDF file: /content/data/Build_Dont_Talk_.pdf
[2025-07-12 14:04:00] INFO: Loaded 178 pages from Build_Dont_Talk_.pdf
[2025-07-12 14:04:00] INFO: Total documents loaded: 178
[2025-07-12 14:04:00] INFO: Splitting 178 documents into chunks...
[2025-07-12 14:04:00] INFO: Created 406 chunks
[2025-07-12 14:04:00] INFO: Average chunk size: 768 characters
[2025-07-12 14:04:00] INFO: Adding unique IDs to chunks...
[2025-07-12 14:04:00] INFO: Added IDs to 406 chunks
[2025-07-12 14:04:00] INFO: Preparing to save to ChromaDB...
[2025-07-12 14:04:00] INFO: Testing ChromaDB connection...
[2025-07-12 14:04:01] INFO: ChromaDB connection test passed!
[2025-07-12 14:04:01] INFO: Creating new ChromaDB database...
[2025-07-12 14:04:27] INFO: Verifying database creation...
[2025-07-12 14

In [None]:
# Import required libraries
from langchain.embeddings import HuggingFaceEmbeddings

def get_embedding_function():
    """
    Returns embedding function using all-mpnet-base-v2 model.

    Returns:
        HuggingFaceEmbeddings: Configured embedding function
    """

    embeddings = HuggingFaceEmbeddings(
        model_name="sentence-transformers/all-mpnet-base-v2",
        model_kwargs={'device': 'cpu'},
        encode_kwargs={'normalize_embeddings': True}
    )

    return embeddings

def test_embedding_function(embedding_func, test_phrases=None):
    """Test the embedding function with sample phrases"""

    if test_phrases is None:
        test_phrases = [
            "artificial intelligence",
            "machine learning algorithms",
            "natural language processing",
            "deep learning models"
        ]

    print("Testing embedding function...")
    print("=" * 30)

    try:
        for i, phrase in enumerate(test_phrases):
            embedding = embedding_func.embed_query(phrase)
            print(f"{i+1}. '{phrase}' → Vector dim: {len(embedding)}")

        print("✅ Embedding function test completed successfully!")
        return True

    except Exception as e:
        print(f"❌ Embedding function test failed: {e}")
        return False

# Main execution
print("🚀 Loading all-mpnet-base-v2 embedding model...")

try:
    embedding_func = get_embedding_function()
    print("✅ Embedding model loaded successfully!")

    # Test the function
    test_embedding_function(embedding_func)

except Exception as e:
    print(f"❌ Error loading embedding model: {e}")

In [19]:
# Cell 3: Configuration
CHROMA_PATH = "/content/chroma"

CONVERSATIONAL_ANSWER_TEMPLATE = """
You are Harry, a helpful, knowledgeable, and friendly AI technical support assistant.
Your goal is to answer questions about technical documentation accurately and helpfully.

You have access to relevant context from the documents and the conversation history.
Use both the context and chat history to provide comprehensive, contextual answers.

INSTRUCTIONS:
1. Always be conversational and remember what was discussed before
2. Reference previous parts of the conversation when relevant
3. If asked for clarification, elaborate on previous responses
4. Provide well-cited, accurate responses
5. If greeting, be friendly and offer help
6. For follow-up questions, build upon previous context
7. If user is asking irrelevant questions just polietly say "I don't have an information about that but I can help you with the motivational textbook named as 'Build Don't Talk'"

Context from Documents: {context}

Chat History: {chat_history}

Current Question: {question}

Harry's Response:
"""

In [20]:
NO_CONTEXT_TEMPLATE = """
You are Harry, a helpful technical support assistant.

I don't have specific context from the documents for this question, but I can help based on our conversation history.

Chat History: {chat_history}

Current Question: {question}

Harry's Response:
"""

In [21]:
from google.colab import userdata
GROQ_API_KEY = userdata.get('GROQ_API_KEY')
os.environ["GROQ_API_KEY"] = GROQ_API_KEY

In [92]:
model = ChatGroq(
        model="llama-3.3-70b-versatile",
        temperature=0.4,  # Slightly higher for more conversational responses
        max_tokens=4096,
        timeout=None,
        max_retries=2,
    )
print("ChatGroq model initialized successfully!")

ChatGroq model initialized successfully!


In [93]:
def get_embedding_function():
    """
    Create and return an embedding function.
    """
    from sentence_transformers import SentenceTransformer
    from langchain.embeddings import HuggingFaceEmbeddings

    model_name = "all-MiniLM-L6-v2"
    embeddings = HuggingFaceEmbeddings(
        model_name=model_name,
        model_kwargs={'device': 'cpu'},
        encode_kwargs={'normalize_embeddings': False}
    )

    return embeddings

In [94]:
class ConversationalMemoryManager:
    """
    Manages conversation history and memory for the RAG system.
    """

    def __init__(self, max_memory_length=10):
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            max_token_limit=2000
        )
        self.max_memory_length = max_memory_length
        self.conversation_log = []

    def add_message(self, human_message, ai_message):
        """Add a new message pair to memory"""
        self.memory.chat_memory.add_user_message(human_message)
        self.memory.chat_memory.add_ai_message(ai_message)

        # Log the conversation with timestamp
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self.conversation_log.append({
            "timestamp": timestamp,
            "human": human_message,
            "ai": ai_message
        })

        # Keep only recent conversations to prevent memory overflow
        if len(self.conversation_log) > self.max_memory_length:
            self.conversation_log.pop(0)

    def get_chat_history(self):
        """Get formatted chat history"""
        history = self.memory.chat_memory.messages
        formatted_history = []

        for msg in history:
            if isinstance(msg, HumanMessage):
                formatted_history.append(f"Human: {msg.content}")
            elif isinstance(msg, AIMessage):
                formatted_history.append(f"Harry: {msg.content}")

        return "\n".join(formatted_history)

    def get_recent_context(self, num_exchanges=3):
        """Get recent conversation context"""
        recent_log = self.conversation_log[-num_exchanges:] if self.conversation_log else []
        context = []

        for exchange in recent_log:
            context.append(f"Human: {exchange['human']}")
            context.append(f"Harry: {exchange['ai']}")

        return "\n".join(context)

    def clear_memory(self):
        """Clear all conversation memory"""
        self.memory.clear()
        self.conversation_log.clear()

    def get_conversation_summary(self):
        """Get a summary of the current conversation"""
        if not self.conversation_log:
            return "No previous conversation."

        total_exchanges = len(self.conversation_log)
        recent_topics = []

        for exchange in self.conversation_log[-3:]:  # Last 3 exchanges
            recent_topics.append(exchange['human'][:50] + "..." if len(exchange['human']) > 50 else exchange['human'])

        return f"Conversation has {total_exchanges} exchanges. Recent topics: {', '.join(recent_topics)}"

In [95]:
memory_manager = ConversationalMemoryManager()

In [115]:
def query_rag_with_memory(query_text: str, use_memory=True):
    """
    Query the RAG system with conversational memory support.

    Args:
        query_text (str): The user's question
        use_memory (bool): Whether to use conversation history

    Returns:
        str: The answer from the RAG system
    """
    try:
        # Initialize embedding function and database
        embedding_function = get_embedding_function()
        db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function)

        # Get chat history
        chat_history = memory_manager.get_chat_history() if use_memory else ""

        # Handle greetings
        greetings = ['hi', 'hello', 'hey', 'greetings', 'good morning', 'good afternoon', 'good evening']
        if query_text.lower().strip() in greetings:
            response = f"Hello! I am here to assist you with the 'Build Don't Talk' Book, ask me your question {memory_manager.get_conversation_summary()}"
            if use_memory:
                memory_manager.add_message(query_text, response)
            return response

        # Search the database for relevant context
        results = db.similarity_search_with_score(query_text, k=5)

        print("Retrieved contexts:")
        for i, (doc, score) in enumerate(results, 1):
            print(f"Context {i}: Score={score:.4f}, Content={doc.page_content[:100]}...")

        print("=" * 50)

        # Combine context from search results
        context_text = "\n\n".join([doc.page_content for doc, _score in results])

        # Choose template based on context availability
        if context_text.strip():
            prompt_template = ChatPromptTemplate.from_template(CONVERSATIONAL_ANSWER_TEMPLATE)
            prompt = prompt_template.format(
                context=context_text,
                chat_history=chat_history,
                question=query_text
            )
        else:
            prompt_template = ChatPromptTemplate.from_template(NO_CONTEXT_TEMPLATE)
            prompt = prompt_template.format(
                chat_history=chat_history,
                question=query_text
            )

        # Generate response using the model
        response = model.invoke(prompt)

        # Extract content from response
        if hasattr(response, 'content'):
            response_content = response.content
        else:
            response_content = str(response)

        # Add to memory if enabled
        if use_memory:
            memory_manager.add_message(query_text, response_content)

        return response_content

    except Exception as e:
        error_msg = f"Error processing query: {str(e)}"
        if use_memory:
            memory_manager.add_message(query_text, error_msg)
        return error_msg

In [97]:
def ask_harry(question, remember=True):
    """
    Ask Harry a question with optional memory.

    Args:
        question (str): Your question for Harry
        remember (bool): Whether to remember this conversation

    Returns:
        str: Harry's response
    """
    print(f"Question: {question}")
    print("=" * 50)

    response = query_rag_with_memory(question, use_memory=remember)
    print(f"Harry's Answer: {response}")
    print("=" * 50)

    return response

In [98]:
ask_harry('what the author is saying')

Question: what the author is saying
Retrieved contexts:
Context 1: Score=1.1694, Content=Fact: Most people don’t achieve anything in their life because most people
don’t do anything in thei...
Context 2: Score=1.2283, Content=‘Raj, your course is very good. All the things that you have done and learnt
in the last five to six...
Context 3: Score=1.3273, Content=the reason I’m becoming a creator and expanding my reach, so that I can be
a crucial part of the Ind...
Context 4: Score=1.3391, Content=Preface: Little Accomplishments
I hate reading books, but I’m writing one.
If you love reading books...
Context 5: Score=1.3444, Content=scenario, like, you know what, if you buy my book today then your life can
also get better, like min...
Harry's Answer: Hello. I'm happy to help you understand what the author is saying. The author appears to be emphasizing the importance of taking action and making progress in small, manageable steps. They're encouraging readers to focus on achieving "little a

'Hello. I\'m happy to help you understand what the author is saying. The author appears to be emphasizing the importance of taking action and making progress in small, manageable steps. They\'re encouraging readers to focus on achieving "little accomplishments" rather than feeling overwhelmed by large goals or projects.\n\nThe author is also being quite honest about their own approach to learning and personal growth, admitting that they don\'t enjoy reading books but are writing one anyway. They\'re offering alternative ways for people to learn, such as through videos or podcasts, and emphasizing that the key to growth is through consistent, daily efforts.\n\nIt seems that the author is trying to motivate readers to take control of their own learning and development, rather than relying on traditional methods or procrastinating. They\'re using their own experiences and successes as examples, and encouraging readers to apply the principles outlined in the book to improve their own lives

In [99]:
def ask_harry_with_context():
    """
    Ask Harry with full conversation context displayed.
    """
    question = input("Your question: ")

    print("\n" + "="*60)
    print("CONVERSATION CONTEXT:")
    print(memory_manager.get_conversation_summary())
    print("="*60)

    response = ask_harry(question)

    print("\n" + "="*60)
    print("UPDATED CONVERSATION HISTORY:")
    recent_history = memory_manager.get_recent_context(3)
    print(recent_history if recent_history else "No conversation history yet.")
    print("="*60)

    return response

In [100]:
def clear_conversation():
    """Clear the conversation memory."""
    memory_manager.clear_memory()
    print("Conversation memory cleared!")

In [101]:
def show_conversation_history():
    """Display the current conversation history."""
    print("\n" + "="*60)
    print("CONVERSATION HISTORY:")
    print("="*60)

    history = memory_manager.get_chat_history()
    if history:
        print(history)
    else:
        print("No conversation history yet.")

    print("="*60)
    print(f"Total exchanges: {len(memory_manager.conversation_log)}")


In [111]:
def create_conversational_widget_interface():
    """
    Create an enhanced widget interface with conversation memory.
    """
    import ipywidgets as widgets
    from IPython.display import display, clear_output

    # Create widgets
    question_input = widgets.Text(
        placeholder='Ask me a question...',
        description='Question:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='50%')
    )

    ask_button = widgets.Button(description='Ask Harry', button_style='primary', layout=widgets.Layout(width='15%'))
    eli5_button = widgets.Button(description='ELI5 Mode', button_style='info', layout=widgets.Layout(width='15%'))
    clear_button = widgets.Button(description='Clear Chat', button_style='warning', layout=widgets.Layout(width='10%'))
    history_button = widgets.Button(description='Show History', button_style='success', layout=widgets.Layout(width='10%'))

    output_area = widgets.Output()

    # Ask Handler (used by both button and Enter key)
    def on_ask_click(b=None):
        with output_area:
            output_area.clear_output()
            query = question_input.value.strip()
            if query and len(query) >= 3:
                print(f"🤔 You: {query}")
                print("-" * 50)
                response = query_rag_with_memory(query)
                print(f"🤖 Harry: {response}")
                print("=" * 50)
                question_input.value = ""
            else:
                print("❌ Please enter a valid question (min 3 characters).")

    def on_eli5_click(b):
        with output_area:
            output_area.clear_output()
            query = question_input.value.strip()
            if query:
                print(f"🤔 You (ELI5): {query}")
                print("-" * 50)
                response = explain_like_im_five(query)
                print(f"🤖 Harry (ELI5): {response}")
                print("=" * 50)
                question_input.value = ""
            else:
                print("❌ Please enter a question for ELI5 mode!")

    def on_clear_click(b):
        clear_conversation()
        output_area.clear_output()
        print("🧹 Conversation cleared!")

    def on_history_click(b):
        with output_area:
            output_area.clear_output()
            print("📜 CONVERSATION HISTORY:")
            print("=" * 60)
            history = memory_manager.get_chat_history()
            if history:
                print(history)
            else:
                print("No conversation history yet.")
            print("=" * 60)

    # Connect events
    ask_button.on_click(on_ask_click)
    eli5_button.on_click(on_eli5_click)
    clear_button.on_click(on_clear_click)
    history_button.on_click(on_history_click)

    # ✅ Submit on Enter
    question_input.on_submit(on_ask_click)

    # Display interface
    print("🤖 Harry's Enhanced Technical Support with Memory")
    print("=" * 60)
    display(widgets.VBox([
        question_input,
        widgets.HBox([ask_button, eli5_button, clear_button, history_button]),
        output_area
    ]))


In [103]:
def interactive_mode_with_memory():
    """
    Run an enhanced interactive session with memory capabilities.
    """
    print("🤖 Welcome to Harry's Enhanced Technical Support!")
    print("Features: Conversational Memory, ELI5 Mode, Summarization")
    print("=" * 60)
    print("Commands:")
    print("- 'quit' or 'exit' - End session")
    print("- 'eli5 <question>' - Explain like I'm 5")
    print("- 'history' - Show conversation history")
    print("- 'clear' - Clear conversation memory")
    print("=" * 60)

    while True:
        try:
            question = input(f"\n[{len(memory_manager.conversation_log)} exchanges] Your question: ")

            if question.lower() in ['quit', 'exit', 'bye']:
                print("Thank you for using Harry's Technical Support!")

                # Offer to save conversation
                save_choice = input("Would you like to save this conversation? (y/n): ")
                if save_choice.lower() == 'y':
                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                    filename = f"conversation_{timestamp}.json"

                    conversation_data = {
                        "timestamp": timestamp,
                        "total_exchanges": len(memory_manager.conversation_log),
                        "conversation": memory_manager.conversation_log
                    }

                    with open(filename, 'w') as f:
                        json.dump(conversation_data, f, indent=2)

                    print(f"Conversation saved to {filename}")

                break

            elif question.lower() == 'history':
                show_conversation_history()

            elif question.lower() == 'clear':
                clear_conversation()

            elif question.lower().startswith('eli5 '):
                eli5_question = question[5:]  # Remove 'eli5 ' prefix
                print(f"\n🎈 ELI5 Mode - Question: {eli5_question}")
                print("-" * 40)
                response = explain_like_im_five(eli5_question)
                print(f"Harry (ELI5): {response}")

            elif question.strip():
                print(f"\n💬 Processing: {question}")
                print("-" * 40)
                response = ask_harry(question)

            else:
                print("Please enter a valid question or command.")

        except KeyboardInterrupt:
            print("\nSession interrupted.")
            break
        except Exception as e:
            print(f"An error occurred: {e}")

In [104]:
def export_conversation(filename=None):
    """
    Export conversation to JSON file.
    """
    if not filename:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"conversation_{timestamp}.json"

    conversation_data = {
        "export_timestamp": datetime.now().isoformat(),
        "total_exchanges": len(memory_manager.conversation_log),
        "conversation": memory_manager.conversation_log,
    }

    with open(filename, 'w') as f:
        json.dump(conversation_data, f, indent=2)

    print(f"Conversation exported to {filename}")
    return filename

In [105]:
def import_conversation(filename):
    """
    Import conversation from JSON file.
    """
    try:
        with open(filename, 'r') as f:
            conversation_data = json.load(f)

        # Clear current memory
        memory_manager.clear_memory()

        # Import conversation
        for exchange in conversation_data['conversation']:
            memory_manager.add_message(exchange['human'], exchange['ai'])

        print(f"Conversation imported from {filename}")

    except Exception as e:
        print(f"Error importing conversation: {e}")

In [106]:
def launch_enhanced_widget_interface():
    """Launch the enhanced IPython widgets interface"""
    print("Launching enhanced interface with conversational memory...")
    try:
        import ipywidgets as widgets
        from IPython.display import display
        create_conversational_widget_interface()
    except ImportError:
        print("Installing ipywidgets...")
        !pip install ipywidgets
        import ipywidgets as widgets
        from IPython.display import display
        create_conversational_widget_interface()

In [80]:
interactive_mode_with_memory()

In [None]:
launch_enhanced_widget_interface()