## Automated Settlement Valuation



Can we build an LLM-powered Settlement Value Calculator for contested insurance claims? The goal is to produce defensible, probability weighted value ranges across litigation timelines. The tool would ingest policy terms, jurisdictional data, judge behavior, counsel behavior, motion practice, discovery, and insurer specific tactics. It must update dynamically, reflect procedural inflection points, spend, and support real-world capital decisions. These include choices around litigation spend, monetization, and settlement terms.

In [23]:
pip install faiss-cpu



In [24]:
# pip install faiss-gpu

In [25]:
!pip install faker



In [26]:
import os
import re
import json
import csv
import random
import datetime
from typing import Dict, Any, List, Tuple

import faiss
import numpy as np
import torch
import pandas as pd
import requests
from bs4 import BeautifulSoup
from faker import Faker
from transformers import (
    pipeline as hf_pipeline,
    T5ForConditionalGeneration,
    T5Tokenizer
)
from sklearn.preprocessing import OneHotEncoder
from sklearn.calibration import CalibratedClassifierCV
from sklearn.exceptions import NotFittedError
from lightgbm import LGBMClassifier
import joblib

# Suppress transformers progress bars
os.environ["TRANSFORMERS_NO_TQDM"] = "1"
USE_CUDA = torch.cuda.is_available()
PIPELINE_KWARGS = {"device": 0} if USE_CUDA else {}

fake = Faker()


# Utility: build payoff matrix from discrete outcomes
def build_payoff_matrix(outcomes: List[float], probs: List[float]) -> Dict[str, float]:
    arr_o = np.array(outcomes, dtype=float)
    arr_p = np.array(probs,    dtype=float)
    mean  = float((arr_o * arr_p).sum())
    var   = float(((arr_o - mean)**2 * arr_p).sum())
    return {"mean": mean, "std": float(np.sqrt(var))}


# DPR‐FAISS INDEXER STUB
class DPRFaissIndexer:
    def __init__(self, dim: int = 768):
        self.index     = faiss.IndexFlatIP(dim)
        self.doc_ids   = []
        self.sentences = []
        self.labels    = []

    def add_sentences(self, annotations: List[Dict[str, Any]], doc_id: str):
        for ann in annotations:
            self.doc_ids.append(doc_id)
            self.sentences.append(ann["sentence"])
            self.labels.append(ann.get("labels", []))


# Synthetic Data Generation
class SyntheticDataGenerator:
    def __init__(
        self,
        jurisdictions: List[str],
        judges:        List[str],
        counsels:      List[str],
        policy_types:  List[str]
    ):
        self.jurisdictions = jurisdictions
        self.judges        = judges
        self.counsels      = counsels
        self.policy_types  = policy_types

    def generate_case_metadata(self, case_id: str) -> Dict[str, Any]:
        jurisdiction = random.choice(self.jurisdictions)
        judge        = random.choice(self.judges)
        counsel      = random.choice(self.counsels)
        case_type    = random.choice(self.policy_types)
        outcome      = random.choice(["win", "loss"])
        filed_date   = fake.date_between(start_date="-3y", end_date="today").isoformat()
        return {
            "case_id":      case_id,
            "jurisdiction": jurisdiction,
            "judge":        judge,
            "counsel":      counsel,
            "case_type":    case_type,
            "case_outcome": outcome,
            "filed_date":   filed_date
        }

    def generate_spend_ledger(
        self,
        case_id:    str,
        n_entries:  int = 5
    ) -> List[Dict[str, Any]]:
        ledger = []
        start = datetime.datetime.strptime(
            fake.date_between(start_date="-2y", end_date="-1y").isoformat(),
            "%Y-%m-%d"
        )
        for i in range(n_entries):
            spend_date = (start + datetime.timedelta(days=30 * i)).date().isoformat()
            amount     = round(random.uniform(500, 20000), 2)
            ledger.append({
                "case_id":      case_id,
                "spend_date":   spend_date,
                "spend_amount": amount,
                "activity":     random.choice(["motion", "discovery", "expert", "mediation"])
            })
        return ledger


# PolicyParser
class PolicyParser:

    # Match dollar amounts with optional K/M/B suffix and per-claim/occurrence/year qualifiers
    limit_pattern = re.compile(
        r'(\$\s?[0-9\.,]+\s?(?:K|M|B)?(?:\s?per[-\s]?(?:claim|occurrence|year))?)',
        re.IGNORECASE
    )
    # Capture exclusions sections
    exclusion_pattern = re.compile(
        r'(?:Exclusion[s]?[:\-]\s*)(.+?)(?=\n\n|\Z)',
        re.IGNORECASE | re.DOTALL
    )
    # Capture endorsements sections
    endorsement_pattern = re.compile(
        r'(?:Endorsement[s]?[:\-]\s*)(.+?)(?=\n\n|\Z)',
        re.IGNORECASE | re.DOTALL
    )

    @staticmethod
    def parse(text: str) -> Dict[str, Any]:
        limits = PolicyParser.extract_limits(text)
        exclusions = PolicyParser.extract_exclusions(text)
        endorsements = PolicyParser.extract_endorsements(text)
        return {
            "limits": limits,
            "exclusions": exclusions,
            "endorsements": endorsements
        }

    @staticmethod
    def extract_limits(text: str) -> Dict[str, float]:
        matches = PolicyParser.limit_pattern.findall(text)
        parsed = {}
        for m in matches:
            cleaned = m.replace("$", "").replace(",", "").strip()
            multiplier = 1
            suffix = cleaned[-1].upper()
            if suffix == "K":
                multiplier = 1e3
                cleaned = cleaned[:-1]
            elif suffix == "M":
                multiplier = 1e6
                cleaned = cleaned[:-1]
            try:
                value = float(cleaned) * multiplier
            except ValueError:
                continue
            key = "limit"
            low = m.lower()
            if "per claim" in low or "per-claim" in low:
                key = "per_claim"
            elif "aggregate" in low:
                key = "aggregate"
            else:
                key = f"limit_{len(parsed)+1}"
            parsed[key] = value
        return parsed

    @staticmethod
    def extract_exclusions(text: str) -> List[str]:
        raw = PolicyParser.exclusion_pattern.findall(text)
        return [r.strip() for r in raw]

    @staticmethod
    def extract_endorsements(text: str) -> List[str]:
        raw = PolicyParser.endorsement_pattern.findall(text)
        return [r.strip() for r in raw]


# Raw Data Collection
class RawDataCollector:
    def __init__(
        self,
        ocr_tool:    Any = None,
        api_clients: Dict[str, Any] = None,
        synth_gen:   SyntheticDataGenerator = None
    ):
        self.ocr_tool    = ocr_tool
        self.api_clients = api_clients or {}
        self.synth       = synth_gen

    def ingest_pdf(self, pdf_path: str) -> Dict[str, Any]:
        text = self.ocr_tool.extract_text(pdf_path) if self.ocr_tool else ""
        parsed = PolicyParser.parse(text)
        return {
            "type":   "pdf",
            "text":   text,
            "parsed": parsed,
            "source": pdf_path
        }

    def ingest_docket(self, docket_json: Dict[str, Any]) -> Dict[str, Any]:
        return {"type": "docket", "data": docket_json}

    def ingest_spend_ledger(self, ledger_csv: str) -> List[Dict[str, Any]]:
        records = []
        try:
            with open(ledger_csv) as f:
                for row in csv.DictReader(f):
                    records.append(row)
        except FileNotFoundError:
            print(f"Warning: spend ledger file not found: {ledger_csv}. Skipping.")
        return records

    def ingest_manual(self, data: Dict[str, Any]) -> Dict[str, Any]:
        return data

    def ingest_web_db(
        self,
        client_name: str,
        endpoint:    str,
        params:      Dict[str, Any] = None,
        mode:        str = "json"
    ) -> Dict[str, Any]:
        client = self.api_clients.get(client_name)
        if client is None:
            raise ValueError(f"No API client named {client_name}")
        if mode == "json":
            data = client.fetch_json(endpoint, params)
        elif mode == "html":
            data = client.fetch_html(endpoint, params)
        else:
            raise ValueError(f"Unsupported mode: {mode}")
        return {"type": "web", "source": endpoint, "data": data}

    def ingest_synthetic(self, case_id: str, n_spend: int = 5) -> List[Dict[str, Any]]:
        if not self.synth:
            raise RuntimeError("No SyntheticDataGenerator provided")
        meta   = self.synth.generate_case_metadata(case_id)
        ledger = self.synth.generate_spend_ledger(case_id, n_entries=n_spend)
        return [meta] + ledger


# Web Scraper
from typing import Dict, Any, Optional
import requests
from bs4 import BeautifulSoup
import pandas as pd

class WebScraper:
    def __init__(self, base_url: str = "", headers: Dict[str, str] = None,
                 auth: Any = None, timeout: int = 10):
        self.base_url = base_url
        self.session  = requests.Session()
        if headers:  self.session.headers.update(headers)
        if auth:     self.session.auth = auth
        self.timeout = timeout

    def fetch_json(self, endpoint: str, params: Dict[str, Any] = None) -> Any:
        url  = self.base_url + endpoint
        resp = self.session.get(url, params=params, timeout=self.timeout)
        resp.raise_for_status()
        return resp.json()

    def fetch_html(self, endpoint: str, params: Dict[str, Any] = None) -> BeautifulSoup:
        url  = self.base_url + endpoint
        resp = self.session.get(url, params=params, timeout=self.timeout)
        resp.raise_for_status()
        return BeautifulSoup(resp.text, "html.parser")

    def extract_table(self, soup: BeautifulSoup, selector: str) -> pd.DataFrame:
        table = soup.select_one(selector)
        if table is None:
            raise ValueError(f"No table found for selector {selector}")
        return pd.read_html(str(table))[0]

    def scrape_text(self, endpoint: str, selector: Optional[str] = None, params: Dict[str, Any] = None) -> str:
        """
        Scrape all visible text from the specified endpoint.
        Optionally, restrict to a CSS selector.
        """
        soup = self.fetch_html(endpoint, params)
        # Use selector if provided
        if selector:
            elements = soup.select(selector)
            texts = [el.get_text(separator=" ", strip=True) for el in elements]
            return "\n".join(texts)
        else:
            # Remove script/style elements
            for tag in soup(["script", "style", "noscript"]):
                tag.extract()
            text = soup.get_text(separator=" ", strip=True)
            return text

# Text Annotation & Retrieval
class SentenceAnnotator:
    def __init__(self, categories: List[str],
                 model_name: str = "facebook/bart-large-mnli"):
        self.categories    = categories
        self.model_name    = model_name
        self._pipe         = None
        self.split_pattern = re.compile(r'(?<=[\.\!\?])\s+')

    @property
    def pipe(self):
        if self._pipe is None:
            self._pipe = hf_pipeline("zero-shot-classification",
                                     model=self.model_name,
                                     **PIPELINE_KWARGS)
        return self._pipe

    def annotate(self, text: str) -> List[Dict[str, Any]]:
        sentences = [s.strip() for s in self.split_pattern.split(text) if s.strip()]
        anns      = []
        for sent in sentences:
            out    = self.pipe(sent, self.categories, multi_label=True)
            labels = [lbl for lbl, score in zip(out["labels"], out["scores"]) if score > 0.5]
            anns.append({"sentence": sent, "labels": labels})
        return anns

# Feature Engineering
class FeatureEngineer:
    def __init__(self):
        self.cat_encoder   = OneHotEncoder(sparse_output=False,
                                           handle_unknown="ignore")
        self.judge_stats   = {}
        self.counsel_stats = {}
        self.fitted        = False

    def fit(self, records: List[Dict[str, Any]]):
        df = pd.DataFrame(records)
        required = {'jurisdiction','case_type','judge','counsel','case_outcome'}
        if not required.issubset(df.columns):
            self.fitted = True
            return

        df = df.dropna(subset=['jurisdiction','case_type','case_outcome'])
        self.cat_encoder.fit(df[['jurisdiction','case_type']].fillna(''))
        df['is_win'] = (df['case_outcome']=="win").astype(int)
        self.judge_stats   = df.groupby('judge')['is_win'].agg(['count','mean']).to_dict('index')
        self.counsel_stats = df.groupby('counsel')['is_win'].agg(['count','mean']).to_dict('index')
        self.fitted = True

    def extract_features(self, records: List[Dict[str, Any]]) -> Dict[str, Dict[str, float]]:
        if not self.fitted:
            raise RuntimeError("FeatureEngineer must be fit() first.")

        df = pd.DataFrame(records)
        if 'case_id' not in df.columns:
            df['case_id'] = [r.get('case_id') for r in records]

        required = {'jurisdiction','case_type','judge','counsel','case_id'}
        if not required.issubset(df.columns):
            return {}

        df["filed_date"]   = pd.to_datetime(df.get("filed_date", pd.NaT))
        df["spend_date"]   = pd.to_datetime(df.get("spend_date", pd.NaT))
        df["spend_amount"] = pd.to_numeric(df.get("spend_amount", 0),
                                           errors="coerce").fillna(0)
        df["days_since_filing"] = (
            pd.Timestamp.today() - df["filed_date"] ).dt.days.fillna(-1)

        cats      = df[['jurisdiction','case_type']].fillna('')
        feats_ohe = self.cat_encoder.transform(cats)
        cols_ohe  = self.cat_encoder.get_feature_names_out(
                        ['jurisdiction','case_type'])
        df_ohe    = pd.DataFrame(feats_ohe,
                                 columns=cols_ohe,
                                 index=df.index)

        spend_sum = df.groupby('case_id')['spend_amount'].agg(['sum','mean','count'])

        ts = (
            df.set_index('spend_date')
              .groupby('case_id')['spend_amount']
              .resample('M').sum()
              .unstack(fill_value=0)
        )
        ts.columns = [f"spend_{d:%Y_%m}" for d in ts.columns]

        def lookup(stats,key,fld):
            return stats.get(key,{}).get(fld,np.nan)

        meta = df.drop_duplicates('case_id').set_index('case_id')
        feats = pd.concat([
            meta.apply(lambda r: lookup(self.judge_stats,   r["judge"],   "mean"), axis=1).rename("judge_win_rate"),
            meta.apply(lambda r: lookup(self.counsel_stats, r["counsel"], "mean"), axis=1).rename("counsel_win_rate"),
            spend_sum, ts, df_ohe
        ],axis=1)

        return feats.to_dict(orient='index')

# Outcome Prediction with Calibrated LightGBM
class OutcomePredictor:
    def __init__(
        self,
        model_path: str = "models/settlement_predictor_calibrated.pkl",
        buckets:    List[float] = None,
        method:     str = "sigmoid",
        cv:         int = 5
    ):
        self.model_path = model_path
        os.makedirs(os.path.dirname(model_path), exist_ok=True)
        try:
            self.cal = joblib.load(model_path)
        except FileNotFoundError:
            base = LGBMClassifier(
                objective='multiclass',
                n_estimators=200,
                learning_rate=0.05,
                max_depth=6
            )
            self.cal = CalibratedClassifierCV(base, method=method, cv=cv)
            if buckets is not None:
                self.cal.classes_ = buckets

    def fit(
        self,
        features: Dict[str, Dict[str, float]],
        labels:   List[float]
    ) -> None:
        df_X = pd.DataFrame.from_dict(features, orient='index').fillna(0)
        y    = pd.Series(labels, index=df_X.index).astype(float)
        self.cal.fit(df_X, y)
        joblib.dump(self.cal, self.model_path)

    def predict(
        self,
        features: Dict[str, Dict[str, float]]
    ) -> Tuple[List[float], List[float]]:
        df_X = pd.DataFrame.from_dict(features, orient='index').fillna(0)
        try:
            proba = self.cal.predict_proba(df_X)[0]
        except (AttributeError, NotFittedError):
            base = getattr(self.cal, "base_estimator",
                           getattr(self.cal, "estimator", None))
            try:
                proba = base.predict_proba(df_X)[0]
            except:
                proba = None

        raw_classes = self.cal.classes_
        buckets = [float(c) for c in raw_classes]
        if proba is None:
            proba = [1.0 / len(buckets)] * len(buckets)

        return buckets, proba

# Simulation & LLM Calibration
class Simulator:
    def simulate(self, payoff_matrix: Dict[str, float], n_simulations: int = 1000) -> List[float]:
        μ = payoff_matrix.get("mean", 0.0)
        σ = payoff_matrix.get("std",  1.0)
        return np.random.normal(loc=μ, scale=σ, size=n_simulations).tolist()

class Calibrator:
    def __init__(self, llm_ckpt: str = "t5-small"):
        self.llm_ckpt = llm_ckpt
        self._pipe    = None

    @property
    def pipe(self):
        if self._pipe is None:
            model     = T5ForConditionalGeneration.from_pretrained(self.llm_ckpt)
            tokenizer = T5Tokenizer.from_pretrained(self.llm_ckpt)
            self._pipe = hf_pipeline(
                "text2text-generation",
                model=model,
                tokenizer=tokenizer,
                **PIPELINE_KWARGS
            )
        return self._pipe

    def calibrate(self, sims: List[float], briefs: List[str]) -> str:
        prompt = (
            f"Simulations:{sims}\n"
            f"Briefs:{'||'.join(briefs)}\n"
            "Produce adjusted probability weights and rationale."
        )
        out = self.pipe(prompt, max_length=256, truncation=True)
        return out[0]["generated_text"].strip()

# Litigation vs Settlement Risk Calculator
class RiskCalculator:
    def __init__(self, interest_rate: float):
        self.r = interest_rate

    def discount_factor(self, years: float) -> float:
        return 1 / ((1 + self.r) ** years)

    def build_discounted_payoff(self, outcomes: List[float], probs: List[float], time_horizon: float) -> Dict[str, float]:
        d_f = self.discount_factor(time_horizon)
        pv  = [o * d_f for o in outcomes]
        arr_o = np.array(pv, dtype=float)
        arr_p = np.array(probs, dtype=float)
        mean  = float((arr_o * arr_p).sum())
        var   = float(((arr_o - mean)**2 * arr_p).sum())
        return {"mean": mean, "std": float(np.sqrt(var))}

    def compare(
        self,
        settle_outcomes: List[float], settle_probs: List[float], settle_time: float,
        litig_outcomes:  List[float], litig_probs:  List[float], litig_time:  float
    ) -> Dict[str, Any]:
        ps = self.build_discounted_payoff(settle_outcomes, settle_probs, settle_time)
        pl = self.build_discounted_payoff(litig_outcomes,  litig_probs,  litig_time)

        n  = 10000
        ds = np.random.choice(settle_outcomes, size=n, p=settle_probs) * self.discount_factor(settle_time)
        dl = np.random.choice(litig_outcomes,  size=n, p=litig_probs ) * self.discount_factor(litig_time)
        diff = dl - ds

        return {
            "PV_settle_mean":       ps["mean"],
            "PV_settle_std":        ps["std"],
            "PV_litig_mean":        pl["mean"],
            "PV_litig_std":         pl["std"],
            "mean_diff":            float(diff.mean()),
            "std_diff":             float(diff.std()),
            "prob_litig_gt_settle": float((diff > 0).mean())
        }

# Live Monitor
class LiveMonitor:
    def __init__(self, ecf_client: Any = None, spend_client: Any = None):
        self.ecf   = ecf_client
        self.spend = spend_client

    def fetch_updates(self) -> Dict[str, Any]:
        return {}

    def detect_anomalies(self, data: Dict[str, Any]) -> bool:
        return False

    def trigger_rerun(self, callback: Any):
        data = self.fetch_updates()
        if self.detect_anomalies(data):
            callback()

# Full Legal RAG Pipeline
class LegalRAGPipeline:
    def __init__(
        self,
        categories:          List[str],
        sub_texts:           List[str],
        jur_rules:           Dict[str, Any],
        predictor_path:      str   = "models/settlement_predictor_calibrated.pkl",
        llm_ckpt:            str   = "t5-small",
        interest_rate:       float = 0.05,
        default_settle_time: float = 1.0,
        default_litig_time:  float = 1.0
    ):
        self.default_settle_time = default_settle_time
        self.default_litig_time  = default_litig_time

        self.jur_rules = jur_rules
        self.synth     = SyntheticDataGenerator(
            jurisdictions=list(jur_rules.keys()),
            judges=["Judge A","Judge B","Judge C"],
            counsels=["Counsel X","Counsel Y","Counsel Z"],
            policy_types=["property","liability","subrogation"]
        )
        self.raw        = RawDataCollector(
            ocr_tool=None,
            api_clients={},
            synth_gen=self.synth
        )
        self.fe         = FeatureEngineer()
        self._annotator = None
        self._indexer   = None
        self._predictor = None
        self._sim       = Simulator()
        self._cal       = None
        self._risk      = RiskCalculator(interest_rate)
        self._monitor   = LiveMonitor()

        self.categories     = categories
        self.sub_texts      = sub_texts
        self.predictor_path = predictor_path
        self.llm_ckpt       = llm_ckpt

    @property
    def annotator(self):
        if self._annotator is None:
            self._annotator = SentenceAnnotator(self.categories)
        return self._annotator

    @property
    def indexer(self):
        if self._indexer is None:
            self._indexer = DPRFaissIndexer()
            anns = [{"sentence": t, "labels": ["subrogation"]} for t in self.sub_texts]
            self._indexer.add_sentences(anns, doc_id="subrog_templates")
        return self._indexer

    @property
    def predictor(self):
        if self._predictor is None:
            self._predictor = OutcomePredictor(
                self.predictor_path,
                buckets=[0, 50000, 100000, 150000]
            )
        return self._predictor

    @property
    def simulator(self):
        return self._sim

    @property
    def calibrator(self):
        if self._cal is None:
            self._cal = Calibrator(self.llm_ckpt)
        return self._cal

    @property
    def risk_calc(self):
        return self._risk

    @property
    def monitor(self):
        return self._monitor

    def ingest_case(
        self,
        case_id:       str,
        pdf_path:      str              = None,
        docket_json:   Dict[str, Any]   = None,
        ledger_csv:    str              = None,
        manual_data:   Dict[str, Any]   = None,
        use_synthetic: bool             = False,
        synth_spend:   int              = 5
    ) -> Dict[str, Any]:
        raw, struct = [], []
        if use_synthetic:
            struct += self.raw.ingest_synthetic(case_id, n_spend=synth_spend)
        else:
            if pdf_path:
                raw.append(self.raw.ingest_pdf(pdf_path))
            if docket_json:
                r = self.raw.ingest_docket(docket_json)
                raw.append(r); struct.append(r["data"])
            if ledger_csv:
                struct += self.raw.ingest_spend_ledger(ledger_csv)
            if manual_data:
                r = self.raw.ingest_manual(manual_data)
                raw.append(r); struct.append(r)

        if not self.fe.fitted or use_synthetic:
            self.fe.fit(struct)

        full_text = "".join(r.get("text", "") for r in raw)
        anns      = self.annotator.annotate(full_text)
        self.indexer.add_sentences(anns, doc_id=case_id)

        return self.fe.extract_features(struct)

    def simulate_settlement(
        self,
        case_id:       str,
        pdf_path:      str            = None,
        docket_json:   Dict[str, Any] = None,
        ledger_csv:    str            = None,
        manual_data:   Dict[str, Any] = None,
        briefs:        List[str]      = [],
        n_draws:       int            = 10000,
        use_synthetic: bool           = False,
        synth_spend:   int            = 5
    ) -> Dict[str, Any]:
        feats           = self.ingest_case(
            case_id, pdf_path, docket_json,
            ledger_csv, manual_data,
            use_synthetic, synth_spend
        )
        buckets, weights = self.predictor.predict(feats)
        payoff          = build_payoff_matrix(buckets, weights)
        sims            = self.simulator.simulate(payoff, n_simulations=n_draws)
        cal             = self.calibrator.calibrate(sims, briefs)
        return {
            "buckets":       buckets,
            "weights":       weights,
            "payoff_matrix": payoff,
            "simulations":   sims,
            "calibration":   cal
        }

    def simulate_litigation_vs_settlement(
        self,
        settle_outcomes: List[float],
        settle_probs:    List[float],
        settle_time:     float             = None,
        litig_outcomes:  List[float]       = None,
        litig_probs:     List[float]       = None,
        litig_time:      float             = None
    ) -> Dict[str, Any]:
        if settle_time is None:
            settle_time = self.default_settle_time
        if litig_time is None:
            litig_time = self.default_litig_time
        if litig_outcomes is None or litig_probs is None:
            litig_outcomes = settle_outcomes
            litig_probs    = settle_probs

        return self.risk_calc.compare(
            settle_outcomes, settle_probs, settle_time,
            litig_outcomes,  litig_probs,  litig_time
        )

if __name__ == "__main__":
    briefs = ["Plaintiff’s opening brief…", "Defendant’s MSJ…"]

    rag = LegalRAGPipeline(
        categories           = ["subrogation", "coverage", "liability"],
        sub_texts            = ["If subrogation is available…", "Subrogation clause states…"],
        jur_rules            = {"NY": {"statute_of_limitations": 3}},
        predictor_path       = "models/settlement_predictor_calibrated.pkl",
        interest_rate        = 0.05,
        default_settle_time  = 1.0,
        default_litig_time   = 1.0
    )

    # 1) Settlement simulation + calibration
    sim_result = rag.simulate_settlement(
        case_id   = "CASE123",
        pdf_path  = "data/CASE123.pdf",
        briefs    = briefs,
        n_draws   = 10000
    )

    # 2) Litigation vs Settlement risk comparison
    risk_metrics = rag.simulate_litigation_vs_settlement(
        settle_outcomes = sim_result["buckets"],
        settle_probs    = sim_result["weights"]
    )

    # Build and print tables
    # Bucket probabilities
    df_buckets = pd.DataFrame({
        "bucket": sim_result["buckets"],
        "probability": sim_result["weights"]
    })
    print("\n## Bucket Probabilities\n")
    print(df_buckets.to_markdown(index=False))

    # Summary metrics
    summary = {
        "Settle Mean (undiscounted)": sim_result["payoff_matrix"]["mean"],
        "Settle Std  (undiscounted)": sim_result["payoff_matrix"]["std"],
        "PV Settle Mean":              risk_metrics["PV_settle_mean"],
        "PV Settle Std":               risk_metrics["PV_settle_std"],
        "PV Litig Mean":               risk_metrics["PV_litig_mean"],
        "PV Litig Std":                risk_metrics["PV_litig_std"],
        "Mean Diff (Litig - Settle)":  risk_metrics["mean_diff"],
        "Std Diff":                    risk_metrics["std_diff"],
        "P(Litig > Settle)":           risk_metrics["prob_litig_gt_settle"],
        "LLM Calibration":             sim_result["calibration"]
    }
    df_summary = pd.DataFrame.from_dict(summary, orient="index", columns=["value"])
    print("\n## Summary Metrics\n")
    print(df_summary.to_markdown())

Device set to use cpu
Both `max_new_tokens` (=256) and `max_length`(=256) seem to have been set. `max_new_tokens` will take precedence. Please refer to the documentation for more information. (https://huggingface.co/docs/transformers/main/en/main_classes/text_generation)



## Bucket Probabilities

|   bucket |   probability |
|---------:|--------------:|
|        0 |          0.25 |
|    50000 |          0.25 |
|   100000 |          0.25 |
|   150000 |          0.25 |

## Summary Metrics

|                            | value                                                                                                                                                                                                                               |
|:---------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Settle Mean (undiscounted) | 75000.0                                                                                                                                                                                                                           