<a href="https://colab.research.google.com/github/DrunkOnJava/CycleTracker/blob/main/Consolidated_Analysis_Script_(Completed).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Consolidated Analysis Script for Emergency Backup Recovery
Version: 1.0.0

Combines file analysis, message analysis, visualization, and reporting
into a single, robust script.

Features:
- Comprehensive file analysis (hashing, MIME types, patterns)
- Detailed message analysis (contacts, conversations, sentiment, topics)
- Data visualization generation (distributions, networks)
- Automated report generation (summary, category, contact, patterns)
- Checkpointing and resume capability
- Parallel processing support
- Enhanced logging and error handling
- System monitoring integration (state file, PID)
- Optional dependency handling
- Data integrity checks (checksums)
"""

import os
import sys
import hashlib
import sqlite3
import argparse
import logging
import logging.handlers
import random
import time
import json
import re
import shutil
import traceback
import tempfile
import signal
import platform
import uuid
import threading
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Set, Tuple, Union, Iterator, Callable
from collections import defaultdict, Counter
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict

# --- Optional Dependency Handling ---

# File type detection
try:
    import magic
    MAGIC_AVAILABLE = True
except ImportError:
    MAGIC_AVAILABLE = False
    print("WARNING: python-magic not found. MIME type detection will be limited. Install with: pip install python-magic", file=sys.stderr)

# Fuzzy hashing
try:
    import ssdeep
    SSDEEP_AVAILABLE = True
except ImportError:
    SSDEEP_AVAILABLE = False
    print("WARNING: ssdeep not found. Fuzzy hashing disabled. Install with: pip install ssdeep", file=sys.stderr)

# Network analysis and visualization
try:
    import networkx as nx
    NETWORKX_AVAILABLE = True
except ImportError:
    NETWORKX_AVAILABLE = False
    print("WARNING: networkx not found. Network graph analysis disabled. Install with: pip install networkx", file=sys.stderr)

try:
    # Ensure using a non-interactive backend BEFORE importing pyplot
    import matplotlib
    matplotlib.use('Agg') # Use Agg backend for non-interactive plotting
    import matplotlib.pyplot as plt
    MATPLOTLIB_AVAILABLE = True
except ImportError:
    MATPLOTLIB_AVAILABLE = False
    print("WARNING: matplotlib not found. Plotting and visualization disabled. Install with: pip install matplotlib", file=sys.stderr)

# Data manipulation and visualization
try:
    import pandas as pd
    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False
    print("WARNING: pandas not found. Some data analysis and reporting features disabled. Install with: pip install pandas", file=sys.stderr)

try:
    import numpy as np
    NUMPY_AVAILABLE = True
except ImportError:
    NUMPY_AVAILABLE = False
    print("WARNING: numpy not found. Some numerical operations and analysis disabled. Install with: pip install numpy", file=sys.stderr)

try:
    import seaborn as sns
    # Seaborn depends on pandas and matplotlib
    SEABORN_AVAILABLE = PANDAS_AVAILABLE and MATPLOTLIB_AVAILABLE and NUMPY_AVAILABLE
except ImportError:
    SEABORN_AVAILABLE = False
    if PANDAS_AVAILABLE and MATPLOTLIB_AVAILABLE and NUMPY_AVAILABLE:
        print("WARNING: seaborn not found. Advanced plotting disabled. Install with: pip install seaborn", file=sys.stderr)

# Text analysis
try:
    from sklearn.feature_extraction.text import TfidfVectorizer
    # from sklearn.cluster import DBSCAN # Not used in provided code, commented out
    SKLEARN_AVAILABLE = True
except ImportError:
    SKLEARN_AVAILABLE = False
    print("WARNING: scikit-learn not found. Topic modeling disabled. Install with: pip install scikit-learn", file=sys.stderr)

# Sentiment analysis
try:
    from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
    SENTIMENT_AVAILABLE = True
except ImportError:
    SENTIMENT_AVAILABLE = False
    print("WARNING: vaderSentiment not found. Sentiment analysis disabled. Install with: pip install vaderSentiment", file=sys.stderr)

# Phone number parsing
try:
    # Use specific imports to avoid potential namespace conflicts
    from phonenumbers import parse as parse_phone, is_valid_number, format_number, PhoneNumberFormat, NumberParseException
    PHONENUMBERS_AVAILABLE = True
except ImportError:
    PHONENUMBERS_AVAILABLE = False
    print("WARNING: phonenumbers not found. Phone number validation/formatting disabled. Install with: pip install phonenumbers", file=sys.stderr)

# Word cloud generation
try:
    from wordcloud import WordCloud
    # Wordcloud depends on matplotlib and numpy
    WORDCLOUD_AVAILABLE = MATPLOTLIB_AVAILABLE and NUMPY_AVAILABLE
except ImportError:
    WORDCLOUD_AVAILABLE = False
    if MATPLOTLIB_AVAILABLE and NUMPY_AVAILABLE:
        print("WARNING: wordcloud not found. Word cloud generation disabled. Install with: pip install wordcloud", file=sys.stderr)

# Tabulate for reports
try:
    from tabulate import tabulate
    TABULATE_AVAILABLE = True
except ImportError:
    TABULATE_AVAILABLE = False
    print("WARNING: tabulate not found. Text-based reports will be basic. Install with: pip install tabulate", file=sys.stderr)

# --- Constants ---
SCRIPT_VERSION = "1.0.0"
DEFAULT_BATCH_SIZE = 1000
DEFAULT_MIN_CONVERSATION_FILES = 3 # Adjusted minimum conversation length
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
CHECKPOINT_INTERVAL = 300 # seconds between automatic checkpoints (not currently used for auto-save timer)
DEFAULT_REPORT_TOP_N = 20 # Default number of items for top lists in reports
DEFAULT_GRAPH_TOP_N = 30 # Default number of nodes for graph visualization

# --- Data Classes ---

@dataclass
class FileInfo:
    """Data class for storing analyzed file information."""
    id: Optional[int] = None
    path: str = ""
    filename: str = ""
    extension: str = ""
    filesize: int = 0
    mimetype: str = "application/octet-stream"
    md5hash: str = ""
    fuzzyhash: str = ""
    creation_date: Optional[str] = None # Use Optional[str] for dates
    modification_date: Optional[str] = None # Use Optional[str] for dates
    category: str = "Other"
    value_score: float = 0.0
    is_duplicate: int = 0
    original_id: Optional[int] = None
    patterns: List[Dict[str, Any]] = field(default_factory=list)
    last_analyzed: Optional[str] = None # Track when analysis was last run
    error: Optional[str] = None

@dataclass
class MessageMetadata:
    """Data class for message metadata extracted during analysis."""
    phone_numbers: List[str] = field(default_factory=list)
    timestamps: List[str] = field(default_factory=list)
    filename: str = ""
    file_path: str = ""
    creation_time: Optional[str] = None
    modification_time: Optional[str] = None
    content: str = ""
    entities: Dict[str, List[str]] = field(default_factory=lambda: defaultdict(list))
    sentiment_score: float = 0.0
    error: Optional[str] = None

@dataclass
class Contact:
    """Data class for contact information derived from messages."""
    phone_number: str
    message_count: int = 0
    first_seen: Optional[str] = None
    last_seen: Optional[str] = None
    files: Set[int] = field(default_factory=set) # Set of file IDs
    common_contacts: Dict[str, int] = field(default_factory=dict) # Other number -> count
    confidence: float = 1.0 # Placeholder for future confidence scoring
    name_suggestions: List[str] = field(default_factory=list) # Placeholder
    normalized_number: str = ""
    sentiment_score: float = 0.0 # Average sentiment across messages
    topics: List[str] = field(default_factory=list)
    location_hints: List[str] = field(default_factory=list)

    def to_dict(self):
        """Convert Contact object to a dictionary suitable for JSON serialization."""
        result = asdict(self)
        result['files'] = sorted(list(self.files)) # Convert set to sorted list
        return result

@dataclass
class Conversation:
    """Data class for conversation information."""
    participants: List[str] = field(default_factory=list)
    message_count: int = 0
    first_message: Optional[str] = None
    last_message: Optional[str] = None
    files: List[Dict[str, Any]] = field(default_factory=list) # List of {'path': str, 'date': str, 'sentiment': float}
    sentiment_score: float = 0.0 # Average sentiment across messages
    topics: List[str] = field(default_factory=list)
    is_group_chat: bool = False

    def to_dict(self):
        """Convert Conversation object to a dictionary suitable for JSON serialization."""
        return asdict(self)

# --- Configuration Class ---

@dataclass
class Config:
    """Holds configuration parameters for the analysis."""
    backup_dir: str
    base_output_dir: str
    sample_rate: float = 1.0
    max_files: Optional[int] = None
    resume: bool = False
    batch_size: int = DEFAULT_BATCH_SIZE
    min_conversation_files: int = DEFAULT_MIN_CONVERSATION_FILES
    parallel_jobs: int = os.cpu_count() or 1 # Default to number of CPU cores
    use_npu: bool = False # Placeholder, actual usage depends on libraries
    debug_level: int = 1 # 0=quiet, 1=info, 2=debug
    report_top_n: int = DEFAULT_REPORT_TOP_N
    graph_top_n: int = DEFAULT_GRAPH_TOP_N

    # Derived paths
    log_dir: str = field(init=False)
    database_dir: str = field(init=False)
    data_dir: str = field(init=False)
    visualization_dir: str = field(init=False)
    report_dir: str = field(init=False) # Added report directory
    monitor_dir: str = field(init=False)
    tmp_dir: str = field(init=False)
    db_path: str = field(init=False)
    checkpoint_path: str = field(init=False) # For file_analyzer progress
    state_file_path: str = field(init=False)
    pid_file_path: str = field(init=False)
    contact_data_path: str = field(init=False) # Path for contacts.json
    conversation_data_path: str = field(init=False) # Path for conversations.json
    contact_conv_dir: str = field(init=False) # Dir for individual contact conversation files
    visualization_data_path: str = field(init=False) # Path for visualization data (e.g., counts)

    def __post_init__(self):
        """Calculate derived paths after initialization."""
        self.base_output_dir = os.path.abspath(self.base_output_dir)
        self.log_dir = os.path.join(self.base_output_dir, "Logs")
        self.database_dir = os.path.join(self.base_output_dir, "Database")
        self.data_dir = os.path.join(self.base_output_dir, "Data")
        self.visualization_dir = os.path.join(self.base_output_dir, "Visualizations")
        self.report_dir = os.path.join(self.base_output_dir, "Reports") # Report dir path
        self.monitor_dir = os.path.join(self.base_output_dir, "Monitor")
        self.tmp_dir = os.path.join(self.base_output_dir, "Tmp")

        self.db_path = os.path.join(self.database_dir, "analysis.db")
        self.checkpoint_path = os.path.join(self.database_dir, "file_analyzer_checkpoint.json")
        self.state_file_path = os.path.join(self.monitor_dir, "analysis_state.json")
        self.pid_file_path = os.path.join(self.monitor_dir, "analysis.pid")

        # Data output paths
        self.contact_data_path = os.path.join(self.data_dir, "contacts.json")
        self.conversation_data_path = os.path.join(self.data_dir, "conversations.json")
        self.contact_conv_dir = os.path.join(self.data_dir, "ConversationsByContact")
        self.visualization_data_path = os.path.join(self.data_dir, "visualization_data.json") # Added viz data path


    def create_directories(self):
        """Create all necessary output directories."""
        dirs = [
            self.log_dir, self.database_dir, self.data_dir,
            self.visualization_dir,
            os.path.join(self.visualization_dir, "Graphs"),
            os.path.join(self.visualization_dir, "Charts"),
            os.path.join(self.visualization_dir, "WordClouds"), # Added WordCloud dir
            self.report_dir, # Added report dir
            self.monitor_dir, self.tmp_dir,
            self.contact_conv_dir # Added contact conversation dir
        ]
        for dir_path in dirs:
            try:
                os.makedirs(dir_path, exist_ok=True)
            except OSError as e:
                # Use print here as logger might not be fully configured yet
                print(f"ERROR: Failed to create directory: {dir_path} - {e}", file=sys.stderr)
                raise # Re-raise the exception to halt execution if critical dirs fail

# --- Logging Setup ---

# Global logger instance
logger = logging.getLogger('consolidated_analyzer')
stop_event = threading.Event() # Global event for signaling shutdown

def setup_logging(log_dir: str, debug_level: int) -> None:
    """Set up advanced logging with rotation and multiple handlers."""
    log_levels = {
        0: logging.WARNING, # Quiet
        1: logging.INFO,    # Normal
        2: logging.DEBUG    # Verbose/Debug
    }
    log_level = log_levels.get(debug_level, logging.INFO)

    log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(module)s:%(lineno)d] - %(message)s'
    date_format = '%Y-%m-%d %H:%M:%S'

    logger.setLevel(log_level)

    # Clear existing handlers to prevent duplication if called multiple times
    if logger.handlers:
        logger.handlers.clear()

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_formatter = logging.Formatter(log_format, date_format)
    console_handler.setFormatter(console_formatter)
    logger.addHandler(console_handler)

    # File handler for all logs (rotating)
    log_file = os.path.join(log_dir, 'analysis_full.log')
    # Rotate logs, keep 5 backups, max 10MB each
    try:
        file_handler = logging.handlers.RotatingFileHandler(
            log_file, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
        )
        file_handler.setLevel(logging.DEBUG) # Log everything to the file
        file_formatter = logging.Formatter(log_format, date_format)
        file_handler.setFormatter(file_formatter)
        logger.addHandler(file_handler)
    except Exception as e:
        print(f"ERROR: Failed to set up rotating log file handler: {e}", file=sys.stderr)


    # File handler specifically for errors
    error_log_file = os.path.join(log_dir, 'analysis_errors.log')
    try:
        error_handler = logging.FileHandler(error_log_file, encoding='utf-8')
        error_handler.setLevel(logging.ERROR)
        error_formatter = logging.Formatter(log_format, date_format)
        error_handler.setFormatter(error_formatter)
        logger.addHandler(error_handler)
    except Exception as e:
         print(f"ERROR: Failed to set up error log file handler: {e}", file=sys.stderr)


    # Log startup information
    logger.info(f"--- Consolidated Analysis Script v{SCRIPT_VERSION} Starting ---")
    logger.info(f"Log level set to {logging.getLevelName(log_level)}")
    logger.info(f"Full logs: {log_file}")
    logger.info(f"Error logs: {error_log_file}")

    # Log available optional features
    optional_features = {
        "python-magic (MIME types)": MAGIC_AVAILABLE,
        "ssdeep (Fuzzy Hashing)": SSDEEP_AVAILABLE,
        "NetworkX (Graph Analysis)": NETWORKX_AVAILABLE,
        "Matplotlib (Plotting)": MATPLOTLIB_AVAILABLE,
        "Pandas (DataFrames)": PANDAS_AVAILABLE,
        "NumPy (Numerical Ops)": NUMPY_AVAILABLE,
        "Seaborn (Adv. Plotting)": SEABORN_AVAILABLE,
        "Scikit-learn (Text Analysis)": SKLEARN_AVAILABLE,
        "VADER Sentiment": SENTIMENT_AVAILABLE,
        "Phonenumbers (Validation)": PHONENUMBERS_AVAILABLE,
        "WordCloud": WORDCLOUD_AVAILABLE,
        "Tabulate (Reports)": TABULATE_AVAILABLE,
    }
    logger.debug("Optional library availability:")
    for name, available in optional_features.items():
        logger.debug(f"- {name}: {'Available' if available else 'Not Available'}")

# --- Utility Functions ---

def calculate_checksum(file_path: str) -> Optional[str]:
    """Calculate SHA256 checksum for a file."""
    if not os.path.exists(file_path):
        logger.error(f"Cannot calculate checksum, file not found: {file_path}")
        return None
    try:
        hasher = hashlib.sha256()
        with open(file_path, "rb") as f:
            while chunk := f.read(8192): # Read in 8KB chunks
                hasher.update(chunk)
        return hasher.hexdigest()
    except OSError as e:
        logger.error(f"Error calculating checksum for {file_path}: {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error calculating checksum for {file_path}: {e}")
        return None


def verify_checksum(file_path: str, expected_checksum: str) -> bool:
    """Verify the checksum of a file against an expected value."""
    calculated_checksum = calculate_checksum(file_path)
    if calculated_checksum is None:
        return False # Error occurred during calculation
    if calculated_checksum == expected_checksum:
        logger.debug(f"Checksum verified for: {file_path}")
        return True
    else:
        logger.error(f"Checksum mismatch for: {file_path}")
        logger.error(f"  Expected: {expected_checksum}")
        logger.error(f"  Actual:   {calculated_checksum}")
        return False

def atomic_write_json(data: Any, file_path: str):
    """Write JSON data to a file atomically."""
    # Ensure the directory exists
    dir_name = os.path.dirname(file_path)
    if dir_name: # Only create if not in the current directory
        os.makedirs(dir_name, exist_ok=True)

    # Use tempfile for secure temporary file creation
    try:
        # Create temp file in the same directory to ensure atomic rename works across filesystems
        with tempfile.NamedTemporaryFile('w', encoding='utf-8', delete=False, dir=dir_name, suffix='.tmp') as tf:
            temp_file_path = tf.name
            json.dump(data, tf, indent=2)
            # Ensure data is written to disk before renaming
            tf.flush()
            os.fsync(tf.fileno())
    except Exception as e:
        logger.error(f"Failed to write temporary JSON file in {dir_name}: {e}")
        # Clean up temp file if it exists and write failed
        if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
            try:
                os.remove(temp_file_path)
            except OSError as rm_err:
                logger.error(f"Failed to remove temporary file {temp_file_path}: {rm_err}")
        raise # Re-raise the original exception

    try:
        # Atomic rename/replace operation
        os.replace(temp_file_path, file_path)
        logger.debug(f"Atomically wrote JSON to: {file_path}")
    except OSError as e:
        logger.error(f"Failed to replace {file_path} with {temp_file_path}: {e}")
        # Clean up temp file if rename failed
        if os.path.exists(temp_file_path):
            try:
                os.remove(temp_file_path)
            except OSError as rm_err:
                logger.error(f"Failed to remove temporary file {temp_file_path}: {rm_err}")
        raise # Re-raise the original exception

def load_json_safe(file_path: str) -> Optional[Dict[str, Any]]:
    """Load JSON data from a file safely, handling errors."""
    if not os.path.exists(file_path):
        logger.debug(f"JSON file not found: {file_path}")
        return None
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    except json.JSONDecodeError as e:
        logger.error(f"Error decoding JSON from {file_path}: {e}")
        # Attempt to move corrupted file
        try:
            corrupt_path = f"{file_path}.corrupt_{datetime.now().strftime('%Y%m%d%H%M%S')}"
            shutil.move(file_path, corrupt_path)
            logger.warning(f"Moved potentially corrupted JSON file to: {corrupt_path}")
        except Exception as move_e:
            logger.error(f"Could not move corrupted JSON file {file_path}: {move_e}")
        return None
    except OSError as e:
        logger.error(f"Error reading JSON file {file_path}: {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error loading JSON file {file_path}: {e}")
        return None


def get_file_mime_type(file_path: str) -> str:
    """Get MIME type using python-magic or fallback to extension."""
    if MAGIC_AVAILABLE:
        try:
            # Ensure the magic object is created fresh or handled thread-safely if needed
            # Creating it each time is safer for multiprocessing/threading unless explicitly managed
            mime_detector = magic.Magic(mime=True)
            return mime_detector.from_file(file_path)
        except magic.MagicException as e:
            logger.warning(f"python-magic failed for {file_path}: {e}. Falling back to extension.")
            return get_mime_from_extension(file_path)
        except FileNotFoundError:
             logger.error(f"File not found by python-magic: {file_path}")
             return get_mime_from_extension(file_path)
        except OSError as e:
             # Catch OS errors like permission denied during magic processing
             logger.error(f"OS error getting MIME type for {file_path}: {e}")
             return get_mime_from_extension(file_path)
        except Exception as e:
            # Catch other potential errors
            logger.error(f"Unexpected error getting MIME type for {file_path}: {e}")
            return get_mime_from_extension(file_path)
    else:
        logger.debug("python-magic not available, using extension-based MIME type.")
        return get_mime_from_extension(file_path)

def get_mime_from_extension(file_path: str) -> str:
    """Guess MIME type from file extension (basic fallback)."""
    extension = os.path.splitext(file_path)[1].lower()
    # Simplified common MIME types
    mime_types = {
        '.txt': 'text/plain', '.log': 'text/plain', '.md': 'text/markdown',
        '.html': 'text/html', '.htm': 'text/html', '.css': 'text/css', '.js': 'application/javascript',
        '.json': 'application/json', '.xml': 'application/xml', '.csv': 'text/csv',
        '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif',
        '.svg': 'image/svg+xml', '.webp': 'image/webp', '.bmp': 'image/bmp', '.tiff': 'image/tiff', '.ico': 'image/vnd.microsoft.icon',
        '.mp3': 'audio/mpeg', '.wav': 'audio/wav', '.ogg': 'audio/ogg', '.m4a': 'audio/mp4', '.flac': 'audio/flac',
        '.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mov': 'video/quicktime', '.wmv': 'video/x-ms-wmv', '.mkv': 'video/x-matroska', '.webm': 'video/webm',
        '.pdf': 'application/pdf',
        '.doc': 'application/msword', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
        '.xls': 'application/vnd.ms-excel', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
        '.ppt': 'application/vnd.ms-powerpoint', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
        '.odt': 'application/vnd.oasis.opendocument.text', '.ods': 'application/vnd.oasis.opendocument.spreadsheet', '.odp': 'application/vnd.oasis.opendocument.presentation',
        '.zip': 'application/zip', '.rar': 'application/vnd.rar', '.tar': 'application/x-tar',
        '.gz': 'application/gzip', '.bz2': 'application/x-bzip2', '.7z': 'application/x-7z-compressed',
        '.exe': 'application/x-msdownload', '.dll': 'application/x-msdownload', '.so': 'application/octet-stream',
        '.apk': 'application/vnd.android.package-archive', '.ipa': 'application/octet-stream', # iOS app
        '.iso': 'application/x-iso9660-image', '.img': 'application/octet-stream', # Disk images
        '.db': 'application/vnd.sqlite3', '.sqlite': 'application/vnd.sqlite3', '.sqlite3': 'application/vnd.sqlite3', '.sql': 'application/sql',
        '.vcf': 'text/vcard', '.ics': 'text/calendar',
        '.ttf': 'font/ttf', '.otf': 'font/otf', '.woff': 'font/woff', '.woff2': 'font/woff2',
        # Add more as needed
    }
    return mime_types.get(extension, 'application/octet-stream') # Default binary stream

def format_bytes(size: int) -> str:
    """Format bytes into a human-readable string (KB, MB, GB)."""
    if size < 1024:
        return f"{size} B"
    elif size < 1024**2:
        return f"{size/1024:.2f} KB"
    elif size < 1024**3:
        return f"{size/1024**2:.2f} MB"
    else:
        return f"{size/1024**3:.2f} GB"

def sanitize_filename(filename: str) -> str:
    """Sanitize a string to be used as a valid filename."""
    # Remove invalid characters for most filesystems
    sanitized = re.sub(r'[\\/*?:"<>|]', "_", filename)
    # Replace multiple underscores with single underscore
    sanitized = re.sub(r'_+', '_', sanitized)
    # Remove leading/trailing underscores/spaces
    sanitized = sanitized.strip('_. ')
    # Limit length (e.g., 200 chars) to avoid issues
    max_len = 200
    if len(sanitized) > max_len:
        # Keep extension if present
        base, ext = os.path.splitext(sanitized)
        sanitized = base[:max_len - len(ext)] + ext
    # Handle empty filenames after sanitization
    if not sanitized:
        return f"sanitized_{uuid.uuid4().hex[:8]}"
    return sanitized

def parse_iso_datetime(date_str: Optional[str]) -> Optional[datetime]:
    """Safely parse an ISO format datetime string."""
    if not date_str:
        return None
    try:
        # Handle potential timezone offsets (Z, +HH:MM, -HH:MM)
        return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
    except (ValueError, TypeError):
        logger.warning(f"Could not parse datetime string: {date_str}")
        return None

# --- Signal Handler ---
def signal_handler(signum, frame):
    """Handle termination signals gracefully."""
    signal_name = signal.Signals(signum).name
    logger.warning(f"Received signal: {signal_name}. Initiating graceful shutdown...")
    stop_event.set() # Signal threads/processes to stop

# --- Analysis Supervisor ---

class AnalysisSupervisor:
    """Monitors and manages the analysis process state."""
    def __init__(self, config: Config):
        self.config = config
        self.start_time = time.time()
        self.state = {
            "scriptVersion": SCRIPT_VERSION,
            "pid": os.getpid(),
            "status": "initializing",
            "phase": "startup",
            "progressPercent": 0.0,
            "processedFiles": 0,
            "totalFilesToProcess": 0,
            "processedMessages": 0, # Specific to message phase
            "totalMessagesToProcess": 0, # Specific to message phase
            "startTime": self.start_time,
            "lastUpdateTime": self.start_time,
            "estimatedEndTime": None,
            "currentTask": "Initializing",
            "errors": [],
            "warnings": [],
            "stats": defaultdict(int),
            "config": {k: v for k, v in asdict(config).items() if k not in ['db_path', 'checkpoint_path', 'state_file_path', 'pid_file_path', 'visualization_data_path']} # Store config for reference, exclude sensitive paths
        }
        self.lock = threading.Lock() # Use threading lock for thread safety
        self._write_pid_file()
        self.save_state() # Initial state save

    def _write_pid_file(self):
        """Write the current process ID to the PID file."""
        try:
            with open(self.config.pid_file_path, 'w') as f:
                f.write(str(os.getpid()))
            logger.info(f"PID file created at: {self.config.pid_file_path} with PID {os.getpid()}")
        except OSError as e:
            logger.error(f"Failed to write PID file {self.config.pid_file_path}: {e}")

    def _remove_pid_file(self):
        """Remove the PID file."""
        try:
            # Verify PID before removing, in case the process died and another started
            if os.path.exists(self.config.pid_file_path):
                 with open(self.config.pid_file_path, 'r') as f:
                     pid_in_file = f.read().strip()
                     if pid_in_file == str(os.getpid()):
                         os.remove(self.config.pid_file_path)
                         logger.info("PID file removed.")
                     else:
                         logger.warning(f"PID file {self.config.pid_file_path} belongs to another process ({pid_in_file}), not removing.")
            else:
                logger.info("PID file not found, nothing to remove.")

        except OSError as e:
            logger.error(f"Failed to remove PID file {self.config.pid_file_path}: {e}")
        except Exception as e:
            logger.error(f"Unexpected error removing PID file: {e}")


    def save_state(self):
        """Save the current state to the state file."""
        with self.lock:
            current_time = time.time()
            self.state["lastUpdateTime"] = current_time
            # Format times for readability in JSON
            current_state = self.state.copy()
            current_state["startTimeFormatted"] = datetime.fromtimestamp(self.state["startTime"]).isoformat()
            current_state["lastUpdateTimeFormatted"] = datetime.fromtimestamp(current_time).isoformat()
            if self.state["estimatedEndTime"]:
                 current_state["estimatedEndTimeFormatted"] = datetime.fromtimestamp(self.state["estimatedEndTime"]).isoformat()
            else:
                 current_state["estimatedEndTimeFormatted"] = None # Ensure it's null if not set

            # Convert defaultdict to regular dict for JSON serialization
            current_state["stats"] = dict(self.state["stats"])

            try:
                atomic_write_json(current_state, self.config.state_file_path)
            except Exception as e:
                # Use print as logger might be involved in the failure
                print(f"CRITICAL: Failed to save state file: {e}", file=sys.stderr)
                logger.critical(f"Failed to save state file: {e}", exc_info=True)

    def update_status(self, status: str, task: Optional[str] = None):
        """Update the overall status and current task."""
        with self.lock:
            self.state["status"] = status
            if task:
                self.state["currentTask"] = task
                logger.info(f"Status: {status} - Task: {task}")
            else:
                # If no task provided, keep the last task or set a default
                self.state["currentTask"] = self.state.get("currentTask", "N/A")
                logger.info(f"Status: {status} (Task: {self.state['currentTask']})")
        self.save_state()

    def update_phase(self, phase: str):
        """Update the current analysis phase."""
        with self.lock:
            self.state["phase"] = phase
            self.state["currentTask"] = f"Starting {phase}" # Reset task for new phase
            logger.info(f"Entering phase: {phase}")
        self.save_state()

    def set_file_totals(self, total_files: int):
        """Set the total number of files to be processed."""
        with self.lock:
            self.state["totalFilesToProcess"] = total_files
            # Reset progress if totals are reset
            self.state["processedFiles"] = 0
            self.state["progressPercent"] = 0.0
            self.state["estimatedEndTime"] = None
        self.save_state()

    def update_file_progress(self, increment: int = 1):
        """Update the progress of file processing."""
        if increment <= 0: return

        with self.lock:
            self.state["processedFiles"] += increment
            total = self.state["totalFilesToProcess"]
            processed = self.state["processedFiles"]

            # Ensure processed doesn't exceed total
            processed = min(processed, total)
            self.state["processedFiles"] = processed

            if total > 0:
                self.state["progressPercent"] = round((processed / total) * 100, 2)
                # Estimate remaining time
                elapsed_time = time.time() - self.start_time
                if processed > 10 and elapsed_time > 5: # Avoid division by zero and unstable early estimates
                    try:
                        time_per_file = elapsed_time / processed
                        remaining_files = total - processed
                        if remaining_files > 0:
                            remaining_time = remaining_files * time_per_file
                            self.state["estimatedEndTime"] = time.time() + remaining_time
                        else:
                            # If processing is complete, clear ETA
                            self.state["estimatedEndTime"] = None
                    except ZeroDivisionError:
                        self.state["estimatedEndTime"] = None # Avoid error if processed becomes 0 somehow
                else:
                    self.state["estimatedEndTime"] = None # Not enough data for reliable ETA
            else:
                self.state["progressPercent"] = 100.0 if processed > 0 else 0.0 # Handle case where total is 0
                self.state["estimatedEndTime"] = None

            # Log progress periodically
            log_interval = max(100, self.config.batch_size) # Log every batch or 100 files
            if processed % log_interval == 0 or processed == total:
                 est_time_str = ""
                 if self.state["estimatedEndTime"]:
                     try:
                         eta_dt = datetime.fromtimestamp(self.state['estimatedEndTime'])
                         est_time_str = f", ETA: {eta_dt.strftime('%Y-%m-%d %H:%M:%S')}"
                     except TypeError: # Handle potential None value if calculation failed
                         est_time_str = ", ETA: Calculating..."
                 logger.info(f"File Progress: {processed}/{total} ({self.state['progressPercent']}%) {est_time_str}")

        # Save state less frequently for performance during high-volume updates
        # Save every N updates or if progress changed significantly
        if processed % 50 == 0 or processed == total:
             self.save_state()


    def set_message_totals(self, total_messages: int):
        """Set the total number of messages to be processed."""
        with self.lock:
            self.state["totalMessagesToProcess"] = total_messages
            # Reset progress for message phase
            self.state["processedMessages"] = 0
        self.save_state()

    def update_message_progress(self, increment: int = 1):
        """Update the progress of message processing."""
        if increment <= 0: return

        with self.lock:
            self.state["processedMessages"] += increment
            total = self.state["totalMessagesToProcess"]
            processed = self.state["processedMessages"]

            # Ensure processed doesn't exceed total
            processed = min(processed, total)
            self.state["processedMessages"] = processed

            # Log message progress if needed, similar to file progress
            log_interval = 1000
            if processed % log_interval == 0 or processed == total:
                 progress_percent = round((processed / total) * 100, 1) if total > 0 else 100.0
                 logger.info(f"Message Progress: {processed}/{total} ({progress_percent}%)")

        # Save state periodically for message progress
        if processed % 200 == 0 or processed == total:
            self.save_state()

    def log_error(self, message: str, detail: Optional[str] = None):
        """Log an error and add it to the state."""
        log_message = f"{message} - Detail: {detail}" if detail else message
        logger.error(log_message) # Log first
        with self.lock:
            error_entry = {
                "timestamp": time.time(),
                "timestampFormatted": datetime.now().isoformat(),
                "message": message,
                "detail": detail
            }
            self.state["errors"].append(error_entry)
            # Limit error list size in state file for performance
            max_errors_in_state = 50
            if len(self.state["errors"]) > max_errors_in_state:
                self.state["errors"] = self.state["errors"][-max_errors_in_state:]
        self.save_state() # Save state after logging error

    def log_warning(self, message: str, detail: Optional[str] = None):
        """Log a warning and add it to the state."""
        log_message = f"{message} - Detail: {detail}" if detail else message
        logger.warning(log_message) # Log first
        with self.lock:
            warning_entry = {
                "timestamp": time.time(),
                "timestampFormatted": datetime.now().isoformat(),
                "message": message,
                "detail": detail
            }
            self.state["warnings"].append(warning_entry)
            # Limit warning list size in state file
            max_warnings_in_state = 50
            if len(self.state["warnings"]) > max_warnings_in_state:
                self.state["warnings"] = self.state["warnings"][-max_warnings_in_state:]
        self.save_state() # Save state after logging warning

    def update_stat(self, key: str, increment: int = 1):
        """Update a specific statistic counter."""
        with self.lock:
            self.state["stats"][key] = self.state["stats"].get(key, 0) + increment
        # Don't save state on every stat update for performance
        # It will be saved on the next status/progress update or explicit save call

    def get_state(self) -> Dict[str, Any]:
        """Return a copy of the current state."""
        with self.lock:
            # Ensure defaultdict is converted to dict for the returned copy
            state_copy = self.state.copy()
            state_copy["stats"] = dict(self.state["stats"])
            return state_copy

    def close(self):
        """Perform cleanup actions for the supervisor."""
        logger.info("Closing analysis supervisor...")
        # Final state save before removing PID
        self.update_status("finished", "Analysis complete")
        self._remove_pid_file()
        logger.info("Analysis supervisor closed.")

# --- Database Manager ---

class DatabaseManager:
    """Manages interactions with the SQLite database."""
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = None
        self._connect()
        self._setup_database()

    def _connect(self):
        """Establish connection to the SQLite database."""
        try:
            # Timeout increased for potentially long operations
            # Check for thread safety: SQLite default build is threadsafe (mode 3)
            # Ensure check_same_thread=False is NOT used unless external locking is implemented
            self.conn = sqlite3.connect(self.db_path, timeout=30.0, check_same_thread=True)
            self.conn.row_factory = sqlite3.Row # Access columns by name
            # Enable Write-Ahead Logging for better concurrency
            self.conn.execute("PRAGMA journal_mode=WAL;")
            # Set busy timeout to handle concurrent writes gracefully
            self.conn.execute("PRAGMA busy_timeout = 5000;") # 5 seconds
            # Enforce foreign key constraints
            self.conn.execute("PRAGMA foreign_keys=ON;")
            logger.info(f"Connected to database: {self.db_path}")
        except sqlite3.Error as e:
            logger.critical(f"Database connection failed: {e}")
            raise # Critical error, cannot proceed

    def _setup_database(self):
        """Create database tables if they don't exist."""
        if not self.conn:
            logger.error("Database not connected, cannot set up tables.")
            return

        setup_statements = [
            # Files table (Consolidated from file_analyzer)
            '''
            CREATE TABLE IF NOT EXISTS files (
                id INTEGER PRIMARY KEY,
                path TEXT NOT NULL,
                filename TEXT NOT NULL,
                extension TEXT,
                filesize INTEGER DEFAULT 0,
                mimetype TEXT DEFAULT 'application/octet-stream',
                md5hash TEXT,
                fuzzyhash TEXT,
                creation_date TEXT,      -- Store as ISO 8601 string
                modification_date TEXT,  -- Store as ISO 8601 string
                category TEXT DEFAULT 'Other',
                value_score REAL DEFAULT 0.0,
                is_duplicate INTEGER DEFAULT 0,
                original_id INTEGER,     -- References files(id)
                last_analyzed TEXT,      -- Store as ISO 8601 string
                error TEXT,
                UNIQUE(path, filename),
                FOREIGN KEY (original_id) REFERENCES files (id) ON DELETE SET NULL
            )
            ''',
            # Patterns table (Consolidated from file_analyzer)
            '''
            CREATE TABLE IF NOT EXISTS patterns (
                id INTEGER PRIMARY KEY,
                file_id INTEGER NOT NULL,
                pattern_type TEXT NOT NULL,
                pattern_value TEXT NOT NULL,
                confidence REAL DEFAULT 1.0,
                FOREIGN KEY (file_id) REFERENCES files (id) ON DELETE CASCADE
            )
            ''',
            # Index for faster pattern lookups
            'CREATE INDEX IF NOT EXISTS idx_patterns_file_id ON patterns (file_id)',
            'CREATE INDEX IF NOT EXISTS idx_patterns_type_value ON patterns (pattern_type, pattern_value)',
            # Index for faster file lookups by hash or path
            'CREATE INDEX IF NOT EXISTS idx_files_md5hash ON files (md5hash)',
            'CREATE INDEX IF NOT EXISTS idx_files_path_filename ON files (path, filename)',
            'CREATE INDEX IF NOT EXISTS idx_files_category ON files (category)',
            'CREATE INDEX IF NOT EXISTS idx_files_modification_date ON files (modification_date)',
            # Checkpoints table (Consolidated from file_analyzer) - Consider if needed with JSON checkpoint
            # Removing Checkpoints table as JSON checkpoint is used for file analysis resume
            # '''
            # CREATE TABLE IF NOT EXISTS checkpoints (
            #     id INTEGER PRIMARY KEY,
            #     checkpoint_time TEXT NOT NULL,
            #     processed_files INTEGER NOT NULL,
            #     total_files INTEGER NOT NULL,
            #     sample_rate REAL,
            #     status TEXT,
            #     details TEXT -- Store JSON details like last processed path etc.
            # )
            # '''
        ]

        try:
            cursor = self.conn.cursor()
            for statement in setup_statements:
                cursor.execute(statement)
            self.conn.commit()
            logger.info("Database schema setup/verified.")
        except sqlite3.Error as e:
            logger.critical(f"Database setup failed: {e}")
            if self.conn:
                self.conn.rollback() # Rollback any partial changes
            raise

    @contextmanager
    def get_cursor(self):
        """Provide a database cursor within a context manager for safe transactions."""
        if not self.conn:
            logger.warning("Database connection lost. Attempting to reconnect...")
            self._connect() # Try to reconnect
            if not self.conn:
                 logger.error("Database reconnection failed.")
                 raise sqlite3.Error("Database connection is not available.")

        cursor = None
        try:
            # Start transaction implicitly
            cursor = self.conn.cursor()
            yield cursor
            self.conn.commit() # Commit successful transactions
        except sqlite3.Error as e:
            logger.error(f"Database transaction failed: {e}")
            if self.conn:
                try:
                    self.conn.rollback() # Rollback on error
                    logger.info("Database transaction rolled back.")
                except sqlite3.Error as rb_err:
                    logger.error(f"Database rollback failed: {rb_err}")
            raise # Re-raise the original exception
        # Cursor is automatically managed by the connection

    def execute_query(self, query: str, params: tuple = ()) -> List[sqlite3.Row]:
        """Execute a SELECT query with retry logic for transient errors."""
        last_error = None
        for attempt in range(MAX_RETRIES + 1): # Try MAX_RETRIES times + 1 initial try
            if stop_event.is_set():
                 logger.warning("Stop event set, aborting DB query.")
                 raise InterruptedError("Database query aborted due to stop signal.")
            try:
                # No need for explicit transaction management here, SELECTs are usually safe
                cursor = self.conn.cursor()
                cursor.execute(query, params)
                return cursor.fetchall()
            except (sqlite3.OperationalError, sqlite3.DatabaseError) as e:
                last_error = e
                # Check for specific transient errors like "database is locked" or "busy"
                err_msg = str(e).lower()
                if "database is locked" in err_msg or "busy" in err_msg:
                    if attempt < MAX_RETRIES:
                        delay = RETRY_DELAY * (attempt + 1) * (random.random() * 0.5 + 0.75) # Add jitter
                        logger.warning(f"DB Query lock/busy (Attempt {attempt+1}/{MAX_RETRIES}): {e}. Retrying in {delay:.2f}s...")
                        time.sleep(delay)
                        continue # Retry the operation
                    else:
                        logger.error(f"DB Query failed due to lock/busy after {MAX_RETRIES} retries.")
                        break # Max retries exceeded
                else:
                    # For other operational/database errors, log and break (don't retry usually)
                    logger.error(f"DB Query failed with non-transient error: {e}")
                    break
            except sqlite3.Error as e: # Catch other SQLite errors
                last_error = e
                logger.error(f"DB Query failed with SQLite error: {e}")
                break # Don't retry other SQLite errors
            except Exception as e: # Catch unexpected errors
                last_error = e
                logger.error(f"Unexpected error during DB query: {e}", exc_info=True)
                break

        logger.error(f"DB Query failed permanently. Last error: {last_error}. Query: {query[:200]}...")
        raise last_error if last_error else sqlite3.Error("DB Query failed for unknown reasons.")


    def execute_write(self, query: str, params: tuple = ()) -> Optional[int]:
        """Execute an INSERT, UPDATE, or DELETE query within a transaction with retry logic."""
        last_error = None
        for attempt in range(MAX_RETRIES + 1):
            if stop_event.is_set():
                 logger.warning("Stop event set, aborting DB write.")
                 raise InterruptedError("Database write aborted due to stop signal.")
            try:
                with self.get_cursor() as cursor:
                    cursor.execute(query, params)
                    return cursor.lastrowid # Return ID for INSERTs or None
            except (sqlite3.OperationalError, sqlite3.DatabaseError) as e:
                last_error = e
                err_msg = str(e).lower()
                if "database is locked" in err_msg or "busy" in err_msg:
                    if attempt < MAX_RETRIES:
                        delay = RETRY_DELAY * (attempt + 1) * (random.random() * 0.5 + 0.75)
                        logger.warning(f"DB Write lock/busy (Attempt {attempt+1}/{MAX_RETRIES}): {e}. Retrying in {delay:.2f}s...")
                        time.sleep(delay)
                        continue
                    else:
                        logger.error(f"DB Write failed due to lock/busy after {MAX_RETRIES} retries.")
                        break
                else:
                    logger.error(f"DB Write failed with non-transient error: {e}")
                    break
            except sqlite3.Error as e:
                last_error = e
                logger.error(f"DB Write failed with SQLite error: {e}")
                # Specific handling for constraint violations if needed
                if "unique constraint" in str(e).lower():
                    logger.warning(f"Unique constraint violation during write: {e}. Query: {query[:100]}...")
                    # Decide whether to raise or handle (e.g., if using INSERT OR IGNORE)
                break # Usually don't retry integrity errors etc.
            except Exception as e:
                last_error = e
                logger.error(f"Unexpected error during DB write: {e}", exc_info=True)
                break

        logger.error(f"DB Write failed permanently. Last error: {last_error}. Query: {query[:200]}...")
        raise last_error if last_error else sqlite3.Error("DB Write failed for unknown reasons.")


    def execute_many(self, query: str, params_list: List[tuple]) -> int:
        """Execute a query with multiple parameter sets (executemany) within a transaction with retry."""
        if not params_list:
            return 0
        last_error = None
        affected_rows = 0
        for attempt in range(MAX_RETRIES + 1):
            if stop_event.is_set():
                 logger.warning("Stop event set, aborting DB executemany.")
                 raise InterruptedError("Database executemany aborted due to stop signal.")
            try:
                with self.get_cursor() as cursor:
                    cursor.executemany(query, params_list)
                    affected_rows = cursor.rowcount # Return number of affected rows
                    return affected_rows # Success
            except (sqlite3.OperationalError, sqlite3.DatabaseError) as e:
                last_error = e
                err_msg = str(e).lower()
                if "database is locked" in err_msg or "busy" in err_msg:
                     if attempt < MAX_RETRIES:
                         delay = RETRY_DELAY * (attempt + 1) * (random.random() * 0.5 + 0.75)
                         logger.warning(f"DB ExecuteMany lock/busy (Attempt {attempt+1}/{MAX_RETRIES}): {e}. Retrying in {delay:.2f}s...")
                         time.sleep(delay)
                         continue
                     else:
                         logger.error(f"DB ExecuteMany failed due to lock/busy after {MAX_RETRIES} retries.")
                         break
                else:
                    logger.error(f"DB ExecuteMany failed with non-transient error: {e}")
                    break
            except sqlite3.Error as e:
                last_error = e
                logger.error(f"DB ExecuteMany failed with SQLite error: {e}")
                break
            except Exception as e:
                last_error = e
                logger.error(f"Unexpected error during DB executemany: {e}", exc_info=True)
                break

        logger.error(f"DB ExecuteMany failed permanently. Last error: {last_error}. Query: {query[:200]}...")
        # Return 0 or raise error? Raising seems more appropriate.
        raise last_error if last_error else sqlite3.Error("DB ExecuteMany failed for unknown reasons.")


    def close(self):
        """Close the database connection."""
        if self.conn:
            try:
                # Optimize database before closing - may take time on large DBs
                logger.info("Optimizing database before closing (PRAGMA optimize)...")
                self.conn.execute("PRAGMA optimize;")
                # Ensure WAL checkpoint is written
                logger.info("Running WAL checkpoint...")
                self.conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
                self.conn.commit() # Commit any final changes
                self.conn.close()
                logger.info("Database connection closed.")
                self.conn = None
            except sqlite3.Error as e:
                logger.error(f"Error during database closing procedures: {e}")
            except Exception as e:
                 logger.error(f"Unexpected error closing database: {e}")


# --- File Analyzer ---

class FileAnalyzer:
    """Handles the analysis of individual files."""
    def __init__(self, config: Config, db_manager: DatabaseManager, supervisor: AnalysisSupervisor):
        self.config = config
        self.db_manager = db_manager
        self.supervisor = supervisor
        self.processed_files_in_run = set() # Track files processed in this specific run/resume

    def analyze_file(self, file_path: str) -> Optional[FileInfo]:
        """Analyze a single file and return its information. Designed to run in a separate process."""
        logger.debug(f"[Worker {os.getpid()}] Analyzing file: {file_path}")
        file_info = FileInfo(path=os.path.dirname(file_path), filename=os.path.basename(file_path))

        try:
            # Basic file stats
            stat_info = os.stat(file_path)
            file_info.filesize = stat_info.st_size
            # Use ISO format for consistency
            file_info.modification_date = datetime.fromtimestamp(stat_info.st_mtime).isoformat()
            # Platform-specific creation time handling
            try:
                file_info.creation_date = datetime.fromtimestamp(stat_info.st_birthtime).isoformat() # macOS, some BSD
            except AttributeError:
                file_info.creation_date = datetime.fromtimestamp(stat_info.st_ctime).isoformat() # Windows, Linux (ctime might be change time)

            file_info.extension = os.path.splitext(file_info.filename)[1].lower()

            # MIME Type (re-initialize magic if needed per process/thread)
            file_info.mimetype = get_file_mime_type(file_path) # Utility function handles magic availability

            # Hashing
            file_info.md5hash = self._calculate_md5(file_path)
            # Fuzzy hash only for smaller files and if library available
            # Check SSDEEP_AVAILABLE within the worker process context
            if SSDEEP_AVAILABLE and 0 < file_info.filesize < 100 * 1024 * 1024: # 100 MB limit, skip 0-byte files
                file_info.fuzzyhash = self._calculate_fuzzy_hash(file_path)

            # Categorization
            file_info.category = self._get_file_category(file_path, file_info.mimetype, file_info.filename)

            # Filename Pattern Analysis
            file_info.patterns = self._analyze_filename_patterns(file_info.filename)

            # Value Score (placeholder for future logic)
            file_info.value_score = self._calculate_value_score(file_info)

            file_info.last_analyzed = datetime.now().isoformat()

            logger.debug(f"[Worker {os.getpid()}] Analysis complete for: {file_path}")
            return file_info

        except FileNotFoundError:
            logger.error(f"[Worker {os.getpid()}] File not found during analysis: {file_path}")
            file_info.error = "File not found"
            # Return partial info with error, but don't log to supervisor from worker
            return file_info
        except OSError as e:
            logger.error(f"[Worker {os.getpid()}] OS error analyzing file {file_path}: {e}")
            file_info.error = f"OS Error: {e}"
            return file_info
        except Exception as e:
            logger.error(f"[Worker {os.getpid()}] Unexpected error analyzing file {file_path}: {e}", exc_info=True)
            file_info.error = f"Unexpected Error: {e}"
            return file_info

    def _calculate_md5(self, file_path: str) -> str:
        """Calculate MD5 hash of a file."""
        try:
            hash_md5 = hashlib.md5()
            with open(file_path, "rb") as f:
                # Read in larger chunks for potentially better performance
                for chunk in iter(lambda: f.read(65536), b""): # 64KB chunks
                    hash_md5.update(chunk)
            return hash_md5.hexdigest()
        except OSError as e:
            logger.error(f"[Worker {os.getpid()}] Error calculating MD5 for {file_path}: {e}")
            return ""
        except Exception as e:
             logger.error(f"[Worker {os.getpid()}] Unexpected error calculating MD5 for {file_path}: {e}")
             return ""


    def _calculate_fuzzy_hash(self, file_path: str) -> str:
        """Calculate fuzzy hash using ssdeep."""
        # No need to check SSDEEP_AVAILABLE here, already checked in analyze_file
        try:
            # Ensure the file is readable before passing to ssdeep
            # ssdeep.hash_from_file handles file reading internally
            return ssdeep.hash_from_file(file_path)
        except ssdeep.Error as e:
            # Log less critical errors as warnings
            logger.warning(f"[Worker {os.getpid()}] ssdeep error calculating fuzzy hash for {file_path}: {e}")
            return ""
        except OSError as e:
            logger.error(f"[Worker {os.getpid()}] OS error during fuzzy hash calculation for {file_path}: {e}")
            return ""
        except Exception as e: # Catch potential ssdeep internal errors
             logger.error(f"[Worker {os.getpid()}] Unexpected error calculating fuzzy hash for {file_path}: {e}")
             return ""

    def _get_file_category(self, file_path: str, mime_type: str, filename: str) -> str:
        """Determine file category based on MIME type, extension, and filename patterns."""
        mime_major = mime_type.split('/')[0] if '/' in mime_type else mime_type
        mime_minor = mime_type.split('/')[1] if '/' in mime_type else ""
        extension = os.path.splitext(filename)[1].lower()

        # Prioritize specific MIME types and extensions
        if mime_major == "image": return "Images"
        if mime_major == "video": return "Videos"
        if mime_major == "audio": return "Audio"

        if "pdf" in mime_minor: return "Documents"
        if "msword" in mime_minor or "wordprocessingml" in mime_minor or extension in ['.doc', '.docx']: return "Documents"
        if "ms-excel" in mime_minor or "spreadsheetml" in mime_minor or extension in ['.xls', '.xlsx']: return "Spreadsheets"
        if "ms-powerpoint" in mime_minor or "presentationml" in mime_minor or extension in ['.ppt', '.pptx']: return "Presentations"
        if "rtf" in mime_minor or extension == '.rtf': return "Documents"
        if "opendocument.text" in mime_minor or extension == '.odt': return "Documents"
        if "opendocument.spreadsheet" in mime_minor or extension == '.ods': return "Spreadsheets"
        if "opendocument.presentation" in mime_minor or extension == '.odp': return "Presentations"
        if mime_major == "text" and extension in ['.txt', '.md', '.log', '.csv', '.tsv']: return "Text" # More specific text category

        # Message file detection (more robust)
        # Check common message/contact extensions first
        if extension in ['.vcf', '.vcard']: return "Contacts"
        if extension in ['.ics', '.ical']: return "Calendar"
        # Check filename patterns and MIME types
        if any(p in filename.lower() for p in ["sms", "mms", "message", "chat", "contact", "backup.contacts"]) or \
           (filename.startswith('+') and extension == '.txt') or \
           "message" in mime_minor or "chat" in mime_minor:
             # Avoid categorizing generic XML/JSON as messages unless filename suggests it
             if not (mime_minor in ['xml', 'json'] and not any(p in filename.lower() for p in ["sms", "mms", "message", "chat"])):
                 return "Messages"

        if "zip" in mime_minor or "rar" in mime_minor or "tar" in mime_minor or \
           "gzip" in mime_minor or "bzip2" in mime_minor or "7z" in mime_minor or \
           extension in ['.zip', '.rar', '.tar', '.gz', '.bz2', '.7z']:
            return "Archives"

        if "x-msdownload" in mime_minor or "executable" in mime_minor or \
           extension in ['.exe', '.dll', '.app', '.bat', '.sh', '.msi', '.dmg', '.deb', '.rpm']:
            return "Applications"
        if "android.package-archive" in mime_minor or extension == '.apk': return "Applications" # APK
        if extension == '.ipa': return "Applications" # iOS App

        if "database" in mime_minor or "sqlite" in mime_minor or extension in ['.db', '.sqlite', '.sqlite3', '.sql']: return "Databases"
        if mime_major == "font" or extension in ['.ttf', '.otf', '.woff', '.woff2']: return "Fonts"

        # Fallback for generic types
        if mime_major == "text": return "Text" # Generic text files
        if mime_major == "application":
             if mime_minor in ['xml', 'json', 'javascript', 'html', 'css']: return "Code/Config" # Common web/config formats
             return "Application Data" # Other application-specific files

        return "Other" # Default category

    def _analyze_filename_patterns(self, filename: str) -> List[Dict[str, Any]]:
        """Analyze filename for common patterns like dates, phone numbers, etc."""
        patterns = []
        filename_lower = filename.lower()

        # Phone number pattern (basic check in filename)
        # Use the more robust extraction function (defined later globally)
        # Note: This runs in worker, so global logger needs care if used inside extract_phone_numbers
        # Pass None or a dummy logger if needed. Here, assuming extract_phone_numbers is safe.
        phone_numbers = extract_phone_numbers(filename, None) # Pass None logger
        for num in phone_numbers:
             # Check length to avoid matching random long numbers
             cleaned_num = re.sub(r'\D', '', num)
             if 6 < len(cleaned_num) < 16:
                 patterns.append({'pattern_type': 'phone_number_fname', 'pattern_value': num, 'confidence': 0.7}) # Lower confidence from filename only

        # Date patterns (YYYY-MM-DD,```python
                    # Convert file list back to set
                    c_dict['files'] = set(c_dict.get('files', []))
                    # Handle potential missing fields gracefully
                    contact = Contact(**{k: v for k, v in c_dict.items() if k in Contact.__annotations__})
                    contacts.append(contact)
                logger.info(f"Loaded {len(contacts)} contacts.")
                return contacts
            except Exception as e:
                logger.error(f"Error reconstructing contacts from JSON: {e}", exc_info=True)
                return None
        else:
            logger.warning("Failed to load or parse contacts JSON file.")
            return None

    def _load_conversations_from_file(self) -> Optional[Dict[str, Conversation]]:
        """Load conversations from the JSON file."""
        if not os.path.exists(self.config.conversation_data_path):
            logger.info("Conversations file not found, initializing empty dict.")
            return None
        logger.info(f"Loading existing conversations from: {self.config.conversation_data_path}")
        data = load_json_safe(self.config.conversation_data_path)
        if data and isinstance(data, dict):
            try:
                # Reconstruct Conversation objects
                conversations = {}
                for conv_id, conv_dict in data.items():
                     # Handle potential missing fields gracefully
                    conv = Conversation(**{k: v for k, v in conv_dict.items() if k in Conversation.__annotations__})
                    conversations[conv_id] = conv
                logger.info(f"Loaded {len(conversations)} conversations.")
                return conversations
            except Exception as e:
                logger.error(f"Error reconstructing conversations from JSON: {e}", exc_info=True)
                return None
        else:
            logger.warning("Failed to load or parse conversations JSON file.")
            return None

    def run_message_analysis(self):
        """Orchestrate the message analysis process."""
        self.supervisor.update_phase("message_analysis")
        logger.info("Starting message analysis phase...")

        # Get message files from database
        self.supervisor.update_status("loading_messages", "Querying message files from DB")
        message_files_data = self._get_message_files_from_db()

        if not message_files_data:
            logger.warning("No message files found in database (category='Messages'). Skipping message analysis.")
            self.supervisor.update_status("skipped", "No message files found")
            return # Nothing to analyze

        num_message_files = len(message_files_data)
        self.supervisor.set_message_totals(num_message_files) # Total messages to process in this phase
        logger.info(f"Found {num_message_files} message files to analyze.")
        self.supervisor.update_status("analyzing_messages", f"Processing {num_message_files} messages")

        # --- Stage 1: Extract/Update Contacts and Conversations ---
        # Process files, update contact map and conversation map incrementally
        logger.info("Processing message files to update contacts and conversations...")
        contacts_map = {c.phone_number: c for c in self.contacts} # Use loaded contacts
        conversations_map = self.conversations # Use loaded conversations

        # Use ThreadPoolExecutor for I/O bound task of reading files + basic parsing
        # Limit workers as parsing is also somewhat CPU intensive
        num_workers = min(max(1, (self.config.parallel_jobs // 2)), 8) # Heuristic: fewer workers than file analysis
        logger.info(f"Using {num_workers} workers for message content processing.")

        processed_message_count = 0
        all_message_metadata = {} # Store metadata to avoid reading files multiple times

        try:
            with ThreadPoolExecutor(max_workers=num_workers) as executor:
                # Submit tasks to read and parse metadata
                future_to_path = {executor.submit(self._read_message_content, msg_data['full_path']): msg_data['full_path'] for msg_data in message_files_data}

                for future in as_completed(future_to_path):
                    if stop_event.is_set(): break
                    file_path = future_to_path[future]
                    try:
                        content, metadata = future.result()
                        if metadata and not metadata.error:
                            all_message_metadata[file_path] = metadata
                        elif metadata and metadata.error:
                            self.supervisor.log_warning(f"Error reading message file {os.path.basename(file_path)}", metadata.error)
                            self.supervisor.update_stat("message_read_errors", 1)
                        else:
                             self.supervisor.log_warning(f"No metadata returned for {os.path.basename(file_path)}")
                             self.supervisor.update_stat("message_read_errors", 1)

                    except Exception as exc:
                        logger.error(f'Message file {file_path} generated an exception: {exc}')
                        self.supervisor.log_error(f"Error processing message future for {os.path.basename(file_path)}", str(exc))
                        self.supervisor.update_stat("message_processing_errors", 1)

                    processed_message_count += 1
                    if processed_message_count % 200 == 0:
                         self.supervisor.update_message_progress(200)
                         logger.info(f"Read metadata for {processed_message_count}/{num_message_files} message files...")

            if stop_event.is_set():
                 logger.warning("Message metadata reading interrupted.")
                 return # Abort phase if stopped

            # Update supervisor progress for remaining files read
            self.supervisor.update_message_progress(num_message_files - processed_message_count)
            logger.info(f"Finished reading metadata for {len(all_message_metadata)} files.")
            self.supervisor.update_status("analyzing_messages", "Updating contact and conversation data")


            # --- Stage 1b: Update data structures based on metadata ---
            file_contents_by_contact = defaultdict(list) # For topic analysis later
            file_contents_by_conv = defaultdict(list)

            update_count = 0
            for msg_data in message_files_data: # Iterate through original list to maintain order if needed
                if stop_event.is_set(): break

                file_path = msg_data['full_path']
                metadata = all_message_metadata.get(file_path)

                if not metadata or not metadata.phone_numbers:
                    continue # Skip if error reading or no participants found

                file_id = msg_data['id']
                file_date = metadata.modification_time or msg_data['modification_date'] # Prefer metadata time

                # Update Contacts
                for number in metadata.phone_numbers:
                    if number not in contacts_map:
                        normalized_num = number # Placeholder
                        if PHONENUMBERS_AVAILABLE:
                            try:
                                # Try parsing with default region (e.g., 'US'), then without region
                                parsed_num = None
                                possible_regions = ['US', None] # Add more default regions if relevant
                                for region in possible_regions:
                                     try:
                                         parsed_num = parse_phone(number, region)
                                         if parsed_num and is_valid_number(parsed_num):
                                             normalized_num = format_number(parsed_num, PhoneNumberFormat.E164)
                                             break # Found valid E.164 format
                                         except NumberParseException:
                                             continue # Try next region or fallback
                                         except Exception as parse_exc: # Catch other potential phonenumbers errors
                                             logger.warning(f"Phonenumbers internal error for '{number}': {parse_exc}")
                                             continue # Try fallback

                                if parsed_num and not is_valid_number(parsed_num):
                                     # If parsed but not valid, maybe still add the cleaned version?
                                     # For now, we prioritize valid numbers. Add cleaned if no valid found.
                                     if cleaned_num not in numbers: # Avoid adding if E.164 was already added
                                         pass # Don't add invalid numbers as contacts unless absolutely necessary
                                elif not parsed_num: # If parsing failed completely
                                     # If parsing failed, maybe add the cleaned number as a fallback?
                                     # contacts_map[number] = Contact(phone_number=number, normalized_number=cleaned_num) # Consider adding if needed
                                     pass # Skip if parsing failed and not a valid number

                            except Exception as e:
                                 logger.error(f"Phonenumbers unexpected error for '{number}': {e}")
                                 # contacts_map[number] = Contact(phone_number=number, normalized_number=cleaned_num) # Fallback on error
                                 pass # Skip on unexpected error
                        # Add contact only if we have a potentially valid number
                        if PHONENUMBERS_AVAILABLE and parsed_num and is_valid_number(parsed_num):
                             contacts_map[number] = Contact(phone_number=number, normalized_number=normalized_num)
                        elif not PHONENUMBERS_AVAILABLE and 6 < len(re.sub(r'\D', '', number)) < 16:
                             contacts_map[number] = Contact(phone_number=number, normalized_number=re.sub(r'\D', '', number))
                        else:
                             continue # Skip if not a valid number and phonenumbers is available

                    contact = contacts_map.get(number)
                    if contact is None: continue # Should not happen if logic above is correct

                    # Avoid double counting if file_id already processed for this contact
                    if file_id not in contact.files:
                        contact.message_count += 1
                        contact.files.add(file_id)

                        # Update sentiment (running average)
                        if metadata.sentiment_score != 0:
                             # More stable running average calculation
                             contact.sentiment_score = ((contact.sentiment_score * (contact.message_count - 1)) + metadata.sentiment_score) / contact.message_count

                        # Update first/last seen
                        if contact.first_seen is None or (file_date and file_date < contact.first_seen):
                            contact.first_seen = file_date
                        if contact.last_seen is None or (file_date and file_date > contact.last_seen):
                            contact.last_seen = file_date

                        # Add location hints (unique)
                        contact.location_hints.extend(l for l in metadata.entities.get('locations', []) if l not in contact.location_hints)

                        # Store content for topic analysis (only if contact needs update)
                        if metadata.content:
                            file_contents_by_contact[number].append(metadata.content)

                    # Track common contacts (always update this)
                    for other_number in metadata.phone_numbers:
                        if other_number != number:
                            contact.common_contacts[other_number] = contact.common_contacts.get(other_number, 0) + 1


                # Update Conversations
                participants = sorted(metadata.phone_numbers)
                if not participants: continue
                conversation_key = ",".join(participants)
                is_group = len(participants) > 2 # Simple group chat definition

                if conversation_key not in conversations_map:
                    conversations_map[conversation_key] = Conversation(
                        participants=participants,
                        is_group_chat=is_group
                    )

                conversation = conversations_map[conversation_key]
                # Check if file already exists in conversation to avoid duplicates
                if not any(f['path'] == file_path for f in conversation.files):
                    conversation.message_count += 1
                    # Store file info with sentiment
                    conversation.files.append({
                        'path': file_path,
                        'date': file_date,
                        'sentiment': metadata.sentiment_score
                    })

                    # Update sentiment running average
                    if metadata.sentiment_score != 0:
                         conversation.sentiment_score = ((conversation.sentiment_score * (conversation.message_count - 1)) + metadata.sentiment_score) / conversation.message_count

                    # Update first/last message dates
                    if conversation.first_message is None or (file_date and file_date < conversation.first_message):
                        conversation.first_message = file_date
                    if conversation.last_message is None or (file_date and file_date > conversation.last_message):
                        conversation.last_message = file_date

                    # Store content for topic analysis
                    if metadata.content:
                         file_contents_by_conv[conversation_key].append(metadata.content)

                update_count += 1
                if update_count % 1000 == 0:
                     logger.info(f"Updated data structures for {update_count} messages...")


            if stop_event.is_set():
                 logger.warning("Message data structure update interrupted.")
                 return

            # --- Stage 1c: Topic Modeling ---
            logger.info("Extracting topics for contacts and conversations...")
            self.supervisor.update_status("analyzing_messages", "Extracting topics")

            # Extract topics per contact (only for contacts with new content)
            topic_processed_count = 0
            for number, texts in file_contents_by_contact.items():
                 if stop_event.is_set(): break
                 if number in contacts_map and len(texts) >= 2: # Need at least 2 docs for TF-IDF min_df
                     # Combine new texts with potentially existing ones if needed, or just analyze new ones
                     # For simplicity, let's just analyze the content associated with this run's files
                     topics = self._extract_topics(texts, num_topics=3)
                     # Append topics, avoiding duplicates
                     existing_topics = set(contacts_map[number].topics)
                     contacts_map[number].topics = sorted(list(existing_topics.union(set(topics))))

                 topic_processed_count += 1
                 if topic_processed_count % 200 == 0:
                      logger.info(f"Processed topics for {topic_processed_count} contacts...")

            # Extract topics per conversation (only for conversations with new content)
            topic_processed_count = 0
            for conv_key, texts in file_contents_by_conv.items():
                 if stop_event.is_set(): break
                 if conv_key in conversations_map and len(texts) >= 2:
                     topics = self._extract_topics(texts, num_topics=3)
                     existing_topics = set(conversations_map[conv_key].topics)
                     conversations_map[conv_key].topics = sorted(list(existing_topics.union(set(topics))))
                     # Sort files within conversation by date after processing
                     conversations_map[conv_key].files.sort(key=lambda x: (parse_iso_datetime(x.get('date')) or datetime.min))


                 topic_processed_count += 1
                 if topic_processed_count % 200 == 0:
                      logger.info(f"Processed topics for {topic_processed_count} conversations...")


            if stop_event.is_set():
                 logger.warning("Topic modeling interrupted.")
                 return


            # Update internal state
            self.contacts = sorted(contacts_map.values(), key=lambda c: c.message_count, reverse=True)
            self.conversations = conversations_map

            logger.info(f"Total contacts: {len(self.contacts)}, Total conversations: {len(self.conversations)}")
            self.supervisor.update_stat("contacts_total", len(self.contacts))
            self.supervisor.update_stat("conversations_total", len(self.conversations))


            # Filter conversations based on minimum file count
            initial_conv_count = len(self.conversations)
            self.conversations = {
                conv_id: data
                for conv_id, data in self.conversations.items()
                if data.message_count >= self.config.min_conversation_files
            }
            filtered_conv_count = len(self.conversations)
            if initial_conv_count != filtered_conv_count:
                 logger.info(f"Filtered conversations: {initial_conv_count} -> {filtered_conv_count} (min {self.config.min_conversation_files} files)")
                 self.supervisor.update_stat("conversations_filtered_out", initial_conv_count - filtered_conv_count)


            # --- Stage 2: Save Results ---
            self.supervisor.update_status("saving_results", "Saving contacts and conversations")
            self._save_contacts()
            self._save_conversations()
            self._save_conversations_by_contact() # Save per-contact files

            # --- Stage 3: Build and Visualize Contact Graph ---
            if NETWORKX_AVAILABLE and MATPLOTLIB_AVAILABLE:
                self.supervisor.update_status("building_graph", "Building contact network")
                # Pass contacts_map for efficient graph building
                contact_graph = self._build_contact_graph(contacts_map)
                if contact_graph and contact_graph.number_of_nodes() > 1:
                    self.supervisor.update_status("visualizing_graph", "Generating contact graph image")
                    graph_path = os.path.join(self.config.visualization_dir, "Graphs", "contact_network.png")
                    self._visualize_contact_graph(contact_graph, graph_path, top_n=self.config.graph_top_n)

                    # Optional: Sentiment-based graph
                    if self.sentiment_analyzer and any(abs(c.sentiment_score) > 0.05 for c in self.contacts):
                         sentiment_graph_path = os.path.join(self.config.visualization_dir, "Graphs", "contact_sentiment_network.png")
                         self._visualize_sentiment_graph(contact_graph, sentiment_graph_path, top_n=self.config.graph_top_n)
                else:
                     logger.warning("Contact graph has too few nodes (<2) or failed to build, skipping visualization.")
            else:
                logger.warning("NetworkX or Matplotlib not available, skipping graph analysis and visualization.")

            # --- Stage 4: Generate Statistics ---
            # This might be better integrated into reporting or visualization stages
            # self.supervisor.update_status("generating_stats", "Generating message statistics")
            # self._generate_contact_statistics() # Keep simple stats for now

            logger.info("Message analysis phase completed.")
            self.supervisor.update_status("completed", "Message analysis finished")

        except InterruptedError: # Catch interruption from signal handler
             logger.warning("Message analysis phase interrupted by stop signal.")
             self.supervisor.update_status("interrupted", "Message analysis stopped")
        except Exception as e:
            logger.critical(f"Critical error during message analysis: {e}", exc_info=True)
            self.supervisor.log_error("Critical error in message analysis", str(e))
            self.supervisor.update_status("failed", "Critical message analysis error")


    def _get_message_files_from_db(self) -> List[Dict[str, Any]]:
        """Get message file details from the database."""
        # Select files categorized as 'Messages' or 'Contacts' (like VCF) if needed
        # For now, focus on 'Messages' category as defined by FileAnalyzer
        query = """
            SELECT id, path, filename, creation_date, modification_date
            FROM files
            WHERE category = 'Messages' AND error IS NULL
            ORDER BY modification_date ASC -- Process in rough chronological order
        """
        try:
            message_files_rows = self.db_manager.execute_query(query)
            message_files = []
            checked_paths = set() # Avoid checking existence repeatedly for same path
            missing_files_count = 0

            for row in message_files_rows:
                full_path = os.path.join(row['path'], row['filename'])

                # Check existence efficiently
                dir_path = row['path']
                if dir_path not in checked_paths:
                     if not os.path.isdir(dir_path):
                         logger.warning(f"Directory not found for message files: {dir_path}. Skipping files within.")
                         # Add to checked paths to avoid re-checking
                         checked_paths.add(dir_path)
                         # We could potentially skip all files from this dir here
                         continue # Skip this file if dir doesn't exist
                     checked_paths.add(dir_path) # Mark dir as checked

                # Check file existence
                if os.path.isfile(full_path):
                    message_files.append({
                        'id': row['id'],
                        'path': row['path'],
                        'filename': row['filename'],
                        'full_path': full_path,
                        'creation_date': row['creation_date'],
                        'modification_date': row['modification_date'],
                        # Phone numbers will be extracted from content/metadata later
                    })
                else:
                    logger.warning(f"Message file listed in DB not found on disk: {full_path}")
                    missing_files_count += 1
                    # Optionally mark the file as having an error in the DB?
                    # self.db_manager.execute_write("UPDATE files SET error = ? WHERE id = ?", ("File not found on disk", row['id']))


            if missing_files_count > 0:
                 self.supervisor.log_warning(f"{missing_files_count} message files from DB were not found on disk.")
                 self.supervisor.update_stat("message_files_missing_on_disk", missing_files_count)

            return message_files
        except InterruptedError:
             raise # Propagate interruption
        except Exception as e:
            logger.error(f"Error retrieving message files from database: {e}", exc_info=True)
            self.supervisor.log_error("Failed to get message files from DB", str(e))
            return []

    def _read_message_content(self, file_path: str) -> Tuple[str, Optional[MessageMetadata]]:
        """Read message content and extract metadata (run in thread/process)."""
        metadata = MessageMetadata(filename=os.path.basename(file_path), file_path=file_path)
        content = ""
        try:
            # Determine encoding - UTF-8 is common, but fallbacks might be needed
            encodings_to_try = ['utf-8', 'latin-1', 'cp1252']
            for enc in encodings_to_try:
                try:
                    with open(file_path, 'r', encoding=enc) as f:
                        content = f.read()
                    logger.debug(f"Read {file_path} with encoding {enc}")
                    break # Stop if successful
                except UnicodeDecodeError:
                    logger.debug(f"Failed to decode {file_path} with {enc}")
                    continue # Try next encoding
                except Exception as read_err: # Catch other file reading errors
                     raise read_err # Re-raise other errors like permission denied
            else: # If loop completes without break
                 logger.warning(f"Could not decode file {file_path} with tried encodings. Reading as binary.")
                 # Fallback: read as binary and decode with error handling
                 try:
                     with open(file_path, 'rb') as f:
                          binary_content = f.read()
                     content = binary_content.decode('utf-8', errors='ignore') # Decode ignoring errors
                 except Exception as bin_read_err:
                     raise bin_read_err # Re-raise if binary read fails


            stats = os.stat(file_path)
            try: # Platform specific creation time
                metadata.creation_time = datetime.fromtimestamp(stats.st_birthtime).isoformat()
            except AttributeError:
                metadata.creation_time = datetime.fromtimestamp(stats.st_ctime).isoformat()
            metadata.modification_time = datetime.fromtimestamp(stats.st_mtime).isoformat()
            metadata.content = content # Store content in metadata

            # Extract entities from filename and content
            # Use the global extract_phone_numbers utility
            filename_phones = extract_phone_numbers(metadata.filename, None) # Use None logger in worker
            content_phones = extract_phone_numbers(content, None)
            all_phones = sorted(list(set(filename_phones + content_phones)))
            metadata.phone_numbers = all_phones

            # Extract other entities (simplified for now)
            metadata.timestamps = re.findall(r'\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}', content) # Basic ISO timestamp

            # Analyze sentiment
            if self.sentiment_analyzer and len(content) > 10: # Avoid analyzing very short strings
                 metadata.sentiment_score = self._analyze_sentiment(content)

            return content, metadata

        except FileNotFoundError:
            metadata.error = "File not found"
            # logger already handled in the calling function
        except OSError as e:
            metadata.error = f"OS Error: {e}"
        except Exception as e:
            metadata.error = f"Unexpected Error: {e}"
            logger.error(f"Unexpected error reading message file {file_path}: {e}", exc_info=True) # Log details here

        return content, metadata # Return metadata with error flag set

    def _analyze_sentiment(self, text: str) -> float:
        """Analyze sentiment using VADER."""
        # Assumes self.sentiment_analyzer is initialized if SENTIMENT_AVAILABLE
        if not self.sentiment_analyzer: return 0.0
        try:
            vs = self.sentiment_analyzer.polarity_scores(text)
            return vs['compound'] # Compound score: -1 (neg) to +1 (pos)
        except Exception as e:
            logger.error(f"Sentiment analysis failed: {e}")
            return 0.0

    def _extract_topics(self, texts: List[str], num_topics: int = 5) -> List[str]:
        """Extract topics using TF-IDF (simplified)."""
        if not SKLEARN_AVAILABLE or not texts or len(texts) < 2: # Need at least 2 documents for min_df
            return []
        # Filter out very short texts
        texts = [t for t in texts if len(t.split()) > 5]
        if len(texts) < 2: return []

        try:
            # Use TfidfVectorizer with common parameters
            vectorizer = TfidfVectorizer(
                max_df=0.85,         # Ignore terms appearing in > 85% of docs
                min_df=2,            # Ignore terms appearing in < 2 docs
                stop_words='english',
                max_features=1000,   # Limit vocabulary size
                ngram_range=(1, 2)   # Consider unigrams and bigrams
            )
            tfidf_matrix = vectorizer.fit_transform(texts)
            feature_names = np.array(vectorizer.get_feature_names_out())

            # Sum TF-IDF scores for each term across all documents in the batch
            total_scores = tfidf_matrix.sum(axis=0).A1 # .A1 converts sparse matrix row to 1D numpy array

            # Get indices of top N scores, ensuring we don't request more topics than features
            actual_num_topics = min(num_topics, len(feature_names))
            if actual_num_topics == 0: return []

            top_indices = total_scores.argsort()[-actual_num_topics:][::-1] # Get indices of top N scores

            return feature_names[top_indices].tolist()
        except ValueError as ve:
             # Handle specific sklearn errors, e.g., empty vocabulary
             if "empty vocabulary" in str(ve):
                  logger.warning(f"Topic extraction skipped: Empty vocabulary after filtering.")
             else:
                  logger.error(f"Topic extraction ValueError: {ve}", exc_info=True)
             return []
        except Exception as e:
            logger.error(f"Topic extraction failed: {e}", exc_info=True)
            return []

    def _save_contacts(self):
        """Save extracted contact data to JSON."""
        output_path = self.config.contact_data_path
        logger.info(f"Saving {len(self.contacts)} contacts to: {output_path}")
        try:
            contacts_dict_list = [c.to_dict() for c in self.contacts]
            atomic_write_json(contacts_dict_list, output_path)
            # Optionally save checksum
            # checksum = calculate_checksum(output_path)
            # if checksum: atomic_write_json({"checksum": checksum}, f"{output_path}.checksum")
        except Exception as e:
            logger.error(f"Failed to save contacts JSON: {e}", exc_info=True)
            self.supervisor.log_error("Failed to save contacts JSON", str(e))

    def _save_conversations(self):
        """Save filtered conversation data to JSON."""
        output_path = self.config.conversation_data_path
        logger.info(f"Saving {len(self.conversations)} filtered conversations to: {output_path}")
        try:
            # Convert Conversation objects to dictionaries for saving
            conversations_dict = {k: v.to_dict() for k, v in self.conversations.items()}
            atomic_write_json(conversations_dict, output_path)
            # Optionally save checksum
            # checksum = calculate_checksum(output_path)
            # if checksum: atomic_write_json({"checksum": checksum}, f"{output_path}.checksum")
        except Exception as e:
            logger.error(f"Failed to save conversations JSON: {e}", exc_info=True)
            self.supervisor.log_error("Failed to save conversations JSON", str(e))

    def _save_conversations_by_contact(self):
        """Save conversations associated with each contact to individual files."""
        output_dir = self.config.contact_conv_dir
        logger.info(f"Saving individual conversation files to: {output_dir}")
        saved_count = 0
        skipped_count = 0

        # Create a map of contact number -> list of conversation keys they participate in
        contact_to_conv_keys = defaultdict(list)
        for conv_key, conv_data in self.conversations.items():
            for participant in conv_data.participants:
                 contact_to_conv_keys[participant].append(conv_key)

        # Iterate through contacts and save their conversations
        for contact in self.contacts:
            if stop_event.is_set(): break

            contact_number = contact.phone_number
            conv_keys = contact_to_conv_keys.get(contact_number)

            if not conv_keys:
                skipped_count += 1
                continue # Skip contacts with no associated conversations (after filtering)

            # Prepare data for the contact's file
            contact_conversations = {}
            for key in conv_keys:
                if key in self.conversations:
                    # Include conversation data, maybe simplify file list?
                    conv_data = self.conversations[key].to_dict()
                    # Optional: Limit file list length in output JSON?
                    # conv_data['files'] = conv_data['files'][:50] # Example limit
                    contact_conversations[key] = conv_data

            if not contact_conversations:
                 skipped_count += 1
                 continue

            # Sanitize phone number for filename
            safe_filename = sanitize_filename(f"contact_{contact_number}.json")
            output_path = os.path.join(output_dir, safe_filename)

            try:
                atomic_write_json(contact_conversations, output_path)
                saved_count += 1
            except Exception as e:
                logger.error(f"Failed to save conversation file for contact {contact_number}: {e}")
                self.supervisor.log_error(f"Failed to save conv file for {contact_number}", str(e))
                skipped_count += 1

            if saved_count % 100 == 0 and saved_count > 0:
                 logger.info(f"Saved {saved_count} contact conversation files...")

        logger.info(f"Finished saving individual conversation files. Saved: {saved_count}, Skipped/Empty: {skipped_count}")
        self.supervisor.update_stat("contact_conversation_files_saved", saved_count)


    def _build_contact_graph(self, contacts_map: Dict[str, Contact]) -> Optional[nx.Graph]:
        """Build a NetworkX graph from contact interactions."""
        if not NETWORKX_AVAILABLE: return None
        logger.info("Building contact interaction graph...")
        G = nx.Graph()

        # Add nodes (contacts) with attributes
        for number, contact in contacts_map.items():
             if contact.message_count > 0: # Only add nodes with messages
                 G.add_node(
                     number, # Use phone number as node ID
                     message_count=contact.message_count,
                     sentiment=contact.sentiment_score,
                     first_seen=contact.first_seen,
                     last_seen=contact.last_seen,
                     normalized_number=contact.normalized_number
                 )

        # Add edges based on common contacts or conversations
        # Method 2: Edges based on participation in the same conversation (more direct link)
        for conv_key, conversation in self.conversations.items():
             participants = conversation.participants
             # Only add edges between participants present in the graph nodes
             valid_participants = [p for p in participants if p in G]
             if len(valid_participants) > 1:
                 # Add edges between all pairs in this conversation
                 for i in range(len(valid_participants)):
                     for j in range(i + 1, len(valid_participants)):
                         u, v = valid_participants[i], valid_participants[j]
                         # Weight edge by number of messages in this conversation? Or just +1 interaction?
                         weight = conversation.message_count # Weight by conversation size
                         if G.has_edge(u, v):
                             G[u][v]['weight'] += weight
                             G[u][v]['conversations'] = G[u][v].get('conversations', 0) + 1
                         else:
                             G.add_edge(u, v, weight=weight, conversations=1)


        logger.info(f"Built contact graph with {G.number_of_nodes()} nodes and {G.number_of_edges()} edges.")
        self.supervisor.update_stat("graph_nodes", G.number_of_nodes())
        self.supervisor.update_stat("graph_edges", G.number_of_edges())
        return G

    def _visualize_contact_graph(self, G: nx.Graph, output_path: str, top_n: int = 30):
        """Visualize the contact graph using Matplotlib."""
        if not NETWORKX_AVAILABLE or not MATPLOTLIB_AVAILABLE or G.number_of_nodes() == 0:
            logger.warning("Graph visualization skipped (libs unavailable or empty graph).")
            return

        logger.info(f"Generating contact graph visualization (top {top_n} nodes by degree) to: {output_path}")

        try:
            # --- Subgraph for Visualization ---
            # Select top N nodes based on degree (number of connections) or message_count? Let's use degree.
            if G.number_of_nodes() > top_n:
                top_nodes = sorted(G.degree(weight='weight'), key=lambda item: item[1], reverse=True)[:top_n]
                top_node_ids = [node_id for node_id, degree in top_nodes]
                subgraph = G.subgraph(top_node_ids)
                logger.info(f"Visualizing subgraph of {subgraph.number_of_nodes()} nodes.")
            else:
                subgraph = G
                logger.info(f"Visualizing full graph with {subgraph.number_of_nodes()} nodes.")

            if subgraph.number_of_nodes() == 0:
                 logger.warning("Subgraph for visualization is empty, skipping plot.")
                 return

            # --- Layout ---
            plt.figure(figsize=(18, 18)) # Increase figure size
            # Use a layout algorithm suitable for social networks
            # pos = nx.spring_layout(subgraph, k=0.5, iterations=50, seed=42) # Increase k for more spread
            pos = nx.kamada_kawai_layout(subgraph) # Often good for smaller graphs

            # --- Node Styling ---
            node_sizes = [subgraph.nodes[n].get('message_count', 10) * 5 + 50 for n in subgraph.nodes()] # Size by message count
            # Normalize node sizes to avoid extremes
            max_size = 5000
            min_size = 100
            node_sizes = [min(max(s, min_size), max_size) for s in node_sizes]

            # --- Edge Styling ---
            # Use edge weight for width or transparency? Let's use transparency (alpha).
            weights = [subgraph[u][v].get('weight', 1) for u, v in subgraph.edges()]
            max_weight = max(weights) if weights else 1
            edge_alphas = [min(0.8, max(0.1, w / max_weight * 0.8)) for w in weights] # Scale alpha
            edge_colors = 'grey'

            # --- Drawing ---
            nx.draw_networkx_nodes(subgraph, pos, node_size=node_sizes, node_color='skyblue', alpha=0.8)
            nx.draw_networkx_edges(subgraph, pos, width=1.0, alpha=edge_alphas, edge_color=edge_colors)

            # --- Labels ---
            # Label only the most central nodes or nodes with high message count?
            # Label top N/2 nodes by degree within the subgraph
            labels = {n: n for idx, (n, deg) in enumerate(sorted(subgraph.degree(weight='weight'), key=lambda item: item[1], reverse=True)) if idx < top_n // 2}
            nx.draw_networkx_labels(subgraph, pos, labels=labels, font_size=10)

            # --- Final Touches ---
            plt.title(f"Contact Network (Top {subgraph.number_of_nodes()} Nodes by Weighted Degree)", fontsize=16)
            plt.axis('off') # Hide axes
            plt.tight_layout()
            plt.savefig(output_path, dpi=150, bbox_inches='tight')
            plt.close() # Close the figure to free memory
            logger.info(f"Contact graph visualization saved to {output_path}")
            self.supervisor.update_stat("visualizations_created", 1)

        except Exception as e:
            logger.error(f"Failed to visualize contact graph: {e}", exc_info=True)
            self.supervisor.log_error("Contact graph visualization failed", str(e))
            self.supervisor.update_stat("visualization_errors", 1)


    def _visualize_sentiment_graph(self, G: nx.Graph, output_path: str, top_n: int = 30):
        """Visualize the contact graph with nodes colored by sentiment."""
        if not NETWORKX_AVAILABLE or not MATPLOTLIB_AVAILABLE or G.number_of_nodes() == 0 or not self.sentiment_analyzer:
            logger.warning("Sentiment graph visualization skipped (libs unavailable, empty graph, or sentiment disabled).")
            return

        logger.info(f"Generating contact sentiment graph visualization (top {top_n} nodes) to: {output_path}")

        try:
            # --- Subgraph for Visualization ---
            # Select top N nodes based on degree (same as main graph)
            if G.number_of_nodes() > top_n:
                top_nodes = sorted(G.degree(weight='weight'), key=lambda item: item[1], reverse=True)[:top_n]
                top_node_ids = [node_id for node_id, degree in top_nodes]
                subgraph = G.subgraph(top_node_ids)
                logger.info(f"Visualizing sentiment subgraph of {subgraph.number_of_nodes()} nodes.")
            else:
                subgraph = G
                logger.info(f"Visualizing full sentiment graph with {subgraph.number_of_nodes()} nodes.")

            if subgraph.number_of_nodes() == 0:
                 logger.warning("Subgraph for sentiment visualization is empty, skipping plot.")
                 return

            # --- Layout ---
            plt.figure(figsize=(18, 18)) # Increase figure size
            pos = nx.kamada_kawai_layout(subgraph)

            # --- Node Styling (Color by Sentiment) ---
            # Map sentiment scores to colors (e.g., using a colormap)
            sentiment_scores = np.array([subgraph.nodes[n].get('sentiment', 0.0) for n in subgraph.nodes()])

            # Normalize sentiment scores to range [0, 1] for colormap
            # VADER compound score is [-1, 1]. Map to [0, 1]
            normalized_sentiment = (sentiment_scores + 1) / 2

            # Use a diverging colormap (e.g., 'coolwarm', 'seismic')
            cmap = plt.cm.coolwarm # Blue for negative, Red for positive
            node_colors = cmap(normalized_sentiment)

            # Node sizes (same as main graph)
            node_sizes = [subgraph.nodes[n].get('message_count', 10) * 5 + 50 for n in subgraph.nodes()]
            max_size = 5000
            min_size = 100
            node_sizes = [min(max(s, min_size), max_size) for s in node_sizes]

            # --- Edge Styling (same as main graph) ---
            weights = [subgraph[u][v].get('weight', 1) for u, v in subgraph.edges()]
            max_weight = max(weights) if weights else 1
            edge_alphas = [min(0.8, max(0.1, w / max_weight * 0.8)) for w in weights]
            edge_colors = 'grey'


            # --- Drawing ---
            nodes = nx.draw_networkx_nodes(subgraph, pos, node_size=node_sizes, node_color=node_colors, alpha=0.8)
            nx.draw_networkx_edges(subgraph, pos, width=1.0, alpha=edge_alphas, edge_color=edge_colors)

            # --- Labels ---
            labels = {n: n for idx, (n, deg) in enumerate(sorted(subgraph.degree(weight='weight'), key=lambda item: item[1], reverse=True)) if idx < top_n // 2}
            nx.draw_networkx_labels(subgraph, pos, labels=labels, font_size=10)

            # --- Add Colorbar for Sentiment ---
            if len(np.unique(normalized_sentiment)) > 1: # Only add colorbar if there's variation
                sm = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin=-1, vmax=1))
                sm.set_array([]) # Dummy array for colorbar
                cbar = plt.colorbar(sm, label="Average Sentiment Score (Compound)", orientation="horizontal", shrink=0.5, pad=0.05)
                cbar.ax.tick_params(labelsize=10)


            # --- Final Touches ---
            plt.title(f"Contact Sentiment Network (Top {subgraph.number_of_nodes()} Nodes)", fontsize=16)
            plt.axis('off') # Hide axes
            plt.tight_layout()
            plt.savefig(output_path, dpi=150, bbox_inches='tight')
            plt.close() # Close the figure to free memory
            logger.info(f"Contact sentiment graph visualization saved to {output_path}")
            self.supervisor.update_stat("visualizations_created", 1)

        except Exception as e:
            logger.error(f"Failed to visualize contact sentiment graph: {e}", exc_info=True)
            self.supervisor.log_error("Contact sentiment graph visualization failed", str(e))
            self.supervisor.update_stat("visualization_errors", 1)


    # Placeholder for generating statistics (can be expanded or moved to ReportGenerator)
    def _generate_contact_statistics(self):
        """Generate basic contact statistics."""
        logger.info("Generating contact statistics...")
        # Example: Count contacts by message count ranges
        message_counts = [c.message_count for c in self.contacts]
        if message_counts:
            avg_messages = np.mean(message_counts)
            total_messages = sum(message_counts)
            logger.info(f"Average messages per contact: {avg_messages:.2f}")
            logger.info(f"Total messages processed: {total_messages}")
            self.supervisor.update_stat("avg_messages_per_contact", avg_messages)
            self.supervisor.update_stat("total_messages_processed", total_messages)
        else:
             logger.info("No contacts with messages found for statistics.")

        # Example: Count contacts by sentiment ranges
        if SENTIMENT_AVAILABLE:
            positive_count = sum(1 for c in self.contacts if c.sentiment_score > 0.1)
            negative_count = sum(1 for c in self.contacts if c.sentiment_score < -0.1)
            neutral_count = len(self.contacts) - positive_count - negative_count
            logger.info(f"Contacts by sentiment: Positive={positive_count}, Negative={negative_count}, Neutral={neutral_count}")
            self.supervisor.update_stat("contacts_positive_sentiment", positive_count)
            self.supervisor.update_stat("contacts_negative_sentiment", negative_count)
            self.supervisor.update_stat("contacts_neutral_sentiment", neutral_count)


    # Placeholder for generating statistics (can be expanded or moved to ReportGenerator)
    def _generate_conversation_statistics(self):
        """Generate basic conversation statistics."""
        logger.info("Generating conversation statistics...")
        # Example: Count conversations by number of participants, message count
        conv_counts = [c.message_count for c in self.conversations.values()]
        if conv_counts:
            avg_conv_messages = np.mean(conv_counts)
            total_conv_messages = sum(conv_counts)
            logger.info(f"Average messages per conversation: {avg_conv_messages:.2f}")
            logger.info(f"Total messages in conversations: {total_conv_messages}")
            self.supervisor.update_stat("avg_messages_per_conversation", avg_conv_messages)
            self.supervisor.update_stat("total_messages_in_conversations", total_conv_messages)

        group_chats = sum(1 for c in self.conversations.values() if c.is_group_chat)
        private_chats = len(self.conversations) - group_chats
        logger.info(f"Conversations: Group={group_chats}, Private={private_chats}")
        self.supervisor.update_stat("group_chats", group_chats)
        self.supervisor.update_stat("private_chats", private_chats)

    # Placeholder for generating statistics (can be expanded or moved to ReportGenerator)
    def _generate_category_statistics(self):
        """Generate statistics on file categories."""
        logger.info("Generating file category statistics...")
        # Query category counts from the database
        query = "SELECT category, COUNT(*) as count FROM files GROUP BY category ORDER BY count DESC"
        try:
            category_counts = self.db_manager.execute_query(query)
            logger.info("File counts by category:")
            for row in category_counts:
                logger.info(f"  {row['category']}: {row['count']}")
                self.supervisor.update_stat(f"files_category_{row['category']}", row['count'])

            # Also get total file count from DB
            total_files_in_db = self.db_manager.execute_query("SELECT COUNT(*) as count FROM files")
            if total_files_in_db:
                 total_files = total_files_in_db[0]['count']
                 logger.info(f"Total files in database: {total_files}")
                 self.supervisor.update_stat("files_total_in_db", total_files)

        except Exception as e:
            logger.error(f"Failed to generate category statistics: {e}", exc_info=True)
            self.supervisor.log_error("Category statistics failed", str(e))

    # Placeholder for generating statistics (can be expanded or moved to ReportGenerator)
    def _generate_pattern_statistics(self):
        """Generate statistics on detected patterns."""
        logger.info("Generating pattern statistics...")
        # Query pattern counts from the database
        query = "SELECT pattern_type, pattern_value, COUNT(*) as count FROM patterns GROUP BY pattern_type, pattern_value ORDER BY count DESC"
        try:
            pattern_counts = self.db_manager.execute_query(query)
            logger.info("Pattern counts:")
            # Group by pattern type for better readability
            patterns_by_type = defaultdict(list)
            for row in pattern_counts:
                patterns_by_type[row['pattern_type']].append((row['pattern_value'], row['count']))

            for p_type, patterns in patterns_by_type.items():
                logger.info(f"  {p_type}:")
                for value, count in patterns[:self.config.report_top_n]: # Show top N patterns per type
                     logger.info(f"    {value}: {count}")
                     # Optionally store top patterns in supervisor stats
                     # self.supervisor.update_stat(f"pattern_{p_type}_{value}", count) # Could lead to too many stats


        except Exception as e:
            logger.error(f"Failed to generate pattern statistics: {e}", exc_info=True)
            self.supervisor.log_error("Pattern statistics failed", str(e))


# --- Visualization Generator ---

class VisualizationGenerator:
    """Generates various data visualizations."""
    def __init__(self, config: Config, db_manager: DatabaseManager, supervisor: AnalysisSupervisor):
        self.config = config
        self.db_manager = db_manager
        self.supervisor = supervisor
        # Ensure matplotlib is using the Agg backend
        matplotlib.use('Agg')
        # Initialize plot styling if seaborn is available
        if SEABORN_AVAILABLE:
            sns.set_theme(style="whitegrid")
            sns.set_palette("viridis") # Example palette

    def run_visualizations(self):
        """Orchestrate the visualization generation process."""
        self.supervisor.update_phase("visualization")
        logger.info("Starting visualization generation phase...")

        if not MATPLOTLIB_AVAILABLE:
            logger.warning("Matplotlib not available, skipping all visualizations.")
            self.supervisor.update_status("skipped", "Matplotlib not available")
            return

        # Ensure visualization directories exist
        os.makedirs(os.path.join(self.config.visualization_dir, "Charts"), exist_ok=True)
        os.makedirs(os.path.join(self.config.visualization_dir, "WordClouds"), exist_ok=True)

        try:
            # Generate file category distribution chart
            self.supervisor.update_status("generating_charts", "Generating file category chart")
            self._generate_category_chart()

            # Generate file size distribution chart
            self.supervisor.update_status("generating_charts", "Generating file size chart")
            self._generate_filesize_chart()

            # Generate file modification date distribution chart
            self.supervisor.update_status("generating_charts", "Generating modification date chart")
            self._generate_modification_date_chart()

            # Generate sentiment distribution chart (if sentiment available)
            if SENTIMENT_AVAILABLE and PANDAS_AVAILABLE:
                self.supervisor.update_status("generating_charts", "Generating sentiment distribution chart")
                self._generate_sentiment_distribution_chart()

            # Generate word cloud from message content (if libraries available)
            if WORDCLOUD_AVAILABLE and PANDAS_AVAILABLE and NUMPY_AVAILABLE and SKLEARN_AVAILABLE:
                 self.supervisor.update_status("generating_wordclouds", "Generating word cloud")
                 self._generate_message_wordcloud()

            # Save visualization data (e.g., counts, distributions) to a JSON file
            self._save_visualization_data()


            logger.info("Visualization generation phase completed.")
            self.supervisor.update_status("completed", "Visualizations finished")

        except InterruptedError:
             logger.warning("Visualization phase interrupted by stop signal.")
             self.supervisor.update_status("interrupted", "Visualization stopped")
        except Exception as e:
            logger.critical(f"Critical error during visualization generation: {e}", exc_info=True)
            self.supervisor.log_error("Critical error in visualization phase", str(e))
            self.supervisor.update_status("failed", "Critical visualization error")


    def _generate_category_chart(self):
        """Generate a bar chart of file categories."""
        if not PANDAS_AVAILABLE or not MATPLOTLIB_AVAILABLE:
            logger.warning("Skipping category chart: Pandas or Matplotlib not available.")
            return

        query = "SELECT category, COUNT(*) as count FROM files GROUP BY category ORDER BY count DESC"
        try:
            category_data = self.db_manager.execute_query(query)
            if not category_data:
                logger.warning("No file category data found for charting.")
                return

            df = pd.DataFrame(category_data)

            plt.figure(figsize=(12, 8))
            # Use seaborn if available for better aesthetics
            if SEABORN_AVAILABLE:
                sns.barplot(x='count', y='category', data=df)
            else:
                plt.barh(df['category'], df['count'])

            plt.xlabel("Number of Files")
            plt.ylabel("Category")
            plt.title("Distribution of File Categories")
            plt.tight_layout()
            output_path = os.path.join(self.config.visualization_dir, "Charts", "file_category_distribution.png")
            plt.savefig(output_path, dpi=150)
            plt.close()
            logger.info(f"File category chart saved to {output_path}")
            self.supervisor.update_stat("visualizations_created", 1)

        except Exception as e:
            logger.error(f"Failed to generate category chart: {e}", exc_info=True)
            self.supervisor.log_error("Category chart generation failed", str(e))
            self.supervisor.update_stat("visualization_errors", 1)


    def _generate_filesize_chart(self):
        """Generate a histogram of file sizes."""
        if not PANDAS_AVAILABLE or not MATPLOTLIB_AVAILABLE or not NUMPY_AVAILABLE:
            logger.warning("Skipping file size chart: Pandas, Matplotlib, or NumPy not available.")
            return

        # Query file sizes (excluding 0-byte files)
        query = "SELECT filesize FROM files WHERE filesize > 0"
        try:
            filesize_data = self.db_manager.execute_query(query)
            if not filesize_data:
                logger.warning("No file size data found for charting.")
                return

            sizes = [row['filesize'] for row in filesize_data]
            df = pd.DataFrame({'filesize': sizes})

            plt.figure(figsize=(12, 8))
            # Use log scale for x-axis due to potentially wide range of file sizes
            # Use a limited number of bins or specify bin edges
            # Consider different plots for different size ranges (e.g., small, medium, large)
            # For simplicity, a single histogram with log scale:
            if SEABORN_AVAILABLE:
                 # Seaborn histplot handles log scale well
                 sns.histplot(data=df, x='filesize', bins=50, log_scale=True, kde=True)
            else:
                 # Matplotlib hist needs manual binning for log scale
                 # Define log-spaced bins
                 min_size = max(1, min(sizes)) # Ensure min_size is at least 1 for log scale
                 max_size = max(sizes)
                 if max_size <= min_size:
                      logger.warning("File sizes are all the same, cannot create histogram.")
                      plt.close() # Close empty figure
                      return
                 log_bins = np.logspace(np.log10(min_size), np.log10(max_size), 50)
                 plt.hist(sizes, bins=log_bins, edgecolor='black')
                 plt.xscale('log')


            plt.xlabel("File Size (Bytes, Log Scale)")
            plt.ylabel("Frequency")
            plt.title("Distribution of File Sizes")
            plt.tight_layout()
            output_path = os.path.join(self.config.visualization_dir, "Charts", "file_size_distribution.png")
            plt.savefig(output_path, dpi=150)
            plt.close()
            logger.info(f"File size distribution chart saved to {output_path}")
            self.supervisor.update_stat("visualizations_created", 1)

        except Exception as e:
            logger.error(f"Failed to generate file size chart: {e}", exc_info=True)
            self.supervisor.log_error("File size chart generation failed", str(e))
            self.supervisor.update_stat("visualization_errors", 1)


    def _generate_modification_date_chart(self):
        """Generate a time series chart of file modification dates."""
        if not PANDAS_AVAILABLE or not MATPLOTLIB_AVAILABLE:
            logger.warning("Skipping modification date chart: Pandas or Matplotlib not available.")
            return

        # Query modification dates (only non-null dates)
        query = "SELECT modification_date FROM files WHERE modification_date IS NOT NULL"
        try:
            date_data = self.db_manager.execute_query(query)
            if not date_data:
                logger.warning("No modification date data found for charting.")
                return

            dates = [parse_iso_datetime(row['modification_date']) for row in date_data]
            # Filter out dates that failed to parse
            valid_dates = [d for d in dates if d is not None]

            if not valid_dates:
                 logger.warning("No valid modification dates found for charting.")
                 return

            df = pd.DataFrame({'date': valid_dates})
            # Count files per day/week/month depending on the time span
            # For simplicity, let's count per month
            df['month'] = df['date'].dt.to_period('M')
            monthly_counts = df['month'].value_counts().sort_index()

            # Convert PeriodIndex to string for plotting
            monthly_counts.index = monthly_counts.index.astype(str)

            plt.figure(figsize=(15, 8))
            if SEABORN_AVAILABLE:
                 # Seaborn lineplot
                 sns.lineplot(x=monthly_counts.index, y=monthly_counts.values)
                 plt.xticks(rotation=45, ha='right') # Rotate labels
            else:
                 plt.plot(monthly_counts.index, monthly_counts.values)
                 plt.xticks(rotation=45, ha='right') # Rotate labels


            plt.xlabel("Month")
            plt.ylabel("Number of Files Modified")
            plt.title("File Modification Activity Over Time (Monthly)")
            plt.tight_layout()
            output_path = os.path.join(self.config.visualization_dir, "Charts", "file_modification_date_activity.png")
            plt.savefig(output_path, dpi=150)
            plt.close()
            logger.info(f"File modification date chart saved to {output_path}")
            self.supervisor.update_stat("visualizations_created", 1)

        except Exception as e:
            logger.error(f"Failed to generate modification date chart: {e}", exc_info=True)
            self.supervisor.log_error("Modification date chart generation failed", str(e))
            self.supervisor.update_stat("visualization_errors", 1)


    def _generate_sentiment_distribution_chart(self):
        """Generate a histogram of message sentiment scores."""
        if not PANDAS_AVAILABLE or not MATPLOTLIB_AVAILABLE or not SEABORN_AVAILABLE or not SENTIMENT_AVAILABLE:
            logger.warning("Skipping sentiment distribution chart: Required libraries not available.")
            return

        # Get sentiment scores from contacts (average sentiment)
        sentiment_scores = [c.sentiment_score for c in self.supervisor.get_state().get('contacts', []) if c.message_count > 0]
        if not sentiment_scores:
            logger.warning("No sentiment data found for charting.")
            return

        df = pd.DataFrame({'sentiment_score': sentiment_scores})

        plt.figure(figsize=(10, 6))
        # Plot histogram with KDE
        sns.histplot(data=df, x='sentiment_score', bins=30, kde=True)

        plt.xlabel("Average Sentiment Score (Compound)")
        plt.ylabel("Frequency (Number of Contacts)")
        plt.title("Distribution of Average Contact Sentiment Scores")
        plt.xlim(-1, 1) # Sentiment score range
        plt.tight_layout()
        output_path = os.path.join(self.config.visualization_dir, "Charts", "contact_sentiment_distribution.png")
        plt.savefig(output_path, dpi=150)
        plt.close()
        logger.info(f"Contact sentiment distribution chart saved to {output_path}")
        self.supervisor.update_stat("visualizations_created", 1)

    def _generate_message_wordcloud(self):
        """Generate a word cloud from message content."""
        if not WORDCLOUD_AVAILABLE or not PANDAS_AVAILABLE or not NUMPY_AVAILABLE or not SKLEARN_AVAILABLE:
            logger.warning("Skipping word cloud: Required libraries not available.")
            return

        # Retrieve message content (can be slow for large datasets)
        # Option 1: Read from individual conversation files (safer memory-wise)
        all_message_texts = []
        contact_conv_dir = self.config.contact_conv_dir
        if os.path.exists(contact_conv_dir):
            logger.info("Reading message content from individual contact conversation files for word cloud...")
            for filename in os.listdir(contact_conv_dir):
                if stop_event.is_set(): break
                if filename.endswith(".json"):
                    file_path = os.path.join(contact_conv_dir, filename)
                    data = load_json_safe(file_path)
                    if data:
                        for conv_key, conv_data in data.items():
                            # Need to get the actual message content, not just metadata
                            # This would require re-reading the original message files...
                            # For simplicity, let's assume we can get content from the metadata saved earlier
                            # This requires a change in MessageAnalyzer to store content persistently or re-read.
                            # Let's modify MessageAnalyzer to store content temporarily or query from DB if needed.

                            # Alternative: Query content from DB if it was stored (it's not currently)
                            # Or, re-read content from the original files listed in conv_data['files']
                            # Re-reading original files is the most robust approach if content isn't stored.

                            # Let's re-read content from the original files listed in conversation data
                            for file_info in conv_data.get('files', []):
                                original_file_path = file_info.get('path') # This is just the directory
                                original_filename = os.path.basename(file_info.get('path')) # This is incorrect, need filename from file_info
                                # Need to query DB to get full path and filename from file_id or stored path
                                # Let's assume the 'path' in conv_data['files'] is the full path for now, or query DB.

                                # Query DB for full path using file_id if available, or path/filename
                                # Assuming file_info in conv_data['files'] has a 'path' key that is the full path
                                original_full_path = file_info.get('path') # Assuming this is the full path

                                if original_full_path and os.path.exists(original_full_path):
                                    try:
                                        # Use the same reading logic as _read_message_content
                                        content, _ = self._read_message_content(original_full_path) # Ignore metadata
                                        if content:
                                            all_message_texts.append(content)
                                    except Exception as e:
                                        logger.warning(f"Failed to re-read message file {original_full_path} for word cloud: {e}")
                                else:
                                     logger.warning(f"Original message file not found for word cloud: {original_full_path}")


        if stop_event.is_set():
             logger.warning("Word cloud generation interrupted during file reading.")
             return

        if not all_message_texts:
            logger.warning("No message content found for word cloud generation.")
            return

        combined_text = " ".join(all_message_texts)

        try:
            # Generate word cloud
            wordcloud = WordCloud(width=1600, height=800, random_state=42,
                                  background_color='white', colormap='viridis',
                                  stopwords=None, # Use default stopwords or provide custom
                                  min_font_size=10).generate(combined_text)

            # Save the word cloud image
            plt.figure(figsize=(16, 8))
            plt.imshow(wordcloud, interpolation='bilinear')
            plt.axis("off")
            plt.tight_layout(pad=0)
            output_path = os.path.join(self.config.visualization_dir, "WordClouds", "message_wordcloud.png")
            plt.savefig(output_path, dpi=150)
            plt.close()
            logger.info(f"Message word cloud saved to {output_path}")
            self.supervisor.update_stat("visualizations_created", 1)

        except Exception as e:
            logger.error(f"Failed to generate word cloud: {e}", exc_info=True)
            self.supervisor.log_error("Word cloud generation failed", str(e))
            self.supervisor.update_stat("visualization_errors", 1)


    def _save_visualization_data(self):
        """Save data used for visualizations (counts, distributions) to JSON."""
        logger.info("Saving visualization data...")
        viz_data = {}

        # File category counts
        query_categories = "SELECT category, COUNT(*) as count FROM files GROUP BY category ORDER BY count DESC"
        try:
            viz_data['file_category_counts'] = self.db_manager.execute_query(query_categories)
        except Exception as e:
            logger.warning(f"Failed to retrieve category counts for viz data: {e}")
            viz_data['file_category_counts'] = []

        # File size distribution (maybe summary stats or histogram bins)
        # For simplicity, just store summary stats for now
        query_filesizes = "SELECT filesize FROM files WHERE filesize > 0"
        try:
            filesize_data = self.db_manager.execute_query(query_filesizes)
            sizes = [row['filesize'] for row in filesize_data]
            if sizes:
                viz_data['file_size_stats'] = {
                    'count': len(sizes),
                    'min': min(sizes),
                    'max': max(sizes),
                    'mean': np.mean(sizes) if NUMPY_AVAILABLE else None,
                    'median': np.median(sizes) if NUMPY_AVAILABLE else None,
                    'std_dev': np.std(sizes) if NUMPY_AVAILABLE else None,
                }
            else:
                viz_data['file_size_stats'] = {}
        except Exception as e:
            logger.warning(f"Failed to retrieve file size data for viz data: {e}")
            viz_data['file_size_stats'] = {}


        # Modification date activity (monthly counts)
        query_dates = "SELECT modification_date FROM files WHERE modification_date IS NOT NULL"
        try:
            date_data = self.db_manager.execute_query(query_dates)
            dates = [parse_iso_datetime(row['modification_date']) for row in date_data]
            valid_dates = [d for d in dates if d is not None]
            if valid_dates and PANDAS_AVAILABLE:
                 df = pd.DataFrame({'date': valid_dates})
                 df['month'] = df['date'].dt.to_period('M')
                 monthly_counts = df['month'].value_counts().sort_index()
                 viz_data['file_modification_monthly_counts'] = monthly_counts.to_dict()
            else:
                 viz_data['file_modification_monthly_counts'] = {}
        except Exception as e:
            logger.warning(f"Failed to retrieve modification date data for viz data: {e}")
            viz_data['file_modification_monthly_counts'] = {}


        # Sentiment distribution (summary stats or histogram bins)
        sentiment_scores = [c.sentiment_score for c in self.supervisor.get_state().get('contacts', []) if c.message_count > 0]
        if sentiment_scores and NUMPY_AVAILABLE:
             viz_data['contact_sentiment_stats'] = {
                 'count': len(sentiment_scores),
                 'min': min(sentiment_scores),
                 'max': max(sentiment_scores),
                 'mean': np.mean(sentiment_scores),
                 'median': np.median(sentiment_scores),
                 'std_dev': np.std(sentiment_scores),
             }
        else:
             viz_data['contact_sentiment_stats'] = {}


        output_path = self.config.visualization_data_path
        try:
            atomic_write_json(viz_data, output_path)
            logger.info(f"Visualization data saved to {output_path}")
        except Exception as e:
            logger.error(f"Failed to save visualization data JSON: {e}", exc_info=True)
            self.supervisor.log_error("Failed to save visualization data JSON", str(e))


# --- Report Generator ---

class ReportGenerator:
    """Generates various reports based on analysis results."""
    def __init__(self, config: Config, db_manager: DatabaseManager, supervisor: AnalysisSupervisor):
        self.config = config
        self.db_manager = db_manager
        self.supervisor = supervisor
        # Load contacts and conversations for reporting
        self.contacts: List[Contact] = self._load_contacts_for_reporting() or []
        self.conversations: Dict[str, Conversation] = self._load_conversations_for_reporting() or {}

    def _load_contacts_for_reporting(self) -> Optional[List[Contact]]:
         """Load contacts from the JSON file for reporting."""
         # Use the same loading logic as MessageAnalyzer
         return MessageAnalyzer(self.config, self.db_manager, self.supervisor)._load_contacts_from_file()

    def _load_conversations_for_reporting(self) -> Optional[Dict[str, Conversation]]:
         """Load conversations from the JSON file for reporting."""
         # Use the same loading logic as MessageAnalyzer
         return MessageAnalyzer(self.config, self.db_manager, self.supervisor)._load_conversations_from_file()


    def run_reports(self):
        """Orchestrate the report generation process."""
        self.supervisor.update_phase("reporting")
        logger.info("Starting report generation phase...")

        # Ensure report directory exists
        os.makedirs(self.config.report_dir, exist_ok=True)

        try:
            # Generate Summary Report
            self.supervisor.update_status("generating_reports", "Generating summary report")
            self._generate_summary_report()

            # Generate File Category Report
            self.supervisor.update_status("generating_reports", "Generating file category report")
            self._generate_file_category_report()

            # Generate Top Contacts Report
            self.supervisor.update_status("generating_reports", "Generating top contacts report")
            self._generate_top_contacts_report(top_n=self.config.report_top_n)

            # Generate Top Conversations Report
            self.supervisor.update_status("generating_reports", "Generating top conversations report")
            self._generate_top_conversations_report(top_n=self.config.report_top_n)

            # Generate File Pattern Report
            self.supervisor.update_status("generating_reports", "Generating file pattern report")
            self._generate_file_pattern_report(top_n=self.config.report_top_n)

            # Generate Duplicate Files Report
            self.supervisor.update_status("generating_reports", "Generating duplicate files report")
            self._generate_duplicate_files_report()

            logger.info("Report generation phase completed.")
            self.supervisor.update_status("completed", "Reports finished")

        except InterruptedError:
             logger.warning("Report generation phase interrupted by stop signal.")
             self.supervisor.update_status("interrupted", "Reporting stopped")
        except Exception as e:
            logger.critical(f"Critical error during report generation: {e}", exc_info=True)
            self.supervisor.log_error("Critical error in reporting phase", str(e))
            self.supervisor.update_status("failed", "Critical reporting error")


    def _generate_summary_report(self):
        """Generate a high-level summary report."""
        logger.info("Generating summary report...")
        report_path = os.path.join(self.config.report_dir, "summary_report.txt")

        state = self.supervisor.get_state()
        total_files_in_db = state['stats'].get('files_total_in_db', 0)
        total_messages_processed = state['stats'].get('total_messages_processed', 0)
        total_contacts = state['stats'].get('contacts_total', 0)
        total_conversations = state['stats'].get('conversations_total', 0)
        duplicates_marked = state['stats'].get('duplicates_marked', 0)
        analysis_start_time = datetime.fromtimestamp(state['startTime']).strftime('%Y-%m-%d %H:%M:%S')
        analysis_end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        duration = str(timedelta(seconds=int(time.time() - state['startTime'])))

        summary_lines = [
            f"--- Emergency Backup Recovery Analysis Summary Report ---",
            f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            f"Script Version: {SCRIPT_VERSION}",
            f"Backup Directory: {self.config.backup_dir}",
            f"Output Directory: {self.config.base_output_dir}",
            f"Analysis Start Time: {analysis_start_time}",
            f"Analysis End Time: {analysis_end_time}",
            f"Analysis Duration: {duration}",
            f"-------------------------------------------------------",
            f"Total Files Processed (in DB): {total_files_in_db}",
            f"Files Marked as Duplicates: {duplicates_marked}",
            f"Total Messages Processed: {total_messages_processed}",
            f"Total Contacts Identified: {total_contacts}",
            f"Total Conversations Identified (after filtering): {len(self.conversations)} / {total_conversations} (filtered)",
            f"-------------------------------------------------------",
            f"Key Statistics:",
        ]

        # Add key stats from supervisor state
        for key, value in state['stats'].items():
             if key not in ['files_total_in_db', 'total_messages_processed', 'contacts_total', 'conversations_total', 'duplicates_marked', 'conversations_filtered_out']:
                 summary_lines.append(f"  {key}: {value}")

        summary_lines.append("-------------------------------------------------------")
        summary_lines.append("Errors and Warnings:")
        if state['errors']:
            summary_lines.append(f"  Errors ({len(state['errors'])}):")
            for error in state['errors']:
                summary_lines.append(f"    - [{error['timestampFormatted']}] {error['message']} (Detail: {error['detail']})")
        else:
            summary_lines.append("  No errors reported.")

        if state['warnings']:
            summary_lines.append(f"  Warnings ({len(state['warnings'])}):")
            for warning in state['warnings']:
                summary_lines.append(f"    - [{warning['timestampFormatted']}] {warning['message']} (Detail: {warning['detail']})")
        else:
            summary_lines.append("  No warnings reported.")

        summary_lines.append("-------------------------------------------------------")
        summary_lines.append("See other reports and visualizations for detailed analysis.")
        summary_lines.append("--- End of Summary Report ---")


        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(summary_lines))
            logger.info(f"Summary report saved to {report_path}")
            self.supervisor.update_stat("reports_generated", 1)
        except Exception as e:
            logger.error(f"Failed to save summary report: {e}", exc_info=True)
            self.supervisor.log_error("Summary report generation failed", str(e))
            self.supervisor.update_stat("report_errors", 1)


    def _generate_file_category_report(self):
        """Generate a report on file categories."""
        logger.info("Generating file category report...")
        report_path = os.path.join(self.config.report_dir, "file_category_report.txt")

        query = "SELECT category, COUNT(*) as count, SUM(filesize) as total_size FROM files GROUP BY category ORDER BY count DESC"
        try:
            category_data = self.db_manager.execute_query(query)

            report_lines = [
                "--- File Category Report ---",
                f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
                "----------------------------",
                "File Counts and Total Sizes by Category:",
                ""
            ]

            if category_data:
                headers = ["Category", "File Count", "Total Size"]
                table_data = []
                for row in category_data:
                    table_data.append([row['category'], row['count'], format_bytes(row['total_size'] or 0)])

                if TABULATE_AVAILABLE:
                    report_lines.append(tabulate(table_data, headers=headers, tablefmt="plain"))
                else:
                    # Basic text formatting if tabulate is not available
                    report_lines.append(" | ".join(headers))
                    report_lines.append("-" * (len(" | ".join(headers)) + 5)) # Simple separator
                    for row in table_data:
                         report_lines.append(" | ".join(map(str, row)))

            else:
                report_lines.append("No file category data available.")

            report_lines.append("\n----------------------------")
            report_lines.append("--- End of File Category Report ---")

            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))
            logger.info(f"File category report saved to {report_path}")
            self.supervisor.update_stat("reports_generated", 1)

        except Exception as e:
            logger.error(f"Failed to generate file category report: {e}", exc_info=True)
            self.supervisor.log_error("File category report generation failed", str(e))
            self.supervisor.update_stat("report_errors", 1)


    def _generate_top_contacts_report(self, top_n: int = DEFAULT_REPORT_TOP_N):
        """Generate a report of top contacts by message count."""
        logger.info(f"Generating top {top_n} contacts report...")
        report_path = os.path.join(self.config.report_dir, "top_contacts_report.txt")

        # Use the loaded contacts, which are already sorted by message count
        top_contacts = self.contacts[:top_n]

        report_lines = [
            f"--- Top {len(top_contacts)} Contacts Report (by Message Count) ---",
            f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            "-----------------------------------------------------",
            ""
        ]

        if top_contacts:
            headers = ["Rank", "Phone Number", "Messages", "Sentiment", "First Seen", "Last Seen", "Topics"]
            table_data = []
            for i, contact in enumerate(top_contacts):
                table_data.append([
                    i + 1,
                    contact.phone_number,
                    contact.message_count,
                    f"{contact.sentiment_score:.2f}" if SENTIMENT_AVAILABLE else "N/A",
                    contact.first_seen or "N/A",
                    contact.last_seen or "N/A",
                    ", ".join(contact.topics) if contact.topics else "N/A"
                ])

            if TABULATE_AVAILABLE:
                report_lines.append(tabulate(table_data, headers=headers, tablefmt="plain"))
            else:
                # Basic text formatting
                report_lines.append(" | ".join(headers))
                report_lines.append("-" * (len(" | ".join(headers)) + 5))
                for row in table_data:
                     report_lines.append(" | ".join(map(str, row)))

        else:
            report_lines.append("No contact data available.")

        report_lines.append("\n-----------------------------------------------------")
        report_lines.append("--- End of Top Contacts Report ---")

        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))
            logger.info(f"Top contacts report saved to {report_path}")
            self.supervisor.update_stat("reports_generated", 1)
        except Exception as e:
            logger.error(f"Failed to generate top contacts report: {e}", exc_info=True)
            self.supervisor.log_error("Top contacts report generation failed", str(e))
            self.supervisor.update_stat("report_errors", 1)


    def _generate_top_conversations_report(self, top_n: int = DEFAULT_REPORT_TOP_N):
        """Generate a report of top conversations by message count."""
        logger.info(f"Generating top {top_n} conversations report...")
        report_path = os.path.join(self.config.report_dir, "top_conversations_report.txt")

        # Sort conversations by message count
        sorted_conversations = sorted(self.conversations.values(), key=lambda c: c.message_count, reverse=True)[:top_n]

        report_lines = [
            f"--- Top {len(sorted_conversations)} Conversations Report (by Message Count) ---",
            f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            "----------------------------------------------------------",
            ""
        ]

        if sorted_conversations:
            headers = ["Rank", "Participants", "Messages", "Sentiment", "First Message", "Last Message", "Type", "Topics"]
            table_data = []
            for i, conversation in enumerate(sorted_conversations):
                table_data.append([
                    i + 1,
                    ", ".join(conversation.participants),
                    conversation.message_count,
                    f"{conversation.sentiment_score:.2f}" if SENTIMENT_AVAILABLE else "N/A",
                    conversation.first_message or "N/A",
                    conversation.last_message or "N/A",
                    "Group" if conversation.is_group_chat else "Private",
                    ", ".join(conversation.topics) if conversation.topics else "N/A"
                ])

            if TABULATE_AVAILABLE:
                report_lines.append(tabulate(table_data, headers=headers, tablefmt="plain"))
            else:
                # Basic text formatting
                report_lines.append(" | ".join(headers))
                report_lines.append("-" * (len(" | ".join(headers)) + 5))
                for row in table_data:
                     report_lines.append(" | ".join(map(str, row)))

        else:
            report_lines.append("No conversation data available (check min_conversation_files setting).")

        report_lines.append("\n----------------------------------------------------------")
        report_lines.append("--- End of Top Conversations Report ---")

        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))
            logger.info(f"Top conversations report saved to {report_path}")
            self.supervisor.update_stat("reports_generated", 1)
        except Exception as e:
            logger.error(f"Failed to generate top conversations report: {e}", exc_info=True)
            self.supervisor.log_error("Top conversations report generation failed", str(e))
            self.supervisor.update_stat("report_errors", 1)


    def _generate_file_pattern_report(self, top_n: int = DEFAULT_REPORT_TOP_N):
        """Generate a report on frequently occurring file patterns."""
        logger.info(f"Generating top {top_n} file patterns report...")
        report_path = os.path.join(self.config.report_dir, "file_pattern_report.txt")

        query = "SELECT pattern_type, pattern_value, COUNT(*) as count FROM patterns GROUP BY pattern_type, pattern_value ORDER BY count DESC"
        try:
            pattern_data = self.db_manager.execute_query(query)

            report_lines = [
                f"--- Top File Patterns Report (Top {top_n} per Type) ---",
                f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
                "----------------------------------------------------",
                ""
            ]

            if pattern_data:
                patterns_by_type = defaultdict(list)
                for row in pattern_data:
                    patterns_by_type[row['pattern_type']].append((row['pattern_value'], row['count']))

                for p_type, patterns in patterns_by_type.items():
                    report_lines.append(f"Pattern Type: {p_type}")
                    headers = ["Rank", "Pattern Value", "Count"]
                    table_data = []
                    for i, (value, count) in enumerate(patterns[:top_n]):
                         table_data.append([i + 1, value, count])

                    if TABULATE_AVAILABLE:
                        report_lines.append(tabulate(table_data, headers=headers, tablefmt="plain"))
                    else:
                        report_lines.append(" | ".join(headers))
                        report_lines.append("-" * (len(" | ".join(headers)) + 5))
                        for row in table_data:
                             report_lines.append(" | ".join(map(str, row)))

                    report_lines.append("") # Add a blank line between types

            else:
                report_lines.append("No file pattern data available.")

            report_lines.append("\n----------------------------------------------------")
            report_lines.append("--- End of File Patterns Report ---")

            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))
            logger.info(f"File patterns report saved to {report_path}")
            self.supervisor.update_stat("reports_generated", 1)

        except Exception as e:
            logger.error(f"Failed to generate file pattern report: {e}", exc_info=True)
            self.supervisor.log_error("File pattern report generation failed", str(e))
            self.supervisor.update_stat("report_errors", 1)


    def _generate_duplicate_files_report(self):
        """Generate a report listing duplicate files."""
        logger.info("Generating duplicate files report...")
        report_path = os.path.join(self.config.report_dir, "duplicate_files_report.txt")

        # Query files marked as duplicates, grouped by original file
        query = """
            SELECT
                f.md5hash,
                orig.path AS original_path,
                orig.filename AS original_filename,
                COUNT(f.id) AS duplicate_count
            FROM files f
            JOIN files orig ON f.original_id = orig.id
            WHERE f.is_duplicate = 1
            GROUP BY f.original_id, f.md5hash -- Group by original and hash for clarity
            ORDER BY duplicate_count DESC, f.md5hash ASC
        """
        try:
            duplicate_groups = self.db_manager.execute_query(query)

            report_lines = [
                "--- Duplicate Files Report ---",
                f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
                "------------------------------",
                ""
            ]

            if duplicate_groups:
                report_lines.append("Groups of duplicate files (grouped by original file and hash):")
                report_lines.append("")

                for group in duplicate_groups:
                    md5 = group['md5hash']
                    original_path = group['original_path']
                    original_filename = group['original_filename']
                    duplicate_count = group['duplicate_count']

                    report_lines.append(f"MD5 Hash: {md5}")
                    report_lines.append(f"Original File: {os.path.join(original_path, original_filename)}")
                    report_lines.append(f"Number of Duplicates: {duplicate_count}")

                    # List the duplicate files in this group
                    query_duplicates = """
                        SELECT path, filename
                        FROM files
                        WHERE original_id = ? AND md5hash = ? AND is_duplicate = 1
                        ORDER BY path ASC, filename ASC
                    """
                    duplicates_list = self.db_manager.execute_query(query_duplicates, (group['original_id'], md5))

                    if duplicates_list:
                        report_lines.append("Duplicate File Paths:")
                        for dup_row in duplicates_list:
                            report_lines.append(f"  - {os.path.join(dup_row['path'], dup_row['filename'])}")
                    report_lines.append("-" * 30) # Separator between groups

            else:
                report_lines.append("No duplicate files found.")

            report_lines.append("\n------------------------------")
            report_lines.append("--- End of Duplicate Files Report ---")

            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))
            logger.info(f"Duplicate files report saved to {report_path}")
            self.supervisor.update_stat("reports_generated", 1)

        except Exception as e:
            logger.error(f"Failed to generate duplicate files report: {e}", exc_info=True)
            self.supervisor.log_error("Duplicate files report generation failed", str(e))
            self.supervisor.update_stat("report_errors", 1)


# --- Main Execution Logic ---

def main():
    """Main function to parse arguments and run the analysis pipeline."""
    parser = argparse.ArgumentParser(description="Consolidated Analysis Script for Emergency Backup Recovery")
    parser.add_argument("backup_dir", help="Path to the emergency backup directory.")
    parser.add_argument("output_dir", help="Path to the output directory for results, logs, and database.")
    parser.add_argument("--sample_rate", type=float, default=1.0,
                        help="Fraction of files to sample for analysis (0.0 to 1.0). Default: 1.0 (all files).")
    parser.add_argument("--max_files", type=int, default=None,
                        help="Maximum number of files to process. Default: None (no limit).")
    parser.add_argument("--resume", action="store_true",
                        help="Resume analysis from the last checkpoint.")
    parser.add_argument("--batch_size", type=int, default=DEFAULT_BATCH_SIZE,
                        help=f"Number of files to process in each batch for database insertion. Default: {DEFAULT_BATCH_SIZE}.")
    parser.add_argument("--min_conversation_files", type=int, default=DEFAULT_MIN_CONVERSATION_FILES,
                        help=f"Minimum number of files required to consider a conversation significant for reporting. Default: {DEFAULT_MIN_CONVERSATION_FILES}.")
    parser.add_argument("--parallel_jobs", type=int, default=os.cpu_count() or 1,
                        help=f"Number of parallel processes/threads to use. Default: Number of CPU cores ({os.cpu_count() or 1}).")
    parser.add_argument("--debug", action="store_true", help="Enable debug logging.")
    parser.add_argument("--quiet", action="store_true", help="Suppress informational output (only show warnings/errors).")
    parser.add_argument("--report_top_n", type=int, default=DEFAULT_REPORT_TOP_N,
                        help=f"Number of top items to include in reports (e.g., top contacts, patterns). Default: {DEFAULT_REPORT_TOP_N}.")
    parser.add_argument("--graph_top_n", type=int, default=DEFAULT_GRAPH_TOP_N,
                        help=f"Number of top nodes to include in graph visualizations. Default: {DEFAULT_GRAPH_TOP_N}.")


    args = parser.parse_args()

    # Determine debug level based on arguments
    debug_level = 1 # Default: INFO
    if args.debug:
        debug_level = 2 # DEBUG
    elif args.quiet:
        debug_level = 0 # WARNING

    # Create configuration object
    config = Config(
        backup_dir=args.backup_dir,
        base_output_dir=args.output_dir,
        sample_rate=args.sample_rate,
        max_files=args.max_files,
        resume=args.resume,
        batch_size=args.batch_size,
        min_conversation_files=args.min_conversation_files,
        parallel_jobs=args.parallel_jobs,
        debug_level=debug_level,
        report_top_n=args.report_top_n,
        graph_top_n=args.graph_top_n,
    )

    # Create necessary directories
    try:
        config.create_directories()
    except Exception as e:
        print(f"FATAL: Could not create output directories: {e}", file=sys.stderr)
        sys.exit(1)

    # Setup logging
    setup_logging(config.log_dir, config.debug_level)

    # Register signal handlers for graceful shutdown
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    # Add other signals if needed (e.g., SIGQUIT)
    if platform.system() != "Windows":
         try:
             signal.signal(signal.SIGQUIT, signal_handler)
         except AttributeError:
             pass # SIGQUIT is not available on all platforms


    # Initialize Supervisor, DB Manager, and Analyzers
    supervisor = None
    db_manager = None
    file_analyzer = None
    message_analyzer = None
    visualization_generator = None
    report_generator = None

    try:
        # Check if another instance is running using PID file
        if os.path.exists(config.pid_file_path):
            try:
                with open(config.pid_file_path, 'r') as f:
                    pid_in_file = int(f.read().strip())
                # Check if the process with that PID is still running
                if os.path.exists(f"/proc/{pid_in_file}"): # Linux specific check, needs platform adaptation
                     logger.error(f"Another instance of the script is already running with PID {pid_in_file}. Exiting.")
                     sys.exit(1)
                else:
                     logger.warning(f"Stale PID file found for PID {pid_in_file}. Removing.")
                     os.remove(config.pid_file_path)
            except (ValueError, FileNotFoundError, OSError) as e:
                logger.warning(f"Could not check/clean up PID file {config.pid_file_path}: {e}")
            except Exception as e:
                logger.error(f"Unexpected error during PID file check: {e}", exc_info=True)


        supervisor = AnalysisSupervisor(config)
        db_manager = DatabaseManager(config.db_path)
        file_analyzer = FileAnalyzer(config, db_manager, supervisor)
        message_analyzer = MessageAnalyzer(config, db_manager, supervisor) # Needs contacts/conversations loaded
        visualization_generator = VisualizationGenerator(config, db_manager, supervisor)
        report_generator = ReportGenerator(config, db_manager, supervisor) # Needs contacts/conversations loaded

        supervisor.update_status("running", "Starting analysis pipeline")

        # --- Analysis Pipeline Stages ---

        # 1. File Analysis (Scan, Analyze, Deduplicate)
        if not stop_event.is_set():
            file_analyzer.run_file_analysis()
        else:
            logger.warning("Skipping file analysis phase due to stop signal.")


        # 2. Message Analysis (Extract contacts/conversations, sentiment, topics)
        # Message analysis depends on files being categorized in the DB
        if not stop_event.is_set():
            message_analyzer.run_message_analysis()
        else:
            logger.warning("Skipping message analysis phase due to stop signal.")


        # 3. Visualization Generation
        # Visualizations depend on analysis results (DB, JSON files)
        if not stop_event.is_set():
            visualization_generator.run_visualizations()
        else:
            logger.warning("Skipping visualization phase due to stop signal.")


        # 4. Report Generation
        # Reports depend on analysis results (DB, JSON files)
        if not stop_event.is_set():
            report_generator.run_reports()
        else:
            logger.warning("Skipping reporting phase due to stop signal.")


        # --- Finalization ---
        if stop_event.is_set():
             supervisor.update_status("interrupted", "Analysis stopped by user/signal")
             logger.warning("Analysis pipeline interrupted.")
        else:
             supervisor.update_status("completed", "Analysis pipeline finished successfully")
             logger.info("Analysis pipeline completed successfully.")

    except InterruptedError:
        logger.warning("Analysis pipeline interrupted by stop signal.")
        if supervisor:
             supervisor.update_status("interrupted", "Analysis stopped by user/signal")
        # Cleanup will happen in finally block
    except Exception as e:
        logger.critical(f"A critical error occurred during the analysis pipeline: {e}", exc_info=True)
        if supervisor:
            supervisor.log_error("Critical error in pipeline", str(e))
            supervisor.update_status("failed", "Critical pipeline error")
        sys.exit(1) # Exit with error code
    finally:
        # Ensure resources are closed
        if db_manager:
            db_manager.close()
        if supervisor:
            supervisor.close()
        logger.info("Analysis script finished.")


# --- Script Entry Point ---

if __name__ == "__main__":
    main()