# x0 Identifiability Analysis Pipeline

This notebook executes the complete pyident workflow for analyzing how initial condition (x0) sampling strategies affect LTI system identifiability.

**Pipeline stages:**
1. Generate random controllable/uncontrollable systems (Ginibre ensemble)
2. Filter by density regime (0.3-0.7)
3. Sample x0 with different sparsification levels
4. Compute PBH structural identifiability scores
5. Visualize binary classification (identifiable vs non-identifiable)

In [None]:
import numpy as np
import pandas as pd
import pathlib
import warnings
from typing import Tuple

try:
    import matplotlib.pyplot as plt
except ImportError:
    plt = None

# Import pyident modules
from pyident.ensembles import sparse_continuous, controllability_rank
from pyident.metrics import pbh_margin_structured

# Suppress warnings
warnings.filterwarnings('ignore')

# Configuration
SEED = 12345
M = 2  # Number of inputs
SPARSITY_GRID = np.arange(0.0, 1.1, 0.1)
NDIM_GRID = np.arange(2, 11, 1)
SAMPLES_PER_POINT = 1000
DENSITY_MIN = 0.3
DENSITY_MAX = 0.7
X0_DENSITIES = [0.25, 0.5, 0.75, 1.0]
X0_SAMPLES_PER_DENSITY = 100
PBH_THRESHOLD = 1e-6

# Output directories
ENSEMBLE_DIR = pathlib.Path("pyident_results/iclr_manuscript/ensemble")
X0_BOXPLOT_DIR = pathlib.Path("pyident_results/x0_boxplot")

# Create directories
ENSEMBLE_DIR.mkdir(parents=True, exist_ok=True)
X0_BOXPLOT_DIR.mkdir(parents=True, exist_ok=True)

print(f"Output directories created:")
print(f"  Ensemble: {ENSEMBLE_DIR.absolute()}")
print(f"  x0 Boxplot: {X0_BOXPLOT_DIR.absolute()}")

## Stage 1: Generate (A, B) Ensemble with Controllability Analysis

Generate random LTI systems using Ginibre ensemble with sparsification, then label by controllability:
- Vary sparsity: 0.0 → 1.0 (step 0.1)
- Vary state dimension n: 2 → 10 (step 1)
- Fixed m = 2 inputs, 1000 samples per grid point
- Compute controllability rank for each system

In [None]:
def generate_system(n: int, m: int, sparsity: float, rng: np.random.Generator) -> Tuple[np.ndarray, np.ndarray]:
    """Generate (A, B) from sparse Ginibre ensemble."""
    A, B = sparse_continuous(
        n=n,
        m=m,
        rng=rng,
        which="both",
        p_density=1.0 - sparsity,  # p_density is sparsity parameter
    )
    return A, B

def matrix_density(M: np.ndarray, tol: float = 1e-12) -> float:
    """Compute density of nonzero entries."""
    return float(np.sum(np.abs(M) > tol)) / float(M.size)

# Generate ensemble
print("=" * 70)
print("STAGE 1: Generating (A, B) ensemble with controllability labels")
print("=" * 70)

rng = np.random.default_rng(SEED)
ensemble_data = []
system_id = 0

for sparsity in SPARSITY_GRID:
    for n in NDIM_GRID:
        for sample in range(SAMPLES_PER_POINT):
            A, B = generate_system(n, M, sparsity, rng)
            
            # Compute controllability rank
            ctrb_rank = controllability_rank(A, B)
            is_controllable = (ctrb_rank == n)
            
            # Compute densities
            density_A = matrix_density(A)
            density_B = matrix_density(B)
            density_AB = matrix_density(np.hstack([A, B]))
            
            ensemble_data.append({
                'system_id': system_id,
                'sparsity': float(sparsity),
                'n': int(n),
                'm': int(M),
                'ctrb_rank': int(ctrb_rank),
                'is_controllable': bool(is_controllable),
                'density_A': float(density_A),
                'density_B': float(density_B),
                'density_AB': float(density_AB),
            })
            system_id += 1
            
            # Save A, B to NPZ (we'll save all systems at once below)

print(f"Generated {len(ensemble_data)} systems")
print(f"Sparsity range: {SPARSITY_GRID}")
print(f"Dimension range: {NDIM_GRID}")

# Create DataFrame
ensemble_df = pd.DataFrame(ensemble_data)

# Calculate controllability statistics
n_controllable = ensemble_df['is_controllable'].sum()
n_uncontrollable = (~ensemble_df['is_controllable']).sum()
print(f"\nControllability summary:")
print(f"  Controllable: {n_controllable} ({100*n_controllable/len(ensemble_df):.1f}%)")
print(f"  Uncontrollable: {n_uncontrollable} ({100*n_uncontrollable/len(ensemble_df):.1f}%)")

# Save ensemble metadata
ensemble_csv = ENSEMBLE_DIR / "ensemble_metadata.csv"
ensemble_df.to_csv(ensemble_csv, index=False)
print(f"\nSaved ensemble metadata to {ensemble_csv}")

# Heatmap: uncontrollable fraction per (sparsity, ndim)
heatmap_data = ensemble_df.groupby(['sparsity', 'n']).apply(
    lambda g: (~g['is_controllable']).sum() / len(g)
).unstack()

print("\nUncontrollable fraction heatmap (rows: n, cols: sparsity):")
print(heatmap_data.round(3))

## Stage 2: Filter Uncontrollable Systems by Density Regime

Select systems with controllability rank deficit density in [0.3, 0.7], which contain mixed controllable/uncontrollable systems. Save filtered datasets as CSV (metadata) and NPZ (matrices).

In [None]:
print("=" * 70)
print("STAGE 2: Filtering systems by density regime")
print("=" * 70)

# Filter by density regime: keep systems with 0.3 <= rho <= 0.7
# where rho = (n - ctrb_rank) / n is controllability rank deficit density
ensemble_df['density'] = (ensemble_df['n'] - ensemble_df['ctrb_rank']) / ensemble_df['n']
filtered_df = ensemble_df[
    (ensemble_df['density'] >= DENSITY_MIN) & 
    (ensemble_df['density'] <= DENSITY_MAX)
].copy()

print(f"\nFiltering by density {DENSITY_MIN} <= ρ <= {DENSITY_MAX}:")
print(f"  Original systems: {len(ensemble_df)}")
print(f"  Filtered systems: {len(filtered_df)} ({100*len(filtered_df)/len(ensemble_df):.1f}%)")

# Show density distribution
print(f"\nFiltered systems by (n, sparsity):")
filtered_summary = filtered_df.groupby(['n', 'sparsity']).size().unstack(fill_value=0)
print(filtered_summary)

# Save filtered dataset CSV
filtered_csv = ENSEMBLE_DIR / "systems_unctrb_d0.3_0.7.csv"
filtered_df.to_csv(filtered_csv, index=False)
print(f"\nSaved filtered systems to {filtered_csv}")

# For NPZ, we need to regenerate the (A, B) matrices
print("\nReconstructing (A, B) matrices for filtered systems...")
rng = np.random.default_rng(SEED)
A_list = []
B_list = []
row_indices = []

for idx, (_, row) in enumerate(filtered_df.iterrows()):
    sparsity = row['sparsity']
    n = row['n']
    
    # Regenerate to get exact (A, B)
    A, B = generate_system(n, M, sparsity, rng)
    A_list.append(A)
    B_list.append(B)
    row_indices.append(idx)

# Save to NPZ
filtered_npz = ENSEMBLE_DIR / "systems_unctrb_d0.3_0.7.npz"
np.savez(
    filtered_npz,
    A_list=A_list,
    B_list=B_list,
    indices=np.array(row_indices),
)
print(f"Saved matrices to {filtered_npz}")

print(f"\nFiltered dataset ready for x0 analysis")

## Stage 3: Sample x0 with Different Sparsification Levels

For each filtered (A, B) system, generate multiple x0 initial conditions using Bernoulli sparsification:
- x0_density=1.0: uniform sampling from unit sphere
- x0_density<1.0: sphere + Bernoulli mask + renormalization
- Compute PBH structural identifiability margin for each (A, B, x0) triple

In [None]:
def sample_unit_sphere(n: int, rng: np.random.Generator) -> np.ndarray:
    """Uniform sample on unit sphere S^{n-1}."""
    v = rng.standard_normal(n)
    nrm = float(np.linalg.norm(v))
    return v / (nrm if nrm > 0.0 else 1.0)

def sample_x0(n: int, rng: np.random.Generator, x0_density: float) -> np.ndarray:
    """Sample x0 with Bernoulli sparsification.
    
    Parameters
    ----------
    n : int
        Dimension
    rng : np.random.Generator
        RNG
    x0_density : float
        Keep probability. 1.0 = no sparsification
    
    Returns
    -------
    np.ndarray
        Sampled x0 on unit sphere with sparsification applied
    """
    x0 = sample_unit_sphere(n, rng)
    
    if x0_density < 1.0:
        # Bernoulli sparsification: keep each entry with prob x0_density
        mask = rng.random(n) < x0_density
        x0 = x0 * mask
        
        # Renormalize to unit norm
        nrm = float(np.linalg.norm(x0))
        if nrm > 1e-15:
            x0 = x0 / nrm
    
    return x0

print("=" * 70)
print("STAGE 3: Computing PBH scores for x0 samples")
print("=" * 70)

# Load matrices from NPZ
data = np.load(filtered_npz, allow_pickle=True)
A_list = data['A_list']
B_list = data['B_list']

print(f"\nLoaded {len(A_list)} systems from filtered dataset")

# Compute PBH scores
scores_data = []
rng = np.random.default_rng(SEED)

for sys_idx, (A, B) in enumerate(zip(A_list, B_list)):
    n = A.shape[0]
    m = B.shape[1]
    
    # Get system metadata from filtered_df
    sys_meta = filtered_df.iloc[sys_idx]
    
    for x0_density in X0_DENSITIES:
        for sample_idx in range(X0_SAMPLES_PER_DENSITY):
            # Sample x0
            x0 = sample_x0(n, rng, x0_density)
            
            # Compute PBH margin
            pbh_score = pbh_margin_structured(A, B, x0)
            
            # Format density label
            x0_density_label = f"{x0_density:.2f}"
            
            scores_data.append({
                'pbh': float(pbh_score),
                'x0_density': float(x0_density),
                'x0_density_label': x0_density_label,
                'n': int(n),
                'm': int(m),
                'system_index': int(sys_idx),
                'sparsity': float(sys_meta['sparsity']),
                'is_controllable': bool(sys_meta['is_controllable']),
                'density_AB': float(sys_meta['density_AB']),
            })

print(f"Computed {len(scores_data)} PBH scores")

# Create scores DataFrame
scores_df = pd.DataFrame(scores_data)

# Save scores
scores_csv = X0_BOXPLOT_DIR / "pbh_scores.csv"
scores_df.to_csv(scores_csv, index=False)
print(f"Saved PBH scores to {scores_csv}")

# Summary statistics
print("\nPBH score summary by x0_density:")
summary = scores_df.groupby('x0_density')['pbh'].agg(['count', 'mean', 'std', 'min', 'max'])
print(summary.round(6))

## Stage 4: Visualize Binary Classification by x0 Sparsification

Plot fraction of identifiable systems (PBH score > ε) for each x0 sparsification level using bar plots.

In [None]:
print("=" * 70)
print("STAGE 4: Binary classification and visualization")
print("=" * 70)

# Apply threshold
scores_df['is_identifiable'] = scores_df['pbh'] > PBH_THRESHOLD

# Binary statistics
binary_stats = scores_df.groupby('x0_density_label').agg({
    'is_identifiable': ['sum', 'count', 'mean']
}).round(4)
binary_stats.columns = ['identifiable_count', 'total_count', 'fraction_identifiable']
print("\nBinary classification (PBH > 1e-6):")
print(binary_stats)

# Create visualization
if plt is not None:
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    
    # Bar plot: fraction identifiable
    x0_density_labels = ['0.25', '0.50', '0.75', '1.00']
    fractions = []
    counts = []
    
    for label in x0_density_labels:
        mask = scores_df['x0_density_label'] == label
        frac = scores_df[mask]['is_identifiable'].mean()
        count = scores_df[mask]['is_identifiable'].sum()
        fractions.append(frac)
        counts.append(count)
    
    # Bar plot 1: Fraction identifiable
    bars = ax1.bar(x0_density_labels, fractions, color=(32/255, 143/255, 140/255), alpha=0.8, edgecolor='black', linewidth=1.5)
    ax1.set_ylabel('Fraction Identifiable', fontsize=14, fontweight='bold')
    ax1.set_xlabel('x0 Sparsification Density', fontsize=14, fontweight='bold')
    ax1.set_title('Binary Classification: Identifiable vs Non-identifiable', fontsize=16, fontweight='bold')
    ax1.set_ylim([0, 1.05])
    ax1.grid(axis='y', alpha=0.3, linestyle='--')
    
    # Add count labels on bars
    for i, (bar, count) in enumerate(zip(bars, counts)):
        height = bar.get_height()
        ax1.text(bar.get_x() + bar.get_width()/2., height + 0.02,
                f'n={int(count)}', ha='center', va='bottom', fontsize=11)
    
    # Box plot 2: Continuous PBH scores
    data_by_density = [scores_df[scores_df['x0_density_label'] == label]['pbh'].values 
                       for label in x0_density_labels]
    bp = ax2.boxplot(data_by_density, labels=x0_density_labels, patch_artist=True)
    
    for patch in bp['boxes']:
        patch.set_facecolor((32/255, 143/255, 140/255))
        patch.set_alpha(0.8)
    
    for median in bp['medians']:
        median.set_linewidth(3.0)
        median.set_color('black')
    
    ax2.axhline(y=PBH_THRESHOLD, color='red', linestyle='--', linewidth=2, label=f'ε={PBH_THRESHOLD:.0e}')
    ax2.set_ylabel('PBH Structural Identifiability Margin', fontsize=14, fontweight='bold')
    ax2.set_xlabel('x0 Sparsification Density', fontsize=14, fontweight='bold')
    ax2.set_title('Continuous Score Distribution', fontsize=16, fontweight='bold')
    ax2.set_yscale('log')
    ax2.legend(fontsize=11)
    ax2.grid(axis='y', alpha=0.3, linestyle='--')
    
    plt.tight_layout()
    
    # Save figure
    fig_path = X0_BOXPLOT_DIR / "pbh_binary_classification.png"
    plt.savefig(fig_path, dpi=150, bbox_inches='tight')
    print(f"\nSaved visualization to {fig_path}")
    plt.show()
else:
    print("Matplotlib not available, skipping visualization")

print("\n" + "=" * 70)
print("PIPELINE COMPLETE")
print("=" * 70)
print(f"\nOutput files:")
print(f"  Ensemble metadata: {ensemble_csv}")
print(f"  Filtered systems (CSV): {filtered_csv}")
print(f"  Filtered systems (NPZ): {filtered_npz}")
print(f"  PBH scores (CSV): {scores_csv}")
print(f"  Visualization (PNG): {fig_path if plt else 'N/A'}")