# Covariance Matrix Non-Positive Definiteness Investigation

**Problem**: Many tests in the robust testing framework fail at `compute_property_posterior` because the covariance matrix of the posterior property measure is not positive definite.

**Goal**: Systematically investigate when and why this happens, and develop solutions.

## Investigation Plan

### Phase 1: Issue Reproduction and Analysis
1. **Reproduce the failure systematically**
2. **Extract covariance matrices** from failing cases
3. **Analyze eigenvalue distributions** and condition numbers
4. **Identify parameter correlations** with failures

### Phase 2: Mathematical Deep Dive
5. **Examine the mathematical construction** of property posterior covariance
6. **Test numerical conditioning** of intermediate matrices
7. **Investigate regularization strategies**

### Phase 3: Enhanced Diagnostics and Solutions
8. **Upgrade profiling code** with covariance diagnostics
9. **Implement regularization options**
10. **Create specialized covariance tests**

Let's start!

## Setup and Imports

In [6]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sys
import traceback
import warnings
from scipy.linalg import eigvals, eigvalsh, LinAlgError
from scipy.sparse import issparse

# Add paths
sys.path.append('.')
sys.path.append('..')

# Import our modules
from robust_testing import RobustTester, ParameterGenerator, TestResult, FailureType
from pli_profiling import run_with_dependencies
from pygeoinf.linear_solvers import LUSolver

print("✅ All imports successful!")

# Configure plotting
plt.style.use('default')
plt.rcParams['figure.figsize'] = (12, 8)
sns.set_palette("husl")

✅ All imports successful!


## Phase 1: Systematic Failure Reproduction

Let's start by creating a controlled test environment to reproduce the covariance matrix failures.

In [7]:
# Base parameters - start with known working configuration
base_params = {
    'N': 20,
    'N_d': 20,
    'N_p': 5,
    'endpoints': (0, 1),
    'basis_type': 'sine',
    'integration_method_G': 'trapz',
    'integration_method_T': 'trapz',
    'n_points_G': 500,
    'n_points_T': 500,
    'alpha': 0.1,
    'K': 50,
    'true_data_noise': 0.1,
    'assumed_data_noise': 0.1,
    'm_bar_callable': lambda x: np.sin(2 * np.pi * x),
    'm_0_callable': lambda x: np.zeros_like(x),
    'solver': LUSolver()
}

print("🎯 Base parameters configured")
print(f"Problem size: N={base_params['N']}, N_d={base_params['N_d']}, N_p={base_params['N_p']}")

🎯 Base parameters configured
Problem size: N=20, N_d=20, N_p=5


In [None]:
# Cached sweep execution - only recompute if parameters changed
tester = RobustTester(base_params)
N_min = 10
N_max = 100
N_d_min = 10
N_d_max = 100
N_p_min = 5
N_p_max = 10
n_points = 3

# Build current config
current_config = [{'N': int(N), 'N_d': int(N_d), 'N_p': int(N_p)}
                  for N in np.linspace(N_min, N_max, n_points)
                  for N_d in np.linspace(N_d_min, N_d_max, n_points)
                  for N_p in np.linspace(N_p_min, N_p_max, n_points)]

# Create a signature for the current parameters
current_signature = (N_min, N_max, N_d_min, N_d_max, N_p_min, N_p_max, n_points)

# Check if we need to recompute
need_recompute = True
if 'tester_results' in globals() and '_last_config_signature' in globals():
    if _last_config_signature == current_signature:
        need_recompute = False
        print("ℹ️  Reusing cached tester_results (parameters unchanged)")

if need_recompute:
    print(f"▶ Running {len(current_config)} test configurations...")
    tester_results = {}
    for i, config in enumerate(current_config, 1):
        print(f"  {i}/{len(current_config)}: N={config['N']}, N_d={config['N_d']}, N_p={config['N_p']}")
        tester_result = tester.run_single_test({**base_params, **config})
        tester_results[tuple(config.items())] = tester_result

    # Cache the signature for next time
    _last_config_signature = current_signature

    # Quick summary
    from collections import Counter
    success_count = sum(1 for r in tester_results.values() if getattr(r, 'success', False))
    failure_count = len(tester_results) - success_count
    failure_stages = [getattr(r, 'failure_stage', None) for r in tester_results.values()
                     if not getattr(r, 'success', False)]
    stage_counts = Counter(failure_stages)

    print(f"\n✅ Completed sweep:")
    print(f"  Total tests: {len(tester_results)}")
    print(f"  Successes: {success_count}")
    print(f"  Failures: {failure_count}")
    if failure_count:
        print(f"  Failure stages: {dict(stage_counts)}")
else:
    print("To force recompute, delete 'tester_results' or change the parameter ranges above.")

▶ Running 27 test configurations...
  1/27: N=10, N_d=10, N_p=5
Executing setup_spatial_spaces...
Executing setup_mappings...
Executing _setup_truths_and_measurement...
Executing setup_prior_measure...
LaplacianInverseOperator initialized with native solver, dirichlet(left=0, right=0) BCs
Executing create_problems...
Executing compute_property_posterior...
  2/27: N=10, N_d=10, N_p=7
Executing setup_spatial_spaces...
Executing setup_mappings...
Executing _setup_truths_and_measurement...
Executing setup_prior_measure...
LaplacianInverseOperator initialized with native solver, dirichlet(left=0, right=0) BCs
Executing create_problems...
Executing compute_property_posterior...
  2/27: N=10, N_d=10, N_p=7
Executing setup_spatial_spaces...
Executing setup_mappings...
Executing _setup_truths_and_measurement...
Executing setup_prior_measure...
LaplacianInverseOperator initialized with native solver, dirichlet(left=0, right=0) BCs
Executing create_problems...
Executing compute_property_posterio

In [None]:
# analyze_tester_results.py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import traceback

def results_to_df(tester_results):
    """
    Convert tester_results (mapping -> TestResult) into a tidy DataFrame with columns:
    N, N_d, N_p, success (bool), is_PD (int), failure_stage, error_message, execution_time
    """
    rows = []
    for key, res in tester_results.items():
        # Try several ways to extract N, N_d, N_p
        N = N_d = N_p = None
        try:
            if isinstance(key, tuple):
                # common formats: (N,N_d,N_p) or tuple(dict.items())
                if len(key) == 3 and all(isinstance(v, (int, np.integer)) for v in key):
                    N, N_d, N_p = map(int, key)
                else:
                    # maybe tuple(config.items())
                    try:
                        d = dict(key)
                        N = int(d.get('N')) if 'N' in d else None
                        N_d = int(d.get('N_d')) if 'N_d' in d else None
                        N_p = int(d.get('N_p')) if 'N_p' in d else None
                    except Exception:
                        pass
        except Exception:
            pass

        # fallback: try to read from res.parameters if available
        try:
            params = getattr(res, 'parameters', None)
            if params:
                if N is None and 'N' in params:
                    N = int(params['N'])
                if N_d is None and 'N_d' in params:
                    N_d = int(params['N_d'])
                if N_p is None and 'N_p' in params:
                    N_p = int(params['N_p'])
        except Exception:
            pass

        # final defensive defaults
        if N is None: N = -1
        if N_d is None: N_d = -1
        if N_p is None: N_p = -1

        success = bool(getattr(res, 'success', False))
        failure_stage = getattr(res, 'failure_stage', None)
        error_message = getattr(res, 'error_message', None)
        exec_time = getattr(res, 'execution_time', None)

        rows.append({
            'N': int(N), 'N_d': int(N_d), 'N_p': int(N_p),
            'success': success, 'is_PD': int(success),
            'failure_stage': failure_stage, 'error_message': error_message,
            'execution_time': exec_time
        })

    df = pd.DataFrame(rows)
    # Sort and return
    return df.sort_values(['N', 'N_d', 'N_p']).reset_index(drop=True)


def summarize_df(df):
    print("Total rows:", len(df))
    print("\nOverall success/failure counts:")
    print(df['success'].value_counts(dropna=False))
    print("\nFailure stage counts (for failures):")
    print(df.loc[~df['success'], 'failure_stage'].value_counts(dropna=False))
    # Example failing case
    fail = df.loc[~df['success']]
    if not fail.empty:
        r = fail.iloc[0]
        print("\nExample failure (first):", r[['N','N_d','N_p','failure_stage','error_message']].to_dict())


def plot_3d(df, interactive_plotly=True):
    """
    3D scatter: N, N_d, N_p colored by is_PD.
    If plotly is available and interactive_plotly True, show interactive plot.
    """
    try:
        if interactive_plotly:
            import plotly.express as px
            fig = px.scatter_3d(df, x='N', y='N_d', z='N_p', color='is_PD',
                                color_continuous_scale=[[0, 'red'], [1, 'green']],
                                symbol='is_PD', size_max=6, hover_data=['failure_stage', 'error_message'])
            fig.update_traces(marker=dict(size=5))
            fig.update_layout(title='3D: N vs N_d vs N_p (is_PD 1=PD)')
            fig.show()
            return
    except Exception:
        # fall back to matplotlib
        pass

    # matplotlib 3D scatter fallback
    from mpl_toolkits.mplot3d import Axes3D  # noqa: F401
    cmap = {1: 'green', 0: 'red'}
    colors = df['is_PD'].map(cmap)
    fig = plt.figure(figsize=(8, 6))
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(df['N'], df['N_d'], df['N_p'], c=colors, s=60, alpha=0.85)
    ax.set_xlabel('N'); ax.set_ylabel('N_d'); ax.set_zlabel('N_p')
    ax.set_title('3D: N vs N_d vs N_p (green=PD, red=Not PD)')
    plt.show()


def plot_heatmaps(df, max_slices=4):
    """
    Produce heatmaps of PD fraction over (N, N_d) for up to `max_slices` distinct N_p values,
    and one aggregated heatmap over all N_p.
    """
    unique_Np = sorted(df['N_p'].unique())
    slices = unique_Np[:max_slices]
    n_plots = len(slices)
    fig, axes = plt.subplots(1, max(1, n_plots), figsize=(5*max(1, n_plots), 4), squeeze=False)
    for i, Np in enumerate(slices):
        sub = df[df['N_p'] == Np]
        pivot = sub.pivot_table(index='N', columns='N_d', values='is_PD', aggfunc='mean')
        ax = axes[0, i]
        sns.heatmap(pivot, ax=ax, vmin=0, vmax=1, cmap='RdYlGn', cbar=(i==n_plots-1))
        ax.set_title(f'PD fraction at N_p={Np}')
        ax.invert_yaxis()
    plt.tight_layout()
    plt.show()

    # Aggregated
    agg = df.pivot_table(index='N', columns='N_d', values='is_PD', aggfunc='mean')
    plt.figure(figsize=(6,5))
    sns.heatmap(agg, cmap='RdYlGn', vmin=0, vmax=1)
    plt.title('PD fraction aggregated over N_p')
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()


def feature_importance_quick(df):
    """
    Quick RandomForest classifier to rank importance of N, N_d, N_p for predicting is_PD.
    Returns importances or None if sklearn missing.
    """
    try:
        from sklearn.ensemble import RandomForestClassifier
    except Exception:
        print("sklearn not available - skipping feature importance")
        return None
    X = df[['N', 'N_d', 'N_p']].values
    y = df['is_PD'].values
    clf = RandomForestClassifier(n_estimators=200, random_state=0)
    clf.fit(X, y)
    importances = clf.feature_importances_
    print("RandomForest feature importances (N, N_d, N_p):", importances)
    return importances

In [None]:
df = results_to_df(tester_results)