# xAI Interface: Grok Knowledge Base from Google Folder
The Jupyter Notebook is designed as a tool for managing and interacting with a vectorized knowledge base and AI systems. Below is a high-level summary of its contents and instructions on how to use it:

## Notebook Overview
1.	Title: xAI Interface
-	Focus: Managing and interacting with a vectorized knowledge base and AI systems.
2.	Sections:
-	Introduction: Provides an overview of the notebook’s functionality.
-	Setup Modules:
 -	Install necessary Python packages (faiss-cpu, faiss-gpu, etc.).
 -	Load required modules.
-	Google Drive Integration:
 -	Mount Google Drive for data input/output.
-	Environment Variable Loading:
 -	Load variables from an .env file.
-	TPU Setup:
 -	Check for TPU (Tensor Processing Unit) availability for accelerated computation.
-	Vectorization Process:
 -	Processes documents into a vectorized format.
-	Validation of Outputs:
 -	Verify that the vectorized outputs are correctly generated.
-	Interaction with xAI and Vector Knowledgebase:
 -	Provides a user interface for interacting with the vectorized database using AI-powered tools.

## Instructions to Use
1.	Install Dependencies:
-	Run the provided code cell for package installation to ensure all necessary libraries are available.
-	Make sure to have appropriate permissions to install packages if using a shared environment.
2.	Mount Google Drive:
-	Run the relevant cell to authenticate and mount Google Drive, allowing access to input/output data.
3.	Set Up Environment Variables:
-	Update the .env file path to match your file structure.
-	Ensure the required variables are defined in your .env file.
4.	Check TPU Availability (Optional):
-	If using TPU for computation, run the TPU setup cell to verify availability.
5.	Process Data:
-	Configure the document folder to be vectorized.
-	Run the vectorization step to process documents and store the results.
6.	Validate Results:
-	Use the validation cell to ensure that the outputs of the vectorization process are as expected.
7.	Interact with the Knowledge Base:
-	A user-friendly interface is provided for querying the vectorized database.
-	Run the interaction cells to start communicating with the xAI system.
8.	Review Outputs:
-	Examine results displayed in the interface or saved to your Google Drive for further analysis.

## Load modules

In [None]:
# @title
# Install necessary packages
!pip install faiss-cpu faiss-gpu joblib google-api-python-client ipywidgets python-dotenv tqdm ray[default] psutil pyyaml typing-extensions --quiet

## Mount Google drive for input and output

In [None]:
# @title
from google.colab import auth, drive
from googleapiclient.discovery import build

# Authenticate user and set up Google Drive service
auth.authenticate_user()
drive_service = build('drive', 'v3')
# Mount Google Drive for file operations
drive.mount('/content/drive')

# Load Variables

In [None]:
# @title
from dotenv import load_dotenv
import os

# Load environment variables from the .env file
env_file_path = "/content/drive/MyDrive/Colab Notebooks/.env"  # Update the path to the location of your .env file
load_dotenv(env_file_path)

# Access the environment variables
grok_api_key = os.getenv("GROK_API_KEY")
vector_db_dir = os.getenv("VECTOR_DB_DIR") # does not appear to be used
vectors_joblib = os.getenv("VECTORS_JOBLIB") # does not appear to be used
vectorizer_joblib = os.getenv("VECTORIZER_JOBLIB") # does not appear to be used
hashes_joblib = os.getenv("HASHES_JOBLIB") # does not appear to be used
FOLDER_ID = os.getenv("FOLDER_ID")
SAVE_DIR = os.getenv("SAVE_DIR")
MODEL = os.getenv("GROK_MODEL")
TEMP = os.getenv("GROK_TEMP")

# Example usage: print the values
print(f"GROK_API_KEY: {grok_api_key}")
print(f"VECTOR_DB_DIR: {vector_db_dir}")
print(f"VECTORS_JOBLIB: {vectors_joblib}")
print(f"VECTORIZER_JOBLIB: {vectorizer_joblib}")
print(f"HASHES_JOBLIB: {hashes_joblib}")
print(f"FOLDER_ID: {FOLDER_ID}")
print(f"SAVE_DIR: {SAVE_DIR}")
print(f"GROK_MODEL: {MODEL}")
print(f"GROK_TEMP: {TEMP}")

## Check TPU availability

In [None]:
# @title
import tensorflow as tf

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
    print('Running on TPU ', tpu.master())
except ValueError:
    print('TPU not found')

## Vectorize the folder defined in this step

In [None]:
# @title
"""
Enhanced Google Drive Document Processor with Parallel Processing and Improved Resumability
"""

from typing import List, Dict, Any, Optional, Set, Tuple
from contextlib import contextmanager
from dataclasses import dataclass
import signal
from datetime import datetime, timedelta
from scipy import sparse
from sklearn.feature_extraction.text import TfidfVectorizer
import ray
from google.colab import auth, drive
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from googleapiclient.errors import HttpError
import mimetypes
import pickle
import numpy as np
import psutil
from pathlib import Path
import yaml
import zlib
from typing_extensions import Protocol
import faiss
import json
import logging
import multiprocessing
import os
import sys
import time
import io
import errno
import fcntl

# Custom exceptions
class DocumentProcessingError(Exception):
    """Base exception for document processing errors"""
    pass

class RateLimitError(DocumentProcessingError):
    """Raised when API rate limits are hit"""
    pass

class ConfigurationError(Exception):
    """Raised for configuration-related errors"""
    pass

@dataclass
class ProcessingMetrics:
    """Metrics for monitoring processing progress"""
    start_time: float
    processed_files: int = 0
    failed_files: int = 0
    total_bytes: int = 0
    vectorization_time: float = 0
    processing_time: float = 0
    memory_usage: float = 0

    @property
    def duration(self) -> float:
        return time.time() - self.start_time

    def to_dict(self) -> Dict[str, Any]:
        return {
            'duration': self.duration,
            'processed_files': self.processed_files,
            'failed_files': self.failed_files,
            'total_bytes': self.total_bytes,
            'vectorization_time': self.vectorization_time,
            'processing_time': self.processing_time,
            'memory_usage_mb': self.memory_usage / (1024 * 1024)
        }

class Logger:
    """Enhanced logging functionality with reduced duplicates and cleaner output"""
    def __init__(self, save_dir: str, level: str = 'INFO'):
        self.logger = logging.getLogger('DocumentProcessor')
        self.logger.setLevel(getattr(logging, level.upper()))

        # Remove any existing handlers to prevent duplicate logging
        for handler in self.logger.handlers[:]:
            self.logger.removeHandler(handler)

        # Clear root logger handlers to prevent duplicate logs
        root_logger = logging.getLogger()
        for handler in root_logger.handlers[:]:
            root_logger.removeHandler(handler)

        # Create log directory if it doesn't exist
        log_dir = Path(save_dir)
        log_dir.mkdir(parents=True, exist_ok=True)

        # Set up file handler
        log_file = log_dir / 'document_processor.log'
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(
            logging.Formatter('%(asctime)s - %(levelname)s - %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S')
        )
        self.logger.addHandler(file_handler)

        # Set up console handler
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(
            logging.Formatter('%(asctime)s - %(levelname)s - %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S')
        )
        self.logger.addHandler(console_handler)

        # Prevent propagation to avoid duplicate logs
        self.logger.propagate = False

        # Track start time for relative timestamps
        self.start_time = time.time()

    def format_time_delta(self, seconds: float) -> str:
        """Format time delta in a human-readable format"""
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        if hours > 0:
            return f"{hours}h {minutes}m {secs}s"
        elif minutes > 0:
            return f"{minutes}m {secs}s"
        return f"{secs}s"

    def add_timing(self, msg: str) -> str:
        """Add elapsed time to message"""
        elapsed = time.time() - self.start_time
        time_str = self.format_time_delta(elapsed)
        return f"[{time_str}] {msg}"

    def info(self, msg: str):
        """Log info message with elapsed time"""
        self.logger.info(self.add_timing(msg))

    def error(self, msg: str):
        """Log error message with elapsed time"""
        self.logger.error(self.add_timing(msg))

    def warning(self, msg: str):
        """Log warning message with elapsed time"""
        self.logger.warning(self.add_timing(msg))

    def debug(self, msg: str):
        """Log debug message with elapsed time"""
        self.logger.debug(self.add_timing(msg))

    def get_log_file_path(self) -> Path:
        """Get the path to the current log file"""
        for handler in self.logger.handlers:
            if isinstance(handler, logging.FileHandler):
                return Path(handler.baseFilename)
        return None

class Config:
    """Enhanced configuration management"""
    def __init__(self, config_path: Optional[str] = None):
        self._defaults = {
            'PROCESS_ZIP_FILES': True,
            'BATCH_SIZE': 5,
            'FILE_WORKERS': multiprocessing.cpu_count() * 2,
            'VECTORIZE_WORKERS': multiprocessing.cpu_count(),
            'LOCK_TIMEOUT': 60,
            'CHUNK_SIZE': 1000,
            'MAX_RETRIES': 3,
            'SAVE_INTERVAL': 300,
            'CHECKPOINT_INTERVAL': 50,
            'MAX_FILE_SIZE': 100 * 1024 * 1024,  # 100MB
            'MAX_CACHE_SIZE': 1000,
            'MAX_RECURSION_DEPTH': 10,
            'COMPRESSION_LEVEL': 6,
            'LOG_LEVEL': 'INFO',
            'MONITORING_INTERVAL': 60,
            'VECTOR_DIMENSION': 512,
            'MIN_DF': 2,
            'MAX_DF': 0.95,
            'RETRY_DELAY_BASE': 2,
            'MAX_RETRY_DELAY': 60,
            'BATCH_TIMEOUT': 3600,
            'API_QUOTA_LIMIT': 10000,
            'API_QUOTA_WINDOW': 100,
            'MEMORY_LIMIT': 0.9  # 80% of available RAM
        }

        self._config = self._defaults.copy()
        if config_path:
            self._load_config(config_path)
        self._load_env_vars()
        self._validate_config()

    def _load_config(self, config_path: str):
        try:
            path = Path(config_path)
            if not path.exists():
                raise ConfigurationError(f"Config file not found: {config_path}")

            with open(path, 'r') as f:
                custom_config = yaml.safe_load(f)

            if not isinstance(custom_config, dict):
                raise ConfigurationError("Invalid config format")

            for key in custom_config:
                if key in self._defaults:
                    self._config[key] = custom_config[key]
                else:
                    raise ConfigurationError(f"Unknown configuration key: {key}")

        except yaml.YAMLError as e:
            raise ConfigurationError(f"Error parsing config file: {e}")
        except Exception as e:
            raise ConfigurationError(f"Error loading config: {e}")

    def _load_env_vars(self):
        for key in self._config:
            env_key = f"DOC_PROCESSOR_{key}"
            env_val = os.getenv(env_key)

            if env_val is not None:
                try:
                    default_type = type(self._defaults[key])
                    if default_type == bool:
                        self._config[key] = env_val.lower() in ('true', '1', 'yes')
                    else:
                        self._config[key] = default_type(env_val)
                except ValueError as e:
                    raise ConfigurationError(f"Invalid environment variable value for {key}: {e}")

    def _validate_config(self):
        validators = {
            'BATCH_SIZE': lambda x: 0 < x <= 1000,
            'FILE_WORKERS': lambda x: 0 < x <= multiprocessing.cpu_count() * 4,
            'VECTORIZE_WORKERS': lambda x: 0 < x <= multiprocessing.cpu_count() * 4,
            'MAX_FILE_SIZE': lambda x: 0 < x <= 1024 * 1024 * 1024,
            'MAX_CACHE_SIZE': lambda x: x > 0,
            'MAX_RECURSION_DEPTH': lambda x: 0 < x <= 20,
            'COMPRESSION_LEVEL': lambda x: 0 <= x <= 9,
            'MEMORY_LIMIT': lambda x: 0 < x <= 0.95
        }

        for key, validator in validators.items():
            if not validator(self._config[key]):
                raise ConfigurationError(f"Invalid configuration for {key}")

    def __getattr__(self, name: str) -> Any:
        if name in self.__dict__:
            return self.__dict__[name]
        if '_config' in self.__dict__ and name in self.__dict__['_config']:
            return self.__dict__['_config'][name]
        raise AttributeError(f"Configuration has no attribute '{name}'")

    def validate_memory_usage(self) -> bool:
        try:
            memory_percent = psutil.Process().memory_percent()
            return memory_percent / 100 < self.MEMORY_LIMIT
        except Exception:
            return True

class ProcessingProgress:
    """Enhanced progress tracking and resumption management"""
    def __init__(self, save_dir: str, logger: Logger):
        self.save_dir = Path(save_dir)
        self.progress_file = self.save_dir / 'progress.json'
        self.logger = logger
        self.last_status_time = datetime.now()
        self.status_interval = timedelta(minutes=5)

        # Track skipped files and reasons
        self.skipped_files: Dict[str, Set[str]] = {
            'unsupported_type': set(),
            'too_large': set(),
            'failed_conversion': set(),
            'rate_limited': set(),
            'permission_denied': set(),
            'already_processed': set()
        }

        # Track current batch progress
        self.current_batch: Optional[Dict[str, Any]] = None
        self.processing_state = None  # Will be set by processor
        self.load_progress()

    def load_progress(self):
        """Load saved progress with validation"""
        try:
            if self.progress_file.exists():
                with open(self.progress_file, 'r') as f:
                    data = json.load(f)

                # Validate structure
                required_keys = {'last_folder_id', 'last_batch_start', 'total_processed',
                               'skipped_files', 'current_batch'}
                if all(key in data for key in required_keys):
                    self.current_batch = data.get('current_batch')
                    # Convert sets from lists
                    self.skipped_files = {
                        category: set(files)
                        for category, files in data.get('skipped_files', {}).items()
                    }
                    self.logger.info(f"Loaded progress: {data['total_processed']} files processed")
                    return

            self.logger.warning("No valid progress file found, starting fresh")

        except Exception as e:
            self.logger.error(f"Error loading progress: {e}")
            self.logger.warning("Starting fresh due to progress load error")

    def save_progress(self, force: bool = False):
        """Save progress with atomic write"""
        try:
            current_time = datetime.now()

            # Print status every 5 minutes
            if current_time - self.last_status_time >= self.status_interval or force:
                total_skipped = sum(len(files) for files in self.skipped_files.values())
                self.logger.info(
                    f"Status: Processed={len(self.processing_state.processed_files)}, "
                    f"Failed={len(self.processing_state.failed_files)}, "
                    f"Skipped={total_skipped}"
                )
                self.last_status_time = current_time

            progress_data = {
                'timestamp': current_time.isoformat(),
                'last_folder_id': self.current_batch.get('folder_id') if self.current_batch else None,
                'last_batch_start': self.current_batch.get('start_index') if self.current_batch else 0,
                'total_processed': len(self.processing_state.processed_files),
                'skipped_files': {
                    category: list(files) for category, files in self.skipped_files.items()
                },
                'current_batch': self.current_batch
            }

            # Atomic write using temporary file
            temp_file = self.progress_file.with_suffix('.tmp')
            with open(temp_file, 'w') as f:
                json.dump(progress_data, f, indent=2)
            temp_file.rename(self.progress_file)

        except Exception as e:
            self.logger.error(f"Error saving progress: {e}")

    def mark_skipped(self, file_id: str, reason: str):
        """Track skipped files by reason"""
        if reason in self.skipped_files:
            self.skipped_files[reason].add(file_id)
        else:
            self.logger.warning(f"Unknown skip reason: {reason}")

class GracefulShutdown:
    """
    Enhanced shutdown handler with robust cleanup and detailed logging.
    Handles both graceful and forced shutdowns, ensuring data consistency.
    """
    def __init__(self, processor, logger: Logger):
        self.processor = processor
        self.logger = logger
        self.shutdown_requested = False
        self.force_shutdown = False
        self.cleanup_started = False
        self.cleanup_completed = False

        # Track component states
        self.component_states = {
            'progress_saved': False,
            'state_saved': False,
            'vectors_saved': False,
            'ray_shutdown': False
        }

        # Register signal handlers
        signal.signal(signal.SIGINT, self.handle_shutdown)
        signal.signal(signal.SIGTERM, self.handle_shutdown)
        # Handle additional signals for more robust shutdown
        signal.signal(signal.SIGHUP, self.handle_shutdown)
        signal.signal(signal.SIGQUIT, self.handle_forced_shutdown)

    def handle_shutdown(self, signum, frame):
        """
        Handle shutdown signals with graceful fallback.
        First attempt graceful shutdown, then force if requested again.
        """
        signal_names = {
            signal.SIGINT: 'SIGINT',
            signal.SIGTERM: 'SIGTERM',
            signal.SIGHUP: 'SIGHUP',
            signal.SIGQUIT: 'SIGQUIT'
        }

        if self.cleanup_started:
            self.logger.warning(
                f"Received {signal_names.get(signum, 'UNKNOWN')} during cleanup. "
                "Please wait for cleanup to complete..."
            )
            return

        if self.shutdown_requested:
            self.logger.warning(
                f"Received second {signal_names.get(signum, 'UNKNOWN')}. "
                "Initiating forced shutdown..."
            )
            self.force_shutdown = True
            return

        self.logger.info(
            f"Received {signal_names.get(signum, 'UNKNOWN')}. "
            "Graceful shutdown initiated..."
        )
        self.shutdown_requested = True

    def handle_forced_shutdown(self, signum, frame):
        """Handle immediate shutdown requests (SIGQUIT)"""
        self.logger.warning("Forced shutdown requested, initiating immediate cleanup...")
        self.force_shutdown = True
        self.shutdown_requested = True

    def should_continue(self) -> bool:
        """Check if processing should continue"""
        return not (self.shutdown_requested or self.force_shutdown)

    def _save_progress(self) -> bool:
        """Save processing progress with error handling"""
        try:
            self.logger.info("Saving processing progress...")
            self.processor.progress.save_progress(force=True)
            self.component_states['progress_saved'] = True
            return True
        except Exception as e:
            self.logger.error(f"Error saving progress: {e}")
            return False

    def _save_processing_state(self) -> bool:
        """Save processing state with error handling"""
        try:
            self.logger.info("Saving processing state...")
            self.processor.processing_state.save_state()
            self.component_states['state_saved'] = True
            return True
        except Exception as e:
            self.logger.error(f"Error saving processing state: {e}")
            return False

    def _save_vector_store(self) -> bool:
        """Save vector store state with error handling"""
        try:
            self.logger.info("Saving vector store state...")
            self.processor.vector_store._save_state()
            self.component_states['vectors_saved'] = True
            return True
        except Exception as e:
            self.logger.error(f"Error saving vector store: {e}")
            return False

    def _shutdown_ray(self) -> bool:
        """Shutdown Ray with error handling"""
        try:
            self.logger.info("Shutting down Ray...")
            ray.shutdown()
            self.component_states['ray_shutdown'] = True
            return True
        except Exception as e:
            self.logger.error(f"Error shutting down Ray: {e}")
            return False

    def cleanup(self) -> bool:
        """
        Perform complete cleanup with state tracking and error handling.
        Returns True if cleanup was successful, False otherwise.
        """
        self.cleanup_started = True
        cleanup_successful = True

        try:
            self.logger.info(
                f"Starting cleanup process "
                f"({'forced' if self.force_shutdown else 'graceful'} shutdown)"
            )

            if not self.force_shutdown:
                # Save all states in graceful shutdown
                cleanup_successful &= self._save_progress()
                cleanup_successful &= self._save_processing_state()
                cleanup_successful &= self._save_vector_store()
            else:
                # In forced shutdown, just log the state
                self.logger.warning("Forced shutdown: Skipping state saves")

            # Always try to shutdown Ray
            cleanup_successful &= self._shutdown_ray()

            # Log cleanup summary
            successful_components = [
                name for name, state in self.component_states.items()
                if state
            ]
            failed_components = [
                name for name, state in self.component_states.items()
                if not state
            ]

            self.logger.info("\nCleanup Summary:")
            if successful_components:
                self.logger.info("Successfully completed:")
                for component in successful_components:
                    self.logger.info(f"  • {component}")

            if failed_components:
                self.logger.warning("Failed components:")
                for component in failed_components:
                    self.logger.warning(f"  • {component}")

            self.cleanup_completed = True
            return cleanup_successful

        except Exception as e:
            self.logger.error(f"Unexpected error during cleanup: {e}")
            return False
        finally:
            self.logger.info(
                f"Cleanup {'completed' if self.cleanup_completed else 'failed'} "
                f"({'forced' if self.force_shutdown else 'graceful'} shutdown)"
            )

class ProcessingState:
    """Enhanced processing state management"""
    def __init__(self, save_dir: str, logger: Logger):
        self.save_dir = Path(save_dir)
        self.logger = logger
        self.state_file = self.save_dir / 'processing_state.pkl'
        self.processed_files: Set[str] = set()
        self.failed_files: Dict[str, str] = {}
        self.last_checkpoint = 0
        self.load_state()

    def load_state(self):
        try:
            if self.state_file.exists():
                with open(self.state_file, 'rb') as f:
                    state = pickle.load(f)

                required_keys = {'processed_files', 'failed_files', 'last_checkpoint'}
                if not all(key in state for key in required_keys):
                    raise ValueError("Invalid state file structure")

                self.processed_files = state['processed_files']
                self.failed_files = state['failed_files']
                self.last_checkpoint = state['last_checkpoint']

                self.logger.info(f"Loaded state: {len(self.processed_files)} processed, "
                               f"{len(self.failed_files)} failed")
        except Exception as e:
            self.logger.error(f"Error loading state: {e}")
            pass

    def save_state(self):
        state = {
            'processed_files': self.processed_files,
            'failed_files': self.failed_files,
            'last_checkpoint': self.last_checkpoint,
            'timestamp': datetime.now().isoformat()
        }

        temp_file = self.state_file.with_suffix('.tmp')
        try:
            with open(temp_file, 'wb') as f:
                pickle.dump(state, f)
            temp_file.rename(self.state_file)
            self.logger.debug("State saved successfully")
        except Exception as e:
            self.logger.error(f"Error saving state: {e}")
            if temp_file.exists():
                temp_file.unlink()
            raise

    def mark_processed(self, file_id: str):
        self.processed_files.add(file_id)
        self.failed_files.pop(file_id, None)

    def mark_failed(self, file_id: str, error: str):
        self.failed_files[file_id] = error
        self.logger.error(f"File {file_id} failed: {error}")

class LRUCache:
    """LRU cache implementation for file metadata"""
    def __init__(self, max_size: int):
        self.cache = {}
        self.max_size = max_size
        self.access_order = []

    def get(self, key: str) -> Optional[Any]:
        if key in self.cache:
            self.access_order.remove(key)
            self.access_order.append(key)
            return self.cache[key]
        return None

    def put(self, key: str, value: Any):
        if key in self.cache:
            self.access_order.remove(key)
        elif len(self.cache) >= self.max_size:
            lru_key = self.access_order.pop(0)
            self.cache.pop(lru_key)

        self.cache[key] = value
        self.access_order.append(key)

    def clear(self):
        self.cache.clear()
        self.access_order.clear()

class FileHandler:
    """Enhanced file handling with improved caching and validation"""
    SUPPORTED_MIMETYPES = {
        # Text files
        'text/plain': 'text',
        'text/html': 'text',
        'text/csv': 'text',
        'text/markdown': 'text',
        'text/xml': 'text',
        # Document files
        'application/pdf': 'document',
        'application/msword': 'document',
        'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'document',
        'application/rtf': 'document',
        'application/vnd.oasis.opendocument.text': 'document',
        # Spreadsheets
        'application/vnd.ms-excel': 'spreadsheet',
        'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'spreadsheet',
        # Google workspace
        'application/vnd.google-apps.document': 'google_doc',
        'application/vnd.google-apps.spreadsheet': 'google_sheet',
        'application/vnd.google-apps.presentation': 'google_slide',
        # Archives
        'application/zip': 'archive',
        'application/x-rar-compressed': 'archive',
        'application/x-7z-compressed': 'archive',
        # Code files
        'text/x-python': 'code',
        'application/javascript': 'code',
        'text/x-java': 'code',
        'text/x-c': 'code',
        'text/x-cpp': 'code'
    }

    def __init__(self, service, processing_state, config, logger):
        self.service = service
        self.processing_state = processing_state
        self.config = config
        self.logger = logger
        self._file_cache = LRUCache(config.MAX_CACHE_SIZE)
        self.metrics = ProcessingMetrics(start_time=time.time())
        self._request_timestamps = []
        self._last_rate_limit_check = time.time()

    def _enforce_rate_limits(self):
        """Enforce API rate limits"""
        current_time = time.time()
        window_start = current_time - self.config.API_QUOTA_WINDOW

        # Remove old timestamps
        self._request_timestamps = [ts for ts in self._request_timestamps if ts > window_start]

        # Check if we're over the limit
        if len(self._request_timestamps) >= self.config.API_QUOTA_LIMIT:
            sleep_time = min(window_start + self.config.API_QUOTA_WINDOW - current_time,
                           self.config.MAX_RETRY_DELAY)
            self.logger.warning(f"API quota limit reached, waiting {sleep_time:.2f}s")
            time.sleep(sleep_time)

    def list_files_recursive(self, folder_id: str) -> List[Dict[str, Any]]:
        """List all files using iteration instead of recursion"""
        files = []
        folders_to_process = [(folder_id, 0)]  # (folder_id, depth)
        processed_folders = set()

        start_time = time.time()
        total_files = 0
        total_folders = 0

        while folders_to_process:
            if time.time() - start_time > self.config.BATCH_TIMEOUT:
                self.logger.warning("Batch timeout reached, stopping folder traversal")
                break

            if not self.config.validate_memory_usage():
                self.logger.warning("Memory usage exceeded limit, stopping folder traversal")
                break

            current_folder, depth = folders_to_process.pop(0)

            if depth > self.config.MAX_RECURSION_DEPTH or current_folder in processed_folders:
                continue

            processed_folders.add(current_folder)
            total_folders += 1

            try:
                cached_files = self._file_cache.get(current_folder)
                if cached_files is not None:
                    files.extend(cached_files)
                    total_files += len(cached_files)
                    continue

                folder_files = []
                page_token = None

                while True:
                    self._enforce_rate_limits()

                    try:
                        results = self.service.files().list(
                            q=f"'{current_folder}' in parents and trashed=false",
                            spaces='drive',
                            fields='nextPageToken, files(id, name, mimeType, size, modifiedTime)',
                            pageToken=page_token,
                            pageSize=1000
                        ).execute()

                        self._request_timestamps.append(time.time())

                        for file in results.get('files', []):
                            if file['mimeType'] == 'application/vnd.google-apps.folder':
                                if depth + 1 <= self.config.MAX_RECURSION_DEPTH:
                                    folders_to_process.append((file['id'], depth + 1))
                            elif file['id'] not in self.processing_state.processed_files:
                                folder_files.append(file)
                                total_files += 1

                        files.extend(folder_files)
                        self._file_cache.put(current_folder, folder_files)

                        page_token = results.get('nextPageToken')
                        if not page_token:
                            break

                    except HttpError as e:
                        if e.resp.status == 429:  # Rate limit
                            wait_time = min(self.config.RETRY_DELAY_BASE ** depth,
                                         self.config.MAX_RETRY_DELAY)
                            self.logger.warning(f"Rate limit hit, waiting {wait_time}s")
                            time.sleep(wait_time)
                            continue
                        raise

            except Exception as e:
                self.logger.error(f"Error processing folder {current_folder}: {e}")

        duration = time.time() - start_time
        self.logger.info(f"Folder traversal completed in {duration:.2f}s: "
                        f"processed {total_folders} folders, found {total_files} files")

        return files

    def get_mime_type(self, file_id: str) -> Optional[str]:
        """Get mime type with fallback to extension-based detection"""
        try:
            self._enforce_rate_limits()

            file = self.service.files().get(
                fileId=file_id,
                fields='mimeType, name'
            ).execute()

            self._request_timestamps.append(time.time())

            mime_type = file.get('mimeType')
            if mime_type not in self.SUPPORTED_MIMETYPES:
                ext = os.path.splitext(file.get('name', ''))[1].lower()
                guessed_type = mimetypes.guess_type(f"file{ext}")[0]
                if guessed_type in self.SUPPORTED_MIMETYPES:
                    return guessed_type
            return mime_type

        except Exception as e:
            self.logger.error(f"Error getting mime type for {file_id}: {e}")
            return None

    def convert_and_extract_text(self, file_id: str) -> Optional[str]:
        """Convert and extract text content from a file with retries"""
        if file_id in self.processing_state.processed_files:
            return None

        try:
            mime_type = self.get_mime_type(file_id)
            if not mime_type or mime_type not in self.SUPPORTED_MIMETYPES:
                self.processing_state.mark_failed(file_id, f"Unsupported mime type: {mime_type}")
                return None

            self._enforce_rate_limits()
            file = self.service.files().get(fileId=file_id, fields='size').execute()
            self._request_timestamps.append(time.time())

            if int(file.get('size', 0)) > self.config.MAX_FILE_SIZE:
                self.processing_state.mark_failed(file_id, "File too large")
                return None

            for attempt in range(self.config.MAX_RETRIES):
                try:
                    if mime_type.startswith('application/vnd.google-apps'):
                        request = self.service.files().export_media(
                            fileId=file_id,
                            mimeType='text/plain'
                        )
                    else:
                        request = self.service.files().get_media(fileId=file_id)

                    fh = io.BytesIO()
                    downloader = MediaIoBaseDownload(fh, request)
                    done = False

                    while not done:
                        self._enforce_rate_limits()
                        _, done = downloader.next_chunk()

                    text = fh.getvalue().decode('utf-8', errors='ignore')

                    self.metrics.total_bytes += len(text.encode('utf-8'))
                    self.metrics.processed_files += 1

                    self.processing_state.mark_processed(file_id)
                    return text

                except HttpError as e:
                    if e.resp.status == 429:  # Rate limit
                        wait_time = min(
                            self.config.RETRY_DELAY_BASE ** attempt,
                            self.config.MAX_RETRY_DELAY
                        )
                        self.logger.warning(f"Rate limit hit, waiting {wait_time}s")
                        time.sleep(wait_time)
                    else:
                        raise
                except Exception as e:
                    if attempt < self.config.MAX_RETRIES - 1:
                        self.logger.warning(f"Retry {attempt + 1} for file {file_id}: {e}")
                        time.sleep(self.config.RETRY_DELAY_BASE ** attempt)
                        continue
                    raise

        except Exception as e:
            error_msg = f"Error extracting text: {str(e)}"
            self.processing_state.mark_failed(file_id, error_msg)
            self.metrics.failed_files += 1
            return None

    def clear_cache(self):
        """Clear the file cache"""
        self._file_cache.clear()
        self.logger.debug("File cache cleared")

class VectorStore:
    """Enhanced vector storage using FAISS for indexing with better synchronization"""
    def __init__(self, save_dir: str, config: Config, logger: Logger):
        self.save_dir = Path(save_dir)
        self.config = config
        self.logger = logger
        self.index_path = self.save_dir / 'faiss_index.index'
        self.metadata_path = self.save_dir / 'metadata.json'
        self.dimension = self.config.VECTOR_DIMENSION
        self._load_state()

    def _load_state(self):
        """Load or create index and metadata with validation"""
        try:
            if self.index_path.exists() and self.metadata_path.exists():
                self.index = faiss.read_index(str(self.index_path))
                with open(self.metadata_path) as f:
                    self.metadata = json.load(f)

                # Validate index and metadata consistency
                if self.index.ntotal != len(self.metadata.get('vector_map', {})):
                    self.logger.warning(
                        f"Index/metadata mismatch detected. Rebuilding... "
                        f"(vectors: {self.index.ntotal}, metadata: {len(self.metadata.get('vector_map', {}))})"
                    )
                    self._rebuild_index()
            else:
                self.index = faiss.IndexFlatL2(self.dimension)
                self.metadata = {'vector_map': {}, 'last_update': None}
                self._save_state()

        except Exception as e:
            self.logger.error(f"Error loading vector store state: {e}")
            self.index = faiss.IndexFlatL2(self.dimension)
            self.metadata = {'vector_map': {}, 'last_update': None}

    def _rebuild_index(self):
        """Rebuild index from scratch using metadata"""
        new_index = faiss.IndexFlatL2(self.dimension)
        new_metadata = {'vector_map': {}, 'last_update': datetime.now().isoformat()}

        try:
            # Collect valid vectors
            valid_vectors = []
            valid_file_ids = []

            for file_id, meta in self.metadata['vector_map'].items():
                idx = meta['index']
                if idx < self.index.ntotal:
                    vector = self.index.reconstruct(idx)
                    valid_vectors.append(vector)
                    valid_file_ids.append(file_id)

            if valid_vectors:
                # Convert to numpy array
                vectors = np.vstack(valid_vectors)
                # Add to new index
                new_index.add(vectors)

                # Update metadata
                for i, file_id in enumerate(valid_file_ids):
                    new_metadata['vector_map'][file_id] = {
                        'index': i,
                        'timestamp': datetime.now().isoformat()
                    }

            # Save rebuilt state
            self.index = new_index
            self.metadata = new_metadata
            self._save_state()

        except Exception as e:
            self.logger.error(f"Error rebuilding index: {e}")
            # Create fresh index
            self.index = faiss.IndexFlatL2(self.dimension)
            self.metadata = {'vector_map': {}, 'last_update': datetime.now().isoformat()}
            self._save_state()

    def _save_state(self):
        """Save both index and metadata atomically"""
        temp_index = self.index_path.with_suffix('.tmp')
        temp_metadata = self.metadata_path.with_suffix('.tmp')

        try:
            # Save index
            faiss.write_index(self.index, str(temp_index))

            # Save metadata
            with open(temp_metadata, 'w') as f:
                json.dump(self.metadata, f, indent=2)

            # Atomic rename
            os.replace(temp_index, self.index_path)
            os.replace(temp_metadata, self.metadata_path)

        except Exception as e:
            self.logger.error(f"Error saving vector store state: {e}")
            # Clean up temp files
            if temp_index.exists():
                temp_index.unlink()
            if temp_metadata.exists():
                temp_metadata.unlink()
            raise

    def save_vectors(self, vectors: np.ndarray, file_ids: List[str]) -> bool:
        """Save vectors to index with validation and atomic updates"""
        if len(vectors) != len(file_ids):
            self.logger.error(f"Vector count ({len(vectors)}) doesn't match file_ids count ({len(file_ids)})")
            return False

        try:
            # Filter out already existing files
            new_vectors = []
            new_file_ids = []
            for vec, file_id in zip(vectors, file_ids):
                if file_id not in self.metadata['vector_map']:
                    new_vectors.append(vec)
                    new_file_ids.append(file_id)

            if not new_vectors:
                return True  # Nothing new to add

            # Stack vectors
            vector_array = np.vstack(new_vectors)

            # Add to index
            start_idx = self.index.ntotal
            self.index.add(vector_array)

            # Update metadata
            for i, file_id in enumerate(new_file_ids):
                self.metadata['vector_map'][file_id] = {
                    'index': start_idx + i,
                    'timestamp': datetime.now().isoformat()
                }

            # Save state
            self.metadata['last_update'] = datetime.now().isoformat()
            self._save_state()

            self.logger.info(f"Added {len(new_vectors)} new vectors to index")
            return True

        except Exception as e:
            self.logger.error(f"Error saving vectors: {e}")
            self._load_state()  # Reload previous state
            return False

    def search(self, query_vector: np.ndarray, k: int = 10) -> List[Tuple[str, float]]:
        """Search for k nearest neighbors with validation"""
        try:
            if query_vector.shape[0] != self.dimension:
                self.logger.error(f"Invalid query vector dimension: {query_vector.shape[0]}, expected: {self.dimension}")
                return []

            distances, indices = self.index.search(query_vector.reshape(1, -1), min(k, self.index.ntotal))

            results = []
            for idx, dist in zip(indices[0], distances[0]):
                # Find matching file_id
                for file_id, meta in self.metadata['vector_map'].items():
                    if meta['index'] == idx:
                        similarity = 1 - (dist / 2)  # Convert L2 distance to similarity
                        results.append((file_id, similarity))
                        break

            return sorted(results, key=lambda x: x[1], reverse=True)

        except Exception as e:
            self.logger.error(f"Error during search: {e}")
            return []

    def get_vector(self, file_id: str) -> Optional[np.ndarray]:
        """Retrieve vector for a specific file"""
        try:
            meta = self.metadata['vector_map'].get(file_id)
            if not meta:
                return None

            idx = meta['index']
            if idx >= self.index.ntotal:
                return None

            return self.index.reconstruct(idx)

        except Exception as e:
            self.logger.error(f"Error retrieving vector for {file_id}: {e}")
            return None

    def remove_vectors(self, file_ids: List[str]) -> bool:
        """Remove vectors for specified files"""
        try:
            # Collect remaining vectors and their file IDs
            keep_vectors = []
            keep_file_ids = []

            for file_id, meta in self.metadata['vector_map'].items():
                if file_id not in file_ids:
                    vector = self.index.reconstruct(meta['index'])
                    keep_vectors.append(vector)
                    keep_file_ids.append(file_id)

            # Create new index
            new_index = faiss.IndexFlatL2(self.dimension)
            new_metadata = {'vector_map': {}, 'last_update': datetime.now().isoformat()}

            if keep_vectors:
                # Add remaining vectors
                vectors = np.vstack(keep_vectors)
                new_index.add(vectors)

                # Update metadata
                for i, file_id in enumerate(keep_file_ids):
                    new_metadata['vector_map'][file_id] = {
                        'index': i,
                        'timestamp': datetime.now().isoformat()
                    }

            # Save new state
            self.index = new_index
            self.metadata = new_metadata
            self._save_state()

            return True

        except Exception as e:
            self.logger.error(f"Error removing vectors: {e}")
            self._load_state()  # Restore previous state
            return False

    def clear(self) -> bool:
        """Clear all vectors and metadata"""
        try:
            self.index = faiss.IndexFlatL2(self.dimension)
            self.metadata = {'vector_map': {}, 'last_update': datetime.now().isoformat()}
            self._save_state()
            return True
        except Exception as e:
            self.logger.error(f"Error clearing vector store: {e}")
            return False

@ray.remote
def process_file_chunk(service, files: List[Dict[str, Any]], processing_state: ProcessingState,
                      config: Config, logger: Logger) -> List[Dict[str, Any]]:
    """Process a chunk of files in parallel"""
    results = []
    handler = FileHandler(service, processing_state, config, logger)

    for file in files:
        try:
            text = handler.convert_and_extract_text(file['id'])
            if text:
                results.append({
                    'id': file['id'],
                    'name': file['name'],
                    'text': text,
                    'size': len(text.encode('utf-8'))
                })
        except Exception as e:
            logger.error(f"Error processing file {file['name']}: {e}")

    return results

@ray.remote
class RayVectorizer:
    """Distributed vectorization with improved error handling"""
    def __init__(self, config: Config):
        self.config = config
        self.vectorizer = TfidfVectorizer(
            max_features=config.VECTOR_DIMENSION,
            ngram_range=(1, 2),
            strip_accents='unicode',
            analyzer='word',
            min_df=config.MIN_DF,
            max_df=config.MAX_DF,
            stop_words='english'
        )
        self.is_fitted = False

    def fit_transform(self, texts: List[str]) -> np.ndarray:
        try:
            if not texts:
                return None
            sparse_matrix = self.vectorizer.fit_transform(texts)
            dense_matrix = sparse_matrix.toarray()
            self.is_fitted = True
            return dense_matrix
        except Exception as e:
            print(f"Error in fit_transform: {e}")
            return None

    def transform(self, texts: List[str]) -> np.ndarray:
        try:
            if not texts:
                return None
            if not self.is_fitted:
                raise ValueError("Vectorizer must be fitted first")
            sparse_matrix = self.vectorizer.transform(texts)
            return sparse_matrix.toarray()
        except Exception as e:
            print(f"Error in transform: {e}")
            return None

    def get_vectorizer_state(self):
        """Get state of fitted vectorizer"""
        if not self.is_fitted:
            raise ValueError("Vectorizer must be fitted first")
        return {
            'vocabulary_': self.vectorizer.vocabulary_,
            'idf_': self.vectorizer.idf_,
            'stop_words_': getattr(self.vectorizer, '_stop_words_', None)
        }

    def set_vectorizer_state(self, state):
        """Set state of vectorizer"""
        for key, value in state.items():
            setattr(self.vectorizer, key, value)
        self.is_fitted = True

class ParallelDocumentProcessor:
    """Enhanced main processor with improved vector store integration"""
    def __init__(self, drive_service, folder_id: str, save_dir: str):
        self.config = Config()
        self.logger = Logger(save_dir, self.config.LOG_LEVEL)

        # Initialize Ray without dashboard
        ray.init(ignore_reinit_error=True, include_dashboard=False, log_to_driver=False)

        self.drive_service = drive_service
        self.save_dir = Path(save_dir)
        self.folder_id = folder_id

        # Create processing components
        self.processing_state = ProcessingState(save_dir, self.logger)
        self.progress = ProcessingProgress(save_dir, self.logger)
        self.progress.processing_state = self.processing_state

        self.file_handler = FileHandler(
            drive_service,
            self.processing_state,
            self.config,
            self.logger
        )

        # Initialize vector store with validation
        self.vector_store = VectorStore(save_dir, self.config, self.logger)
        self.verify_vector_store()

        self.shutdown_handler = GracefulShutdown(self, self.logger)

        # Initialize vectorizers
        self.vectorizers = [
            RayVectorizer.remote(self.config)
            for _ in range(self.config.VECTORIZE_WORKERS)
        ]
        self.is_fitted = False

        # Set up status tracking
        self.last_status_time = time.time()
        self.start_time = time.time()
        self.status_interval = 300  # 5 minutes in seconds

    def verify_vector_store(self):
        """Verify vector store consistency with processing state"""
        try:
            # Get all processed file IDs from metadata
            vector_files = set(self.vector_store.metadata['vector_map'].keys())
            processed_files = self.processing_state.processed_files

            # Check for any inconsistencies
            missing_vectors = processed_files - vector_files
            extra_vectors = vector_files - processed_files

            if missing_vectors or extra_vectors:
                self.logger.warning(
                    f"Vector store inconsistency detected:\n"
                    f"• Files without vectors: {len(missing_vectors)}\n"
                    f"• Vectors without files: {len(extra_vectors)}"
                )

                # Remove any extra vectors
                if extra_vectors:
                    self.vector_store.remove_vectors(list(extra_vectors))

                # Reset processing state for files missing vectors
                for file_id in missing_vectors:
                    self.processing_state.processed_files.remove(file_id)

                self.logger.info("Vector store verification complete. Inconsistencies resolved.")

        except Exception as e:
            self.logger.error(f"Error verifying vector store: {e}")

    def parallel_vectorize(self, texts: List[str]) -> Optional[np.ndarray]:
        """Vectorize texts in parallel with improved error handling"""
        if not texts:
            return None

        try:
            if not self.is_fitted:
                # Use first vectorizer to fit on all data
                first_vectorizer = self.vectorizers[0]
                result = ray.get(first_vectorizer.fit_transform.remote(texts))
                if result is None:
                    self.logger.error("Failed to fit vectorizer")
                    return None

                # Update state for all vectorizers
                state = ray.get(first_vectorizer.get_vectorizer_state.remote())
                update_futures = [
                    vectorizer.set_vectorizer_state.remote(state)
                    for vectorizer in self.vectorizers[1:]
                ]
                ray.get(update_futures)
                self.is_fitted = True
                return result

            # Split texts for parallel processing
            n_workers = len(self.vectorizers)
            chunk_size = max(1, len(texts) // n_workers)
            chunks = [texts[i:i + chunk_size] for i in range(0, len(texts), chunk_size)]

            # Process chunks in parallel
            futures = []
            for i, chunk in enumerate(chunks):
                if not chunk:  # Skip empty chunks
                    continue
                vectorizer_idx = i % len(self.vectorizers)
                futures.append(self.vectorizers[vectorizer_idx].transform.remote(chunk))

            if not futures:
                self.logger.warning("No valid chunks to process")
                return None

            # Get results and handle errors
            results = []
            for future in futures:
                try:
                    result = ray.get(future)
                    if result is not None and result.size > 0:
                        results.append(result)
                except Exception as e:
                    self.logger.error(f"Error processing chunk: {str(e)}")
                    continue

            if not results:
                return None

            # Combine results with error checking
            try:
                combined = np.vstack([r for r in results if r is not None and r.size > 0])
                if combined.size == 0:
                    return None
                return combined
            except ValueError as e:
                self.logger.error(f"Error combining results: {str(e)}")
                return None

        except Exception as e:
            self.logger.error(f"Error in parallel_vectorize: {str(e)}")
            return None

    def process_batch(self, batch: List[Dict[str, Any]]) -> bool:
        """Process a single batch with improved vector store handling"""
        try:
            batch_info = {
                'start_time': datetime.now().isoformat(),
                'files': [f['id'] for f in batch],
                'start_index': len(self.processing_state.processed_files),
                'folder_id': self.folder_id
            }
            self.progress.current_batch = batch_info

            # Process files
            processed_files = self.parallel_process_files(batch)

            if processed_files:
                # Filter out any None or empty texts
                valid_files = [
                    f for f in processed_files
                    if f.get('text') and len(f['text'].strip()) > 0
                ]

                if valid_files:
                    texts = [f['text'] for f in valid_files]
                    file_ids = [f['id'] for f in valid_files]

                    # Attempt vectorization
                    vectors = self.parallel_vectorize(texts)
                    if vectors is not None and vectors.size > 0:
                        # Save vectors with validation
                        if self.vector_store.save_vectors(vectors, file_ids):
                            # Only mark as processed if vectors were saved successfully
                            for file_id in file_ids:
                                self.processing_state.mark_processed(file_id)
                        else:
                            self.logger.error("Failed to save vectors, batch will be retried")
                            return False
                    else:
                        self.logger.warning(
                            f"Vectorization failed for batch of {len(valid_files)} files"
                        )
                else:
                    self.logger.warning(f"No valid text content in batch of {len(processed_files)} files")

            # Update progress and status
            self.progress.current_batch = None
            self.progress.save_progress()
            self.log_status()
            return True

        except Exception as e:
            self.logger.error(f"Error processing batch: {e}")
            return False

    def parallel_process_files(self, files: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Process files in parallel with chunking and error handling"""
        chunks = [
            files[i:i + self.config.CHUNK_SIZE]
            for i in range(0, len(files), self.config.CHUNK_SIZE)
        ]

        chunk_futures = [
            process_file_chunk.remote(
                self.drive_service,
                chunk,
                self.processing_state,
                self.config,
                self.logger
            )
            for chunk in chunks
        ]

        try:
            processed_chunks = ray.get(chunk_futures)
            return [item for chunk in processed_chunks for item in chunk if item]
        except Exception as e:
            self.logger.error(f"Error in parallel processing: {e}")
            return []

    def log_status(self, force: bool = False):
        """Log detailed status including vector store information"""
        current_time = time.time()
        if force or (current_time - self.last_status_time >= self.status_interval):
            elapsed = current_time - self.start_time
            processed = len(self.processing_state.processed_files)
            failed = len(self.processing_state.failed_files)
            skipped = sum(len(files) for files in self.progress.skipped_files.values())
            vectors = len(self.vector_store.metadata['vector_map'])

            memory_percent = psutil.Process().memory_percent()

            status_msg = (
                f"Status after {elapsed:.1f}s:\n"
                f"• Processed files: {processed:,}\n"
                f"• Failed files: {failed:,}\n"
                f"• Skipped files: {skipped:,}\n"
                f"• Stored vectors: {vectors:,}\n"
                f"• Memory usage: {memory_percent:.1f}%"
            )
            self.logger.info(status_msg)
            self.last_status_time = current_time

    def process_files(self):
        """Main processing loop with improved resumability"""
        try:
            all_files = self.file_handler.list_files_recursive(self.folder_id)

            # Filter already processed and skipped files
            remaining_files = []
            for f in all_files:
                if f['id'] in self.processing_state.processed_files:
                    self.progress.mark_skipped(f['id'], 'already_processed')
                    continue

                mime_type = self.file_handler.get_mime_type(f['id'])
                if not mime_type or mime_type not in FileHandler.SUPPORTED_MIMETYPES:
                    self.progress.mark_skipped(f['id'], 'unsupported_type')
                    continue

                remaining_files.append(f)

            self.logger.info(f"Found {len(remaining_files)} files to process")

            for i in range(0, len(remaining_files), self.config.BATCH_SIZE):
                if not self.shutdown_handler.should_continue():
                    self.logger.info("Shutdown requested, stopping processing")
                    break

                batch = remaining_files[i:i + self.config.BATCH_SIZE]
                if not self.process_batch(batch):
                    self.logger.error("Batch processing failed, stopping")
                    break

                if i % self.config.CHECKPOINT_INTERVAL == 0:
                    self.processing_state.save_state()
                    self.log_status(force=True)
                    self.logger.info(
                        f"Checkpoint: {i + len(batch)}/{len(remaining_files)} files processed"
                    )

        except Exception as e:
            self.logger.error(f"Error in process_files: {e}")
            raise

        finally:
            self.progress.save_progress(force=True)
            self.log_status(force=True)

    def run(self):
        """Main execution method with improved cleanup"""
        try:
            self.logger.info("Starting document processing")
            self.process_files()

            # Verify final state
            self.verify_vector_store()
            self.log_status(force=True)

            self.logger.info("Processing completed successfully")

        except Exception as e:
            self.logger.error(f"Processing failed: {e}")
            raise

        finally:
            self.shutdown_handler.cleanup()

def create_drive_service():
    """Create and authenticate Google Drive service"""
    try:
        auth.authenticate_user()
        drive.mount('/content/drive')
        service = build('drive', 'v3')
        return service
    except Exception as e:
        print(f"Error creating Drive service: {e}")
        sys.exit(1)

def main():
    """Main execution function for Google Colab environment with resume capability and auto-restart on exception"""
    while True:
        try:
            # Check if any processing state exists
            state_file = Path(SAVE_DIR) / 'processing_state.pkl'
            progress_file = Path(SAVE_DIR) / 'progress.json'
            has_previous_state = state_file.exists() or progress_file.exists()

            # Increase recursion limit
            sys.setrecursionlimit(10000)

            # Create save directory
            os.makedirs(SAVE_DIR, exist_ok=True)

            # Create service
            drive_service = create_drive_service()

            # Initialize processor with existing state
            processor = ParallelDocumentProcessor(
                drive_service=drive_service,
                folder_id=FOLDER_ID,
                save_dir=SAVE_DIR
            )

            if has_previous_state:
                processor.logger.info("Resuming previously interrupted processing...")
                processor.logger.info(f"Found {len(processor.processing_state.processed_files)} previously processed files")
                processor.logger.info(f"Found {len(processor.processing_state.failed_files)} previously failed files")

                # Log skipped files if available
                if processor.progress.skipped_files:
                    total_skipped = sum(len(files) for files in processor.progress.skipped_files.values())
                    processor.logger.info(f"Found {total_skipped} previously skipped files")

            # Run processor
            processor.run()

            processor.logger.info("Processing completed successfully")
            return 0

        except KeyboardInterrupt:
            print("\nGracefully shutting down...")
            return 130  # Standard Unix practice for Ctrl+C
        except Exception as e:
            print(f"Processing failed: {str(e)}")
            print("Restarting in 5 seconds...")
            time.sleep(5)  # Wait for 5 seconds before restarting

if __name__ == '__main__':
    while True:
        exit_code = main()
        if exit_code != 1:  # Assuming 1 signifies an exception that should trigger a restart
            sys.exit(exit_code)

## Check the creation of files from previous step

In [None]:
# @title
from google.colab import drive
import os
from pathlib import Path
import json
import faiss
from typing import Optional, Dict, Any
import numpy as np

def check_vector_db(save_dir: str = SAVE_DIR) -> Optional[Dict[str, Any]]:
    """
    Check the existence and integrity of the vector database and processing state.

    Args:
        save_dir: Directory where vector database and related files are stored (defaults to SAVE_DIR from env)

    Returns:
        Optional[Dict]: Dictionary containing database stats if files exist, None otherwise
    """
    try:
        # Mount Google Drive if not already mounted
        if not os.path.exists('/content/drive'):
            drive.mount('/content/drive')

        # Convert save_dir to Path object
        save_path = Path(str(save_dir))

        # Ensure directory exists
        if not save_path.exists():
            print(f"Directory does not exist: {save_path}")
            return None

        # Define essential files for FAISS vector database
        required_files = {
            'faiss_index': save_path / 'faiss_index.index',
            'metadata': save_path / 'metadata.json',
            'processing_state': save_path / 'processing_state.pkl'
        }

        # Check essential files exist
        missing_files = []
        for name, path in required_files.items():
            if not path.exists():
                missing_files.append(name)

        if missing_files:
            print(f"\nMissing essential files in {save_path}:")
            for file in missing_files:
                print(f"  • {file}")
            return None

        # Load files and perform integrity check
        try:
            # Load FAISS index
            index = faiss.read_index(str(required_files['faiss_index']))

            # Load metadata
            with open(required_files['metadata']) as f:
                metadata = json.load(f)

            # Collect database statistics
            stats = {
                'vector_count': index.ntotal,
                'vector_dimension': index.d,
                'indexed_files': len(metadata['vector_map']),
                'last_update': metadata.get('last_update'),
                'storage_location': str(save_path),
                'memory_usage': {
                    'index_size': os.path.getsize(required_files['faiss_index']) / (1024 * 1024),  # MB
                    'metadata_size': os.path.getsize(required_files['metadata']) / (1024 * 1024),  # MB
                }
            }

            # Print detailed report
            print("\nVector Database Status:")
            print("-" * 50)
            print(f"• Storage Location: {stats['storage_location']}")
            print(f"• Total vectors: {stats['vector_count']}")
            print(f"• Vector dimensionality: {stats['vector_dimension']}")
            print(f"• Indexed files: {stats['indexed_files']}")
            if stats.get('last_update'):
                print(f"• Last update: {stats['last_update']}")

            print("\nStorage Usage:")
            print(f"• FAISS index: {stats['memory_usage']['index_size']:.2f} MB")
            print(f"• Metadata: {stats['memory_usage']['metadata_size']:.2f} MB")

            # Basic integrity check
            if stats['vector_count'] != stats['indexed_files']:
                print("\nWARNING: Number of vectors doesn't match metadata entries!")

            return stats

        except Exception as e:
            print(f"Error loading database files: {str(e)}")
            return None

    except Exception as e:
        print(f"Error: {str(e)}")
        return None

if __name__ == '__main__':
    print(f"Checking vector database in: {SAVE_DIR}")
    stats = check_vector_db()
    if stats:
        print("\nDatabase check completed successfully.")

## Interaction between xAI and Vector Knowledgebase

In [None]:
# @title
"""
Enhanced Chat Interface with FAISS Vector Database Integration
"""

import requests
import json
from IPython.display import display, clear_output, Markdown, HTML
import ipywidgets as widgets
from datetime import datetime
import time
import os
from google.colab import drive
import faiss
import numpy as np
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from sklearn.feature_extraction.text import TfidfVectorizer

class VectorDB:
    """FAISS vector database manager"""
    def __init__(self, save_dir: str):
        self.save_dir = Path(str(save_dir))
        self.index_path = self.save_dir / 'faiss_index.index'
        self.metadata_path = self.save_dir / 'metadata.json'
        self.dimension = 512  # Match the dimension used in processing
        self._load_state()

    def _load_state(self):
        """Load or create index and metadata with validation"""
        try:
            if self.index_path.exists() and self.metadata_path.exists():
                self.index = faiss.read_index(str(self.index_path))
                with open(self.metadata_path) as f:
                    self.metadata = json.load(f)
                print(f"Loaded FAISS index with {self.index.ntotal:,} vectors")
            else:
                print("No existing vector database found")
                self.index = None
                self.metadata = None
        except Exception as e:
            print(f"Error loading vector database: {e}")
            self.index = None
            self.metadata = None

    def find_similar_documents(self, query_vector: np.ndarray, k: int = 3) -> List[Tuple[str, float]]:
        """Find k most similar documents using FAISS"""
        if self.index is None or self.metadata is None:
            return []

        try:
            # Search for similar vectors
            distances, indices = self.index.search(query_vector.reshape(1, -1), min(k, self.index.ntotal))

            results = []
            for idx, dist in zip(indices[0], distances[0]):
                # Find corresponding document ID
                for file_id, meta in self.metadata['vector_map'].items():
                    if meta['index'] == idx:
                        similarity = 1 - (dist / 2)  # Convert L2 distance to similarity score
                        results.append((file_id, similarity))
                        break

            return sorted(results, key=lambda x: x[1], reverse=True)

        except Exception as e:
            print(f"Error in similarity search: {e}")
            return []

    def get_document_vector(self, file_id: str) -> Optional[np.ndarray]:
        """Get vector for a specific document"""
        if self.index is None or self.metadata is None:
            return None

        try:
            meta = self.metadata['vector_map'].get(file_id)
            if not meta:
                return None
            idx = meta['index']
            if idx >= self.index.ntotal:
                return None
            return self.index.reconstruct(idx)
        except Exception as e:
            print(f"Error retrieving vector: {e}")
            return None

class ChatLogger:
    """Handles chat logging to markdown files"""
    def __init__(self, save_dir: str):
        self.save_dir = Path(str(save_dir))
        self.chat_dir = self.save_dir / 'chat_logs'
        self.chat_dir.mkdir(parents=True, exist_ok=True)

        # Create or load current session file
        self.session_file = self.chat_dir / f"chat_session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
        self.session_file.touch()

    def log_interaction(self, query: str, response: str, references: str = ""):
        """Log a single interaction to both session and individual files"""
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        interaction = (
            f"## {timestamp}\n\n"
            f"**Question:** {query}\n\n"
            f"**Response:**\n{response}\n"
            f"{references}\n\n"
            f"---\n\n"
        )

        # Prepend to session file
        current_content = self.session_file.read_text() if self.session_file.exists() else ""
        self.session_file.write_text(interaction + current_content)

        # Create individual log file for this interaction
        individual_file = self.chat_dir / f"interaction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
        individual_file.write_text(interaction)

    def get_recent_interactions(self, limit: int = 5) -> str:
        """Get recent interactions from session file"""
        if not self.session_file.exists():
            return ""
        content = self.session_file.read_text()
        interactions = content.split("---\n\n")[:limit]
        return "---\n\n".join(interactions)

class VectorSearchMetrics:
    """Track and analyze vector search performance"""
    def __init__(self):
        self.total_searches = 0
        self.successful_searches = 0
        self.average_similarity = 0.0
        self.search_times = []
        self.error_counts: Dict[str, int] = {}

    def log_search(self, duration: float, results: List[Tuple[str, float]], error: Optional[str] = None):
        """Log a search operation"""
        self.total_searches += 1

        if error:
            self.error_counts[error] = self.error_counts.get(error, 0) + 1
        else:
            self.successful_searches += 1
            self.search_times.append(duration)

            if results:
                similarities = [sim for _, sim in results]
                self.average_similarity = (
                    (self.average_similarity * (self.successful_searches - 1) +
                     sum(similarities) / len(similarities)) / self.successful_searches
                )

    def get_stats(self) -> Dict[str, Any]:
        """Get current search statistics"""
        return {
            'total_searches': self.total_searches,
            'successful_searches': self.successful_searches,
            'success_rate': self.successful_searches / max(1, self.total_searches),
            'average_similarity': self.average_similarity,
            'average_search_time': sum(self.search_times) / max(1, len(self.search_times)),
            'error_distribution': self.error_counts
        }

class ChatInterface:
    """Enhanced chat interface with improved FAISS vector search"""
    def __init__(self, save_dir: str = SAVE_DIR):
        self.save_dir = Path(str(save_dir))
        self.vector_db = VectorDB(save_dir)
        self.logger = ChatLogger(save_dir)
        self.metrics = VectorSearchMetrics()
        self.setup_interface()

    def setup_interface(self):
        """Initialize the chat interface with status display"""
        # Create interface elements
        self.status_area = widgets.Output()
        self.chat_area = widgets.Output()
        self.input_area = widgets.Text(
            placeholder='Ask a question...',
            layout=widgets.Layout(width='80%')
        )
        self.send_button = widgets.Button(
            description="Send",
            button_style='info',
            layout=widgets.Layout(width='10%')
        )
        self.clear_button = widgets.Button(
            description="Clear",
            button_style='warning',
            layout=widgets.Layout(width='10%')
        )

        # Show database status
        with self.status_area:
            if self.vector_db.index is not None:
                stats = self.get_vector_stats()
                display(HTML(
                    f"<b>Vector Database Status:</b><br>"
                    f"• {stats['total_vectors']:,} vectors loaded<br>"
                    f"• Vector dimension: {stats['dimension']}<br>"
                    f"• Documents indexed: {stats['indexed_documents']:,}<br>"
                    f"• Index size: {stats['memory_usage']['index_size']:.2f} MB"
                ))
                recent = self.logger.get_recent_interactions(3)
                if recent:
                    display(Markdown("\n**Recent Interactions:**\n" + recent))
            else:
                display(HTML("<b>Warning:</b> Vector database not loaded"))

        # Set up callbacks
        self.send_button.on_click(self.handle_send)
        self.clear_button.on_click(self.handle_clear)
        self.input_area.on_submit(self.handle_submit)

        # Layout
        button_box = widgets.HBox([self.send_button, self.clear_button])
        input_box = widgets.VBox([
            widgets.Label('Enter your question:'),
            widgets.HBox([self.input_area, button_box])
        ])
        display(widgets.VBox([self.status_area, input_box, self.chat_area]))

    def vectorize_query(self, query: str) -> Optional[np.ndarray]:
        """Convert query to vector matching index dimension"""
        try:
            if not self.vector_db.index:
                return None

            # Get example vector for dimension
            example_vector = self.vector_db.index.reconstruct(0)
            dimension = len(example_vector)

            # Configure vectorizer
            vectorizer = TfidfVectorizer(
                max_features=dimension,
                strip_accents='unicode',
                analyzer='word',
                stop_words='english'
            )

            # Create vector
            vector = vectorizer.fit_transform([query]).toarray()[0]
            vector = np.pad(vector, (0, max(0, dimension - len(vector))))[:dimension]

            # Normalize
            norm = np.linalg.norm(vector)
            if norm > 0:
                vector = vector / norm

            return vector

        except Exception as e:
            print(f"Error vectorizing query: {e}")
            return None

    def find_relevant_context(self, query: str) -> List[Tuple[str, float]]:
        """Find relevant documents using normalized vectors"""
        if self.vector_db.index is None:
            return []

        try:
            query_vector = self.vectorize_query(query)
            if query_vector is None:
                return []

            similar_docs = self.vector_db.find_similar_documents(query_vector, k=3)
            return similar_docs

        except Exception as e:
            print(f"Error finding relevant context: {e}")
            return []

    def format_context(self, similar_docs: List[Tuple[str, float]]) -> str:
        """Format context with document information"""
        if not similar_docs:
            return "No relevant context found."

        context_parts = []
        for doc_id, similarity in similar_docs:
            vector = self.vector_db.get_document_vector(doc_id)
            vector_info = (
                f"(norm: {np.linalg.norm(vector):.2f})"
                if vector is not None else ""
            )
            context_parts.append(
                f"Document {doc_id} "
                f"(similarity: {similarity:.2f}) {vector_info}"
            )

        return "Related documents:\n" + "\n".join(f"• {part}" for part in context_parts)

    def get_grok_response(self, query: str, context: str) -> str:
        """Get response from Grok API"""
        payload = {
            "messages": [
                {
                    "role": "system",
                    "content": "You are a helpful assistant with access to a knowledge base. "
                              "Use the provided context when relevant, but you can also draw "
                              "on your general knowledge when appropriate."
                },
                {
                    "role": "user",
                    "content": f"{query}\n\nContext: {context}"
                }
            ],
            "model": MODEL,
            "stream": False,
            "temperature": float(TEMP)
        }

        headers = {
            "Authorization": f"Bearer {grok_api_key}",
            "Content-Type": "application/json"
        }

        try:
            response = requests.post(
                "https://api.x.ai/v1/chat/completions",
                headers=headers,
                json=payload,
                timeout=60
            )

            if response.status_code == 200:
                return response.json()['choices'][0]['message']['content']
            else:
                return f"Error: API returned status code {response.status_code}"

        except Exception as e:
            return f"Error: {str(e)}"

    def format_references(self, docs: List[Tuple[str, float]]) -> str:
        """Format reference links with metadata"""
        if not docs:
            return ""

        refs = []
        for doc_id, similarity in docs:
            url = f"https://drive.google.com/file/d/{doc_id}/view?usp=sharing"
            vector = self.vector_db.get_document_vector(doc_id)
            vector_info = f"(norm: {np.linalg.norm(vector):.2f})" if vector is not None else ""
            refs.append(f"[Document {doc_id[:8]}...]({url}) ({similarity:.2f}) {vector_info}")

        return "\n\n---\n**References:**\n\n" + "\n".join(f"• {ref}" for ref in refs)

    def send_message(self, query: str):
        """Process and send message"""
        with self.chat_area:
            clear_output(wait=True)

        similar_docs = self.find_relevant_context(query)
        context = self.format_context(similar_docs)
        response = self.get_grok_response(query, context)
        references = self.format_references(similar_docs)

        self.logger.log_interaction(query, response, references)

        with self.chat_area:
            display(Markdown(self.logger.get_recent_interactions(1)))

    def handle_submit(self, widget):
        """Handle enter key"""
        self.handle_send(None)

    def handle_send(self, b):
        """Handle send button"""
        query = self.input_area.value.strip()
        if query:
            self.input_area.value = ''
            self.send_message(query)

    def handle_clear(self, b):
        """Handle clear button"""
        with self.chat_area:
            clear_output()
        self.input_area.value = ''

    def get_vector_stats(self) -> Dict[str, Any]:
        """Get vector database statistics"""
        return {
            'total_vectors': self.vector_db.index.ntotal,
            'dimension': self.vector_db.dimension,
            'indexed_documents': len(self.vector_db.metadata['vector_map']),
            'memory_usage': {
                'index_size': os.path.getsize(self.vector_db.index_path) / (1024 * 1024),
                'metadata_size': os.path.getsize(self.vector_db.metadata_path) / (1024 * 1024)
            }
        }

def main():
    """Initialize and start the chat interface"""
    print(f"Initializing chat interface with vector database from: {SAVE_DIR}")
    ChatInterface()

if __name__ == '__main__':
    try:
        # Mount Google Drive if needed
        if not os.path.exists('/content/drive'):
            drive.mount('/content/drive')

        print(f"\nInitializing chat interface with vector database from: {SAVE_DIR}")

        # Create interface
        chat = ChatInterface()

        # Show start message
        print("\nChat interface ready. Type your questions in the input box above.")
        print(f"Using model: {MODEL}")
        print(f"Temperature: {TEMP}\n")

    except Exception as e:
        print(f"Error initializing chat interface: {e}")