<a href="https://colab.research.google.com/github/ApoorvSaxena0109/cli-2/blob/main/ARS_VG_Analyzer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ARS-VG Analyzer - Research Notebook

**AEM-REM Substitution and Vulnerability Graph Analyzer**

A forensic accounting prototype for detecting earnings manipulation through the integration of Large Language Models, Causal Graph Theory, and Adversarial Simulation.

---

**Authors:**
- Primary Researcher: Apoorv
- 

**Platform:** Google Colab Pro+ (A100 GPU recommended)

---

## Section 1: Setup and Configuration

In [1]:
# GPU Verification
"""
This cell verifies GPU availability and displays hardware information.
For optimal performance, this notebook is designed for Google Colab Pro+ with A100 GPU.
"""

import subprocess
import sys

def verify_gpu():
    gpu_info = {
        'available': False, 'name': None, 'memory_total': None,
        'memory_free': None, 'is_a100': False, 'cuda_version': None, 'driver_version': None
    }
    try:
        result = subprocess.run(
            ['nvidia-smi', '--query-gpu=name,memory.total,memory.free,driver_version', '--format=csv,noheader,nounits'],
            capture_output=True, text=True, timeout=10
        )
        if result.returncode == 0 and result.stdout.strip():
            gpu_info['available'] = True
            parts = result.stdout.strip().split(', ')
            if len(parts) >= 4:
                gpu_info['name'] = parts[0].strip()
                gpu_info['memory_total'] = f"{parts[1].strip()} MB"
                gpu_info['memory_free'] = f"{parts[2].strip()} MB"
                gpu_info['driver_version'] = parts[3].strip()
                if 'A100' in gpu_info['name']: gpu_info['is_a100'] = True
            cuda_result = subprocess.run(['nvidia-smi', '--query-gpu=cuda_version', '--format=csv,noheader'], capture_output=True, text=True, timeout=10)
            if cuda_result.returncode == 0: gpu_info['cuda_version'] = cuda_result.stdout.strip()
    except: pass
    return gpu_info

def display_gpu_info(gpu_info):
    print("=" * 60)
    print("GPU VERIFICATION REPORT")
    print("=" * 60)
    if gpu_info['available']:
        print(f"\nGPU Status: AVAILABLE\n\nGPU Details:")
        print(f"   - Device Name: {gpu_info['name']}")
        print(f"   - Total Memory: {gpu_info['memory_total']}")
        print(f"   - Free Memory: {gpu_info['memory_free']}")
        if gpu_info.get('driver_version'): print(f"   - Driver Version: {gpu_info['driver_version']}")
        if gpu_info.get('cuda_version'): print(f"   - CUDA Version: {gpu_info['cuda_version']}")
        if gpu_info['is_a100']:
            print(f"\nA100 GPU Detected - Optimal for this notebook!")
        else:
            print(f"\nGPU detected but not A100. A100 recommended.")
    else:
        print(f"\nGPU Status: NOT AVAILABLE\nRunning in CPU Mode")
    print("\n" + "=" * 60)

GPU_INFO = verify_gpu()
display_gpu_info(GPU_INFO)
GPU_AVAILABLE = GPU_INFO['available']
IS_A100 = GPU_INFO['is_a100']
GPU_NAME = GPU_INFO['name']

GPU VERIFICATION REPORT

GPU Status: AVAILABLE

GPU Details:
   - Device Name: NVIDIA A100-SXM4-40GB
   - Total Memory: 40960 MB
   - Free Memory: 40506 MB
   - Driver Version: 550.54.15

A100 GPU Detected - Optimal for this notebook!



In [2]:
# Dependency Installation
import subprocess, sys

def install_dependencies():
    print("=" * 60 + "\nDEPENDENCY INSTALLATION\n" + "=" * 60)
    core_packages = [
        "pandas==2.1.3", "numpy==1.26.2", "scipy==1.11.4",
        "networkx==3.2.1", "pyvis==0.3.2", "chromadb==0.4.18",
        "sentence-transformers==2.2.2", "gradio==4.8.0",
        "requests==2.31.0", "python-dotenv==1.0.0", "tqdm==4.66.1"
    ]
    special_packages = [("unstructured[pdf]==0.10.30", "PDF"), ("ollama==0.1.2", "LLM")]
    installed, failed = [], []
    print("\nInstalling core packages...")
    for pkg in core_packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
            installed.append(pkg.split("==")[0])
            print(f"   [OK] {pkg}")
        except: failed.append(pkg); print(f"   [FAIL] {pkg}")
    print("\nInstalling special packages...")
    for pkg, desc in special_packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
            installed.append(pkg.split("==")[0].split("[")[0])
            print(f"   [OK] {pkg}")
        except: failed.append(pkg); print(f"   [SKIP] {pkg}")
    print(f"\nInstalled: {len(installed)}, Failed: {len(failed)}")
    return installed, failed

def verify_imports():
    print("\nVERIFYING IMPORTS...")
    imports = ["pandas", "numpy", "scipy", "networkx", "chromadb", "gradio", "requests", "tqdm"]
    for m in imports:
        try: exec(f"import {m}"); print(f"   [OK] {m}")
        except: print(f"   [FAIL] {m}")

INSTALLED_PACKAGES, FAILED_PACKAGES = install_dependencies()
verify_imports()

DEPENDENCY INSTALLATION

Installing core packages...
   [OK] pandas==2.1.3
   [OK] numpy==1.26.2
   [OK] scipy==1.11.4
   [OK] networkx==3.2.1
   [OK] pyvis==0.3.2
   [OK] chromadb==0.4.18
   [OK] sentence-transformers==2.2.2
   [OK] gradio==4.8.0
   [OK] requests==2.31.0
   [OK] python-dotenv==1.0.0
   [OK] tqdm==4.66.1

Installing special packages...
   [SKIP] unstructured[pdf]==0.10.30
   [OK] ollama==0.1.2

Installed: 12, Failed: 1

VERIFYING IMPORTS...
   [FAIL] pandas
   [OK] numpy
   [FAIL] scipy
   [OK] networkx
   [FAIL] chromadb
   [FAIL] gradio
   [OK] requests
   [OK] tqdm


In [3]:
# Ollama Server Setup
import subprocess, time, requests, os

OLLAMA_HOST = "127.0.0.1"
OLLAMA_PORT = 11434
OLLAMA_URL = f"http://{OLLAMA_HOST}:{OLLAMA_PORT}"

def is_colab():
    try: import google.colab; return True
    except: return False

def install_ollama():
    print("=" * 60 + "\nOLLAMA INSTALLATION\n" + "=" * 60)
    try:
        r = subprocess.run(['ollama', '--version'], capture_output=True, text=True, timeout=10)
        if r.returncode == 0: print(f"\nOllama installed: {r.stdout.strip()}"); return True
    except: pass
    print("\nInstalling Ollama...")
    try:
        r = subprocess.run("curl -fsSL https://ollama.com/install.sh | sh", shell=True, capture_output=True, text=True, timeout=300)
        if r.returncode == 0: print("   [OK] Installed"); return True
    except: pass
    return False

def check_ollama_health():
    try: return requests.get(f"{OLLAMA_URL}/api/tags", timeout=5).status_code == 200
    except: return False

def start_ollama_server():
    print("\nStarting Ollama server...")
    if check_ollama_health(): print("   [OK] Already running"); return True
    try:
        env = os.environ.copy(); env['OLLAMA_HOST'] = f"{OLLAMA_HOST}:{OLLAMA_PORT}"
        p = subprocess.Popen(['ollama', 'serve'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env, start_new_session=True)
        for i in range(30):
            time.sleep(1)
            if check_ollama_health(): print(f"   [OK] Started (PID: {p.pid})"); return True
    except: pass
    return False

print(f"Environment: {'Colab' if is_colab() else 'Local'}")
OLLAMA_INSTALLED = install_ollama()
OLLAMA_RUNNING = start_ollama_server() if OLLAMA_INSTALLED else False
OLLAMA_AVAILABLE = OLLAMA_RUNNING

Environment: Colab
OLLAMA INSTALLATION

Installing Ollama...
   [OK] Installed

Starting Ollama server...
   [OK] Started (PID: 6899)


In [4]:
# DeepSeek Model Download
import requests, time, json

MODEL_NAME = "deepseek-r1:32b"
OLLAMA_URL = "http://127.0.0.1:11434"

def check_model_exists(name):
    try:
        r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=10)
        if r.status_code == 200:
            for m in r.json().get('models', []):
                if m.get('name', '').startswith(name.split(':')[0]): return True
    except: pass
    return False

def download_model(name):
    print("=" * 60 + f"\nDEEPSEEK MODEL DOWNLOAD\nModel: {name}\n" + "=" * 60)
    if check_model_exists(name): print(f"\n[OK] Already downloaded!"); return True
    print(f"\nDownloading {name}...")
    try:
        r = requests.post(f"{OLLAMA_URL}/api/pull", json={"name": name, "stream": True}, stream=True, timeout=None)
        if r.status_code != 200: return False
        last_pct = 0
        for line in r.iter_lines():
            if line:
                try:
                    d = json.loads(line)
                    if 'total' in d and 'completed' in d and d['total'] > 0:
                        pct = (d['completed'] / d['total']) * 100
                        if pct - last_pct >= 10:
                            print(f"   Progress: {pct:.0f}%")
                            last_pct = pct
                except: pass
        time.sleep(2)
        if check_model_exists(name): print("\n[SUCCESS] Downloaded!"); return True
    except Exception as e: print(f"\n[FAIL] {e}")
    return False

def test_model(name):
    print("\nTesting model...")
    try:
        r = requests.post(f"{OLLAMA_URL}/api/generate", json={"model": name, "prompt": "What is 2+2?", "stream": False, "options": {"temperature": 0, "num_predict": 10}}, timeout=60)
        if r.status_code == 200:
            ans = r.json().get('response', '').strip()
            print(f"   Response: {ans}")
            return '4' in ans
    except: pass
    return False

MODEL_DOWNLOADED = download_model(MODEL_NAME)
MODEL_READY = test_model(MODEL_NAME) if MODEL_DOWNLOADED else False
DEEPSEEK_AVAILABLE = MODEL_READY
DEEPSEEK_MODEL = MODEL_NAME if MODEL_READY else None

DEEPSEEK MODEL DOWNLOAD
Model: deepseek-r1:32b

Downloading deepseek-r1:32b...
   Progress: 10%
   Progress: 20%
   Progress: 30%
   Progress: 40%
   Progress: 50%
   Progress: 60%
   Progress: 71%
   Progress: 81%
   Progress: 91%

[SUCCESS] Downloaded!

Testing model...
   Response: 


In [5]:
# Google Drive Mounting and Directory Setup
import os
from pathlib import Path

BASE_DIR_NAME = "ARS-VG-Analyzer"
SUBDIRECTORIES = ["input", "processed", "chromadb", "results", "graphs"]

def is_colab():
    try: import google.colab; return True
    except: return False

def mount_google_drive():
    print("=" * 60 + "\nGOOGLE DRIVE MOUNTING\n" + "=" * 60)
    if not is_colab(): print("\nNot in Colab. Using local storage."); return None
    try:
        from google.colab import drive
        drive_path = Path("/content/drive/MyDrive")
        if drive_path.exists(): print("\n[OK] Already mounted!"); return str(drive_path)
        print("\nMounting Google Drive...")
        drive.mount('/content/drive')
        if drive_path.exists(): print("\n[OK] Mounted!"); return str(drive_path)
    except Exception as e: print(f"\n[FAIL] {e}")
    return None

def create_directory_structure(base_path):
    print("\n" + "-" * 60 + "\nDIRECTORY STRUCTURE\n" + "-" * 60)
    analyzer_dir = Path(base_path) / BASE_DIR_NAME
    paths = {"base": str(analyzer_dir)}
    try: analyzer_dir.mkdir(parents=True, exist_ok=True); print(f"\n[OK] Base: {analyzer_dir}")
    except Exception as e: print(f"\n[FAIL] {e}"); return None
    for subdir in SUBDIRECTORIES:
        try:
            (analyzer_dir / subdir).mkdir(parents=True, exist_ok=True)
            paths[subdir] = str(analyzer_dir / subdir)
            print(f"   [OK] {subdir}/")
        except: pass
    return paths

def verify_dirs(paths):
    print("\n" + "-" * 60 + "\nVERIFICATION\n" + "-" * 60)
    for name, path in paths.items():
        status = "[OK]" if Path(path).exists() else "[FAIL]"
        print(f"   {status} {name}: {path}")
    return all(Path(p).exists() for p in paths.values())

# Main execution
if is_colab():
    DRIVE_PATH = mount_google_drive()
    BASE_PATH = DRIVE_PATH if DRIVE_PATH else "/content"
    DRIVE_MOUNTED = DRIVE_PATH is not None
else:
    print("=" * 60 + "\nLOCAL MODE\n" + "=" * 60)
    BASE_PATH = os.getcwd()
    DRIVE_MOUNTED = False

PATHS = create_directory_structure(BASE_PATH)
DIRS_VALID = verify_dirs(PATHS) if PATHS else False

print(f"\nStorage: {'Google Drive' if DRIVE_MOUNTED else 'Local'}")
print("=" * 60)

# Export paths
INPUT_DIR = PATHS.get("input") if PATHS else None
PROCESSED_DIR = PATHS.get("processed") if PATHS else None
CHROMADB_DIR = PATHS.get("chromadb") if PATHS else None
RESULTS_DIR = PATHS.get("results") if PATHS else None
GRAPHS_DIR = PATHS.get("graphs") if PATHS else None
BASE_DIR = PATHS.get("base") if PATHS else None

GOOGLE DRIVE MOUNTING

Mounting Google Drive...
Mounted at /content/drive

[OK] Mounted!

------------------------------------------------------------
DIRECTORY STRUCTURE
------------------------------------------------------------

[OK] Base: /content/drive/MyDrive/ARS-VG-Analyzer
   [OK] input/
   [OK] processed/
   [OK] chromadb/
   [OK] results/
   [OK] graphs/

------------------------------------------------------------
VERIFICATION
------------------------------------------------------------
   [OK] base: /content/drive/MyDrive/ARS-VG-Analyzer
   [OK] input: /content/drive/MyDrive/ARS-VG-Analyzer/input
   [OK] processed: /content/drive/MyDrive/ARS-VG-Analyzer/processed
   [OK] chromadb: /content/drive/MyDrive/ARS-VG-Analyzer/chromadb
   [OK] results: /content/drive/MyDrive/ARS-VG-Analyzer/results
   [OK] graphs: /content/drive/MyDrive/ARS-VG-Analyzer/graphs

Storage: Google Drive


## Section 2: Data Structures and Schema

In [6]:
# Configuration Dataclasses
"""
Central configuration for the ARS-VG Analyzer using Python dataclasses.
Provides sensible defaults, environment detection, and centralized settings management.
"""

from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Any, Tuple, Literal
from pathlib import Path
import os
import json

def _is_colab() -> bool:
    """Check if running in Google Colab environment."""
    try:
        import google.colab
        return True
    except ImportError:
        return False

@dataclass
class LLMConfig:
    """Configuration for LLM (Ollama/DeepSeek) settings."""
    model_name: str = "deepseek-r1:32b"
    ollama_host: str = "127.0.0.1"
    ollama_port: int = 11434
    temperature: float = 0.1
    max_tokens: int = 4096
    timeout: int = 120
    num_ctx: int = 8192

    @property
    def ollama_url(self) -> str:
        """Get the full Ollama API URL."""
        return f"http://{self.ollama_host}:{self.ollama_port}"

@dataclass
class EmbeddingConfig:
    """Configuration for embedding model settings."""
    model_name: str = "all-MiniLM-L6-v2"
    dimension: int = 384
    batch_size: int = 32
    normalize: bool = True

@dataclass
class ChunkingConfig:
    """Configuration for document chunking."""
    chunk_size: int = 1000
    chunk_overlap: int = 200
    min_chunk_length: int = 100
    separator: str = "\n\n"

@dataclass
class GraphConfig:
    """Configuration for vulnerability graph construction."""
    max_nodes: int = 500
    edge_threshold: float = 0.7
    layout_algorithm: str = "force_directed"
    node_size_range: Tuple[int, int] = (10, 50)
    physics_enabled: bool = True
    hierarchical: bool = False

@dataclass
class AnalysisConfig:
    """Configuration for AEM/REM analysis thresholds."""
    aem_threshold: float = 0.65
    rem_threshold: float = 0.55
    confidence_minimum: float = 0.5
    substitution_detection_threshold: float = 0.6
    max_iterations: int = 100
    convergence_epsilon: float = 0.001

@dataclass
class PathConfig:
    """Configuration for file paths - initialized from global vars or defaults."""
    base_dir: str = ""
    input_dir: str = ""
    processed_dir: str = ""
    chromadb_dir: str = ""
    results_dir: str = ""
    graphs_dir: str = ""

    def __post_init__(self):
        """Initialize paths from global variables or compute defaults."""
        g = globals()
        if not self.base_dir:
            self.base_dir = g.get('BASE_DIR') or os.getcwd()
        if not self.input_dir:
            self.input_dir = g.get('INPUT_DIR') or str(Path(self.base_dir) / 'input')
        if not self.processed_dir:
            self.processed_dir = g.get('PROCESSED_DIR') or str(Path(self.base_dir) / 'processed')
        if not self.chromadb_dir:
            self.chromadb_dir = g.get('CHROMADB_DIR') or str(Path(self.base_dir) / 'chromadb')
        if not self.results_dir:
            self.results_dir = g.get('RESULTS_DIR') or str(Path(self.base_dir) / 'results')
        if not self.graphs_dir:
            self.graphs_dir = g.get('GRAPHS_DIR') or str(Path(self.base_dir) / 'graphs')

    def as_dict(self) -> Dict[str, str]:
        """Return all paths as a dictionary."""
        return {
            'base': self.base_dir, 'input': self.input_dir, 'processed': self.processed_dir,
            'chromadb': self.chromadb_dir, 'results': self.results_dir, 'graphs': self.graphs_dir
        }

@dataclass
class Config:
    """Main configuration class combining all settings."""
    llm: LLMConfig = field(default_factory=LLMConfig)
    embedding: EmbeddingConfig = field(default_factory=EmbeddingConfig)
    chunking: ChunkingConfig = field(default_factory=ChunkingConfig)
    graph: GraphConfig = field(default_factory=GraphConfig)
    analysis: AnalysisConfig = field(default_factory=AnalysisConfig)
    paths: PathConfig = field(default_factory=PathConfig)
    is_colab: bool = field(default_factory=_is_colab)
    gpu_available: bool = False
    debug: bool = False

    def __post_init__(self):
        """Initialize GPU availability from global variables."""
        self.gpu_available = globals().get('GPU_AVAILABLE', False)

    def display(self):
        """Display configuration summary."""
        print("=" * 60)
        print("CONFIGURATION SUMMARY")
        print("=" * 60)
        print(f"\nEnvironment:")
        print(f"   - Platform: {'Google Colab' if self.is_colab else 'Local'}")
        print(f"   - GPU Available: {self.gpu_available}")
        print(f"   - Debug Mode: {self.debug}")
        print(f"\nLLM Configuration:")
        print(f"   - Model: {self.llm.model_name}")
        print(f"   - Ollama URL: {self.llm.ollama_url}")
        print(f"   - Temperature: {self.llm.temperature}")
        print(f"   - Max Tokens: {self.llm.max_tokens}")
        print(f"\nEmbedding Configuration:")
        print(f"   - Model: {self.embedding.model_name}")
        print(f"   - Dimension: {self.embedding.dimension}")
        print(f"\nChunking Configuration:")
        print(f"   - Chunk Size: {self.chunking.chunk_size}")
        print(f"   - Overlap: {self.chunking.chunk_overlap}")
        print(f"\nAnalysis Thresholds:")
        print(f"   - AEM Threshold: {self.analysis.aem_threshold}")
        print(f"   - REM Threshold: {self.analysis.rem_threshold}")
        print(f"   - Substitution Detection: {self.analysis.substitution_detection_threshold}")
        print(f"\nPaths:")
        for name, path in self.paths.as_dict().items():
            print(f"   - {name}: {path}")
        print("\n" + "=" * 60)

# Create and display global configuration instance
CONFIG = Config()
CONFIG.display()

# Verify configuration is accessible
print(f"\n[OK] Config instantiated successfully")
print(f"[OK] LLM URL: {CONFIG.llm.ollama_url}")
print(f"[OK] Environment: {'Colab' if CONFIG.is_colab else 'Local'}")

CONFIGURATION SUMMARY

Environment:
   - Platform: Google Colab
   - GPU Available: True
   - Debug Mode: False

LLM Configuration:
   - Model: deepseek-r1:32b
   - Ollama URL: http://127.0.0.1:11434
   - Temperature: 0.1
   - Max Tokens: 4096

Embedding Configuration:
   - Model: all-MiniLM-L6-v2
   - Dimension: 384

Chunking Configuration:
   - Chunk Size: 1000
   - Overlap: 200

Analysis Thresholds:
   - AEM Threshold: 0.65
   - REM Threshold: 0.55
   - Substitution Detection: 0.6

Paths:
   - base: /content/drive/MyDrive/ARS-VG-Analyzer
   - input: /content/drive/MyDrive/ARS-VG-Analyzer/input
   - processed: /content/drive/MyDrive/ARS-VG-Analyzer/processed
   - chromadb: /content/drive/MyDrive/ARS-VG-Analyzer/chromadb
   - results: /content/drive/MyDrive/ARS-VG-Analyzer/results
   - graphs: /content/drive/MyDrive/ARS-VG-Analyzer/graphs


[OK] Config instantiated successfully
[OK] LLM URL: http://127.0.0.1:11434
[OK] Environment: Colab


In [7]:
# Financial Data Structures
"""
Core data structures for representing financial facts, claims, and governance data.
These dataclasses form the canonical schema for the ARS-VG analysis pipeline.
"""

from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Any, Literal
import json
from datetime import datetime

@dataclass
class QuantitativeFact:
    """
    Represents a quantitative financial fact extracted from financial statements.
    Used for storing numerical data like Revenue, COGS, Inventory values.
    """
    account_name: str  # e.g., "Revenue", "Inventory", "Accounts Receivable"
    value: float  # The numerical value
    period: str  # e.g., "FY2024", "Q3-2024"
    currency: str = "USD"  # Currency code
    source_table: str = ""  # Reference to source table in document
    footnote_refs: List[str] = field(default_factory=list)  # References like ["Note 4", "Note 7"]
    unit_scale: str = "units"  # "thousands", "millions", "billions", "units"
    confidence: float = 1.0  # Extraction confidence score

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

    def to_json(self) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=2)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'QuantitativeFact':
        """Create instance from dictionary."""
        return cls(**data)

    def scaled_value(self) -> float:
        """Return value adjusted by unit scale."""
        scale_map = {"units": 1, "thousands": 1e3, "millions": 1e6, "billions": 1e9}
        return self.value * scale_map.get(self.unit_scale, 1)

@dataclass
class QualitativeClaim:
    """
    Represents a qualitative claim from MD&A or notes sections.
    Used for storing textual claims that need LLM evaluation.
    """
    section: str  # e.g., "MD&A", "Note 4", "Risk Factors"
    text: str  # The actual claim text
    embedded_numbers: List[str] = field(default_factory=list)  # Numbers mentioned in text
    sentiment_indicators: Dict[str, float] = field(default_factory=dict)  # e.g., {"positive": 0.7}
    page_number: Optional[int] = None
    paragraph_index: int = 0
    related_accounts: List[str] = field(default_factory=list)  # Account names referenced

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

    def to_json(self) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=2)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'QualitativeClaim':
        """Create instance from dictionary."""
        return cls(**data)

@dataclass
class GovernanceVector:
    """
    Represents governance and audit-related metadata.
    Used for computing AEM/REM constraint scores.
    """
    auditor_type: Literal["Big4", "Non-Big4"] = "Non-Big4"
    auditor_tenure: int = 0  # Years with current auditor
    sox_compliant: bool = True  # SOX 404 compliance
    institutional_ownership: float = 0.0  # Percentage (0-100)
    analyst_coverage: int = 0  # Number of analysts
    insider_ownership: float = 0.0  # Percentage (0-100)
    board_independence: float = 0.0  # Percentage of independent directors
    audit_committee_expertise: bool = False  # Financial expert on audit committee

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

    def to_json(self) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=2)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'GovernanceVector':
        """Create instance from dictionary."""
        return cls(**data)

# Test the data structures
print("=" * 60)
print("FINANCIAL DATA STRUCTURES TEST")
print("=" * 60)

# Test QuantitativeFact
fact = QuantitativeFact(
    account_name="Revenue",
    value=1500.5,
    period="FY2024",
    currency="USD",
    source_table="Income Statement",
    footnote_refs=["Note 2", "Note 4"],
    unit_scale="millions"
)
print(f"\n[OK] QuantitativeFact created:")
print(f"   - Account: {fact.account_name}")
print(f"   - Value: {fact.value} {fact.unit_scale}")
print(f"   - Scaled Value: ${fact.scaled_value():,.0f}")
print(f"   - Period: {fact.period}")
print(f"   - Footnotes: {fact.footnote_refs}")

# Test QualitativeClaim
claim = QualitativeClaim(
    section="MD&A",
    text="Revenue growth of 15% was driven by strong performance in our cloud services segment.",
    embedded_numbers=["15%"],
    sentiment_indicators={"positive": 0.85, "neutral": 0.15},
    related_accounts=["Revenue", "Cloud Services Revenue"]
)
print(f"\n[OK] QualitativeClaim created:")
print(f"   - Section: {claim.section}")
print(f"   - Text length: {len(claim.text)} chars")
print(f"   - Embedded numbers: {claim.embedded_numbers}")

# Test GovernanceVector
governance = GovernanceVector(
    auditor_type="Big4",
    auditor_tenure=5,
    sox_compliant=True,
    institutional_ownership=65.5,
    analyst_coverage=12
)
print(f"\n[OK] GovernanceVector created:")
print(f"   - Auditor: {governance.auditor_type}")
print(f"   - SOX Compliant: {governance.sox_compliant}")
print(f"   - Institutional Ownership: {governance.institutional_ownership}%")

# Test JSON serialization
print(f"\n[OK] JSON serialization works:")
fact_json = fact.to_json()
print(f"   - QuantitativeFact JSON length: {len(fact_json)} chars")

# Test round-trip
fact_restored = QuantitativeFact.from_dict(json.loads(fact_json))
print(f"   - Round-trip verified: {fact_restored.account_name == fact.account_name}")

print("\n" + "=" * 60)

FINANCIAL DATA STRUCTURES TEST

[OK] QuantitativeFact created:
   - Account: Revenue
   - Value: 1500.5 millions
   - Scaled Value: $1,500,500,000
   - Period: FY2024
   - Footnotes: ['Note 2', 'Note 4']

[OK] QualitativeClaim created:
   - Section: MD&A
   - Text length: 85 chars
   - Embedded numbers: ['15%']

[OK] GovernanceVector created:
   - Auditor: Big4
   - SOX Compliant: True
   - Institutional Ownership: 65.5%

[OK] JSON serialization works:
   - QuantitativeFact JSON length: 233 chars
   - Round-trip verified: True



In [8]:
# Graph Data Structures - FinancialNode and FinancialEdge
"""
Data structures for the vulnerability graph representing financial statement relationships.
Used by NetworkX and PyVis for graph analysis and visualization.
"""

from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Any, Literal
import json

@dataclass
class FinancialNode:
    """
    Represents a node in the financial vulnerability graph.
    Can represent an account (Revenue, COGS), a ratio (DSO, DIO), or governance metric.
    """
    node_id: str  # Unique identifier e.g., "revenue_fy2024", "dso_fy2024"
    node_type: Literal["ACCOUNT", "RATIO", "GOVERNANCE"] = "ACCOUNT"
    value: float = 0.0  # Current value
    period: str = ""  # Time period e.g., "FY2024"
    label: str = ""  # Display label
    metadata: Dict[str, Any] = field(default_factory=dict)  # Additional attributes
    risk_score: float = 0.0  # Calculated risk level (0-1)
    category: str = ""  # Category e.g., "Income Statement", "Balance Sheet"

    def __post_init__(self):
        """Set default label if not provided."""
        if not self.label:
            self.label = self.node_id.replace("_", " ").title()

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

    def to_json(self) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=2)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'FinancialNode':
        """Create instance from dictionary."""
        return cls(**data)

    def get_color(self) -> str:
        """Get node color based on risk score."""
        if self.risk_score >= 0.7: return "#ff4444"  # Red - High risk
        elif self.risk_score >= 0.4: return "#ffaa00"  # Orange - Medium risk
        else: return "#44aa44"  # Green - Low risk

    def get_size(self, min_size: int = 10, max_size: int = 50) -> int:
        """Get node size based on value magnitude."""
        if self.value == 0: return min_size
        import math
        log_val = math.log10(abs(self.value) + 1)
        size = min_size + (max_size - min_size) * min(log_val / 10, 1)
        return int(size)

@dataclass
class FinancialEdge:
    """
    Represents an edge (relationship) in the financial vulnerability graph.
    Types: IDENTITY (accounting equations), CORRELATION (statistical), REGULATORY (compliance).
    """
    source: str  # Source node_id
    target: str  # Target node_id
    edge_type: Literal["IDENTITY", "CORRELATION", "REGULATORY"] = "CORRELATION"
    weight: float = 1.0  # Edge weight/strength
    strain: Optional[float] = None  # Deviation from expected (None if not calculated)
    expected_ratio: Optional[float] = None  # Expected relationship ratio
    actual_ratio: Optional[float] = None  # Actual observed ratio
    std_dev: Optional[float] = None  # Historical standard deviation
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

    def to_json(self) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=2)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'FinancialEdge':
        """Create instance from dictionary."""
        return cls(**data)

    def calculate_strain(self) -> float:
        """Calculate strain if expected and actual ratios are available."""
        if self.expected_ratio is None or self.actual_ratio is None:
            return 0.0
        if self.std_dev and self.std_dev > 0:
            self.strain = abs(self.actual_ratio - self.expected_ratio) / self.std_dev
        else:
            self.strain = abs(self.actual_ratio - self.expected_ratio)
        return self.strain

    def get_color(self) -> str:
        """Get edge color based on strain."""
        if self.strain is None: return "#888888"  # Gray - No strain calculated
        if self.strain >= 2.0: return "#ff0000"  # Red - High strain
        elif self.strain >= 1.0: return "#ff8800"  # Orange - Medium strain
        else: return "#00aa00"  # Green - Low strain

    def get_width(self, min_width: float = 1.0, max_width: float = 5.0) -> float:
        """Get edge width based on weight."""
        return min_width + (max_width - min_width) * min(self.weight, 1.0)

# Test the graph data structures
print("=" * 60)
print("GRAPH DATA STRUCTURES TEST")
print("=" * 60)

# Test FinancialNode with ACCOUNT type
node_revenue = FinancialNode(
    node_id="revenue_fy2024",
    node_type="ACCOUNT",
    value=1500000000,
    period="FY2024",
    category="Income Statement",
    metadata={"source": "10-K", "audited": True}
)
print(f"\n[OK] FinancialNode (ACCOUNT) created:")
print(f"   - ID: {node_revenue.node_id}")
print(f"   - Type: {node_revenue.node_type}")
print(f"   - Label: {node_revenue.label}")
print(f"   - Value: ${node_revenue.value:,.0f}")
print(f"   - Size: {node_revenue.get_size()}")

# Test FinancialNode with RATIO type
node_dso = FinancialNode(
    node_id="dso_fy2024",
    node_type="RATIO",
    value=45.5,
    period="FY2024",
    label="Days Sales Outstanding",
    risk_score=0.75,
    category="Efficiency Ratio"
)
print(f"\n[OK] FinancialNode (RATIO) created:")
print(f"   - ID: {node_dso.node_id}")
print(f"   - Type: {node_dso.node_type}")
print(f"   - Value: {node_dso.value} days")
print(f"   - Risk Score: {node_dso.risk_score}")
print(f"   - Color: {node_dso.get_color()}")

# Test FinancialEdge with IDENTITY type
edge_identity = FinancialEdge(
    source="revenue_fy2024",
    target="ar_fy2024",
    edge_type="IDENTITY",
    weight=1.0,
    expected_ratio=1.0,
    actual_ratio=0.08,
    metadata={"equation": "AR = Revenue * DSO/365"}
)
print(f"\n[OK] FinancialEdge (IDENTITY) created:")
print(f"   - Source: {edge_identity.source}")
print(f"   - Target: {edge_identity.target}")
print(f"   - Type: {edge_identity.edge_type}")

# Test FinancialEdge with CORRELATION type and strain
edge_corr = FinancialEdge(
    source="revenue_fy2024",
    target="cogs_fy2024",
    edge_type="CORRELATION",
    weight=0.85,
    expected_ratio=0.65,
    actual_ratio=0.72,
    std_dev=0.03
)
strain = edge_corr.calculate_strain()
print(f"\n[OK] FinancialEdge (CORRELATION) with strain:")
print(f"   - Expected Ratio: {edge_corr.expected_ratio}")
print(f"   - Actual Ratio: {edge_corr.actual_ratio}")
print(f"   - Strain: {strain:.2f} std devs")
print(f"   - Color: {edge_corr.get_color()}")

# Test metadata dict handling
print(f"\n[OK] Metadata dict handling:")
print(f"   - Node metadata: {node_revenue.metadata}")
print(f"   - Edge metadata: {edge_identity.metadata}")

print("\n" + "=" * 60)

GRAPH DATA STRUCTURES TEST

[OK] FinancialNode (ACCOUNT) created:
   - ID: revenue_fy2024
   - Type: ACCOUNT
   - Label: Revenue Fy2024
   - Value: $1,500,000,000
   - Size: 46

[OK] FinancialNode (RATIO) created:
   - ID: dso_fy2024
   - Type: RATIO
   - Value: 45.5 days
   - Risk Score: 0.75
   - Color: #ff4444

[OK] FinancialEdge (IDENTITY) created:
   - Source: revenue_fy2024
   - Target: ar_fy2024
   - Type: IDENTITY

[OK] FinancialEdge (CORRELATION) with strain:
   - Expected Ratio: 0.65
   - Actual Ratio: 0.72
   - Strain: 2.33 std devs
   - Color: #ff0000

[OK] Metadata dict handling:
   - Node metadata: {'source': '10-K', 'audited': True}
   - Edge metadata: {'equation': 'AR = Revenue * DSO/365'}



## Section 3: Module 1 - Ingestion Service

In [9]:
# Ingestion Service
"""
Document ingestion pipeline for ARS-VG Analyzer.
Handles PDF parsing, text extraction, chunking, and format detection.
"""

import os
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple, Literal
from dataclasses import dataclass, field
import re

# Format detection
SUPPORTED_FORMATS = ["pdf", "txt", "csv", "xlsx", "html"]

def detect_format(file_path: str) -> Optional[str]:
    """
    Detect document format from file extension and magic bytes.
    Returns format string or None if unsupported.
    """
    path = Path(file_path)
    if not path.exists():
        return None

    # Check extension first
    ext = path.suffix.lower().strip('.')
    if ext in SUPPORTED_FORMATS:
        return ext

    # Check magic bytes for PDF
    try:
        with open(file_path, 'rb') as f:
            header = f.read(8)
            if header.startswith(b'%PDF'):
                return 'pdf'
    except:
        pass

    return None

def validate_file(file_path: str) -> Tuple[bool, str]:
    """
    Validate a file for processing.
    Returns (is_valid, message).
    """
    path = Path(file_path)

    if not path.exists():
        return False, f"File not found: {file_path}"

    if not path.is_file():
        return False, f"Not a file: {file_path}"

    # Check file size (max 100MB)
    size_mb = path.stat().st_size / (1024 * 1024)
    if size_mb > 100:
        return False, f"File too large: {size_mb:.1f}MB (max 100MB)"

    fmt = detect_format(file_path)
    if fmt is None:
        return False, f"Unsupported format: {path.suffix}"

    return True, f"Valid {fmt.upper()} file ({size_mb:.2f}MB)"

@dataclass
class TextChunk:
    """Represents a chunk of extracted text."""
    content: str
    chunk_id: int
    source_file: str
    page_number: Optional[int] = None
    section: str = ""
    start_char: int = 0
    end_char: int = 0
    metadata: Dict[str, Any] = field(default_factory=dict)

    def __len__(self) -> int:
        return len(self.content)

def extract_text_from_pdf(file_path: str) -> Tuple[str, List[Dict]]:
    """
    Extract text from PDF using unstructured library.
    Returns (full_text, page_info_list).
    """
    pages_info = []

    try:
        from unstructured.partition.pdf import partition_pdf
        elements = partition_pdf(file_path)

        full_text = ""
        current_page = 1
        page_text = ""

        for elem in elements:
            text = str(elem)
            elem_page = getattr(elem.metadata, 'page_number', current_page) if hasattr(elem, 'metadata') else current_page

            if elem_page != current_page:
                if page_text.strip():
                    pages_info.append({"page": current_page, "text": page_text.strip()})
                page_text = ""
                current_page = elem_page

            page_text += text + "\n"
            full_text += text + "\n"

        # Add last page
        if page_text.strip():
            pages_info.append({"page": current_page, "text": page_text.strip()})

        return full_text.strip(), pages_info

    except ImportError:
        # Fallback: try PyPDF2
        try:
            import PyPDF2
            with open(file_path, 'rb') as f:
                reader = PyPDF2.PdfReader(f)
                full_text = ""
                for i, page in enumerate(reader.pages):
                    text = page.extract_text() or ""
                    pages_info.append({"page": i+1, "text": text.strip()})
                    full_text += text + "\n"
            return full_text.strip(), pages_info
        except:
            pass
    except Exception as e:
        print(f"PDF extraction error: {e}")

    return "", []

def extract_text_from_txt(file_path: str) -> str:
    """Extract text from plain text file."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin-1') as f:
            return f.read()

def chunk_text(
    text: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    source_file: str = "",
    min_chunk_length: int = 100
) -> List[TextChunk]:
    """
    Split text into overlapping chunks.
    """
    if not text or len(text) < min_chunk_length:
        if text:
            return [TextChunk(content=text, chunk_id=0, source_file=source_file, start_char=0, end_char=len(text))]
        return []

    chunks = []
    start = 0
    chunk_id = 0

    while start < len(text):
        end = start + chunk_size

        # Try to break at sentence boundary
        if end < len(text):
            # Look for sentence endings
            search_start = max(start + chunk_size - 100, start)
            search_end = min(start + chunk_size + 100, len(text))
            search_text = text[search_start:search_end]

            # Find best break point
            for pattern in ['. ', '.\n', '! ', '? ', '\n\n']:
                idx = search_text.rfind(pattern)
                if idx > 0:
                    end = search_start + idx + len(pattern)
                    break
        else:
            end = len(text)

        chunk_content = text[start:end].strip()

        if len(chunk_content) >= min_chunk_length:
            chunks.append(TextChunk(
                content=chunk_content,
                chunk_id=chunk_id,
                source_file=source_file,
                start_char=start,
                end_char=end
            ))
            chunk_id += 1

        # Move start position with overlap
        start = end - chunk_overlap
        if start >= len(text) - min_chunk_length:
            break

    return chunks

@dataclass
class ProcessedDocument:
    """Result of document processing."""
    file_path: str
    format: str
    full_text: str
    chunks: List[TextChunk]
    pages: List[Dict]
    metadata: Dict[str, Any] = field(default_factory=dict)
    success: bool = True
    error: Optional[str] = None

def process_document(
    file_path: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200
) -> ProcessedDocument:
    """
    Main document processing function.
    Detects format, extracts text, and creates chunks.
    """
    # Validate file
    is_valid, message = validate_file(file_path)
    if not is_valid:
        return ProcessedDocument(
            file_path=file_path, format="unknown", full_text="",
            chunks=[], pages=[], success=False, error=message
        )

    fmt = detect_format(file_path)
    full_text = ""
    pages = []

    # Extract text based on format
    if fmt == "pdf":
        full_text, pages = extract_text_from_pdf(file_path)
    elif fmt == "txt":
        full_text = extract_text_from_txt(file_path)
        pages = [{"page": 1, "text": full_text}]
    else:
        return ProcessedDocument(
            file_path=file_path, format=fmt, full_text="",
            chunks=[], pages=[], success=False, error=f"Format not yet supported: {fmt}"
        )

    if not full_text:
        return ProcessedDocument(
            file_path=file_path, format=fmt, full_text="",
            chunks=[], pages=[], success=False, error="No text extracted"
        )

    # Create chunks
    chunks = chunk_text(full_text, chunk_size, chunk_overlap, file_path)

    return ProcessedDocument(
        file_path=file_path,
        format=fmt,
        full_text=full_text,
        chunks=chunks,
        pages=pages,
        metadata={
            "char_count": len(full_text),
            "chunk_count": len(chunks),
            "page_count": len(pages)
        }
    )

# Test the ingestion service
print("=" * 60)
print("INGESTION SERVICE TEST")
print("=" * 60)

# Test format detection
print("\n[OK] Format Detection:")
print(f"   - PDF detection: {detect_format('test.pdf') if detect_format('test.pdf') else 'N/A (no file)'}")
print(f"   - TXT detection: {'txt' == detect_format.__code__.co_consts[0] if hasattr(detect_format, '__code__') else 'function works'}")

# Test file validation
test_path = "/nonexistent/file.pdf"
is_valid, msg = validate_file(test_path)
print(f"\n[OK] File Validation:")
print(f"   - Invalid file handled: {not is_valid}")
print(f"   - Message: {msg}")

# Test chunking
sample_text = "This is sentence one. This is sentence two. " * 50
chunks = chunk_text(sample_text, chunk_size=200, chunk_overlap=50, source_file="test.txt")
print(f"\n[OK] Text Chunking:")
print(f"   - Input length: {len(sample_text)} chars")
print(f"   - Chunks created: {len(chunks)}")
if chunks:
    print(f"   - First chunk size: {len(chunks[0])} chars")
    print(f"   - Chunk overlap working: {chunks[0].end_char > chunks[1].start_char if len(chunks) > 1 else 'N/A'}")

# Test TextChunk dataclass
chunk = TextChunk(content="Test content", chunk_id=0, source_file="test.pdf", page_number=1)
print(f"\n[OK] TextChunk dataclass:")
print(f"   - Content accessible: {bool(chunk.content)}")
print(f"   - Metadata dict: {chunk.metadata}")

print("\n" + "=" * 60)

INGESTION SERVICE TEST

[OK] Format Detection:
   - PDF detection: N/A (no file)
   - TXT detection: False

[OK] File Validation:
   - Invalid file handled: True
   - Message: File not found: /nonexistent/file.pdf

[OK] Text Chunking:
   - Input length: 2200 chars
   - Chunks created: 9
   - First chunk size: 285 chars
   - Chunk overlap working: True

[OK] TextChunk dataclass:
   - Content accessible: True
   - Metadata dict: {}



## Section 4: Module 2 - Reasoning Service

In [10]:
# Reasoning Service
"""
LLM-based reasoning service for ARS-VG Analyzer.
Handles Ollama client, prompt generation, and response parsing.
"""

import requests
import json
import time
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field

@dataclass
class OllamaClient:
    """Client for interacting with Ollama API."""
    host: str = "127.0.0.1"
    port: int = 11434
    model: str = "deepseek-r1:32b"
    timeout: int = 120
    max_retries: int = 3
    retry_delay: float = 2.0

    @property
    def base_url(self) -> str:
        return f"http://{self.host}:{self.port}"

    def is_connected(self) -> bool:
        """Check if Ollama server is available."""
        try:
            r = requests.get(f"{self.base_url}/api/tags", timeout=5)
            return r.status_code == 200
        except:
            return False

    def connect_with_retry(self) -> bool:
        """Connect to Ollama with retry logic."""
        for attempt in range(self.max_retries):
            if self.is_connected():
                return True
            if attempt < self.max_retries - 1:
                time.sleep(self.retry_delay)
        return False

    def generate(self, prompt: str, temperature: float = 0.1, max_tokens: int = 4096) -> Tuple[str, bool]:
        """Generate response from LLM. Returns (response_text, success)."""
        if not self.connect_with_retry():
            return "Error: Cannot connect to Ollama server", False

        try:
            payload = {
                "model": self.model,
                "prompt": prompt,
                "stream": False,
                "options": {
                    "temperature": temperature,
                    "num_predict": max_tokens
                }
            }
            r = requests.post(f"{self.base_url}/api/generate", json=payload, timeout=self.timeout)
            if r.status_code == 200:
                return r.json().get("response", ""), True
            return f"Error: HTTP {r.status_code}", False
        except requests.Timeout:
            return "Error: Request timeout", False
        except Exception as e:
            return f"Error: {str(e)}", False

    def get_model_info(self) -> Optional[Dict]:
        """Get information about the current model."""
        try:
            r = requests.get(f"{self.base_url}/api/tags", timeout=10)
            if r.status_code == 200:
                for model in r.json().get("models", []):
                    if model.get("name", "").startswith(self.model.split(":")[0]):
                        return model
        except:
            pass
        return None

@dataclass
class ReasoningPrompt:
    """Template for reasoning prompts."""
    system_context: str = ""
    task: str = ""
    data: str = ""
    output_format: str = "JSON"

    def build(self) -> str:
        """Build the full prompt string."""
        parts = []
        if self.system_context:
            parts.append(f"Context: {self.system_context}")
        parts.append(f"Task: {self.task}")
        if self.data:
            parts.append(f"Data:\n{self.data}")
        parts.append(f"Provide your response in {self.output_format} format.")
        return "\n\n".join(parts)

class ReasoningService:
    """Service for LLM-based reasoning on financial data."""

    def __init__(self, client: Optional[OllamaClient] = None):
        self.client = client or OllamaClient()
        self._cache: Dict[str, str] = {}

    def analyze_claim(self, claim_text: str, context: str = "") -> Dict[str, Any]:
        """Analyze a qualitative claim for manipulation indicators."""
        prompt = ReasoningPrompt(
            system_context="You are a forensic accounting expert analyzing financial statements for earnings manipulation.",
            task=f"Analyze this claim for potential manipulation indicators:\n\"{claim_text}\"",
            data=context,
            output_format="JSON with keys: credibility_score (0-1), red_flags (list), reasoning (string)"
        )

        response, success = self.client.generate(prompt.build())
        if not success:
            return {"error": response, "success": False}

        try:
            # Try to parse JSON from response
            json_start = response.find("{")
            json_end = response.rfind("}") + 1
            if json_start >= 0 and json_end > json_start:
                return json.loads(response[json_start:json_end])
        except:
            pass

        return {"raw_response": response, "success": True}

    def evaluate_ratio_deviation(self, ratio_name: str, expected: float, actual: float, std_dev: float) -> Dict[str, Any]:
        """Evaluate whether a ratio deviation is suspicious."""
        deviation = abs(actual - expected) / std_dev if std_dev > 0 else abs(actual - expected)

        prompt = ReasoningPrompt(
            system_context="You are analyzing financial ratios for anomalies.",
            task=f"Evaluate this ratio deviation: {ratio_name}",
            data=f"Expected: {expected:.4f}, Actual: {actual:.4f}, Deviation: {deviation:.2f} std devs",
            output_format="JSON with keys: suspicious (bool), explanation (string), severity (low/medium/high)"
        )

        response, success = self.client.generate(prompt.build(), temperature=0.1)
        if not success:
            return {"error": response, "success": False}

        try:
            json_start = response.find("{")
            json_end = response.rfind("}") + 1
            if json_start >= 0 and json_end > json_start:
                return json.loads(response[json_start:json_end])
        except:
            pass

        return {"raw_response": response, "success": True}

    def generate_substitution_hypothesis(self, aem_indicators: List[str], rem_indicators: List[str]) -> Dict[str, Any]:
        """Generate hypothesis about AEM/REM substitution patterns."""
        prompt = ReasoningPrompt(
            system_context="You are analyzing patterns of earnings manipulation.",
            task="Analyze the relationship between AEM and REM indicators to identify substitution patterns.",
            data=f"AEM Indicators: {aem_indicators}\nREM Indicators: {rem_indicators}",
            output_format="JSON with keys: substitution_detected (bool), pattern_type (string), confidence (0-1), explanation (string)"
        )

        response, success = self.client.generate(prompt.build())
        if not success:
            return {"error": response, "success": False}

        try:
            json_start = response.find("{")
            json_end = response.rfind("}") + 1
            if json_start >= 0 and json_end > json_start:
                return json.loads(response[json_start:json_end])
        except:
            pass

        return {"raw_response": response, "success": True}

# Test the reasoning service
print("=" * 60)
print("REASONING SERVICE TEST")
print("=" * 60)

# Test OllamaClient
client = OllamaClient()
print(f"\n[OK] OllamaClient created:")
print(f"   - Base URL: {client.base_url}")
print(f"   - Model: {client.model}")
print(f"   - Max retries: {client.max_retries}")

# Test connection
is_connected = client.is_connected()
print(f"\n[OK] Connection Test:")
print(f"   - Server available: {is_connected}")

# Test retry logic
print(f"\n[OK] Retry Logic:")
print(f"   - connect_with_retry method: {callable(client.connect_with_retry)}")
print(f"   - Retry delay: {client.retry_delay}s")

# Test ReasoningPrompt
prompt = ReasoningPrompt(
    system_context="Test context",
    task="Test task",
    data="Test data",
    output_format="JSON"
)
built_prompt = prompt.build()
print(f"\n[OK] ReasoningPrompt:")
print(f"   - Prompt length: {len(built_prompt)} chars")
print(f"   - Contains task: {'Test task' in built_prompt}")

# Test ReasoningService
service = ReasoningService(client)
print(f"\n[OK] ReasoningService created:")
print(f"   - analyze_claim method: {callable(service.analyze_claim)}")
print(f"   - evaluate_ratio_deviation method: {callable(service.evaluate_ratio_deviation)}")
print(f"   - generate_substitution_hypothesis method: {callable(service.generate_substitution_hypothesis)}")

# Test model info (only if connected)
if is_connected:
    model_info = client.get_model_info()
    if model_info:
        print(f"\n[OK] Model Info:")
        print(f"   - Name: {model_info.get('name', 'N/A')}")

print("\n" + "=" * 60)

REASONING SERVICE TEST

[OK] OllamaClient created:
   - Base URL: http://127.0.0.1:11434
   - Model: deepseek-r1:32b
   - Max retries: 3

[OK] Connection Test:
   - Server available: True

[OK] Retry Logic:
   - connect_with_retry method: True
   - Retry delay: 2.0s

[OK] ReasoningPrompt:
   - Prompt length: 94 chars
   - Contains task: True

[OK] ReasoningService created:
   - analyze_claim method: True
   - evaluate_ratio_deviation method: True
   - generate_substitution_hypothesis method: True

[OK] Model Info:
   - Name: deepseek-r1:32b



In [11]:
# Vector Store Service (ChromaDB)
"""
ChromaDB-based vector store for semantic search and retrieval.
Handles document embeddings, storage, and similarity queries.
"""

import os
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field

@dataclass
class VectorStoreConfig:
    """Configuration for vector store."""
    collection_name: str = "ars_vg_documents"
    embedding_model: str = "all-MiniLM-L6-v2"
    persist_directory: str = ""
    distance_metric: str = "cosine"

    def __post_init__(self):
        if not self.persist_directory:
            self.persist_directory = globals().get('CHROMADB_DIR') or './chromadb'

class VectorStore:
    """ChromaDB-based vector store for document embeddings."""

    def __init__(self, config: Optional[VectorStoreConfig] = None):
        self.config = config or VectorStoreConfig()
        self._client = None
        self._collection = None
        self._embedding_fn = None
        self._initialized = False

    def initialize(self) -> bool:
        """Initialize ChromaDB client and collection."""
        try:
            import chromadb
            from chromadb.config import Settings

            # Create persist directory if needed
            persist_path = Path(self.config.persist_directory)
            persist_path.mkdir(parents=True, exist_ok=True)

            # Initialize client with persistence
            self._client = chromadb.Client(Settings(
                chroma_db_impl="duckdb+parquet",
                persist_directory=str(persist_path),
                anonymized_telemetry=False
            ))

            # Try to load embedding function
            try:
                from chromadb.utils import embedding_functions
                self._embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
                    model_name=self.config.embedding_model
                )
            except:
                self._embedding_fn = None

            # Get or create collection
            self._collection = self._client.get_or_create_collection(
                name=self.config.collection_name,
                embedding_function=self._embedding_fn,
                metadata={"hnsw:space": self.config.distance_metric}
            )

            self._initialized = True
            return True

        except ImportError:
            print("ChromaDB not installed. Run: pip install chromadb")
            return False
        except Exception as e:
            print(f"VectorStore initialization error: {e}")
            return False

    @property
    def is_initialized(self) -> bool:
        return self._initialized and self._collection is not None

    def add_documents(self, documents: List[str], metadatas: Optional[List[Dict]] = None, ids: Optional[List[str]] = None) -> bool:
        """Add documents to the collection."""
        if not self.is_initialized:
            if not self.initialize():
                return False

        try:
            # Generate IDs if not provided
            if ids is None:
                existing_count = self._collection.count()
                ids = [f"doc_{existing_count + i}" for i in range(len(documents))]

            # Add documents
            self._collection.add(
                documents=documents,
                metadatas=metadatas or [{}] * len(documents),
                ids=ids
            )
            return True
        except Exception as e:
            print(f"Error adding documents: {e}")
            return False

    def query(self, query_text: str, n_results: int = 5) -> Dict[str, Any]:
        """Query the collection for similar documents."""
        if not self.is_initialized:
            if not self.initialize():
                return {"error": "Store not initialized", "documents": [], "distances": []}

        try:
            results = self._collection.query(
                query_texts=[query_text],
                n_results=n_results
            )
            return {
                "documents": results.get("documents", [[]])[0],
                "metadatas": results.get("metadatas", [[]])[0],
                "distances": results.get("distances", [[]])[0],
                "ids": results.get("ids", [[]])[0]
            }
        except Exception as e:
            return {"error": str(e), "documents": [], "distances": []}

    def count(self) -> int:
        """Get the number of documents in the collection."""
        if not self.is_initialized:
            return 0
        try:
            return self._collection.count()
        except:
            return 0

    def persist(self) -> bool:
        """Persist the collection to disk."""
        if not self.is_initialized:
            return False
        try:
            self._client.persist()
            return True
        except:
            return False

    def delete_collection(self) -> bool:
        """Delete the entire collection."""
        if not self.is_initialized:
            return False
        try:
            self._client.delete_collection(self.config.collection_name)
            self._collection = None
            self._initialized = False
            return True
        except:
            return False

# Test the vector store
print("=" * 60)
print("VECTOR STORE TEST")
print("=" * 60)

# Test VectorStoreConfig
config = VectorStoreConfig()
print(f"\n[OK] VectorStoreConfig created:")
print(f"   - Collection: {config.collection_name}")
print(f"   - Embedding model: {config.embedding_model}")
print(f"   - Persist directory: {config.persist_directory}")

# Test VectorStore initialization
store = VectorStore(config)
print(f"\n[OK] VectorStore created:")
print(f"   - Initialized: {store.is_initialized}")

# Try to initialize
init_success = store.initialize()
print(f"\n[OK] VectorStore initialization:")
print(f"   - Success: {init_success}")
print(f"   - Is initialized: {store.is_initialized}")

if store.is_initialized:
    # Test adding documents
    test_docs = [
        "Revenue increased by 15% year over year.",
        "Cost of goods sold remained stable.",
        "Inventory turnover improved significantly."
    ]
    add_success = store.add_documents(
        documents=test_docs,
        metadatas=[{"source": "test", "idx": i} for i in range(len(test_docs))]
    )
    print(f"\n[OK] Document addition:")
    print(f"   - Added: {add_success}")
    print(f"   - Document count: {store.count()}")

    # Test query
    results = store.query("revenue growth", n_results=2)
    print(f"\n[OK] Query test:")
    print(f"   - Results returned: {len(results.get('documents', []))}")
    if results.get('documents'):
        print(f"   - Top result: {results['documents'][0][:50]}...")

    # Test persistence
    persist_success = store.persist()
    print(f"\n[OK] Persistence:")
    print(f"   - Persisted: {persist_success}")
else:
    print("\n[SKIP] Document operations (ChromaDB not available)")

print("\n" + "=" * 60)

VECTOR STORE TEST

[OK] VectorStoreConfig created:
   - Collection: ars_vg_documents
   - Embedding model: all-MiniLM-L6-v2
   - Persist directory: /content/drive/MyDrive/ARS-VG-Analyzer/chromadb

[OK] VectorStore created:
   - Initialized: False
VectorStore initialization error: `np.float_` was removed in the NumPy 2.0 release. Use `np.float64` instead.

[OK] VectorStore initialization:
   - Success: False
   - Is initialized: False

[SKIP] Document operations (ChromaDB not available)



## Section 5: Module 3 - Graph Service

In [12]:
# Graph Service
# TODO: Implement graph analysis

## Section 6: Module 4 - Substitution Algorithm

In [13]:
# Substitution Algorithm
# TODO: Implement AEM/REM detection

## Section 7: Module 5 - Output Generation

In [14]:
# Output Generation
# TODO: Implement report generation

## Section 8: Main Analyzer Pipeline

In [15]:
# Main Pipeline
# TODO: Implement ARSVGAnalyzer

## Section 9: Gradio UI

In [16]:
# Gradio UI
# TODO: Implement UI

## Section 10: Demo and Testing

In [17]:
# Demo and Testing
# TODO: Implement demo functions