In [2]:
# Import libraries
import os
import hashlib
import logging
import shutil
import datetime
import re
from pathlib import Path
from typing import Dict, List, Optional, Any
from dotenv import load_dotenv
from pymongo import MongoClient
from pinecone import Pinecone
from langchain.text_splitter import RecursiveCharacterTextSplitter
from tqdm import tqdm

# Your document processing libraries
import fitz  # PyMuPDF
import docx
import pytesseract
from pdf2image import convert_from_path
from pptx import Presentation
from odf import text, teletype
from odf.opendocument import load

In [3]:
# Load environment variables
%env MONGO_DB_NAME=your_database_name
load_dotenv()

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
        
# Load configuration from environment
mongo_connection_string = os.getenv('MONGODB_URI')
mongo_db_name = os.getenv('MONGO_DB_NAME')
mongo_collection_name = os.getenv('MONGO_COLLECTION_NAME')
pinecone_api_key = os.getenv('PINECONE_API_KEY')
pinecone_index_name = os.getenv('PINECONE_INDEX_NAME')
text_field = os.getenv('PINECONE_TEXT_FIELD')

# If environment variables are not set, raise an error
if not mongo_connection_string:
    raise ValueError("MONGODB_URI environment variable is required")
if not mongo_db_name:
    raise ValueError("MONGO_DB_NAME environment variable is required")
if not mongo_collection_name:
    raise ValueError("MONGO_COLLECTION_NAME environment variable is required")
if not pinecone_api_key:
    raise ValueError("PINECONE_API_KEY environment variable is required")
if not pinecone_index_name:
    raise ValueError("PINECONE_INDEX_NAME environment variable is required")

# Directory configuration
staging_directory = os.getenv('STAGING_DIRECTORY', '/Users/aimac/Documents/Coding/policy_pulse_app/database_inbox')
processed_directory = os.getenv('PROCESSED_DIRECTORY', '/Users/aimac/Documents/Coding/policy_pulse_app/processed_files')
  
# If it doesn't exist, create processed directory
Path(staging_directory).mkdir(parents=True, exist_ok=True)
Path(processed_directory).mkdir(parents=True, exist_ok=True)   

# Initialize text splitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=int(os.getenv('CHUNK_SIZE', 2000)),
chunk_overlap=int(os.getenv('CHUNK_OVERLAP', 400)),
length_function=len,
        )

env: MONGO_DB_NAME=your_database_name


ValueError: MONGO_COLLECTION_NAME environment variable is required

In [None]:
def _initialize_mongodb(mongo_collection_name, mongo_db_name, mongo_connection_string):
        """Initialize MongoDB connection."""
    client = MongoClient(mongo_connection_string, server_api=ServerApi('1'))
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)


In [None]:
# Initialise API connections
initialise_mongodb()
initialise_pinecone()

<!-- Validate required environment variables -->
if not pinecone_api_key:
raise ValueError("PINECONE_API_KEY environment variable is required")
        if not self.mongo_connection_string:
            raise ValueError("MONGODB_URI environment variable is required")
        if not self.pinecone_index_name:
            raise ValueError("PINECONE_INDEX_NAME environment variable is required")
        
<!-- Initialize connections -->
initialize_mongodb()
initialize_pinecone()

In [None]:

        
        # Validate required environment variables
        if not self.pinecone_api_key:
            raise ValueError("PINECONE_API_KEY environment variable is required")
        if not self.mongo_connection_string:
            raise ValueError("MONGODB_URI environment variable is required")
        if not self.pinecone_index_name:
            raise ValueError("PINECONE_INDEX_NAME environment variable is required")
        
        # Initialize connections
        self._initialize_mongodb()
        self._initialize_pinecone()
    
    def _sanitize_id(self, text: str) -> str:
        """
        Sanitize text to create a valid Pinecone ID.
        Pinecone IDs must be ASCII and cannot contain certain special characters.
        """
        # Replace non-ASCII characters and special characters with underscores
        sanitized = re.sub(r'[^\w\-.]', '_', text)
        # Remove multiple consecutive underscores
        sanitized = re.sub(r'_+', '_', sanitized)
        # Remove leading/trailing underscores
        sanitized = sanitized.strip('_')
        # Ensure it's not empty
        if not sanitized:
            sanitized = "document"
        return sanitized
        
    def _initialize_mongodb(self):
        """Initialize MongoDB connection."""
        try:
            self.mongo_client = MongoClient(self.mongo_connection_string)
            self.mongo_db = self.mongo_client[self.mongo_db_name]
            self.mongo_collection = self.mongo_db[self.mongo_collection_name]
            
            # Test connection
            self.mongo_client.admin.command('ping')
            logger.info("Successfully connected to MongoDB")
            
        except Exception as e:
            logger.error(f"Failed to connect to MongoDB: {e}")
            raise
    
    def _initialize_pinecone(self):
        """Initialize Pinecone connection - assumes index already exists."""
        try:
            # Initialize Pinecone (just like your working code)
            pc = Pinecone(api_key=self.pinecone_api_key)
            
            # Get the existing index (no need to create)
            self.pinecone_index = pc.Index(self.pinecone_index_name)
            
            logger.info(f"Successfully connected to Pinecone index: {self.pinecone_index_name}")
            
        except Exception as e:
            logger.error(f"Failed to initialize Pinecone: {e}")
            raise
    
    def _generate_content_hash(self, content: str) -> str:
        """Generate a hash of the content for change detection."""
        return hashlib.sha256(content.encode('utf-8')).hexdigest()
    
    def _load_document(self, file_path: str) -> str:
        """Extract text from document using your exact extraction functions."""
        return self.extract_text(file_path)
    
    # Your exact text extraction functions
    def extract_text_from_docx(self, file_path):
        """Function to extract text from DOCX"""
        doc = docx.Document(file_path)
        text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
        return text

    def extract_text_from_pptx(self, file_path):
        """Function to extract text from PPTX"""
        prs = Presentation(file_path)
        text = []
        for slide in prs.slides:
            for shape in slide.shapes:
                if hasattr(shape, "text"):
                    text.append(shape.text)
        return "\n".join(text)

    def extract_text_from_odp(self, file_path):
        """Function to extract text from ODP"""
        doc = load(file_path)
        text_elements = doc.getElementsByType(text.P)
        return "\n".join([teletype.extractText(element) for element in text_elements])

    def extract_text_from_pdf(self, file_path):
        """Function to check if a PDF is scanned and extract text accordingly"""
        # Open the PDF
        doc = fitz.open(file_path)
        
        # Check if the PDF has text
        text = ""
        text_found = False
        
        for page_num in range(len(doc)):
            page = doc[page_num]
            page_text = page.get_text()
            
            # If page has more than 10 characters, consider it a text PDF
            if len(page_text.strip()) > 10:
                text_found = True
                text += page_text + "\n"
        
        # If no significant text found, it's likely a scanned PDF
        if not text_found:
            logger.info(f"PDF appears to be scanned, applying OCR: {file_path}")
            return self.extract_text_from_scanned_pdf(file_path)
        
        return text

    def extract_text_from_scanned_pdf(self, file_path):
        """Function to extract text from scanned PDFs using OCR"""
        # Convert PDF to images
        images = convert_from_path(file_path)
        
        # Apply OCR to each image
        text = []
        for i, image in enumerate(images):
            text.append(pytesseract.image_to_string(image))
        
        return "\n".join(text)

    def extract_text(self, file_path):
        """Main function to extract text based on file extension"""
        file_path = Path(file_path)
        extension = file_path.suffix.lower()
        
        try:
            if extension == '.pdf':
                return self.extract_text_from_pdf(file_path)
            elif extension == '.docx':
                return self.extract_text_from_docx(file_path)
            elif extension == '.pptx':
                return self.extract_text_from_pptx(file_path)
            elif extension == '.odp':
                return self.extract_text_from_odp(file_path)
            elif extension == '.txt':
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    return f.read()
            elif extension == '.md':
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    return f.read()
            else:
                logger.warning(f"Unsupported file type: {extension}")
                return None
        except Exception as e:
            logger.error(f"Error extracting text from {file_path}: {e}")
            return None
    
    def process_file(self, file_path: str, metadata: Optional[Dict] = None) -> bool:
        """
        Process a single file using your exact process_file function logic.
        
        Args:
            file_path: Path to the file to process
            metadata: Optional metadata to attach to all chunks
        
        Returns:
            bool: True if successful, False otherwise
        """
        try:
            file_path = Path(file_path)
            logger.info(f"Processing: {file_path}")
            
            # Extract text using your exact function
            text = self.extract_text(file_path)
            if not text:
                logger.error(f"No text extracted from {file_path}")
                return False
            
            # Split text into chunks
            chunks = self.text_splitter.split_text(text)
            
            # Create metadata using your exact format
            base_metadata = {
                "filename": file_path.name,
                "file_path": str(file_path),
                "file_type": file_path.suffix.lower(),
                # Format timestamps as ISO format strings
                "created_at": datetime.datetime.fromtimestamp(os.path.getctime(file_path)).isoformat(),
                "modified_at": datetime.datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat(),
                **(metadata or {})
            }
            
            # Process each chunk
            successful_chunks = 0
            
            for i, chunk in enumerate(chunks):
                # Create a unique chunk metadata (your exact format)
                chunk_metadata = base_metadata.copy()
                chunk_metadata["chunk_id"] = i
                chunk_metadata["chunk_text"] = chunk[:100] + "..."  # Preview of the chunk
                
                # Create document object with sanitized ID
                sanitized_filename = self._sanitize_id(file_path.stem)
                document = {
                    "id": f"{sanitized_filename}_chunk_{i}",  # Now sanitized!
                    "text": chunk,  # Raw text
                    "metadata": chunk_metadata
                }
                
                # Add chunk to both MongoDB and Pinecone
                if self._add_chunk_from_document(document):
                    successful_chunks += 1
                else:
                    logger.warning(f"Failed to add chunk {document['id']}")
            
            # Move file to processed directory if all chunks were successful
            if successful_chunks == len(chunks):
                self._move_to_processed(file_path)
                logger.info(f"Created {len(chunks)} chunks for {file_path}")
                return True
            else:
                logger.error(f"Only {successful_chunks}/{len(chunks)} chunks processed for {file_path}")
                return False
                
        except Exception as e:
            logger.error(f"Failed to process file {file_path}: {e}")
            return False
    
    def _add_chunk_from_document(self, document: Dict) -> bool:
        """Add a chunk to MongoDB and Pinecone from your document format."""
        try:
            import json
            
            doc_id = document['id']
            content = document['text']
            doc_metadata = document['metadata']
            
            # Generate content hash for change detection
            content_hash = self._generate_content_hash(content)
            
            # Check if chunk already exists
            existing_chunk = self.mongo_collection.find_one({"id": doc_id})
            if existing_chunk and existing_chunk.get('content_hash') == content_hash:
                logger.info(f"Chunk {doc_id} already exists with same content, skipping")
                return True
            
            # Prepare metadata exactly like your working code
            metadata_dict = {
                "filename": doc_metadata["filename"],
                "file_type": doc_metadata["file_type"],
                "chunk_id": str(doc_metadata["chunk_id"]),
                "preview": doc_metadata["chunk_text"]  # This is your preview format
            }
            
            # Convert metadata to a JSON string (exactly like your code)
            metadata_str = json.dumps(metadata_dict)
            
            # Prepare MongoDB document (store both dict and string versions)
            mongo_doc = {
                "id": doc_id,
                "text": content,  # The raw text
                "metadata": metadata_dict,  # Store as dict in MongoDB for easy querying
                "metadata_str": metadata_str,  # Also store JSON string version
                "content_hash": content_hash,
                "full_metadata": doc_metadata  # Store your complete metadata too
            }
            
            # Store in MongoDB
            self.mongo_collection.replace_one(
                {"id": doc_id}, 
                mongo_doc, 
                upsert=True
            )
            
            # Create record for Pinecone integrated embeddings (your exact format)
            record = {
                "id": doc_id,
                "text": content,  # This field gets automatically embedded by llama-text-embed-v2
                "metadata": metadata_str  # Metadata as JSON string
            }
            
            # Upsert to Pinecone using integrated embeddings
            self.pinecone_index.upsert_records("__default__", [record])
            
            return True
            
        except Exception as e:
            logger.error(f"Failed to add chunk {document.get('id', 'unknown')}: {e}")
            return False
    
    def _move_to_processed(self, file_path: str):
        """Move file to processed directory."""
        try:
            file_name = Path(file_path).name
            processed_path = Path(self.processed_directory) / file_name
            
            # Handle duplicate filenames
            counter = 1
            while processed_path.exists():
                name_parts = Path(file_name).stem, counter, Path(file_name).suffix
                processed_path = Path(self.processed_directory) / f"{name_parts[0]}_{name_parts[1]}{name_parts[2]}"
                counter += 1
            
            shutil.move(file_path, processed_path)
            logger.info(f"Moved {file_name} to processed directory")
            
        except Exception as e:
            logger.error(f"Failed to move file {file_path}: {e}")
    
    def process_staging_directory(self) -> Dict[str, int]:
        """
        Process all files in the staging directory.
        
        Returns:
            Dict with processing statistics
        """
        staging_path = Path(self.staging_directory)
        
        # Debug: Check if directory exists and list all files
        if not staging_path.exists():
            logger.error(f"Staging directory does not exist: {staging_path}")
            return {"total_files": 0, "successful": 0, "failed": 0}
        
        logger.info(f"Checking staging directory: {staging_path}")
        all_files = list(staging_path.iterdir())
        logger.info(f"All files in directory: {[f.name for f in all_files if f.is_file()]}")
        
        # Find all files to process (your supported extensions)
        supported_extensions = ['.pdf', '.docx', '.pptx', '.odp', '.txt', '.md']
        files_to_process = [
            f for f in staging_path.iterdir() 
            if f.is_file() and f.suffix.lower() in supported_extensions
        ]
        
        logger.info(f"Supported extensions: {supported_extensions}")
        logger.info(f"Files with supported extensions: {[f.name for f in files_to_process]}")
        logger.info(f"Found {len(files_to_process)} supported documents to process")
        
        if len(files_to_process) == 0:
            logger.warning("No supported files found. Make sure you have files with these extensions in the staging directory:")
            logger.warning(f"Staging directory: {staging_path}")
            logger.warning(f"Supported extensions: {supported_extensions}")
        
        successful = 0
        failed = 0
        
        for file_path in tqdm(files_to_process, desc="Processing files"):
            if self.process_file(str(file_path)):
                successful += 1
            else:
                failed += 1
        
        stats = {
            "total_files": len(files_to_process),
            "successful": successful,
            "failed": failed
        }
        
        logger.info(f"Processing complete: {stats}")
        return stats
    
    def search_similar(
        self, 
        query: str, 
        top_k: int = 5, 
        include_content: bool = True,
        filter_metadata: Optional[Dict] = None
    ) -> List[Dict]:
        """
        Search for similar document chunks using Pinecone vector search.
        
        Args:
            query: Search query text
            top_k: Number of results to return
            include_content: Whether to fetch full content from MongoDB
            filter_metadata: Optional metadata filters
        
        Returns:
            List of matching document chunks
        """
        try:
            # Search using simple query method (like your working code)
            # For integrated embeddings, pass the text directly
            search_results = self.pinecone_index.query(
                vector=query,  # This should work with integrated embeddings
                top_k=top_k,
                include_metadata=True,
                filter=filter_metadata
            )
            
            results = []
            
            for match in search_results['matches']:
                import json
                
                result = {
                    'id': match['id'],
                    'score': match['score'],
                    'metadata_str': match.get('metadata', '{}')  # JSON string from Pinecone
                }
                
                # Parse the JSON string metadata back to dict
                try:
                    result['metadata'] = json.loads(result['metadata_str'])
                except:
                    result['metadata'] = {}
                
                # Optionally fetch full content from MongoDB
                if include_content:
                    mongo_doc = self.mongo_collection.find_one({"id": match['id']})
                    if mongo_doc:
                        result['text'] = mongo_doc.get('text', '')
                        result['full_metadata'] = mongo_doc.get('metadata', {})
                else:
                    # Use preview from parsed metadata
                    result['text_preview'] = result['metadata'].get('preview', '')
                
                results.append(result)
            
            logger.info(f"Found {len(results)} similar chunks for query: {query[:50]}...")
            return results
            
        except Exception as e:
            logger.error(f"Failed to search for similar documents: {e}")
            return []
    
    def delete_file_chunks(self, source_file: str) -> bool:
        """
        Delete all chunks associated with a specific source file.
        
        Args:
            source_file: Name of the source file
        
        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Find all chunks for this file
            chunks = list(self.mongo_collection.find({"metadata.filename": source_file}))
            chunk_ids = [chunk['id'] for chunk in chunks]
            
            if not chunk_ids:
                logger.info(f"No chunks found for file {source_file}")
                return True
            
            # Delete from MongoDB
            mongo_result = self.mongo_collection.delete_many({"metadata.filename": source_file})
            
            # Delete from Pinecone
            self.pinecone_index.delete(ids=chunk_ids, namespace="__default__")
            
            logger.info(f"Deleted {mongo_result.deleted_count} chunks for file {source_file}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to delete chunks for file {source_file}: {e}")
            return False
    
    def get_stats(self) -> Dict:
        """Get statistics about the document collections."""
        try:
            mongo_count = self.mongo_collection.count_documents({})
            pinecone_stats = self.pinecone_index.describe_index_stats()
            
            # Get file statistics
            file_stats = list(self.mongo_collection.aggregate([
                {"$group": {
                    "_id": "$metadata.filename",
                    "chunk_count": {"$sum": 1}
                }},
                {"$group": {
                    "_id": None,
                    "total_files": {"$sum": 1},
                    "avg_chunks_per_file": {"$avg": "$chunk_count"}
                }}
            ]))
            
            stats = {
                "mongodb_chunks": mongo_count,
                "pinecone_vectors": pinecone_stats.get('total_vector_count', 0),
                "pinecone_index_fullness": pinecone_stats.get('index_fullness', 0),
                "total_files": file_stats[0].get('total_files', 0) if file_stats else 0,
                "avg_chunks_per_file": round(file_stats[0].get('avg_chunks_per_file', 0), 2) if file_stats else 0
            }
            
            return stats
            
        except Exception as e:
            logger.error(f"Failed to get stats: {e}")
            return {}
        
        
