In [10]:
#!pip install -U langchain-community langchain-openai pypdf unstructured pandas numpy matplotlib seaborn scikit-learn nest-asyncio


In [4]:
# Import necessary libraries
import os
import logging
from typing import List, Dict, Optional, Union, Any
from dataclasses import dataclass
from datetime import datetime
import asyncio
import nest_asyncio

# LangChain components
from langchain_openai import OpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.memory import ConversationBufferMemory
from langchain_community.document_loaders import (
    TextLoader,
    UnstructuredPDFLoader,
    CSVLoader,
    UnstructuredImageLoader,
    JSONLoader
)
from langchain.text_splitter import CharacterTextSplitter

# Analysis tools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [5]:
@dataclass
class Document:
    """Base structure for document representation"""
    content: str
    metadata: Dict[str, Any]
    doc_type: str
    embedding: Optional[List[float]] = None

@dataclass
class QueryResult:
    """Enhanced query result structure"""
    query: str
    response: str
    sources: List[Document]
    response_time: float
    confidence: float
    analysis: Dict[str, Any]

@dataclass
class SystemMetrics:
    """Comprehensive system performance tracking"""
    query_count: int
    average_response_time: float
    source_distribution: Dict[str, int]
    success_rate: float
    error_rate: float
    document_stats: Dict[str, Any]

In [9]:
class DocumentManager:
    """Unified document management system for handling multiple document types"""

    def __init__(self, embeddings_model: OpenAIEmbeddings):
        self.embeddings = embeddings_model
        self.vector_store = None
        self.document_cache = {}
        self.supported_formats = {
            '.pdf': UnstructuredPDFLoader,
            '.csv': CSVLoader,
            '.txt': TextLoader,
            '.jpg': UnstructuredImageLoader,
            '.png': UnstructuredImageLoader,
            '.json': JSONLoader
        }
        # Initialize text splitter for document chunking
        self.text_splitter = CharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separator="\n"
        )

    async def _load_document(self, loader: Any) -> List[Document]:
        """
        Load a document using the appropriate loader

        Args:
            loader: Document loader instance

        Returns:
            List of loaded documents
        """
        try:
            # Handle synchronous loaders
            if hasattr(loader, 'load'):
                documents = loader.load()
            # Handle asynchronous loaders
            elif hasattr(loader, 'aload'):
                documents = await loader.aload()
            else:
                raise ValueError("Unsupported loader type")

            return documents
        except Exception as e:
            logger.error(f"Document loading failed: {str(e)}")
            raise

    async def _process_content(self, raw_docs: List[Any], doc_type: str) -> Document:
        """
        Process raw document content into standardized format

        Args:
            raw_docs: List of raw document objects
            doc_type: Type of document being processed

        Returns:
            Processed Document object
        """
        try:
            # Split documents into chunks
            chunks = self.text_splitter.split_documents(raw_docs)

            # Combine chunks with metadata
            processed_content = ""
            combined_metadata = {}

            for chunk in chunks:
                processed_content += chunk.page_content + "\n"
                # Merge metadata from all chunks
                combined_metadata.update(chunk.metadata)

            return Document(
                content=processed_content.strip(),
                metadata=combined_metadata,
                doc_type=doc_type,
                embedding=None  # Will be generated later
            )
        except Exception as e:
            logger.error(f"Content processing failed: {str(e)}")
            raise

    async def _generate_embedding(self, content: str) -> List[float]:
        """
        Generate embeddings for document content

        Args:
            content: Document content to embed

        Returns:
            List of embedding values
        """
        try:
            embedding = await self.embeddings.aembed_query(content)
            return embedding
        except Exception as e:
            logger.error(f"Embedding generation failed: {str(e)}")
            raise

    async def process_document(self, file_path: str) -> Document:
        """
        Process a single document with appropriate loader

        Args:
            file_path: Path to the document file

        Returns:
            Processed Document object
        """
        try:
            # Check if document is already cached
            if file_path in self.document_cache:
                return self.document_cache[file_path]

            ext = os.path.splitext(file_path)[1].lower()
            if ext not in self.supported_formats:
                raise ValueError(f"Unsupported file format: {ext}")

            # Initialize appropriate loader
            loader_class = self.supported_formats[ext]
            loader = loader_class(file_path)

            # Load and process document
            raw_docs = await self._load_document(loader)
            processed_doc = await self._process_content(raw_docs, ext)

            # Generate and store embedding
            embedding = await self._generate_embedding(processed_doc.content)
            processed_doc.embedding = embedding

            # Cache the processed document
            self.document_cache[file_path] = processed_doc

            return processed_doc

        except Exception as e:
            logger.error(f"Document processing failed: {str(e)}")
            raise

    async def batch_process_documents(self, file_paths: List[str]) -> List[Document]:
        """
        Process multiple documents in parallel

        Args:
            file_paths: List of paths to document files

        Returns:
            List of processed Document objects
        """
        try:
            tasks = [self.process_document(path) for path in file_paths]
            return await asyncio.gather(*tasks)
        except Exception as e:
            logger.error(f"Batch processing failed: {str(e)}")
            raise

    async def update_vector_store(self, documents: List[Document]):
        """
        Update the vector store with new documents

        Args:
            documents: List of documents to add to vector store
        """
        try:
            if self.vector_store is None:
                # Initialize vector store
                self.vector_store = FAISS.from_documents(
                    [doc for doc in documents],
                    self.embeddings
                )
            else:
                # Add to existing vector store
                self.vector_store.add_documents(documents)
        except Exception as e:
            logger.error(f"Vector store update failed: {str(e)}")
            raise

In [10]:
class MultiAgentRAG:
    """Advanced Multi-Agent RAG system with comprehensive document handling"""

    def __init__(self, openai_api_key: str, other_api_keys: Dict[str, str]):
        self.document_manager = DocumentManager(
            OpenAIEmbeddings(openai_api_key=openai_api_key)
        )
        self.llm = OpenAI(openai_api_key=openai_api_key)
        self.api_keys = other_api_keys
        self.metrics = SystemMetrics(0, 0.0, {}, 0.0, 0.0, {})

    async def initialize_knowledge_base(self, file_paths: List[str]):
        """Initialize system with document corpus"""
        documents = await self.document_manager.batch_process_documents(file_paths)
        self.vector_store = await self._create_vector_store(documents)
        return len(documents)

    async def process_query(self, query: str, domain: str = None) -> QueryResult:
        """Process a query with domain-specific handling"""
        start_time = datetime.now()

        try:
            # Retrieve relevant documents
            relevant_docs = await self._retrieve_documents(query)

            # Get external information
            external_info = await self._gather_external_info(query)

            # Process with domain-specific logic
            if domain:
                processor = DomainProcessor(domain)
                analysis = await processor.process_query(
                    query, relevant_docs, external_info
                )
            else:
                analysis = await self._general_processing(
                    query, relevant_docs, external_info
                )

            response_time = (datetime.now() - start_time).total_seconds()

            return QueryResult(
                query=query,
                response=analysis['response'],
                sources=relevant_docs,
                response_time=response_time,
                confidence=analysis['confidence'],
                analysis=analysis
            )

        except Exception as e:
            logger.error(f"Query processing failed: {str(e)}")
            raise

In [1]:
class APIManager:
    """Manages API keys and authentication for the Multi-Agent RAG system"""

    def __init__(self):
        self.api_statuses = {}

    def setup_openai(self, api_key: str) -> bool:
        """
        Configure OpenAI API access
        """
        try:
            os.environ['OPENAI_API_KEY'] = api_key
            # Verify the API key works
            test_llm = OpenAI(api_key=api_key)
            test_response = test_llm.invoke("Test")
            self.api_statuses['openai'] = True
            logger.info("OpenAI API configured successfully")
            return True
        except Exception as e:
            logger.error(f"OpenAI API setup failed: {str(e)}")
            self.api_statuses['openai'] = False
            return False

    def setup_external_apis(self, api_keys: dict) -> dict:
        """
        Configure external APIs (Google, Scientific DB, News)
        """
        results = {}
        for api_name, api_key in api_keys.items():
            try:
                os.environ[api_name] = api_key
                results[api_name] = self._verify_api_key(api_name, api_key)
                self.api_statuses[api_name] = results[api_name]
            except Exception as e:
                logger.error(f"{api_name} setup failed: {str(e)}")
                results[api_name] = False
                self.api_statuses[api_name] = False

        return results

    def _verify_api_key(self, api_name: str, api_key: str) -> bool:
        """
        Verify that an API key is valid by making a test request
        """
        try:
            if api_name == "GOOGLE_API_KEY":
                # Test Google API
                return self._test_google_api(api_key)
            elif api_name == "SCIENTIFIC_DB_KEY":
                # Test Scientific DB API
                return self._test_scientific_db(api_key)
            elif api_name == "NEWS_API_KEY":
                # Test News API
                return self._test_news_api(api_key)
            return False
        except Exception:
            return False

class EnhancedMultiAgentRAG:
    """Enhanced Multi-Agent RAG system with API integration"""

    def __init__(self):
        self.api_manager = APIManager()
        self.document_manager = None
        self.llm = None
        self.embeddings = None

    async def initialize(self, openai_key: str, external_api_keys: dict) -> bool:
        """
        Initialize the system with all necessary APIs
        """
        try:
            # Setup OpenAI
            if not self.api_manager.setup_openai(openai_key):
                raise ValueError("OpenAI API setup failed")

            # Setup external APIs
            api_results = self.api_manager.setup_external_apis(external_api_keys)
            if not all(api_results.values()):
                logger.warning("Some external APIs failed to initialize")

            # Initialize core components
            self.llm = OpenAI(api_key=openai_key)
            self.embeddings = OpenAIEmbeddings(openai_api_key=openai_key)
            self.document_manager = DocumentManager(self.embeddings)

            logger.info("Multi-Agent RAG system initialized successfully")
            return True

        except Exception as e:
            logger.error(f"System initialization failed: {str(e)}")
            return False

    async def process_query_with_documents(self,
                                         query: str,
                                         documents: List[str],
                                         domain: str = None) -> Dict:
        """
        Process a query using provided documents and APIs
        """
        try:
            # Process documents
            processed_docs = await self.document_manager.batch_process_documents(documents)

            # Update vector store
            await self.document_manager.update_vector_store(processed_docs)

            # Get relevant documents
            relevant_docs = self.document_manager.vector_store.similarity_search(query)

            # Get external information if APIs are available
            external_info = await self._gather_external_info(query)

            # Combine information and generate response
            response = await self._generate_response(query, relevant_docs, external_info)

            return response

        except Exception as e:
            logger.error(f"Query processing failed: {str(e)}")
            raise

# Example usage
async def main():
    """Demonstrate the enhanced Multi-Agent RAG system"""

    # Initialize system with APIs
    openai_key = "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"

    external_api_keys = {
        "GOOGLE_API_KEY": "AIzaSyD2GUnxJp0X8rurOdXWGe3Mcqj3C8z_pW8",
        "SCIENTIFIC_DB_KEY": "tel8v4ue3VMKbsyQZ4DfajDaR6dYGyBYB52MuZtm",
        "NEWS_API_KEY": "pub_666321d0d81657c45e48ce104d6e75349825d"
    }

    # Create and initialize system
    rag_system = EnhancedMultiAgentRAG()
    initialized = await rag_system.initialize(openai_key, external_api_keys)

    if initialized:
        # Example documents (you would replace these with actual file paths)
        documents = [
            "quantum_paper.pdf",
            "research_data.csv",
            "circuit_diagram.png"
        ]

        # Process a query
        query = "What are the latest developments in quantum error correction?"
        result = await rag_system.process_query_with_documents(
            query=query,
            documents=documents,
            domain="quantum_computing"
        )

        # Display results
        print("\nQuery Results:")
        print(f"Response: {result['response']}")
        print(f"Confidence: {result['confidence']}")
        print("\nSources Used:")
        for source in result['sources']:
            print(f"- {source['type']}: {source['title']}")

    else:
        print("System initialization failed")

if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(main())

NameError: name 'List' is not defined

In [11]:
# Core imports
import os
import logging
from typing import List, Dict, Optional, Union, Any
from dataclasses import dataclass
from datetime import datetime
import asyncio
import nest_asyncio

# LangChain components
from langchain_openai import OpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.memory import ConversationBufferMemory
from langchain_community.document_loaders import (
    TextLoader,
    UnstructuredPDFLoader,
    CSVLoader,
    UnstructuredImageLoader
)
from langchain.text_splitter import CharacterTextSplitter

# Analysis tools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [12]:
@dataclass
class APIConfig:
    """Configuration for API management"""
    openai_key: str
    google_key: str
    scientific_db_key: str
    news_key: str

    @classmethod
    def from_environment(cls):
        """Create configuration from environment variables"""
        return cls(
            openai_key=os.getenv("OPENAI_API_KEY"),
            google_key=os.getenv("GOOGLE_API_KEY"),
            scientific_db_key=os.getenv("SCIENTIFIC_DB_KEY"),
            news_key=os.getenv("NEWS_API_KEY")
        )

class APIManager:
    """Manages API authentication and verification"""

    def __init__(self):
        self.api_statuses = {}

    def setup_openai(self, api_key: str) -> bool:
        """Configure OpenAI API access"""
        try:
            os.environ['OPENAI_API_KEY'] = api_key
            test_llm = OpenAI(api_key=api_key)
            self.api_statuses['openai'] = True
            logger.info("OpenAI API configured successfully")
            return True
        except Exception as e:
            logger.error(f"OpenAI API setup failed: {str(e)}")
            self.api_statuses['openai'] = False
            return False

    def setup_external_apis(self, api_keys: Dict[str, str]) -> Dict[str, bool]:
        """Configure external API access"""
        results = {}
        for api_name, api_key in api_keys.items():
            try:
                os.environ[api_name] = api_key
                results[api_name] = True
                self.api_statuses[api_name] = True
                logger.info(f"{api_name} configured successfully")
            except Exception as e:
                logger.error(f"{api_name} setup failed: {str(e)}")
                results[api_name] = False
                self.api_statuses[api_name] = False
        return results

class DocumentProcessor:
    """Handles document loading and processing"""

    def __init__(self, embeddings: OpenAIEmbeddings):
        self.embeddings = embeddings
        self.text_splitter = CharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )

    async def process_file(self, file_path: str) -> Dict[str, Any]:
        """Process a single file"""
        try:
            # Determine file type and load appropriate loader
            if file_path.endswith('.pdf'):
                loader = UnstructuredPDFLoader(file_path)
            elif file_path.endswith('.csv'):
                loader = CSVLoader(file_path)
            elif file_path.endswith('.txt'):
                loader = TextLoader(file_path)
            else:
                raise ValueError(f"Unsupported file type: {file_path}")

            # Load and process document
            docs = loader.load()
            chunks = self.text_splitter.split_documents(docs)

            # Generate embeddings
            embeddings = await self.embeddings.aembed_documents(
                [chunk.page_content for chunk in chunks]
            )

            return {
                'chunks': chunks,
                'embeddings': embeddings,
                'metadata': {'source': file_path}
            }
        except Exception as e:
            logger.error(f"File processing failed: {str(e)}")
            raise

class EnhancedMultiAgentRAG:
    """Enhanced Multi-Agent RAG system with comprehensive document handling"""

    def __init__(self):
        self.api_manager = APIManager()
        self.document_processor = None
        self.vector_store = None
        self.llm = None

    async def initialize(self, config: APIConfig) -> bool:
        """Initialize the system with API configuration"""
        try:
            # Setup APIs
            if not self.api_manager.setup_openai(config.openai_key):
                raise ValueError("OpenAI API setup failed")

            external_apis = {
                "GOOGLE_API_KEY": config.google_key,
                "SCIENTIFIC_DB_KEY": config.scientific_db_key,
                "NEWS_API_KEY": config.news_key
            }
            self.api_manager.setup_external_apis(external_apis)

            # Initialize components
            self.llm = OpenAI(api_key=config.openai_key)
            embeddings = OpenAIEmbeddings(openai_api_key=config.openai_key)
            self.document_processor = DocumentProcessor(embeddings)

            logger.info("System initialized successfully")
            return True

        except Exception as e:
            logger.error(f"Initialization failed: {str(e)}")
            return False

    async def process_query(self, query: str, documents: List[str]) -> Dict[str, Any]:
        """Process a query using provided documents"""
        try:
            # Process documents
            results = []
            for doc_path in documents:
                doc_result = await self.document_processor.process_file(doc_path)
                results.append(doc_result)

            # Create or update vector store
            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(
                    [chunk for result in results for chunk in result['chunks']],
                    self.document_processor.embeddings
                )

            # Perform similarity search
            relevant_docs = self.vector_store.similarity_search(query, k=3)

            # Generate response
            context = "\n".join([doc.page_content for doc in relevant_docs])
            prompt = f"Based on the following context, answer the query: {query}\n\nContext: {context}"
            response = self.llm.invoke(prompt)

            return {
                'query': query,
                'response': response,
                'sources': [doc.metadata for doc in relevant_docs],
                'timestamp': datetime.now()
            }

        except Exception as e:
            logger.error(f"Query processing failed: {str(e)}")
            raise

In [13]:
# First, let's create a function to set up our test environment with sample documents
async def setup_test_environment():
    """
    Create sample documents for testing the RAG system.
    Returns the directory path containing the sample files.
    """
    try:
        # Create a test directory
        test_dir = "test_documents"
        os.makedirs(test_dir, exist_ok=True)

        # Create a sample text file
        with open(os.path.join(test_dir, "sample.txt"), "w") as f:
            f.write("""
            Quantum Computing Research Update

            Recent developments in quantum error correction have shown promising results.
            Scientists have achieved a 99.9% fidelity rate in qubit operations using the
            new surface code implementation. This breakthrough could lead to more stable
            quantum computers in the near future.
            """)

        # Create a sample CSV file
        with open(os.path.join(test_dir, "quantum_results.csv"), "w") as f:
            f.write("""
            date,experiment,success_rate,error_rate
            2024-01-15,Surface Code,0.999,0.001
            2024-01-16,Topological Quantum,0.995,0.005
            2024-01-17,Error Correction,0.997,0.003
            """)

        logger.info(f"Test environment created in {test_dir}")
        return test_dir

    except Exception as e:
        logger.error(f"Failed to create test environment: {str(e)}")
        raise

# Enhanced DocumentProcessor with better error handling
class DocumentProcessor:
    """Handles document loading and processing with comprehensive error handling"""

    def __init__(self, embeddings: OpenAIEmbeddings):
        self.embeddings = embeddings
        self.text_splitter = CharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )

    async def process_file(self, file_path: str) -> Dict[str, Any]:
        """
        Process a single file with detailed error handling
        """
        try:
            # Verify file exists
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"File not found: {file_path}")

            # Verify file is readable
            if not os.access(file_path, os.R_OK):
                raise PermissionError(f"Cannot read file: {file_path}")

            # Determine file type and load appropriate loader
            file_extension = os.path.splitext(file_path)[1].lower()

            if file_extension == '.pdf':
                loader = UnstructuredPDFLoader(file_path)
                logger.info(f"Processing PDF file: {file_path}")
            elif file_extension == '.csv':
                loader = CSVLoader(file_path)
                logger.info(f"Processing CSV file: {file_path}")
            elif file_extension == '.txt':
                loader = TextLoader(file_path)
                logger.info(f"Processing text file: {file_path}")
            else:
                raise ValueError(f"Unsupported file type: {file_extension}")

            # Load and process document
            logger.info(f"Loading content from {file_path}")
            docs = loader.load()

            if not docs:
                raise ValueError(f"No content loaded from {file_path}")

            logger.info(f"Splitting content into chunks")
            chunks = self.text_splitter.split_documents(docs)

            # Generate embeddings
            logger.info(f"Generating embeddings for {len(chunks)} chunks")
            embeddings = await self.embeddings.aembed_documents(
                [chunk.page_content for chunk in chunks]
            )

            return {
                'chunks': chunks,
                'embeddings': embeddings,
                'metadata': {
                    'source': file_path,
                    'chunk_count': len(chunks),
                    'processed_at': datetime.now().isoformat()
                }
            }

        except FileNotFoundError as e:
            logger.error(f"File not found: {file_path}")
            raise
        except PermissionError as e:
            logger.error(f"Permission denied: {file_path}")
            raise
        except Exception as e:
            logger.error(f"Error processing {file_path}: {str(e)}")
            raise

# Updated main function with proper setup and error handling
async def main():
    """
    Demonstrate the enhanced Multi-Agent RAG system with proper setup and error handling
    """
    try:
        # First, set up our test environment
        test_dir = await setup_test_environment()

        # Setup API configuration
        api_config = APIConfig(
            openai_key="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",
            google_key="AIzaSyD2GUnxJp0X8rurOdXWGe3Mcqj3C8z_pW8",
            scientific_db_key="tel8v4ue3VMKbsyQZ4DfajDaR6dYGyBYB52MuZtm",
            news_key="pub_666321d0d81657c45e48ce104d6e75349825d"
        )

        # Initialize system
        logger.info("Initializing RAG system")
        rag_system = EnhancedMultiAgentRAG()
        initialized = await rag_system.initialize(api_config)

        if initialized:
            # Prepare document paths
            documents = [
                os.path.join(test_dir, "sample.txt"),
                os.path.join(test_dir, "quantum_results.csv")
            ]

            # Verify all documents exist
            for doc in documents:
                if not os.path.exists(doc):
                    raise FileNotFoundError(f"Document not found: {doc}")

            logger.info(f"Processing query with {len(documents)} documents")

            # Process query
            query = "What are the latest developments in quantum error correction?"
            result = await rag_system.process_query(query, documents)

            # Display results
            print("\nQuery Results:")
            print(f"Query: {result['query']}")
            print(f"Response: {result['response']}")
            print("\nSources Used:")
            for source in result['sources']:
                print(f"- {source['source']}")

            # Optional: Display processing metrics
            print("\nProcessing Metrics:")
            print(f"Timestamp: {result['timestamp']}")
            print(f"Number of sources used: {len(result['sources'])}")

        else:
            print("System initialization failed")

    except FileNotFoundError as e:
        print(f"File not found error: {str(e)}")
    except PermissionError as e:
        print(f"Permission error: {str(e)}")
    except Exception as e:
        print(f"Error: {str(e)}")
    finally:
        # Cleanup test environment if needed
        if 'test_dir' in locals():
            logger.info(f"Cleaning up test environment: {test_dir}")
            # Uncomment the following line to enable cleanup
            # shutil.rmtree(test_dir)

if __name__ == "__main__":
    # Setup async environment
    nest_asyncio.apply()

    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

    # Run the example
    asyncio.run(main())

ERROR:__main__:Query processing failed: Could not import faiss python package. Please install it with `pip install faiss-gpu` (for CUDA supported GPU) or `pip install faiss-cpu` (depending on Python version).


Error: Could not import faiss python package. Please install it with `pip install faiss-gpu` (for CUDA supported GPU) or `pip install faiss-cpu` (depending on Python version).
