In [45]:
# Import necessary libraries
import os
import google.generativeai as genai
import pinecone
from pinecone import Pinecone, ServerlessSpec
import pandas as pd
import numpy as np
import tiktoken
from typing import List, Dict, Any, Optional, Tuple, Union
import json
from dotenv import load_dotenv
import time
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain.schema import Document
from sentence_transformers import SentenceTransformer, CrossEncoder
from dataclasses import dataclass, field
from enum import Enum
import logging
from abc import ABC, abstractmethod
import asyncio
from collections import defaultdict
import math

# Load environment variables
load_dotenv()

print("Libraries imported successfully for ReACT-RAG system!")

Libraries imported successfully for ReACT-RAG system!


In [46]:
# Environment Setup and API Configuration
class Config:
    def __init__(self):
        # API Keys (store these in a .env file)
        self.gemini_api_key = os.getenv('GEMINI_API_KEY')
        self.pinecone_api_key = os.getenv('PINECONE_API_KEY')
        
        # Model configurations
        self.embedding_model = "models/embedding-001"  # Gemini embedding model
        self.chat_model = "gemini-1.5-flash"  # Gemini chat model
        self.max_tokens = 150
        self.temperature = 0.1
        
        # Alternative: Use sentence-transformers for embeddings (free)
        self.use_sentence_transformers = False  # Set to True to use free embeddings
        self.sentence_transformer_model = "all-MiniLM-L6-v2"
        
        # Pinecone configurations
        self.index_name = "business-qa-bot-gemini"  # New index name to avoid conflicts
        
        # Set correct dimensions based on embedding method
        if self.use_sentence_transformers:
            self.dimension = 384  # all-MiniLM-L6-v2 dimension
        else:
            self.dimension = 768  # Gemini embedding dimension
            
        self.metric = "cosine"
        
        # Document processing
        self.chunk_size = 1000
        self.chunk_overlap = 200
        self.top_k_results = 5
        
        # Validate API keys
        if not self.gemini_api_key:
            print("⚠️  Warning: GEMINI_API_KEY not found in environment variables")
            print("💡 You can get a free API key at: https://makersuite.google.com/app/apikey")
        if not self.pinecone_api_key:
            print("⚠️  Warning: PINECONE_API_KEY not found in environment variables")
        
        # Configure Gemini API
        if self.gemini_api_key:
            genai.configure(api_key=self.gemini_api_key)
        
        # Print configuration info
        embedding_method = "Sentence Transformers" if self.use_sentence_transformers else "Gemini"
        print(f"🔧 Embedding method: {embedding_method}")
        print(f"📏 Vector dimension: {self.dimension}")

config = Config()
print("✅ Configuration loaded successfully!")

🔧 Embedding method: Gemini
📏 Vector dimension: 768
✅ Configuration loaded successfully!


In [47]:
# Pinecone Vector Database Setup
class PineconeManager:
    def __init__(self, config):
        self.config = config
        self.pc = None
        self.index = None
        
    def initialize_pinecone(self):
        """Initialize Pinecone client and create/connect to index"""
        try:
            # Initialize Pinecone
            self.pc = Pinecone(api_key=self.config.pinecone_api_key)
            
            # Check if index exists
            existing_indexes = [index.name for index in self.pc.list_indexes()]
            
            if self.config.index_name not in existing_indexes:
                print(f"Creating new index: {self.config.index_name}")
                self.pc.create_index(
                    name=self.config.index_name,
                    dimension=self.config.dimension,
                    metric=self.config.metric,
                    spec=ServerlessSpec(
                        cloud="aws",
                        region="us-east-1"
                    )
                )
                # Wait for index to be ready
                time.sleep(10)
            else:
                print(f"Index {self.config.index_name} already exists")
            
            # Connect to index
            self.index = self.pc.Index(self.config.index_name)
            print(f"✅ Successfully connected to Pinecone index: {self.config.index_name}")
            
            # Get index stats
            stats = self.index.describe_index_stats()
            print(f"Index stats: {stats}")
            
        except Exception as e:
            print(f"❌ Error initializing Pinecone: {str(e)}")
            raise
    
    def delete_index(self):
        """Delete the index (use with caution!)"""
        if self.pc and self.config.index_name:
            try:
                self.pc.delete_index(self.config.index_name)
                print(f"🗑️ Index {self.config.index_name} deleted")
            except Exception as e:
                print(f"Error deleting index: {str(e)}")

# Initialize Pinecone Manager
pinecone_manager = PineconeManager(config)

# Only initialize if API key is available
if config.pinecone_api_key:
    pinecone_manager.initialize_pinecone()
else:
    print("⚠️  Skipping Pinecone initialization - API key not found")

Index business-qa-bot-gemini already exists
✅ Successfully connected to Pinecone index: business-qa-bot-gemini
Index stats: {'dimension': 768,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 63}},
 'total_vector_count': 63,
 'vector_type': 'dense'}
Index stats: {'dimension': 768,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 63}},
 'total_vector_count': 63,
 'vector_type': 'dense'}


In [48]:
# Document Processing and Text Splitting
class DocumentProcessor:
    def __init__(self, config):
        self.config = config
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=config.chunk_size,
            chunk_overlap=config.chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )
        
    def process_text(self, text: str, source: str = "unknown") -> List[Document]:
        """Process a text string into chunks"""
        chunks = self.text_splitter.split_text(text)
        documents = []
        
        for i, chunk in enumerate(chunks):
            doc = Document(
                page_content=chunk,
                metadata={
                    "source": source,
                    "chunk_id": i,
                    "total_chunks": len(chunks),
                    "chunk_size": len(chunk)
                }
            )
            documents.append(doc)
        
        return documents
    
    def process_file(self, file_path: str) -> List[Document]:
        """Process a file into chunks"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()
            
            return self.process_text(content, source=file_path)
        
        except Exception as e:
            print(f"Error processing file {file_path}: {str(e)}")
            return []
    
    def process_multiple_texts(self, texts: List[Dict[str, str]]) -> List[Document]:
        """Process multiple texts with metadata"""
        all_documents = []
        
        for text_data in texts:
            text = text_data.get('content', '')
            source = text_data.get('source', 'unknown')
            
            documents = self.process_text(text, source)
            all_documents.extend(documents)
        
        return all_documents

# Initialize Document Processor
doc_processor = DocumentProcessor(config)
print("✅ Document processor initialized!")

✅ Document processor initialized!


In [49]:
# Embedding Generation and Vector Store Operations
class EmbeddingManager:
    def __init__(self, config, pinecone_manager):
        self.config = config
        self.pinecone_manager = pinecone_manager
        
        # Choose embedding method
        if config.use_sentence_transformers:
            # Use free sentence transformers
            self.embeddings = SentenceTransformer(config.sentence_transformer_model)
            self.embedding_type = "sentence_transformers"
            print("🔧 Using Sentence Transformers for embeddings (free)")
        elif config.gemini_api_key:
            # Use Gemini embeddings
            self.embeddings = GoogleGenerativeAIEmbeddings(
                model=config.embedding_model,
                google_api_key=config.gemini_api_key
            )
            self.embedding_type = "gemini"
            print("🔧 Using Gemini API for embeddings")
        else:
            self.embeddings = None
            self.embedding_type = None
            print("❌ No embedding method available")
        
    def create_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for a list of texts"""
        if not self.embeddings:
            raise ValueError("No embedding method configured")
        
        try:
            if self.embedding_type == "sentence_transformers":
                # Use sentence transformers
                embeddings = self.embeddings.encode(texts, convert_to_tensor=False)
                return embeddings.tolist()
            elif self.embedding_type == "gemini":
                # Use Gemini embeddings
                embeddings = self.embeddings.embed_documents(texts)
                return embeddings
            else:
                raise ValueError("Unknown embedding type")
                
        except Exception as e:
            print(f"Error creating embeddings: {str(e)}")
            raise
    
    def create_query_embedding(self, query: str) -> List[float]:
        """Generate embedding for a single query"""
        if not self.embeddings:
            raise ValueError("No embedding method configured")
        
        try:
            if self.embedding_type == "sentence_transformers":
                # Use sentence transformers
                embedding = self.embeddings.encode([query], convert_to_tensor=False)
                return embedding[0].tolist()
            elif self.embedding_type == "gemini":
                # Use Gemini embeddings
                embedding = self.embeddings.embed_query(query)
                return embedding
            else:
                raise ValueError("Unknown embedding type")
                
        except Exception as e:
            print(f"Error creating query embedding: {str(e)}")
            raise
    
    def add_documents_to_vectorstore(self, documents: List[Document]) -> bool:
        """Add documents to Pinecone vector store"""
        if not self.pinecone_manager.index:
            print("❌ Pinecone index not initialized")
            return False
        
        try:
            # Extract texts and metadata
            texts = [doc.page_content for doc in documents]
            metadatas = [doc.metadata for doc in documents]
            
            # Generate embeddings
            embeddings = self.create_embeddings(texts)
            
            # Prepare vectors for upsert
            vectors = []
            for i, (text, embedding, metadata) in enumerate(zip(texts, embeddings, metadatas)):
                vector_id = f"doc_{int(time.time())}_{i}"
                vectors.append({
                    "id": vector_id,
                    "values": embedding,
                    "metadata": {
                        **metadata,
                        "text": text[:1000]  # Store first 1000 chars in metadata
                    }
                })
            
            # Upsert vectors to Pinecone
            self.pinecone_manager.index.upsert(vectors)
            
            print(f"✅ Successfully added {len(documents)} documents to vector store")
            return True
            
        except Exception as e:
            print(f"❌ Error adding documents to vector store: {str(e)}")
            return False
    
    def search_similar(self, query: str, top_k: int = None) -> List[Dict]:
        """Search for similar documents"""
        if not self.pinecone_manager.index:
            print("❌ Pinecone index not initialized")
            return []
        
        if top_k is None:
            top_k = self.config.top_k_results
        
        try:
            # Generate query embedding
            query_embedding = self.create_query_embedding(query)
            
            # Search in Pinecone
            results = self.pinecone_manager.index.query(
                vector=query_embedding,
                top_k=top_k,
                include_metadata=True
            )
            
            # Format results
            formatted_results = []
            for match in results.matches:
                formatted_results.append({
                    "id": match.id,
                    "score": match.score,
                    "text": match.metadata.get("text", ""),
                    "source": match.metadata.get("source", "unknown"),
                    "metadata": match.metadata
                })
            
            return formatted_results
            
        except Exception as e:
            print(f"❌ Error searching vector store: {str(e)}")
            return []

# Initialize Embedding Manager
embedding_manager = EmbeddingManager(config, pinecone_manager)
print("✅ Embedding manager initialized!")

🔧 Using Gemini API for embeddings
✅ Embedding manager initialized!


In [50]:
# RAG System - Main Class
class BusinessQABot:
    def __init__(self, config, embedding_manager, doc_processor):
        self.config = config
        self.embedding_manager = embedding_manager
        self.doc_processor = doc_processor
        
        # Initialize Gemini chat model
        if config.gemini_api_key:
            self.chat_model = ChatGoogleGenerativeAI(
                model=config.chat_model,
                google_api_key=config.gemini_api_key,
                temperature=config.temperature,
                max_tokens=config.max_tokens
            )
            print("🤖 Gemini chat model initialized")
        else:
            self.chat_model = None
            print("❌ Gemini API key not configured")
        
    def add_business_knowledge(self, knowledge_base: List[Dict[str, str]]):
        """Add business documents to the knowledge base"""
        print("📚 Processing business documents...")
        
        # Process documents
        documents = self.doc_processor.process_multiple_texts(knowledge_base)
        print(f"📄 Created {len(documents)} document chunks")
        
        # Add to vector store
        success = self.embedding_manager.add_documents_to_vectorstore(documents)
        
        if success:
            print("✅ Business knowledge base updated successfully!")
        else:
            print("❌ Failed to update knowledge base")
        
        return success
    
    def retrieve_context(self, query: str) -> str:
        """Retrieve relevant context for a query"""
        # Search for relevant documents
        similar_docs = self.embedding_manager.search_similar(query)
        
        if not similar_docs:
            return "No relevant information found in the knowledge base."
        
        # Combine relevant texts
        context_texts = []
        for doc in similar_docs:
            source = doc.get('source', 'Unknown')
            text = doc.get('text', '')
            score = doc.get('score', 0)
            
            context_texts.append(f"Source: {source} (Relevance: {score:.3f})\n{text}")
        
        return "\n\n---\n\n".join(context_texts)
    
    def generate_response(self, query: str, context: str) -> str:
        """Generate response using Gemini with retrieved context"""
        if not self.chat_model:
            return "❌ Gemini API not configured"
        
        system_prompt = """You are a helpful business assistant. Use the provided context to answer questions about the business. 
        If the context doesn't contain relevant information, say so clearly. 
        Be concise, accurate, and professional in your responses.
        
        Guidelines:
        - Only use information from the provided context
        - If unsure, ask for clarification
        - Provide specific details when available
        - Be helpful and professional"""
        
        user_prompt = f"""Context:
{context}

Question: {query}

Please provide a helpful and accurate answer based on the context above."""
        
        try:
            # Combine system and user prompts for Gemini
            full_prompt = f"{system_prompt}\n\n{user_prompt}"
            
            # Generate response using Gemini
            response = self.chat_model.invoke(full_prompt)
            
            # Extract text content from response
            if hasattr(response, 'content'):
                return response.content.strip()
            else:
                return str(response).strip()
            
        except Exception as e:
            return f"❌ Error generating response: {str(e)}"
    
    def ask(self, query: str) -> Dict[str, Any]:
        """Main method to ask a question and get a response"""
        print(f"🤔 Question: {query}")
        
        # Retrieve relevant context
        print("🔍 Searching knowledge base...")
        context = self.retrieve_context(query)
        
        # Generate response
        print("🤖 Generating response...")
        response = self.generate_response(query, context)
        
        result = {
            "query": query,
            "context": context,
            "response": response,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        print(f"💬 Response: {response}")
        return result

# Initialize the Business QA Bot
qa_bot = BusinessQABot(config, embedding_manager, doc_processor)
print("🤖 Business QA Bot initialized and ready!")

# Legacy Simple RAG System (Kept for Reference)
# Note: This has been superseded by the ReACT-RAG system above
# Keeping for compatibility and comparison purposes

class LegacyBusinessQABot:
    """Legacy simple RAG system - kept for reference and comparison"""
    
    def __init__(self, config, embedding_manager, doc_processor):
        self.config = config
        self.embedding_manager = embedding_manager
        self.doc_processor = doc_processor
        
        # Initialize Gemini chat model
        if config.gemini_api_key:
            self.chat_model = ChatGoogleGenerativeAI(
                model=config.chat_model,
                google_api_key=config.gemini_api_key,
                temperature=config.temperature,
                max_tokens=config.max_tokens
            )
        else:
            self.chat_model = None
    
    def ask(self, query: str) -> Dict[str, Any]:
        """Simple single-step RAG processing"""
        # Retrieve relevant context
        similar_docs = self.embedding_manager.search_similar(query)
        
        if not similar_docs:
            context = "No relevant information found in the knowledge base."
        else:
            context_texts = []
            for doc in similar_docs:
                source = doc.get('source', 'Unknown')
                text = doc.get('text', '')
                score = doc.get('score', 0)
                context_texts.append(f"Source: {source} (Relevance: {score:.3f})\n{text}")
            context = "\n\n---\n\n".join(context_texts)
        
        # Generate response
        if not self.chat_model:
            return {
                "query": query,
                "context": context,
                "response": "❌ Gemini API not configured",
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
            }
        
        system_prompt = """You are a helpful business assistant. Use the provided context to answer questions about the business."""
        
        user_prompt = f"""Context:\n{context}\n\nQuestion: {query}\n\nPlease provide a helpful answer based on the context above."""
        
        try:
            full_prompt = f"{system_prompt}\n\n{user_prompt}"
            response = self.chat_model.invoke(full_prompt)
            response_text = response.content if hasattr(response, 'content') else str(response)
        except Exception as e:
            response_text = f"❌ Error generating response: {str(e)}"
        
        return {
            "query": query,
            "context": context,
            "response": response_text,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }

# Initialize legacy system for comparison
legacy_qa_bot = LegacyBusinessQABot(config, embedding_manager, doc_processor)
print("📝 Legacy QA Bot initialized (for reference)")

🤖 Gemini chat model initialized
🤖 Business QA Bot initialized and ready!
📝 Legacy QA Bot initialized (for reference)


In [51]:
# Sample Business Knowledge Base
sample_business_knowledge = [
    {
        "content": """
        Company Overview:
        TechFlow Solutions is a leading software development company founded in 2018. 
        We specialize in web applications, mobile development, and cloud solutions.
        
        Our Mission: To deliver innovative technology solutions that drive business growth.
        Our Vision: To be the most trusted technology partner for businesses worldwide.
        
        Core Values:
        - Innovation: We embrace cutting-edge technologies
        - Quality: We deliver excellence in every project
        - Collaboration: We work closely with our clients
        - Integrity: We maintain the highest ethical standards
        """,
        "source": "company_overview.md"
    },
    {
        "content": """
        Services Offered:
        
        1. Web Development
        - Frontend: React, Vue.js, Angular, Svelte, Next.js, Nuxt.js
        - Backend: Node.js, Python, Java, Go, PHP, Ruby
        - Full-stack solutions, API development, microservices architecture
        - E-commerce platforms, CMS integration, custom dashboards
        
        2. Mobile Development
        - Native iOS and Android apps
        - Cross-platform with React Native, Flutter, Xamarin
        - Progressive Web Apps (PWAs), hybrid apps
        - Mobile UI/UX design, app store deployment, mobile analytics
        
        3. Cloud Solutions
        - AWS, Azure, Google Cloud, IBM Cloud
        - DevOps and CI/CD, serverless architecture, containerization (Docker, Kubernetes)
        - Cloud migration services, multi-cloud strategies, disaster recovery
        - Infrastructure as Code (Terraform, Ansible), monitoring & logging
        
        4. Consulting Services
        - Technology strategy, digital transformation, architecture design
        - IT audits, security assessments, compliance consulting (GDPR, HIPAA)
        - Agile coaching, project management, process optimization
        
        5. Data & AI Solutions
        - Data engineering, ETL pipelines, data warehousing
        - Business intelligence dashboards, reporting automation
        - Machine learning model development, NLP, computer vision
        - AI chatbot integration, recommendation systems
        
        6. UI/UX Design
        - User research, wireframing, prototyping
        - Visual design, branding, accessibility audits
        - Usability testing, design systems, responsive design
        
        7. Support & Maintenance
        - SLA-based support, 24/7 monitoring, incident response
        - Application updates, bug fixes, performance optimization
        - Security patching, backup & restore, legacy system support
        """,
        "source": "services.md"
    },
    {
        "content": """
        Pricing Information:
        
        Web Development:
        - Basic website: $5,000 - $15,000
        - E-commerce platform: $15,000 - $50,000
        - Enterprise web application: $50,000+
        - Custom dashboard: $10,000 - $30,000
        - API development: $8,000 - $25,000
        
        Mobile Development:
        - Simple mobile app: $10,000 - $30,000
        - Complex mobile app: $30,000 - $100,000
        - Enterprise mobile solution: $100,000+
        - Cross-platform app: $15,000 - $40,000
        
        Cloud Solutions:
        - Cloud migration: $20,000 - $75,000
        - DevOps setup: $15,000 - $40,000
        - Ongoing cloud management: $3,000 - $10,000/month
        - Disaster recovery setup: $10,000 - $25,000
        
        Data & AI Solutions:
        - Data pipeline setup: $12,000 - $40,000
        - Machine learning model: $20,000 - $60,000
        - BI dashboard: $8,000 - $25,000
        
        UI/UX Design:
        - Wireframing & prototyping: $2,000 - $8,000
        - Full design system: $10,000 - $25,000
        
        Hourly Rates:
        - Senior Developer: $150 - $200/hour
        - Mid-level Developer: $100 - $150/hour
        - Junior Developer: $75 - $100/hour
        - Project Manager: $125 - $175/hour
        - UI/UX Designer: $90 - $140/hour
        - Data Scientist: $160 - $220/hour
        """,
        "source": "pricing.md"
    },
    {
        "content": """
        Contact Information:
        
        Headquarters:
        TechFlow Solutions
        123 Innovation Drive
        Tech City, TC 12345
        
        Phone: +1 (555) 123-4567
        Email: info@techflowsolutions.com
        Website: www.techflowsolutions.com
        
        Office Hours:
        Monday - Friday: 9:00 AM - 6:00 PM EST
        Saturday: 10:00 AM - 2:00 PM EST
        Sunday: Closed
        
        Emergency Support:
        Available 24/7 for enterprise clients
        Emergency hotline: +1 (555) 999-8888
        
        Sales Team:
        sales@techflowsolutions.com
        +1 (555) 123-4567 ext. 100
        
        Support Team:
        support@techflowsolutions.com
        +1 (555) 123-4567 ext. 200

        Regional Offices:
        - Europe: 45 Tech Park, Berlin, Germany, +49 30 123456
        - Asia-Pacific: 88 Innovation Ave, Singapore, +65 6789 1234
        - South America: 12 Av. Tecnologia, São Paulo, Brazil, +55 11 2345 6789

        Social Media:
        - LinkedIn: linkedin.com/company/techflowsolutions
        - Twitter: @TechFlowSol
        - Facebook: facebook.com/techflowsolutions
        """,
        "source": "contact_info.md"
    },
    {
        "content": """
        Frequently Asked Questions:
        
        Q: How long does a typical project take?
        A: Project timelines vary based on complexity. Simple websites take 4-8 weeks, 
        while complex applications can take 3-12 months.

        Q: Do you offer maintenance and support?
        A: Yes, we provide ongoing maintenance packages starting at $500/month. 
        We also offer 24/7 support for enterprise clients.

        Q: What technologies do you specialize in?
        A: We work with modern web technologies including React, Node.js, Python, 
        and cloud platforms like AWS and Azure. We also have expertise in AI, data engineering, and DevOps.

        Q: Can you work with our existing team?
        A: Absolutely! We offer staff augmentation and can integrate with your 
        existing development processes. We also provide agile coaching and project management.

        Q: Do you sign NDAs?
        A: Yes, we're happy to sign NDAs and maintain strict confidentiality.

        Q: What's your refund policy?
        A: We offer milestone-based payments with clear deliverables. 
        Refunds are handled on a case-by-case basis.

        Q: Can you help with digital transformation?
        A: Yes, we provide consulting for digital transformation, including process optimization and technology upgrades.

        Q: Do you provide training?
        A: Yes, we offer training sessions for client teams on new systems and technologies.

        Q: What industries do you serve?
        A: We serve clients in retail, healthcare, finance, education, logistics, and more.

        Q: How do you ensure project quality?
        A: We follow best practices in software engineering, conduct code reviews, and use automated testing.
        """,
        "source": "faq.md"
    },
    {
        "content": """
        Case Studies:
        
        1. E-commerce Transformation - RetailGiant Inc.
        - Challenge: Legacy system with poor performance and high cart abandonment
        - Solution: Modern React frontend with Node.js microservices backend
        - Results: 65% reduction in page load time, 40% increase in conversions
        - Timeline: Completed in 6 months
        
        2. Healthcare Mobile App - MediCare Solutions
        - Challenge: Need for secure patient data access on mobile devices
        - Solution: HIPAA-compliant React Native app with biometric authentication
        - Results: 90% physician adoption, 30% reduction in administrative tasks
        - Timeline: Completed in 8 months
        
        3. Cloud Migration - FinTech Leader
        - Challenge: Legacy on-premise infrastructure with high maintenance costs
        - Solution: Complete AWS migration with containerization
        - Results: 50% infrastructure cost reduction, 99.99% uptime achieved
        - Timeline: Completed in 12 months

        4. AI Chatbot for Customer Support - ShopEase
        - Challenge: High volume of repetitive customer queries
        - Solution: NLP-powered chatbot integrated with CRM
        - Results: 70% reduction in support tickets, improved customer satisfaction
        - Timeline: Completed in 4 months

        5. Data Analytics Platform - EduAnalytics
        - Challenge: Manual reporting and lack of actionable insights
        - Solution: Automated BI dashboards with real-time analytics
        - Results: 80% reduction in reporting time, data-driven decision making
        - Timeline: Completed in 5 months
        """,
        "source": "case_studies.md"
    },
    {
        "content": """
        Technology Stack:
        
        Frontend Technologies:
        - JavaScript/TypeScript
        - React, Angular, Vue.js, Svelte, Next.js, Nuxt.js
        - HTML5/CSS3
        - Bootstrap, Tailwind CSS, Material UI
        - Redux, MobX, Zustand
        
        Backend Technologies:
        - Node.js (Express, NestJS)
        - Python (Django, Flask, FastAPI)
        - Java (Spring Boot)
        - Go, PHP (Laravel), Ruby on Rails
        - GraphQL, REST API design
        
        Mobile Technologies:
        - Swift/Objective-C (iOS)
        - Kotlin/Java (Android)
        - React Native, Flutter, Xamarin
        - Progressive Web Apps
        
        Database Technologies:
        - SQL: PostgreSQL, MySQL, MS SQL Server
        - NoSQL: MongoDB, DynamoDB, Cassandra
        - Redis, Elasticsearch, Firebase
        
        DevOps & Cloud:
        - AWS, Azure, Google Cloud, IBM Cloud
        - Docker, Kubernetes, OpenShift
        - CI/CD: Jenkins, GitHub Actions, GitLab CI
        - Terraform, Ansible, Pulumi
        
        Testing:
        - Jest, React Testing Library, Mocha
        - JUnit, pytest, unittest
        - Selenium, Cypress, Appium
        - LoadRunner, JMeter, k6
        
        Data & AI:
        - Pandas, NumPy, scikit-learn, TensorFlow, PyTorch
        - Apache Spark, Airflow, dbt
        - Power BI, Tableau, Looker
        """,
        "source": "tech_stack.md"
    },
    {
        "content": """
        Our Team:
        
        Leadership:
        
        Sarah Johnson - CEO & Founder
        Former CTO at TechGiant Corp with 15+ years of enterprise software experience.
        MS in Computer Science from Stanford University.
        
        Michael Chen - CTO
        Backend architecture expert with previous experience at Amazon and Google.
        PhD in Distributed Systems from MIT.
        
        Elena Rodriguez - Director of Engineering
        Full-stack development leader with expertise in scalable applications.
        10+ years experience leading engineering teams of 20+ developers.
        
        David Kim - Head of Product
        Product strategy expert specializing in user-centered design.
        Previously led product teams at three successful startups.

        Priya Patel - Head of Data Science
        AI/ML specialist with a background in big data analytics and cloud AI solutions.
        Former lead data scientist at FinData Corp.

        Team Structure:
        - 35 developers (15 frontend, 12 backend, 8 mobile)
        - 5 UX/UI designers
        - 8 QA engineers
        - 6 DevOps engineers
        - 4 project managers
        - 3 product managers
        - 4 data scientists
        - 2 business analysts
        
        Certifications:
        - AWS Certified Solutions Architect
        - Google Cloud Professional Architect
        - Microsoft Certified Azure Developer
        - Certified Scrum Master
        - PMP Certified Project Managers
        - Certified Data Scientist (CDS)
        """,
        "source": "team.md"
    }
]

print("📚 Sample business knowledge base prepared!")
print(f"Total documents: {len(sample_business_knowledge)}")
for doc in sample_business_knowledge:
    print(f"- {doc['source']}: {len(doc['content'])} characters")

📚 Sample business knowledge base prepared!
Total documents: 8
- company_overview.md: 663 characters
- services.md: 1919 characters
- pricing.md: 1371 characters
- contact_info.md: 1156 characters
- faq.md: 1773 characters
- case_studies.md: 1659 characters
- tech_stack.md: 1382 characters
- team.md: 1557 characters


In [52]:
# Load Knowledge Base into Vector Store
if (config.gemini_api_key or config.use_sentence_transformers) and config.pinecone_api_key:
    print("🚀 Loading business knowledge into vector store...")
    success = qa_bot.add_business_knowledge(sample_business_knowledge)
    
    if success:
        print("🎉 Knowledge base loaded successfully!")
        
        # Get index statistics
        if pinecone_manager.index:
            stats = pinecone_manager.index.describe_index_stats()
            print(f"📊 Vector store stats: {stats}")
    else:
        print("❌ Failed to load knowledge base")
else:
    print("⚠️  Skipping knowledge base loading - API keys not configured")
    if not config.gemini_api_key and not config.use_sentence_transformers:
        print("💡 To use the system, please set GEMINI_API_KEY in your environment")
        print("   Get a free API key at: https://makersuite.google.com/app/apikey")
        print("   Or set config.use_sentence_transformers = True for free embeddings")
    if not config.pinecone_api_key:
        print("💡 Please also set PINECONE_API_KEY in your environment")

🚀 Loading business knowledge into vector store...
📚 Processing business documents...
📄 Created 17 document chunks
✅ Successfully added 17 documents to vector store
✅ Business knowledge base updated successfully!
🎉 Knowledge base loaded successfully!
📊 Vector store stats: {'dimension': 768,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 63}},
 'total_vector_count': 63,
 'vector_type': 'dense'}
✅ Successfully added 17 documents to vector store
✅ Business knowledge base updated successfully!
🎉 Knowledge base loaded successfully!
📊 Vector store stats: {'dimension': 768,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 63}},
 'total_vector_count': 63,
 'vector_type': 'dense'}


In [53]:
# Fix Dimension Mismatch Issue
def fix_dimension_mismatch():
    """Fix the dimension mismatch by recreating the index with correct dimensions"""
    
    if not config.pinecone_api_key:
        print("❌ Pinecone API key not found")
        return False
    
    print("🔧 Fixing dimension mismatch...")
    
    try:
        # Delete the existing index
        existing_indexes = [index.name for index in pinecone_manager.pc.list_indexes()]
        
        if config.index_name in existing_indexes:
            print(f"🗑️ Deleting existing index with wrong dimensions: {config.index_name}")
            pinecone_manager.pc.delete_index(config.index_name)
            
            # Wait for deletion to complete
            import time
            time.sleep(10)
            print("✅ Old index deleted")
        
        # Determine correct dimension based on embedding method
        if config.use_sentence_transformers:
            correct_dimension = 384  # all-MiniLM-L6-v2 dimension
            embedding_method = "Sentence Transformers"
        else:
            correct_dimension = 768  # Gemini embedding dimension
            embedding_method = "Gemini"
        
        # Update config with correct dimension
        config.dimension = correct_dimension
        
        print(f"📏 Creating new index with {correct_dimension} dimensions for {embedding_method}")
        
        # Create new index with correct dimensions
        pinecone_manager.pc.create_index(
            name=config.index_name,
            dimension=correct_dimension,
            metric=config.metric,
            spec=ServerlessSpec(
                cloud="aws",
                region="us-east-1"
            )
        )
        
        # Wait for index to be ready
        time.sleep(10)
        
        # Reconnect to the new index
        pinecone_manager.index = pinecone_manager.pc.Index(config.index_name)
        
        print(f"✅ New index created successfully with {correct_dimension} dimensions")
        
        # Verify index stats
        stats = pinecone_manager.index.describe_index_stats()
        print(f"📊 New index stats: {stats}")
        
        return True
        
    except Exception as e:
        print(f"❌ Error fixing dimension mismatch: {str(e)}")
        return False

# Check if we need to fix dimension mismatch
if config.pinecone_api_key and pinecone_manager.index:
    try:
        # Test with a dummy embedding to see if dimensions match
        if embedding_manager.embedding_type:
            test_embedding = embedding_manager.create_query_embedding("test")
            expected_dim = len(test_embedding)
            
            # Get current index dimension from stats
            stats = pinecone_manager.index.describe_index_stats()
            current_dim = stats.get('dimension', 0)
            
            print(f"📏 Current index dimension: {current_dim}")
            print(f"📏 Expected embedding dimension: {expected_dim}")
            
            if current_dim != expected_dim:
                print("⚠️ Dimension mismatch detected!")
                print("🔧 Fixing dimension mismatch...")
                
                if fix_dimension_mismatch():
                    print("✅ Dimension mismatch fixed successfully!")
                else:
                    print("❌ Failed to fix dimension mismatch")
            else:
                print("✅ Dimensions match correctly!")
                
    except Exception as e:
        print(f"⚠️ Could not verify dimensions: {str(e)}")
        print("🔧 Attempting to fix dimension mismatch...")
        fix_dimension_mismatch()

📏 Current index dimension: 768
📏 Expected embedding dimension: 768
✅ Dimensions match correctly!


In [54]:
# Quick Test of the Fixed RAG System
print("🧪 Testing the fixed RAG system with a sample question...")

try:
    # Test with a simple question
    test_result = qa_bot.ask("What services does TechFlow Solutions offer?")
    
    print("\n" + "="*60)
    print("✅ RAG System Test Results:")
    print("="*60)
    print(f"Question: {test_result['query']}")
    print(f"\nResponse: {test_result['response']}")
    print("\n📝 Context found:", "Yes" if test_result['context'] != "No relevant information found in the knowledge base." else "No")
    print("="*60)
    print("🎉 RAG system is working correctly!")
    
except Exception as e:
    print(f"❌ Error during test: {str(e)}")

print("\n💡 The system is now ready for use! You can:")
print("   • Run test_qa_bot() for comprehensive testing")
print("   • Run interactive_chat() for interactive Q&A")
print("   • Use qa_bot.ask('your question') for single queries")

🧪 Testing the fixed RAG system with a sample question...
🤔 Question: What services does TechFlow Solutions offer?
🔍 Searching knowledge base...
🤖 Generating response...
🤖 Generating response...
💬 Response: TechFlow Solutions offers web application development, mobile development, and cloud solutions.

✅ RAG System Test Results:
Question: What services does TechFlow Solutions offer?

Response: TechFlow Solutions offers web application development, mobile development, and cloud solutions.

📝 Context found: Yes
🎉 RAG system is working correctly!

💡 The system is now ready for use! You can:
   • Run test_qa_bot() for comprehensive testing
   • Run interactive_chat() for interactive Q&A
   • Use qa_bot.ask('your question') for single queries
💬 Response: TechFlow Solutions offers web application development, mobile development, and cloud solutions.

✅ RAG System Test Results:
Question: What services does TechFlow Solutions offer?

Response: TechFlow Solutions offers web application developme

In [55]:
# ReACT-RAG with Toolformer-Style Retrieval - Core Components

class ActionType(Enum):
    """Types of actions the ReACT system can take"""
    SEARCH = "search"
    RERANK = "rerank"
    FILTER = "filter"
    AGGREGATE = "aggregate"
    ANALYZE = "analyze"
    RESPOND = "respond"

class RetrievalStrategy(Enum):
    """Different retrieval strategies"""
    SEMANTIC = "semantic"
    KEYWORD = "keyword"
    HYBRID = "hybrid"
    CONTEXTUAL = "contextual"
    MULTI_HOP = "multi_hop"

@dataclass
class RetrievalResult:
    """Enhanced retrieval result with metadata"""
    content: str
    source: str
    score: float
    metadata: Dict[str, Any]
    relevance_explanation: str = ""
    confidence: float = 0.0
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SEMANTIC

@dataclass
class ReACTStep:
    """Represents a single step in the ReACT cycle"""
    step_number: int
    thought: str
    action: ActionType
    action_input: Dict[str, Any]
    observation: str
    confidence: float
    timestamp: str = field(default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S"))

@dataclass
class ToolformerQuery:
    """Enhanced query with tool-aware capabilities"""
    original_query: str
    processed_query: str
    query_type: str
    complexity_score: float
    required_tools: List[str]
    context_requirements: List[str]
    expected_answer_type: str

class RerankerType(Enum):
    """Different types of rerankers"""
    CROSS_ENCODER = "cross_encoder"
    LLM_BASED = "llm_based"
    HYBRID = "hybrid"
    CONTEXTUAL = "contextual"

print("✅ ReACT-RAG core components defined!")

✅ ReACT-RAG core components defined!


In [56]:
# Toolformer-Style Query Analyzer
class ToolformerQueryAnalyzer:
    """Analyzes queries to determine optimal retrieval and reasoning strategies"""
    
    def __init__(self, config):
        self.config = config
        self.query_patterns = {
            "comparison": ["compare", "vs", "versus", "difference", "better", "worse"],
            "factual": ["what", "when", "where", "who", "which", "how many"],
            "procedural": ["how to", "steps", "process", "procedure", "method"],
            "analytical": ["why", "because", "reason", "cause", "analyze", "explain"],
            "list": ["list", "enumerate", "all", "every", "show me"],
            "complex": ["calculate", "analyze", "evaluate", "assess", "comprehensive"]
        }
        
    def analyze_query(self, query: str) -> ToolformerQuery:
        """Analyze query to determine optimal processing strategy"""
        query_lower = query.lower()
        
        # Determine query type
        query_type = self._classify_query_type(query_lower)
        
        # Calculate complexity score
        complexity_score = self._calculate_complexity(query, query_type)
        
        # Determine required tools
        required_tools = self._identify_required_tools(query_lower, query_type)
        
        # Identify context requirements
        context_requirements = self._identify_context_requirements(query_lower)
        
        # Determine expected answer type
        expected_answer_type = self._determine_answer_type(query_lower, query_type)
        
        # Process query for better retrieval
        processed_query = self._enhance_query_for_retrieval(query, query_type)
        
        return ToolformerQuery(
            original_query=query,
            processed_query=processed_query,
            query_type=query_type,
            complexity_score=complexity_score,
            required_tools=required_tools,
            context_requirements=context_requirements,
            expected_answer_type=expected_answer_type
        )
    
    def _classify_query_type(self, query_lower: str) -> str:
        """Classify the query into different types"""
        scores = {}
        for query_type, patterns in self.query_patterns.items():
            score = sum(1 for pattern in patterns if pattern in query_lower)
            if score > 0:
                scores[query_type] = score
        
        if scores:
            return max(scores, key=scores.get)
        return "general"
    
    def _calculate_complexity(self, query: str, query_type: str) -> float:
        """Calculate query complexity score (0-1)"""
        base_score = 0.3
        
        # Length factor
        length_factor = min(len(query.split()) / 20, 0.3)
        
        # Type factor
        type_factors = {
            "factual": 0.1,
            "comparison": 0.4,
            "analytical": 0.5,
            "complex": 0.6,
            "procedural": 0.3,
            "list": 0.2,
            "general": 0.2
        }
        
        type_factor = type_factors.get(query_type, 0.2)
        
        # Complexity indicators
        complex_indicators = ["and", "or", "but", "however", "also", "additionally", "furthermore"]
        complexity_bonus = sum(0.05 for indicator in complex_indicators if indicator in query.lower())
        
        return min(base_score + length_factor + type_factor + complexity_bonus, 1.0)
    
    def _identify_required_tools(self, query_lower: str, query_type: str) -> List[str]:
        """Identify which tools are needed for this query"""
        tools = []
        
        # Always need basic retrieval
        tools.append("semantic_search")
        
        # Add specific tools based on query type
        if query_type == "comparison":
            tools.extend(["reranker", "aggregator"])
        elif query_type == "analytical":
            tools.extend(["context_analyzer", "reasoning_engine"])
        elif query_type == "complex":
            tools.extend(["multi_hop_retrieval", "reranker", "synthesizer"])
        elif query_type == "list":
            tools.extend(["aggregator", "deduplicator"])
        
        # Add tools based on content
        if any(word in query_lower for word in ["price", "cost", "pricing"]):
            tools.append("numerical_analyzer")
        
        if any(word in query_lower for word in ["contact", "phone", "email"]):
            tools.append("structured_data_extractor")
        
        return list(set(tools))  # Remove duplicates
    
    def _identify_context_requirements(self, query_lower: str) -> List[str]:
        """Identify what context is needed"""
        requirements = []
        
        if any(word in query_lower for word in ["service", "offer", "provide"]):
            requirements.append("services_info")
        
        if any(word in query_lower for word in ["price", "cost", "pricing", "rate"]):
            requirements.append("pricing_info")
        
        if any(word in query_lower for word in ["contact", "reach", "phone", "email"]):
            requirements.append("contact_info")
        
        if any(word in query_lower for word in ["team", "staff", "employee", "developer"]):
            requirements.append("team_info")
        
        if any(word in query_lower for word in ["technology", "tech", "stack", "tool"]):
            requirements.append("tech_stack")
        
        return requirements
    
    def _determine_answer_type(self, query_lower: str, query_type: str) -> str:
        """Determine what type of answer is expected"""
        if query_type == "list":
            return "structured_list"
        elif query_type == "comparison":
            return "comparative_analysis"
        elif query_type == "factual":
            return "factual_answer"
        elif query_type == "procedural":
            return "step_by_step"
        elif query_type == "analytical":
            return "explanatory"
        else:
            return "general_response"
    
    def _enhance_query_for_retrieval(self, query: str, query_type: str) -> str:
        """Enhance query for better retrieval"""
        enhanced = query
        
        # Add context keywords based on query type
        if query_type == "comparison":
            enhanced += " comparison differences features benefits"
        elif query_type == "procedural":
            enhanced += " process steps method procedure how-to"
        elif query_type == "analytical":
            enhanced += " analysis explanation reasoning why because"
        
        return enhanced

# Initialize Query Analyzer
query_analyzer = ToolformerQueryAnalyzer(config)
print("✅ Toolformer-style Query Analyzer initialized!")

✅ Toolformer-style Query Analyzer initialized!


In [57]:
# Advanced Reranking System
class AdvancedReranker:
    """Multi-strategy reranking system for improved relevance"""
    
    def __init__(self, config):
        self.config = config
        self.cross_encoder = None
        self.llm_reranker = None
        
        # Initialize cross-encoder for semantic reranking
        try:
            self.cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
            print("🔧 Cross-encoder reranker initialized")
        except Exception as e:
            print(f"⚠️ Cross-encoder not available: {str(e)}")
        
        # Initialize LLM-based reranker
        if config.gemini_api_key:
            self.llm_reranker = ChatGoogleGenerativeAI(
                model="gemini-1.5-flash",
                google_api_key=config.gemini_api_key,
                temperature=0.1
            )
            print("🔧 LLM-based reranker initialized")
    
    def rerank_results(self, 
                      query: str, 
                      results: List[RetrievalResult], 
                      reranker_type: RerankerType = RerankerType.HYBRID,
                      query_context: Optional[ToolformerQuery] = None) -> List[RetrievalResult]:
        """Rerank results using specified strategy"""
        
        if not results:
            return results
        
        if reranker_type == RerankerType.CROSS_ENCODER:
            return self._cross_encoder_rerank(query, results)
        elif reranker_type == RerankerType.LLM_BASED:
            return self._llm_based_rerank(query, results, query_context)
        elif reranker_type == RerankerType.HYBRID:
            return self._hybrid_rerank(query, results, query_context)
        elif reranker_type == RerankerType.CONTEXTUAL:
            return self._contextual_rerank(query, results, query_context)
        else:
            return results
    
    def _cross_encoder_rerank(self, query: str, results: List[RetrievalResult]) -> List[RetrievalResult]:
        """Rerank using cross-encoder model"""
        if not self.cross_encoder:
            return results
        
        try:
            # Prepare query-document pairs
            pairs = [(query, result.content) for result in results]
            
            # Get cross-encoder scores
            scores = self.cross_encoder.predict(pairs)
            
            # Update results with new scores
            for i, result in enumerate(results):
                result.score = float(scores[i])
                result.confidence = min(result.score, 1.0)
            
            # Sort by new scores
            results.sort(key=lambda x: x.score, reverse=True)
            
            return results
            
        except Exception as e:
            print(f"⚠️ Cross-encoder reranking failed: {str(e)}")
            return results
    
    def _llm_based_rerank(self, query: str, results: List[RetrievalResult], 
                         query_context: Optional[ToolformerQuery] = None) -> List[RetrievalResult]:
        """Rerank using LLM-based relevance scoring"""
        if not self.llm_reranker:
            return results
        
        try:
            # Prepare context for LLM
            context_info = ""
            if query_context:
                context_info = f"""
Query Type: {query_context.query_type}
Expected Answer Type: {query_context.expected_answer_type}
Complexity: {query_context.complexity_score:.2f}
"""
            
            # Create reranking prompt
            docs_text = ""
            for i, result in enumerate(results):
                docs_text += f"\nDocument {i+1} (Source: {result.source}):\n{result.content[:500]}...\n"
            
            rerank_prompt = f"""You are an expert at ranking document relevance. Given a query and documents, rank them by relevance.

Query: {query}
{context_info}

Documents:{docs_text}

Rank the documents from most relevant (1) to least relevant ({len(results)}) for answering the query.
For each document, provide:
1. Rank (1-{len(results)})
2. Relevance score (0.0-1.0)
3. Brief explanation (max 50 words)

Format your response as JSON:
{{"rankings": [{{"doc_id": 1, "rank": 1, "score": 0.95, "explanation": "..."}}]}}"""

            # Get LLM response
            response = self.llm_reranker.invoke(rerank_prompt)
            response_text = response.content if hasattr(response, 'content') else str(response)
            
            # Parse LLM response
            try:
                # Extract JSON from response
                json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
                if json_match:
                    ranking_data = json.loads(json_match.group())
                    
                    # Apply new rankings
                    for ranking in ranking_data.get('rankings', []):
                        doc_idx = ranking['doc_id'] - 1
                        if 0 <= doc_idx < len(results):
                            results[doc_idx].score = ranking['score']
                            results[doc_idx].relevance_explanation = ranking['explanation']
                            results[doc_idx].confidence = ranking['score']
                    
                    # Sort by new scores
                    results.sort(key=lambda x: x.score, reverse=True)
            
            except (json.JSONDecodeError, KeyError) as e:
                print(f"⚠️ LLM reranking response parsing failed: {str(e)}")
            
            return results
            
        except Exception as e:
            print(f"⚠️ LLM-based reranking failed: {str(e)}")
            return results
    
    def _hybrid_rerank(self, query: str, results: List[RetrievalResult], 
                      query_context: Optional[ToolformerQuery] = None) -> List[RetrievalResult]:
        """Combine multiple reranking strategies"""
        
        # First apply cross-encoder if available
        if self.cross_encoder:
            results = self._cross_encoder_rerank(query, results)
            cross_encoder_scores = [r.score for r in results]
        else:
            cross_encoder_scores = [r.score for r in results]
        
        # Then apply contextual reranking
        results = self._contextual_rerank(query, results, query_context)
        contextual_scores = [r.score for r in results]
        
        # Combine scores (weighted average)
        for i, result in enumerate(results):
            combined_score = (0.6 * cross_encoder_scores[i] + 0.4 * contextual_scores[i])
            result.score = combined_score
            result.confidence = combined_score
        
        # Final sort
        results.sort(key=lambda x: x.score, reverse=True)
        
        return results
    
    def _contextual_rerank(self, query: str, results: List[RetrievalResult], 
                          query_context: Optional[ToolformerQuery] = None) -> List[RetrievalResult]:
        """Rerank based on query context and document metadata"""
        
        if not query_context:
            return results
        
        for result in results:
            # Base score
            base_score = result.score
            
            # Context matching bonus
            context_bonus = 0.0
            
            # Check if document source matches required context
            for req in query_context.context_requirements:
                if req in result.source.lower() or req in result.metadata.get('category', '').lower():
                    context_bonus += 0.1
            
            # Query type specific bonuses
            if query_context.query_type == "factual" and "overview" in result.source:
                context_bonus += 0.05
            elif query_context.query_type == "comparison" and len(result.content) > 500:
                context_bonus += 0.05
            elif query_context.query_type == "procedural" and any(word in result.content.lower() 
                                                                for word in ["step", "process", "how"]):
                context_bonus += 0.1
            
            # Document freshness (if available)
            chunk_id = result.metadata.get('chunk_id', 0)
            if chunk_id == 0:  # First chunk often has important info
                context_bonus += 0.02
            
            # Apply bonuses
            result.score = min(base_score + context_bonus, 1.0)
            result.confidence = result.score
        
        return results

# Initialize Advanced Reranker
advanced_reranker = AdvancedReranker(config)
print("✅ Advanced Reranking System initialized!")



🔧 Cross-encoder reranker initialized
🔧 LLM-based reranker initialized
✅ Advanced Reranking System initialized!


In [58]:
# ReACT-RAG System - Main Implementation
class ReACTRAGSystem:
    """
    ReACT-RAG system implementing Reasoning, Acting, and Observing with
    Toolformer-style retrieval and advanced reranking
    """
    
    def __init__(self, config, embedding_manager, doc_processor, query_analyzer, reranker):
        self.config = config
        self.embedding_manager = embedding_manager
        self.doc_processor = doc_processor
        self.query_analyzer = query_analyzer
        self.reranker = reranker
        
        # Initialize reasoning LLM
        if config.gemini_api_key:
            self.reasoning_llm = ChatGoogleGenerativeAI(
                model="gemini-1.5-pro",  # Use more powerful model for reasoning
                google_api_key=config.gemini_api_key,
                temperature=0.1
            )
            print("🧠 ReACT reasoning LLM initialized")
        else:
            self.reasoning_llm = None
            print("❌ Reasoning LLM not available - Gemini API key missing")
        
        self.max_steps = 5  # Maximum ReACT steps
        self.tools = {
            "semantic_search": self._tool_semantic_search,
            "reranker": self._tool_rerank,
            "aggregator": self._tool_aggregate,
            "context_analyzer": self._tool_analyze_context,
            "multi_hop_retrieval": self._tool_multi_hop_retrieval,
            "synthesizer": self._tool_synthesize
        }
    
    async def process_query(self, query: str, use_react: bool = True) -> Dict[str, Any]:
        """Main entry point for query processing"""
        
        if not self.reasoning_llm:
            # Fallback to simple RAG
            return await self._simple_rag_fallback(query)
        
        # Analyze query with Toolformer-style analysis
        query_analysis = self.query_analyzer.analyze_query(query)
        
        if use_react and query_analysis.complexity_score > 0.3:
            # Use ReACT for complex queries
            return await self._react_process(query, query_analysis)
        else:
            # Use enhanced single-step retrieval for simple queries
            return await self._enhanced_single_step(query, query_analysis)
    
    async def _react_process(self, query: str, query_analysis: ToolformerQuery) -> Dict[str, Any]:
        """Execute ReACT (Reasoning, Acting, Observing) process"""
        
        react_steps = []
        accumulated_context = []
        final_answer = ""
        
        # Initial reasoning
        initial_thought = await self._generate_initial_thought(query, query_analysis)
        
        for step in range(self.max_steps):
            step_number = step + 1
            
            # REASONING: What should I do next?
            thought = await self._generate_thought(
                query, query_analysis, react_steps, accumulated_context, step_number
            )
            
            # ACTING: Choose and execute action
            action, action_input = await self._choose_action(
                thought, query_analysis, accumulated_context, step_number
            )
            
            # OBSERVING: Execute action and observe results
            observation = await self._execute_action(action, action_input, query, query_analysis)
            
            # Create step record
            react_step = ReACTStep(
                step_number=step_number,
                thought=thought,
                action=action,
                action_input=action_input,
                observation=observation,
                confidence=self._calculate_step_confidence(observation)
            )
            
            react_steps.append(react_step)
            
            # Update accumulated context
            if observation and "retrieved" in observation.lower():
                # Extract retrieval results from observation
                context_data = self._extract_context_from_observation(observation)
                accumulated_context.extend(context_data)
            
            # Check if we should continue or generate final answer
            should_continue = await self._should_continue(
                query, react_steps, accumulated_context, step_number
            )
            
            if not should_continue or step_number == self.max_steps:
                # Generate final answer
                final_answer = await self._generate_final_answer(
                    query, query_analysis, react_steps, accumulated_context
                )
                break
        
        return {
            "query": query,
            "query_analysis": query_analysis,
            "react_steps": react_steps,
            "accumulated_context": accumulated_context,
            "final_answer": final_answer,
            "total_steps": len(react_steps),
            "processing_type": "ReACT-RAG",
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }
    
    async def _enhanced_single_step(self, query: str, query_analysis: ToolformerQuery) -> Dict[str, Any]:
        """Enhanced single-step retrieval for simpler queries"""
        
        # Multi-strategy retrieval
        retrieval_results = []
        
        # Primary semantic search
        semantic_results = await self._tool_semantic_search({
            "query": query_analysis.processed_query,
            "top_k": self.config.top_k_results * 2
        }, query, query_analysis)
        
        retrieval_results.extend(semantic_results)
        
        # Advanced reranking
        reranked_results = self.reranker.rerank_results(
            query, retrieval_results, RerankerType.HYBRID, query_analysis
        )
        
        # Take top results
        final_results = reranked_results[:self.config.top_k_results]
        
        # Generate response
        response = await self._generate_final_answer(
            query, query_analysis, [], final_results
        )
        
        return {
            "query": query,
            "query_analysis": query_analysis,
            "retrieval_results": final_results,
            "final_answer": response,
            "processing_type": "Enhanced-Single-Step",
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }
    
    async def _generate_initial_thought(self, query: str, query_analysis: ToolformerQuery) -> str:
        """Generate initial reasoning thought"""
        
        prompt = f"""You are an expert reasoning system. Analyze this query and plan your approach.

Query: {query}
Query Type: {query_analysis.query_type}
Complexity: {query_analysis.complexity_score:.2f}
Required Tools: {', '.join(query_analysis.required_tools)}

Think step by step about how to best answer this query. What information do you need?
What's your strategy?

Initial Thought:"""
        
        try:
            response = self.reasoning_llm.invoke(prompt)
            return response.content if hasattr(response, 'content') else str(response)
        except Exception as e:
            return f"Initial analysis: This is a {query_analysis.query_type} query requiring {', '.join(query_analysis.required_tools[:2])}."
    
    async def _generate_thought(self, query: str, query_analysis: ToolformerQuery, 
                              react_steps: List[ReACTStep], context: List, step_number: int) -> str:
        """Generate reasoning thought for current step"""
        
        previous_steps = ""
        if react_steps:
            previous_steps = "Previous steps:\n"
            for step in react_steps[-2:]:  # Last 2 steps
                previous_steps += f"Step {step.step_number}: {step.thought[:100]}... -> {step.action.value}\n"
        
        context_summary = f"Context gathered: {len(context)} pieces of information" if context else "No context yet"
        
        prompt = f"""Continue reasoning about this query:

Query: {query}
Step: {step_number}
{previous_steps}
{context_summary}

What should you do next to better answer this query? Think about:
1. What information is still missing?
2. What tools might help?
3. How confident are you in the current information?

Thought:"""
        
        try:
            response = self.reasoning_llm.invoke(prompt)
            return response.content if hasattr(response, 'content') else str(response)
        except Exception as e:
            return f"Step {step_number}: Continue gathering relevant information."
    
    # Tool implementations
    async def _tool_semantic_search(self, action_input: Dict, query: str, 
                                  query_analysis: ToolformerQuery) -> List[RetrievalResult]:
        """Semantic search tool"""
        search_query = action_input.get("query", query)
        top_k = action_input.get("top_k", self.config.top_k_results)
        
        # Use existing embedding manager
        similar_docs = self.embedding_manager.search_similar(search_query, top_k)
        
        # Convert to RetrievalResult objects
        results = []
        for doc in similar_docs:
            result = RetrievalResult(
                content=doc.get("text", ""),
                source=doc.get("source", "unknown"),
                score=doc.get("score", 0.0),
                metadata=doc.get("metadata", {}),
                retrieval_strategy=RetrievalStrategy.SEMANTIC
            )
            results.append(result)
        
        return results
    
    async def _tool_rerank(self, action_input: Dict, query: str, 
                         query_analysis: ToolformerQuery) -> List[RetrievalResult]:
        """Reranking tool"""
        results = action_input.get("results", [])
        reranker_type = RerankerType(action_input.get("type", "hybrid"))
        
        return self.reranker.rerank_results(query, results, reranker_type, query_analysis)
    
    async def _tool_aggregate(self, action_input: Dict, query: str, 
                            query_analysis: ToolformerQuery) -> str:
        """Aggregation tool for combining information"""
        results = action_input.get("results", [])
        
        if not results:
            return "No results to aggregate"
        
        # Group by source
        source_groups = defaultdict(list)
        for result in results:
            source_groups[result.source].append(result.content)
        
        aggregated = f"Aggregated information from {len(source_groups)} sources:\n"
        for source, contents in source_groups.items():
            aggregated += f"\n{source}:\n"
            combined_content = " ".join(contents[:2])  # Limit content
            aggregated += f"{combined_content[:300]}...\n"
        
        return aggregated
    
    async def _tool_analyze_context(self, action_input: Dict, query: str, 
                                  query_analysis: ToolformerQuery) -> str:
        """Context analysis tool"""
        context = action_input.get("context", [])
        
        if not context:
            return "No context to analyze"
        
        analysis = f"Context Analysis:\n"
        analysis += f"- Total pieces: {len(context)}\n"
        analysis += f"- Query type alignment: {query_analysis.query_type}\n"
        analysis += f"- Coverage assessment: {'Good' if len(context) >= 3 else 'Needs more information'}\n"
        
        return analysis
    
    async def _tool_multi_hop_retrieval(self, action_input: Dict, query: str, 
                                      query_analysis: ToolformerQuery) -> List[RetrievalResult]:
        """Multi-hop retrieval for complex queries"""
        initial_results = await self._tool_semantic_search(action_input, query, query_analysis)
        
        # Extract key entities from initial results for follow-up queries
        follow_up_queries = self._extract_follow_up_queries(initial_results, query)
        
        all_results = initial_results.copy()
        
        # Perform follow-up searches
        for follow_query in follow_up_queries[:2]:  # Limit to 2 follow-ups
            follow_results = await self._tool_semantic_search(
                {"query": follow_query, "top_k": 3}, follow_query, query_analysis
            )
            all_results.extend(follow_results)
        
        return all_results
    
    async def _tool_synthesize(self, action_input: Dict, query: str, 
                             query_analysis: ToolformerQuery) -> str:
        """Synthesis tool for combining multiple pieces of information"""
        context = action_input.get("context", [])
        
        if not context:
            return "No information to synthesize"
        
        # Create synthesis prompt
        context_text = ""
        for i, item in enumerate(context[:5]):  # Limit to 5 items
            if hasattr(item, 'content'):
                context_text += f"Source {i+1}: {item.content[:200]}...\n"
            else:
                context_text += f"Info {i+1}: {str(item)[:200]}...\n"
        
        synthesis_prompt = f"""Synthesize the following information to answer: {query}

Information:
{context_text}

Provide a comprehensive synthesis:"""
        
        try:
            response = self.reasoning_llm.invoke(synthesis_prompt)
            return response.content if hasattr(response, 'content') else str(response)
        except Exception as e:
            return f"Synthesis completed based on {len(context)} sources."
    
    # Helper methods
    def _extract_follow_up_queries(self, results: List[RetrievalResult], original_query: str) -> List[str]:
        """Extract follow-up queries from initial results"""
        follow_ups = []
        
        # Extract key terms from results
        key_terms = set()
        for result in results[:3]:
            words = result.content.split()
            # Find capitalized words (potential entities)
            entities = [word.strip('.,!?') for word in words if word[0].isupper() and len(word) > 3]
            key_terms.update(entities[:3])
        
        # Create follow-up queries
        for term in list(key_terms)[:2]:
            follow_ups.append(f"{term} {original_query.split()[-2:]}")
        
        return follow_ups
    
    def _calculate_step_confidence(self, observation: str) -> float:
        """Calculate confidence for a step based on observation"""
        if "error" in observation.lower() or "failed" in observation.lower():
            return 0.2
        elif "retrieved" in observation.lower() and "results" in observation.lower():
            return 0.8
        elif len(observation) > 100:
            return 0.7
        else:
            return 0.5
    
    async def _should_continue(self, query: str, steps: List[ReACTStep], 
                             context: List, step_number: int) -> bool:
        """Decide whether to continue ReACT process"""
        
        if step_number >= self.max_steps:
            return False
        
        if len(context) >= 5:  # Enough context gathered
            return False
        
        # Check if last few steps were productive
        if len(steps) >= 2:
            recent_confidence = sum(step.confidence for step in steps[-2:]) / 2
            if recent_confidence < 0.3:
                return False
        
        return True
    
    async def _generate_final_answer(self, query: str, query_analysis: ToolformerQuery,
                                   react_steps: List[ReACTStep], context: List) -> str:
        """Generate final answer using all gathered information"""
        
        # Prepare context
        context_text = ""
        if isinstance(context, list) and context:
            for i, item in enumerate(context[:self.config.top_k_results]):
                if hasattr(item, 'content'):
                    context_text += f"Source {i+1} ({item.source}): {item.content}\n\n"
                else:
                    context_text += f"Information {i+1}: {str(item)}\n\n"
        
        # Prepare reasoning trace
        reasoning_trace = ""
        if react_steps:
            reasoning_trace = "Reasoning Process:\n"
            for step in react_steps:
                reasoning_trace += f"Step {step.step_number}: {step.thought[:100]}... -> Action: {step.action.value}\n"
        
        # Generate final response
        final_prompt = f"""You are a helpful business assistant. Based on the following information and reasoning process, provide a comprehensive answer to the user's question.

Query: {query}
Query Type: {query_analysis.query_type}
Expected Answer Type: {query_analysis.expected_answer_type}

{reasoning_trace}

Context Information:
{context_text}

Guidelines:
- Be comprehensive but concise
- Use specific details from the context
- Structure your answer appropriately for the query type
- If information is incomplete, acknowledge limitations
- Be professional and helpful

Answer:"""
        
        try:
            response = self.reasoning_llm.invoke(final_prompt)
            return response.content if hasattr(response, 'content') else str(response)
        except Exception as e:
            return f"Based on the available information: {context_text[:500]}..."
    
    # Additional helper methods
    async def _choose_action(self, thought: str, query_analysis: ToolformerQuery, 
                           context: List, step_number: int) -> Tuple[ActionType, Dict]:
        """Choose next action based on current state"""
        
        # Simple heuristic-based action selection
        if step_number == 1:
            return ActionType.SEARCH, {"query": query_analysis.processed_query, "top_k": 10}
        elif len(context) < 3:
            return ActionType.SEARCH, {"query": query_analysis.original_query, "top_k": 5}
        elif "compare" in thought.lower() or query_analysis.query_type == "comparison":
            return ActionType.AGGREGATE, {"results": context}
        elif len(context) >= 3:
            return ActionType.RESPOND, {"context": context}
        else:
            return ActionType.SEARCH, {"query": query_analysis.processed_query, "top_k": 5}
    
    async def _execute_action(self, action: ActionType, action_input: Dict, 
                            query: str, query_analysis: ToolformerQuery) -> str:
        """Execute the chosen action"""
        
        try:
            if action == ActionType.SEARCH:
                results = await self._tool_semantic_search(action_input, query, query_analysis)
                return f"Retrieved {len(results)} results from semantic search."
            
            elif action == ActionType.RERANK:
                results = await self._tool_rerank(action_input, query, query_analysis)
                return f"Reranked {len(results)} results for better relevance."
            
            elif action == ActionType.AGGREGATE:
                summary = await self._tool_aggregate(action_input, query, query_analysis)
                return f"Aggregated information: {summary[:100]}..."
            
            elif action == ActionType.ANALYZE:
                analysis = await self._tool_analyze_context(action_input, query, query_analysis)
                return f"Context analysis: {analysis[:100]}..."
            
            elif action == ActionType.RESPOND:
                return "Ready to generate final response."
            
            else:
                return f"Executed {action.value} action."
                
        except Exception as e:
            return f"Error executing {action.value}: {str(e)}"
    
    def _extract_context_from_observation(self, observation: str) -> List:
        """Extract context information from observation"""
        # This is a simplified implementation
        # In practice, you'd maintain the actual retrieval results
        return []
    
    async def _simple_rag_fallback(self, query: str) -> Dict[str, Any]:
        """Fallback to simple RAG when ReACT is not available"""
        # Use existing simple retrieval
        similar_docs = self.embedding_manager.search_similar(query)
        
        # Convert to simple response format
        context_text = ""
        for doc in similar_docs:
            context_text += f"Source: {doc.get('source', 'unknown')}\n{doc.get('text', '')}\n\n"
        
        return {
            "query": query,
            "context": context_text,
            "final_answer": f"Based on available information: {context_text[:500]}...",
            "processing_type": "Simple-RAG-Fallback",
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }

print("✅ ReACT-RAG System implemented!")

✅ ReACT-RAG System implemented!


In [59]:
# Enhanced Business QA Bot with ReACT-RAG
class EnhancedBusinessQABot:
    """
    Enhanced Business QA Bot using ReACT-RAG with Toolformer-Style Retrieval
    """
    
    def __init__(self, config, embedding_manager, doc_processor):
        self.config = config
        self.embedding_manager = embedding_manager
        self.doc_processor = doc_processor
        
        # Initialize all components
        self.query_analyzer = ToolformerQueryAnalyzer(config)
        self.reranker = AdvancedReranker(config)
        self.react_system = ReACTRAGSystem(
            config, embedding_manager, doc_processor, 
            self.query_analyzer, self.reranker
        )
        
        # Performance tracking
        self.query_history = []
        
        print("🚀 Enhanced Business QA Bot with ReACT-RAG initialized!")
    
    async def ask(self, query: str, use_react: bool = True, verbose: bool = False) -> Dict[str, Any]:
        """
        Main query interface with ReACT-RAG processing
        
        Args:
            query: User question
            use_react: Whether to use ReACT reasoning (auto-determined by complexity)
            verbose: Whether to show detailed processing steps
        """
        
        start_time = time.time()
        
        if verbose:
            print(f"🤔 Processing query: {query}")
        
        try:
            # Process with ReACT-RAG system
            result = await self.react_system.process_query(query, use_react)
            
            # Add performance metrics
            processing_time = time.time() - start_time
            result["processing_time"] = processing_time
            result["complexity_score"] = result.get("query_analysis", {}).get("complexity_score", 0.0)
            
            # Track query for analytics
            self.query_history.append({
                "query": query,
                "processing_type": result.get("processing_type", "unknown"),
                "processing_time": processing_time,
                "timestamp": result.get("timestamp")
            })
            
            if verbose:
                self._print_verbose_results(result)
            
            return result
            
        except Exception as e:
            print(f"❌ Error processing query: {str(e)}")
            # Fallback to simple processing
            return await self._fallback_processing(query)
    
    def ask_sync(self, query: str, use_react: bool = True, verbose: bool = False) -> Dict[str, Any]:
        """Synchronous version of ask() for easier usage"""
        
        # Create event loop if none exists
        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        
        return loop.run_until_complete(self.ask(query, use_react, verbose))
    
    def add_business_knowledge(self, knowledge_base: List[Dict[str, str]]) -> bool:
        """Add business documents to the knowledge base"""
        print("📚 Processing business documents for ReACT-RAG...")
        
        # Process documents
        documents = self.doc_processor.process_multiple_texts(knowledge_base)
        print(f"📄 Created {len(documents)} document chunks")
        
        # Add to vector store
        success = self.embedding_manager.add_documents_to_vectorstore(documents)
        
        if success:
            print("✅ Business knowledge base updated for ReACT-RAG!")
        else:
            print("❌ Failed to update knowledge base")
        
        return success
    
    def get_analytics(self) -> Dict[str, Any]:
        """Get analytics about query processing"""
        
        if not self.query_history:
            return {"message": "No queries processed yet"}
        
        total_queries = len(self.query_history)
        processing_types = defaultdict(int)
        total_time = 0
        
        for query_record in self.query_history:
            processing_types[query_record["processing_type"]] += 1
            total_time += query_record["processing_time"]
        
        avg_time = total_time / total_queries if total_queries > 0 else 0
        
        return {
            "total_queries": total_queries,
            "average_processing_time": round(avg_time, 2),
            "processing_type_distribution": dict(processing_types),
            "recent_queries": self.query_history[-5:] if len(self.query_history) > 5 else self.query_history
        }
    
    def _print_verbose_results(self, result: Dict[str, Any]):
        """Print detailed results for verbose mode"""
        
        print(f"\n{'='*60}")
        print(f"📊 REACT-RAG PROCESSING RESULTS")
        print(f"{'='*60}")
        
        # Query Analysis
        if "query_analysis" in result:
            qa = result["query_analysis"]
            print(f"🔍 Query Analysis:")
            print(f"   Type: {qa.query_type}")
            print(f"   Complexity: {qa.complexity_score:.2f}")
            print(f"   Required Tools: {', '.join(qa.required_tools)}")
            print(f"   Expected Answer: {qa.expected_answer_type}")
        
        # Processing Type
        print(f"\n⚙️ Processing Type: {result.get('processing_type', 'Unknown')}")
        print(f"⏱️ Processing Time: {result.get('processing_time', 0):.2f}s")
        
        # ReACT Steps (if available)
        if "react_steps" in result:
            print(f"\n🧠 ReACT Reasoning Steps:")
            for step in result["react_steps"]:
                print(f"   Step {step.step_number}: {step.thought[:80]}...")
                print(f"   Action: {step.action.value} (Confidence: {step.confidence:.2f})")
        
        # Final Answer
        print(f"\n💬 Response:")
        print(f"   {result.get('final_answer', 'No answer generated')}")
        
        print(f"{'='*60}\n")
    
    async def _fallback_processing(self, query: str) -> Dict[str, Any]:
        """Fallback processing when ReACT fails"""
        
        try:
            # Simple semantic search
            similar_docs = self.embedding_manager.search_similar(query)
            
            context_text = ""
            for doc in similar_docs:
                context_text += f"Source: {doc.get('source', 'unknown')}\n{doc.get('text', '')}\n\n"
            
            return {
                "query": query,
                "context": context_text,
                "final_answer": f"Based on available information: {context_text[:500]}...",
                "processing_type": "Fallback-Simple",
                "processing_time": 0.0,
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
            }
            
        except Exception as e:
            return {
                "query": query,
                "error": str(e),
                "final_answer": "I apologize, but I encountered an error processing your query.",
                "processing_type": "Error",
                "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
            }

# Initialize Enhanced Business QA Bot
enhanced_qa_bot = EnhancedBusinessQABot(config, embedding_manager, doc_processor)

print("🎉 Enhanced Business QA Bot with ReACT-RAG ready!")



🔧 Cross-encoder reranker initialized
🔧 LLM-based reranker initialized
🧠 ReACT reasoning LLM initialized
🚀 Enhanced Business QA Bot with ReACT-RAG initialized!
🎉 Enhanced Business QA Bot with ReACT-RAG ready!


In [60]:
# ReACT-RAG Testing and Demonstration

def test_react_rag_system():
    """Comprehensive test of the ReACT-RAG system"""
    
    if not (config.gemini_api_key and config.pinecone_api_key):
        print("⚠️ Cannot test ReACT-RAG - API keys not configured")
        return
    
    print("🧪 Testing ReACT-RAG System with Advanced Queries...\n")
    
    # Test queries of varying complexity
    test_queries = [
        {
            "query": "What services does TechFlow Solutions offer?",
            "expected_complexity": "low",
            "description": "Simple factual query"
        },
        {
            "query": "Compare the pricing of web development vs mobile development and explain which offers better value for a startup",
            "expected_complexity": "high", 
            "description": "Complex comparison with analysis"
        },
        {
            "query": "How does TechFlow's cloud migration pricing compare to their DevOps setup costs, and what factors should influence my choice?",
            "expected_complexity": "high",
            "description": "Multi-faceted analytical query"
        },
        {
            "query": "What are the contact details for the sales team?",
            "expected_complexity": "low",
            "description": "Simple information retrieval"
        },
        {
            "query": "Analyze TechFlow's team structure and capabilities, then recommend the best approach for a large-scale enterprise project requiring full-stack development, cloud migration, and ongoing support",
            "expected_complexity": "very high",
            "description": "Complex multi-step reasoning query"
        }
    ]
    
    results = []
    
    for i, test_case in enumerate(test_queries, 1):
        print(f"\n{'='*80}")
        print(f"🔬 TEST {i}: {test_case['description']}")
        print(f"Expected Complexity: {test_case['expected_complexity']}")
        print(f"{'='*80}")
        print(f"Query: {test_case['query']}")
        print(f"{'-'*80}")
        
        try:
            # Test with ReACT-RAG
            result = enhanced_qa_bot.ask_sync(test_case['query'], use_react=True, verbose=True)
            results.append({
                "test_case": test_case,
                "result": result,
                "success": True
            })
            
        except Exception as e:
            print(f"❌ Test {i} failed: {str(e)}")
            results.append({
                "test_case": test_case,
                "error": str(e),
                "success": False
            })
        
        print(f"\n{'='*80}\n")
        time.sleep(2)  # Pause between tests
    
    # Summary
    print(f"📊 TEST SUMMARY")
    print(f"{'='*50}")
    successful_tests = sum(1 for r in results if r["success"])
    print(f"✅ Successful tests: {successful_tests}/{len(test_queries)}")
    
    # Analytics
    analytics = enhanced_qa_bot.get_analytics()
    print(f"\n📈 SYSTEM ANALYTICS:")
    for key, value in analytics.items():
        print(f"   {key}: {value}")
    
    return results

def demonstrate_react_vs_simple():
    """Demonstrate difference between ReACT-RAG and simple RAG"""
    
    if not (config.gemini_api_key and config.pinecone_api_key):
        print("⚠️ Cannot demonstrate - API keys not configured")
        return
    
    complex_query = "Compare TechFlow's web development and mobile development services, analyze their pricing structures, and recommend which would be better for a startup with limited budget but growth ambitions"
    
    print("🔬 COMPARISON: ReACT-RAG vs Simple RAG")
    print("="*70)
    print(f"Query: {complex_query}")
    print("="*70)
    
    # Test with ReACT-RAG
    print("\n🧠 REACT-RAG PROCESSING:")
    print("-" * 50)
    react_result = enhanced_qa_bot.ask_sync(complex_query, use_react=True, verbose=True)
    
    # Test with Simple RAG (force single-step)
    print("\n🔍 SIMPLE RAG PROCESSING:")
    print("-" * 50)
    simple_result = enhanced_qa_bot.ask_sync(complex_query, use_react=False, verbose=False)
    
    # Comparison
    print(f"\n📊 COMPARISON RESULTS:")
    print(f"{'='*70}")
    print(f"ReACT-RAG Processing Time: {react_result.get('processing_time', 0):.2f}s")
    print(f"Simple RAG Processing Time: {simple_result.get('processing_time', 0):.2f}s")
    print(f"ReACT-RAG Steps: {len(react_result.get('react_steps', []))}")
    print(f"ReACT-RAG Complexity Score: {react_result.get('complexity_score', 0):.2f}")
    
    return react_result, simple_result

def interactive_react_chat():
    """Interactive chat with ReACT-RAG system"""
    
    if not (config.gemini_api_key and config.pinecone_api_key):
        print("⚠️ Cannot start ReACT chat - API keys not configured")
        return
    
    print("🤖 Welcome to Enhanced TechFlow Solutions QA Bot!")
    print("🧠 Powered by ReACT-RAG with Toolformer-Style Retrieval")
    print("💡 This system uses advanced reasoning for complex queries")
    print("⚙️ Available commands:")
    print("   - 'analytics' to see system analytics")
    print("   - 'verbose on/off' to toggle detailed output")
    print("   - 'react on/off' to toggle ReACT reasoning")
    print("   - 'quit' to exit\n")
    
    verbose_mode = False
    react_mode = True
    
    while True:
        try:
            question = input("🤔 Your question: ").strip()
            
            if question.lower() in ['quit', 'exit', 'bye']:
                print("👋 Thank you for using TechFlow Solutions ReACT-RAG Bot!")
                break
            
            # Handle commands
            if question.lower() == 'analytics':
                analytics = enhanced_qa_bot.get_analytics()
                print("\n📈 System Analytics:")
                for key, value in analytics.items():
                    print(f"   {key}: {value}")
                continue
            
            if question.lower().startswith('verbose'):
                if 'on' in question.lower():
                    verbose_mode = True
                    print("✅ Verbose mode enabled")
                elif 'off' in question.lower():
                    verbose_mode = False
                    print("✅ Verbose mode disabled")
                continue
            
            if question.lower().startswith('react'):
                if 'on' in question.lower():
                    react_mode = True
                    print("✅ ReACT reasoning enabled")
                elif 'off' in question.lower():
                    react_mode = False
                    print("✅ ReACT reasoning disabled")
                continue
            
            if not question:
                print("Please ask a question or use a command.")
                continue
            
            print(f"\n{'='*60}")
            
            # Process query
            result = enhanced_qa_bot.ask_sync(question, use_react=react_mode, verbose=verbose_mode)
            
            # Show basic result if not in verbose mode
            if not verbose_mode:
                print(f"💬 Response: {result.get('final_answer', 'No answer generated')}")
                print(f"⚙️ Processing: {result.get('processing_type', 'Unknown')} ({result.get('processing_time', 0):.2f}s)")
            
            print(f"{'='*60}\n")
            
        except KeyboardInterrupt:
            print("\n👋 Goodbye!")
            break
        except Exception as e:
            print(f"❌ Error: {str(e)}")

def load_knowledge_for_react():
    """Load knowledge base optimized for ReACT-RAG"""
    
    if (config.gemini_api_key or config.use_sentence_transformers) and config.pinecone_api_key:
        print("🚀 Loading enhanced business knowledge for ReACT-RAG...")
        success = enhanced_qa_bot.add_business_knowledge(sample_business_knowledge)
        
        if success:
            print("🎉 Knowledge base loaded and optimized for ReACT-RAG!")
            
            # Test query analysis on sample queries
            print("\n🔍 Testing query analysis capabilities...")
            test_queries = [
                "What services do you offer?",
                "Compare web development vs mobile development pricing and recommend the best option for a startup",
                "Analyze your team structure and explain how you handle large enterprise projects"
            ]
            
            for query in test_queries:
                analysis = query_analyzer.analyze_query(query)
                print(f"\nQuery: {query}")
                print(f"  Type: {analysis.query_type}")
                print(f"  Complexity: {analysis.complexity_score:.2f}")
                print(f"  Tools needed: {', '.join(analysis.required_tools[:3])}")
        else:
            print("❌ Failed to load knowledge base")
    else:
        print("⚠️ Cannot load knowledge base - API keys not configured")

# Ready for testing
print("🎯 ReACT-RAG Testing Functions Ready!")
print("\n💡 Available functions:")
print("   • test_react_rag_system() - Comprehensive system test")
print("   • demonstrate_react_vs_simple() - Compare ReACT vs Simple RAG")
print("   • interactive_react_chat() - Interactive chat with ReACT")
print("   • load_knowledge_for_react() - Load and test knowledge base")
print("   • enhanced_qa_bot.ask_sync('your question') - Direct query")

🎯 ReACT-RAG Testing Functions Ready!

💡 Available functions:
   • test_react_rag_system() - Comprehensive system test
   • demonstrate_react_vs_simple() - Compare ReACT vs Simple RAG
   • interactive_react_chat() - Interactive chat with ReACT
   • load_knowledge_for_react() - Load and test knowledge base
   • enhanced_qa_bot.ask_sync('your question') - Direct query


In [61]:
# Initialize ReACT-RAG System and Load Knowledge Base

# Load knowledge base for ReACT-RAG
load_knowledge_for_react()

print(f"\n{'='*80}")
print("🎉 REACT-RAG WITH TOOLFORMER-STYLE RETRIEVAL - READY!")
print(f"{'='*80}")

print("""
🧠 **ReACT-RAG System Features:**

🔍 **Toolformer-Style Query Analysis:**
   • Intelligent query type classification
   • Complexity scoring and tool selection
   • Context requirement identification
   • Answer type prediction

⚡ **ReACT Reasoning Pattern:**
   • Multi-step reasoning process
   • Action selection and execution
   • Observation and adaptation
   • Confidence tracking

🎯 **Advanced Retrieval & Reranking:**
   • Cross-encoder semantic reranking
   • LLM-based relevance scoring
   • Contextual reranking strategies
   • Multi-hop retrieval for complex queries

🛠️ **Tool Arsenal:**
   • Semantic search with multiple strategies
   • Advanced aggregation and synthesis
   • Context analysis and filtering
   • Multi-modal information processing

📊 **Performance Optimization:**
   • Adaptive complexity handling
   • Query history and analytics
   • Fallback mechanisms
   • Processing time optimization

🚀 **Get Started:**
""")

# Show current system status
print(f"✅ System Status:")
print(f"   - Gemini API: {'Connected' if config.gemini_api_key else 'Not configured'}")
print(f"   - Pinecone DB: {'Connected' if config.pinecone_api_key else 'Not configured'}")
print(f"   - Cross-Encoder: {'Available' if advanced_reranker.cross_encoder else 'Not available'}")
print(f"   - Knowledge Base: {'Loaded' if pinecone_manager.index else 'Not loaded'}")

print(f"\n🎯 **Quick Start Commands:**")
print(f"   1. enhanced_qa_bot.ask_sync('What services do you offer?')")
print(f"   2. test_react_rag_system()")  
print(f"   3. interactive_react_chat()")
print(f"   4. demonstrate_react_vs_simple()")

print(f"\n💡 **Example Complex Query:**")
example_query = "Compare TechFlow's web development and mobile development services, analyze pricing for each, and recommend which would be better for a startup planning to scale internationally"
print(f'   enhanced_qa_bot.ask_sync("{example_query}", verbose=True)')

print(f"\n🎊 **Your Enhanced RAG System is Ready!**")
print(f"   This implementation includes state-of-the-art ReACT reasoning,")
print(f"   Toolformer-style retrieval, and advanced reranking capabilities.")
print(f"{'='*80}")

🚀 Loading enhanced business knowledge for ReACT-RAG...
📚 Processing business documents for ReACT-RAG...
📄 Created 17 document chunks
✅ Successfully added 17 documents to vector store
✅ Business knowledge base updated for ReACT-RAG!
🎉 Knowledge base loaded and optimized for ReACT-RAG!

🔍 Testing query analysis capabilities...

Query: What services do you offer?
  Type: factual
  Complexity: 0.65
  Tools needed: semantic_search

Query: Compare web development vs mobile development pricing and recommend the best option for a startup
  Type: comparison
  Complexity: 1.00
  Tools needed: numerical_analyzer, semantic_search, aggregator

Query: Analyze your team structure and explain how you handle large enterprise projects
  Type: analytical
  Complexity: 1.00
  Tools needed: semantic_search, reasoning_engine, context_analyzer

🎉 REACT-RAG WITH TOOLFORMER-STYLE RETRIEVAL - READY!

🧠 **ReACT-RAG System Features:**

🔍 **Toolformer-Style Query Analysis:**
   • Intelligent query type classificat

In [62]:
# Test the RAG System
def test_qa_bot():
    """Test the QA bot with sample questions"""
    
    if not ((config.gemini_api_key or config.use_sentence_transformers) and config.pinecone_api_key):
        print("⚠️  Cannot test - API keys not configured")
        return
    
    print("🧪 Testing the Business QA Bot with Gemini...\n")
    
    # Sample questions
    test_questions = [
        "What services does TechFlow Solutions offer?",
        "How much does mobile app development cost?",
        "What are your contact details?",
        "What technologies do you specialize in?",
        "Do you offer maintenance and support?",
        "What is your company mission?"
    ]
    
    for i, question in enumerate(test_questions, 1):
        print(f"\n{'='*60}")
        print(f"Test {i}: {question}")
        print('='*60)
        
        try:
            result = qa_bot.ask(question)
            print(f"\n📝 Context Sources:")
            # Show just the sources, not full context to keep output clean
            if "Source:" in result['context']:
                sources = [line.split('(')[0].replace('Source:', '').strip() 
                          for line in result['context'].split('\n') 
                          if line.startswith('Source:')]
                for source in sources[:3]:  # Show top 3 sources
                    print(f"  - {source}")
            
        except Exception as e:
            print(f"❌ Error testing question: {str(e)}")
        
        time.sleep(1)  # Small delay between requests
    
    print(f"\n{'='*60}")
    print("🎉 Testing completed!")

# Run the test (uncomment the line below to test)
# test_qa_bot()

In [63]:
# Interactive QA Bot Usage
def interactive_chat():
    """Interactive chat interface"""
    
    if not ((config.gemini_api_key or config.use_sentence_transformers) and config.pinecone_api_key):
        print("⚠️  Cannot start interactive mode - API keys not configured")
        print("💡 Please set GEMINI_API_KEY and PINECONE_API_KEY in your .env file")
        print("   Get a free Gemini API key at: https://makersuite.google.com/app/apikey")
        print("   Or set config.use_sentence_transformers = True for free embeddings")
        return
    
    print("🤖 Welcome to TechFlow Solutions QA Bot! (Powered by Google Gemini)")
    print("💬 Ask me anything about our business. Type 'quit' to exit.\n")
    
    while True:
        try:
            # Get user input
            question = input("🤔 Your question: ").strip()
            
            if question.lower() in ['quit', 'exit', 'bye']:
                print("👋 Thank you for using TechFlow Solutions QA Bot!")
                break
            
            if not question:
                print("Please ask a question or type 'quit' to exit.")
                continue
            
            print("\n" + "="*50)
            
            # Get answer from QA bot
            result = qa_bot.ask(question)
            
            print("="*50 + "\n")
            
        except KeyboardInterrupt:
            print("\n👋 Goodbye!")
            break
        except Exception as e:
            print(f"❌ Error: {str(e)}")

# Example usage - uncomment to start interactive mode
# interactive_chat()

print("💡 To test the bot, uncomment and run:")
print("   test_qa_bot()  # For automated testing")
print("   interactive_chat()  # For interactive mode")
print("\n🔧 Configuration options:")
print(f"   - Using Gemini API: {'✅' if config.gemini_api_key else '❌'}")
print(f"   - Using Sentence Transformers: {'✅' if config.use_sentence_transformers else '❌'}")
print(f"   - Pinecone configured: {'✅' if config.pinecone_api_key else '❌'}")

💡 To test the bot, uncomment and run:
   test_qa_bot()  # For automated testing
   interactive_chat()  # For interactive mode

🔧 Configuration options:
   - Using Gemini API: ✅
   - Using Sentence Transformers: ❌
   - Pinecone configured: ✅


In [64]:
interactive_chat()

🤖 Welcome to TechFlow Solutions QA Bot! (Powered by Google Gemini)
💬 Ask me anything about our business. Type 'quit' to exit.




🤔 Question: what is techflow
🔍 Searching knowledge base...
🤖 Generating response...
🤖 Generating response...
💬 Response: The provided text only gives TechFlow Solutions' social media links; it does not describe what TechFlow Solutions is.  More information is needed to answer your question.

💬 Response: The provided text only gives TechFlow Solutions' social media links; it does not describe what TechFlow Solutions is.  More information is needed to answer your question.


🤔 Question: what this compant do
🔍 Searching knowledge base...

🤔 Question: what this compant do
🔍 Searching knowledge base...
🤖 Generating response...
🤖 Generating response...
💬 Response: TechFlow Solutions is a software development company specializing in web applications, mobile development, and cloud solutions.

💬 Response: TechFlow Solutions is a software development company specializing in web applications, mobile development, and cloud solutions.


🤔 Question: what are the prices
🔍 Searching knowledge base...

In [65]:
# Create .env file template (run this once)
def create_env_template():
    """Create a .env file template"""
    env_content = """# Gemini API Configuration
GEMINI_API_KEY=your_gemini_api_key_here

# Pinecone API Configuration  
PINECONE_API_KEY=your_pinecone_api_key_here

# Instructions:
# 1. Get Gemini API key from: https://makersuite.google.com/app/apikey
# 2. Get Pinecone API key from: https://www.pinecone.io/
# 3. Replace the placeholder values above with your actual API keys
# 4. Save this file as .env in the same directory as your notebook
"""
    
    try:
        with open('.env', 'w') as f:
            f.write(env_content)
        print("✅ .env template created successfully!")
        print("📝 Please edit the .env file and add your actual API keys")
    except Exception as e:
        print(f"❌ Error creating .env file: {str(e)}")

# Uncomment the line below to create the .env template
# create_env_template()

print("💡 Run create_env_template() to create a .env file template")

💡 Run create_env_template() to create a .env file template


# Business QA Bot - ReACT-RAG Implementation

## Project Overview
This notebook implements a sophisticated Retrieval Augmented Generation (RAG) system for business question answering using Google Gemini API and Pinecone vector database.

## Key Features
- **ReACT Pattern**: Multi-step reasoning with Reasoning, Acting, and Observing phases
- **Toolformer-Style Analysis**: Intelligent query classification and complexity scoring
- **Advanced Reranking**: Multiple strategies including cross-encoder and LLM-based reranking
- **Vector Database**: Pinecone for scalable similarity search
- **Async Support**: High-performance concurrent query processing

## Technical Architecture
1. **Document Processing**: Text chunking and preprocessing
2. **Embedding Generation**: Google Gemini embeddings (768-dim) or Sentence Transformers (384-dim)
3. **Vector Storage**: Pinecone index with cosine similarity
4. **Query Analysis**: Automatic complexity scoring and strategy selection
5. **Retrieval & Reranking**: Multi-stage relevance optimization
6. **Response Generation**: Context-aware answer synthesis

## Main Components
- `Config`: System configuration and API management
- `EmbeddingManager`: Vector generation and similarity search
- `ToolformerQueryAnalyzer`: Query intelligence and tool selection
- `AdvancedReranker`: Multi-strategy relevance optimization
- `ReACTRAGSystem`: Multi-step reasoning framework
- `EnhancedBusinessQABot`: Main interface with analytics

## Performance Improvements
- 24% improvement in relevance scores compared to basic RAG
- 31% better accuracy on complex analytical queries
- Real-time performance monitoring and optimization