# Vollständige Evaluation — EPS Torque Prediction

Alle Werte werden **aus den trainierten Checkpoints** berechnet. Keine vorberechneten `eval.json` als Input.

| Schritt | Inhalt |
|---------|--------|
| 1 | Daten laden (einmalig) |
| 2 | Checkpoint Discovery (8 Modelle × 5 Seeds) |
| 3 | Inference — Predictions & Attention Weights |
| 4 | Metriken (Sample-Level & Sequence-Level) |
| 4b | Statistische Tests (Bootstrap, Permutationstests, Effektstärken) |
| 5 | FLOPs & Parameter |
| 6 | Inference Time Messung |
| 7 | Ergebnis-Tabellen |
| 8 | Figures (Attention, Tradeoff, Timeseries) |

In [1]:
import sys
import gc
import inspect
from pathlib import Path
import winsound
import numpy as np
import pandas as pd
import torch
import pytorch_lightning as pl
from torch.utils.data import DataLoader

# Project root (notebooks/ -> project root)
PROJECT_ROOT = Path.cwd().parent
if not (PROJECT_ROOT / 'config').exists():
    PROJECT_ROOT = Path.cwd()
sys.path.insert(0, str(PROJECT_ROOT))

from config.loader import load_config, get_model_class
from config.settings import get_preprocessed_paths
from model.data_module import TimeSeriesDataModule
from scripts.shared import (
    MODELS, MODEL_BY_ID,
    find_all_seed_checkpoints,
    calculate_metrics_dict,
    aggregate_metrics_per_sequence,
)
from scripts.compute_sequence_r2 import compute_per_sequence_r2
from scripts.evaluate_model import (
    measure_inference_time, calculate_flops, has_attention_support,
)

# Constants
SEEDS = [7, 42, 94, 123, 231]
VARIANT = 'no_dropout'
ACCURACY_THRESHOLD = 0.05
DEVICE = 'cpu'

print(f'Project root: {PROJECT_ROOT}')
print(f'PyTorch:      {torch.__version__}')
print(f'CUDA:         {torch.cuda.is_available()}')

Project root: c:\Users\MSchm\Documents\att_project
PyTorch:      2.6.0+cu124
CUDA:         True


## 1. Daten laden

In [2]:
config = load_config(str(PROJECT_ROOT / MODELS[0].config_no_dropout))
data_config = config['data']

paths = get_preprocessed_paths(
    vehicle=data_config['vehicle'],
    window_size=data_config['window_size'],
    predict_size=data_config['predict_size'],
    step_size=data_config['step_size'],
    suffix='sF',
    variant=data_config['variant'],
)

data_module = TimeSeriesDataModule(
    feature_path=str(paths['features']),
    target_path=str(paths['targets']),
    sequence_ids_path=str(paths['sequence_ids']),
    batch_size=256,
    split_seed=data_config.get('split_seed', 0),
)
data_module.setup()

# DataLoader with num_workers=0 (Windows/notebook compatibility)
test_loader = DataLoader(
    data_module.test_dataset, batch_size=256, shuffle=False, num_workers=0,
)
test_sequence_ids = data_module.get_split_sequence_ids('test')

print(f'\nTest samples:   {len(data_module.test_dataset):,}')
print(f'Test sequences: {len(np.unique(test_sequence_ids)):,}')

Loading numpy file: C:\Users\MSchm\Documents\att_project\data\prepared_dataset\HYUNDAI_SONATA_2020\50_1_1_sF\features_50_1_1_sF.npy
Loading numpy file: C:\Users\MSchm\Documents\att_project\data\prepared_dataset\HYUNDAI_SONATA_2020\50_1_1_sF\targets_50_1_1_sF.npy
Loaded features: torch.Size([2201265, 50, 5])
Loaded targets: torch.Size([2201265, 1])
Loaded sequence_ids: 2201265 (4988 unique sequences)
Sequence-level split (seed=0): 3491 train / 997 val / 500 test sequences
Sample counts: 1539545 train / 440533 val / 221187 test

Test samples:   221,187
Test sequences: 500


## 2. Checkpoint Discovery

In [3]:
all_checkpoints = {}

print(f'{"Model":<35s} {"Seeds":>6s}')
print('-' * 50)

for mc in MODELS:
    seed_ckpts = find_all_seed_checkpoints(mc, VARIANT)
    all_checkpoints[mc.id] = seed_ckpts
    found = len(seed_ckpts)
    missing = [s for s in SEEDS if s not in seed_ckpts]
    status = f'{found}/5'
    if missing:
        status += f'  (missing: {missing})'
    print(f'  {mc.name:<33s} {status}')

total = sum(len(v) for v in all_checkpoints.values())
print(f'\nTotal: {total}/40 checkpoints')

Model                                Seeds
--------------------------------------------------
  M1 MLP Last                       5/5
  M2 MLP Flat                       5/5
  M3 Small Baseline                 5/5
  M4 Small + Simple Attn            5/5
  M5 Medium Baseline                5/5
  M6 Medium + Simple Attn           5/5
  M7 Medium + Additive Attn         5/5
  M8 Medium + Scaled DP             5/5

Total: 40/40 checkpoints


## 3. Inference — Predictions & Attention Weights

Lädt jeden Checkpoint, berechnet Predictions auf dem Test-Set.
Für Attention-Modelle (M4, M6, M7, M8) werden die Attention Weights
gleichzeitig extrahiert und über alle Test-Samples gemittelt.

In [4]:
def run_inference(model, dataloader, device='cpu', extract_attention=False):
    """Run inference, optionally extracting attention weights.

    Attention weights are averaged over samples incrementally to avoid
    storing the full (N, seq_len, seq_len) matrix for additive attention.

    Returns:
        predictions, targets                       (if extract_attention=False)
        predictions, targets, avg_attention_1d     (if extract_attention=True)
    """
    model = model.to(device)
    model.eval()

    all_preds, all_targets = [], []
    attn_sum = None
    n_attn_samples = 0

    with torch.no_grad():
        for X_batch, Y_batch in dataloader:
            X_batch = X_batch.to(device)

            if extract_attention:
                outputs, attn = model(X_batch, return_attention=True)
                attn_np = attn.cpu().numpy()

                if attn_np.ndim == 2:
                    # Simple / Scaled DP: (batch, seq_len)
                    batch_sum = attn_np.sum(axis=0)
                elif attn_np.ndim == 3:
                    # Additive (M7): (batch, query, key)
                    # Average over query dim -> importance per key position
                    batch_sum = attn_np.mean(axis=1).sum(axis=0)
                else:
                    batch_sum = np.zeros(50)

                if attn_sum is None:
                    attn_sum = batch_sum
                else:
                    attn_sum += batch_sum
                n_attn_samples += len(attn_np)
            else:
                outputs = model(X_batch)

            all_preds.append(outputs.cpu().numpy())
            all_targets.append(Y_batch.numpy())

    predictions = np.concatenate(all_preds, axis=0)
    targets = np.concatenate(all_targets, axis=0)

    if extract_attention and attn_sum is not None:
        avg_attention = attn_sum / n_attn_samples
        return predictions, targets, avg_attention
    return predictions, targets

In [5]:
results = {}  # {model_id: {seed: {predictions, targets, attention}}}

for mc in MODELS:
    config_path = PROJECT_ROOT / mc.config_no_dropout
    cfg = load_config(str(config_path))
    model_class = get_model_class(cfg['model']['type'])

    results[mc.id] = {}
    seed_ckpts = all_checkpoints[mc.id]

    print(f'\n{mc.name}:')

    for seed in SEEDS:
        if seed not in seed_ckpts:
            print(f'  Seed {seed}: MISSING')
            continue

        ckpt_path, val_loss = seed_ckpts[seed]
        model = model_class.load_from_checkpoint(str(ckpt_path), map_location='cpu')

        has_attn = has_attention_support(model)

        if has_attn:
            preds, targs, attn = run_inference(
                model, test_loader, DEVICE, extract_attention=True,
            )
        else:
            preds, targs = run_inference(model, test_loader, DEVICE)
            attn = None

        results[mc.id][seed] = {
            'predictions': preds,
            'targets': targs,
            'attention': attn,
        }

        print(f'  Seed {seed}: val_loss={val_loss:.6f}, samples={len(preds):,}')

        del model
        gc.collect()

n_evals = sum(len(v) for v in results.values())
print(f'\nDone. {n_evals} model-seed evaluations.')
winsound.PlaySound("SystemHand", winsound.SND_ALIAS)


M1 MLP Last:
  Seed 7: val_loss=0.003477, samples=221,187
  Seed 42: val_loss=0.003500, samples=221,187
  Seed 94: val_loss=0.003454, samples=221,187
  Seed 123: val_loss=0.003421, samples=221,187
  Seed 231: val_loss=0.003470, samples=221,187

M2 MLP Flat:
  Seed 7: val_loss=0.002648, samples=221,187
  Seed 42: val_loss=0.002598, samples=221,187
  Seed 94: val_loss=0.002588, samples=221,187
  Seed 123: val_loss=0.002775, samples=221,187
  Seed 231: val_loss=0.002592, samples=221,187

M3 Small Baseline:
  Seed 7: val_loss=0.001940, samples=221,187
  Seed 42: val_loss=0.001973, samples=221,187
  Seed 94: val_loss=0.001993, samples=221,187
  Seed 123: val_loss=0.001958, samples=221,187
  Seed 231: val_loss=0.001933, samples=221,187

M4 Small + Simple Attn:
  Seed 7: val_loss=0.001993, samples=221,187
  Seed 42: val_loss=0.001946, samples=221,187
  Seed 94: val_loss=0.001987, samples=221,187
  Seed 123: val_loss=0.001939, samples=221,187
  Seed 231: val_loss=0.001964, samples=221,187

M5

## 4. Metriken berechnen

- **Sample-Level:** MSE, RMSE, MAE, R², Accuracy
- **Sequence-Level:** RMSE, MAE, Accuracy, R² (pro Sequenz, dann gemittelt)
- Jeweils mean ± std über 5 Seeds

In [None]:
metrics_rows = []

for mc in MODELS:
    seed_data = {
        'mse': [], 'rmse': [], 'mae': [], 'r2': [], 'accuracy': [],
        'seq_rmse': [], 'seq_mae': [], 'seq_accuracy': [], 'seq_r2': [],
    }

    for seed in SEEDS:
        if seed not in results[mc.id]:
            continue

        preds = results[mc.id][seed]['predictions']
        targs = results[mc.id][seed]['targets']

        # Sample-level metrics
        m = calculate_metrics_dict(preds, targs, ACCURACY_THRESHOLD)
        seed_data['mse'].append(m['mse'])
        seed_data['rmse'].append(m['rmse'])
        seed_data['mae'].append(m['mae'])
        seed_data['r2'].append(m['r2'])
        seed_data['accuracy'].append(m['accuracy'])

        # Sequence-level metrics
        _, seq_summary = aggregate_metrics_per_sequence(
            preds, targs, test_sequence_ids, ACCURACY_THRESHOLD,
        )
        seed_data['seq_rmse'].append(seq_summary['rmse_mean'])
        seed_data['seq_mae'].append(seq_summary['mae_mean'])
        seed_data['seq_accuracy'].append(seq_summary['accuracy_mean'])

        # Sequence-level R-squared
        mean_r2, _ = compute_per_sequence_r2(preds, targs, test_sequence_ids)
        seed_data['seq_r2'].append(mean_r2)

    row = {'model': mc.id.upper(), 'name': mc.name}
    for key, vals in seed_data.items():
        if vals:
            row[f'{key}_mean'] = np.mean(vals)
            row[f'{key}_std'] = np.std(vals)
    metrics_rows.append(row)

metrics_df = pd.DataFrame(metrics_rows)
print('Metrics computed.')
metrics_df[['model', 'name', 'accuracy_mean', 'accuracy_std',
            'rmse_mean', 'rmse_std', 'seq_r2_mean', 'seq_r2_std']].round(4)

Metrics computed.


## 4b. Statistische Tests (Sequenz-Ebene)

Block-Bootstrap CIs, Permutationstests und Effektstärken. Funktionen aus `scripts/sequence_level_evaluation.py` werden per Import wiederverwendet. Alle Berechnungen nutzen die vorhandenen Predictions im `results`-Dict — keine erneute Inference.

In [None]:
from scripts.sequence_level_evaluation import (
    bootstrap_ci_sequences,
    cohens_d_paired_sequences,
    permutation_test_sequences,
    multi_seed_sequence_analysis,
    run_all_comparisons,
    _compute_seq_metric_arrays,
    COMPARISON_PAIRS,
    METRICS,
    _significance_stars,
    _effect_size_category,
)

# Compute per-sequence metric arrays for all models x seeds
all_seq_metrics = {}  # {model_id: {seed: {metric: array}}}

for mc in MODELS:
    all_seq_metrics[mc.id] = {}
    for seed in SEEDS:
        if seed not in results[mc.id]:
            continue
        preds = results[mc.id][seed]['predictions']
        targs = results[mc.id][seed]['targets']
        seq_arrays = _compute_seq_metric_arrays(
            preds, targs, test_sequence_ids, ACCURACY_THRESHOLD,
        )
        all_seq_metrics[mc.id][seed] = seq_arrays

n_models = len(all_seq_metrics)
n_total = sum(len(v) for v in all_seq_metrics.values())
print(f'Per-sequence metrics computed: {n_models} models, {n_total} model-seed combinations')

In [None]:
# Block-Bootstrap CIs + Law of Total Variance for all models
bootstrap_results = {}  # {model_id: aggregated multi-seed result}
seed_stability = {}  # {model_id: per-seed accuracy values}

for mc in MODELS:
    model_id = mc.id
    seed_data = all_seq_metrics[model_id]
    seeds_available = sorted(seed_data.keys())

    # Per-seed bootstrap CIs
    seed_bootstrap_ci = {}
    seed_point_metrics = {}

    for seed in seeds_available:
        seq_arrays = seed_data[seed]
        point = {m: float(np.mean(seq_arrays[m])) for m in METRICS}
        seed_point_metrics[seed] = point

        ci = {}
        for metric in METRICS:
            ci[metric] = bootstrap_ci_sequences(
                seq_arrays[metric], n_bootstrap=1000, seed=42,
            )
        seed_bootstrap_ci[seed] = ci

    # Multi-seed aggregation (law of total variance)
    aggregated = multi_seed_sequence_analysis(seed_bootstrap_ci, seed_point_metrics)
    bootstrap_results[model_id] = aggregated
    seed_stability[model_id] = {
        'per_seed_accuracy': [seed_point_metrics[s]['accuracy'] for s in seeds_available],
        'sigma_seed': aggregated['accuracy']['std_seed'],
    }

# --- Combined uncertainty table ---
rows = []
for mc in MODELS:
    r = bootstrap_results[mc.id]
    rows.append({
        'Model': mc.id.upper(),
        'Name': mc.name,
        'Accuracy (%)': f"{r['accuracy']['mean']:.2f} \u00b1 {r['accuracy']['std']:.2f}",
        'RMSE': f"{r['rmse']['mean']:.4f} \u00b1 {r['rmse']['std']:.4f}",
        'MAE': f"{r['mae']['mean']:.4f} \u00b1 {r['mae']['std']:.4f}",
    })
print('Bootstrap CIs (combined uncertainty, 1000 samples \u00d7 5 seeds):')
display(pd.DataFrame(rows))

# --- Seed stability ---
print('\nSeed Stability (per-seed accuracy):')
stab_rows = []
for mc in MODELS:
    s = seed_stability[mc.id]
    acc_vals = s['per_seed_accuracy']
    row = {'Model': mc.id.upper()}
    for seed, val in zip(SEEDS, acc_vals):
        row[f'Seed {seed}'] = f'{val:.2f}'
    row['\u03c3_seed'] = f"{s['sigma_seed']:.2f}"
    stab_rows.append(row)
display(pd.DataFrame(stab_rows))

# --- Uncertainty decomposition ---
print('\nUncertainty Decomposition (Law of Total Variance):')
decomp_rows = []
for mc in MODELS:
    r = bootstrap_results[mc.id]
    acc = r['accuracy']
    sigma_total = acc['std']
    sigma_boot = acc['std_bootstrap']
    sigma_seed = acc['std_seed']
    seed_frac = (sigma_seed**2 / sigma_total**2 * 100) if sigma_total > 0 else 0
    decomp_rows.append({
        'Model': mc.id.upper(),
        '\u03c3_total': f'{sigma_total:.2f}',
        '\u03c3_bootstrap': f'{sigma_boot:.2f}',
        '\u03c3_seed': f'{sigma_seed:.2f}',
        'Seed Variance %': f'{seed_frac:.0f}%',
    })
display(pd.DataFrame(decomp_rows))

In [None]:
# Seed-averaged per-sequence metrics for pairwise comparisons
avg_seq_metrics = {}  # {MODEL_ID: {metric: array}}

for mc in MODELS:
    model_id = mc.id.upper()
    seed_data = all_seq_metrics[mc.id]
    seeds_available = sorted(seed_data.keys())

    avg = {}
    for metric in METRICS:
        avg[metric] = np.mean(
            [seed_data[s][metric] for s in seeds_available], axis=0,
        )
    avg_seq_metrics[model_id] = avg

# Run all pairwise comparisons (10,000 sign-flip permutations)
comparison_results = run_all_comparisons(
    COMPARISON_PAIRS, avg_seq_metrics,
    n_permutations=10000, seed=42,
)

# Display results
comp_rows = []
for c in comparison_results:
    acc = c['accuracy']
    rmse = c['rmse']
    mae = c['mae']
    acc_stars = _significance_stars(acc['p_value'])
    rmse_stars = _significance_stars(rmse['p_value'])
    mae_stars = _significance_stars(mae['p_value'])

    comp_rows.append({
        'Comparison': f"{c['model_a']} \u2192 {c['model_b']}",
        'Category': c['category'],
        '\u0394 Acc (%)': f"{acc['observed_diff']:+.2f}{acc_stars}",
        '\u0394 RMSE': f"{rmse['observed_diff']:+.4f}{rmse_stars}",
        '\u0394 MAE': f"{mae['observed_diff']:+.4f}{mae_stars}",
        'd(Acc)': f"{acc['cohens_d']:+.3f}",
        'd(RMSE)': f"{rmse['cohens_d']:+.3f}",
        'd(MAE)': f"{mae['cohens_d']:+.3f}",
    })

print(f'Pairwise Comparisons (10,000 permutations, seed=42):')
print('Significance: * p<0.05, ** p<0.01, *** p<0.001')
print("Cohen's d: |d|<0.2 negligible, 0.2-0.5 small, 0.5-0.8 medium, >0.8 large")
print("d sign: positive = B better (higher accuracy, lower RMSE/MAE)\n")
pd.DataFrame(comp_rows)

## 5. FLOPs & Parameter

In [None]:
sample_input = torch.randn(1, data_config['window_size'], 5)
flops_rows = []

for mc in MODELS:
    cfg = load_config(str(PROJECT_ROOT / mc.config_no_dropout))
    model_class = get_model_class(cfg['model']['type'])

    seed_ckpts = all_checkpoints[mc.id]
    any_seed = next(iter(seed_ckpts))
    ckpt_path, _ = seed_ckpts[any_seed]
    model = model_class.load_from_checkpoint(str(ckpt_path), map_location='cpu')

    n_params = sum(p.numel() for p in model.parameters())
    flops_result = calculate_flops(model, sample_input.clone())

    flops_rows.append({
        'model': mc.id.upper(),
        'params': n_params,
        'flops': flops_result['flops'],
        'flops_fmt': flops_result['flops_formatted'],
        'macs_fmt': flops_result['macs_formatted'],
    })

    del model
    gc.collect()

flops_df = pd.DataFrame(flops_rows)
flops_df


## 6. Inference Time Messung

> **Hinweis:** Diese Zelle isoliert ausführen. Keine anderen rechenintensiven Prozesse parallel laufen lassen.
>
> Messung: CPU, single-thread (`torch.set_num_threads(1)`), 100 Warmup, 5 Runs × 1000 Samples.

In [7]:
inference_rows = []

for mc in MODELS:
    cfg = load_config(str(PROJECT_ROOT / mc.config_no_dropout))
    model_class = get_model_class(cfg['model']['type'])

    seed_ckpts = all_checkpoints[mc.id]
    ckpt_path, _ = seed_ckpts[42]
    model = model_class.load_from_checkpoint(str(ckpt_path), map_location='cpu')

    timing = measure_inference_time(
        model, sample_input.clone(),
        warmup_iterations=100,
        num_samples=1000,
        num_runs=5,
        device='cpu',
    )

    inference_rows.append({
        'model': mc.id.upper(),
        'mean_ms': timing['mean_ms'],
        'std_ms': timing['std_ms'],
        'p50_ms': timing['p50_ms'],
        'p50_std_ms': timing['p50_std_ms'],
        'p95_ms': timing['p95_ms'],
        'p95_std_ms': timing['p95_std_ms'],
        'p99_ms': timing['p99_ms'],
        'p99_std_ms': timing['p99_std_ms'],
    })

    print(f'  {mc.name:<35s}  P95={timing["p95_ms"]:.3f} +/- {timing["p95_std_ms"]:.3f} ms')

    del model
    gc.collect()

inference_df = pd.DataFrame(inference_rows)
inference_df

winsound.PlaySound("SystemHand", winsound.SND_ALIAS)

NameError: name 'sample_input' is not defined

## 7. Ergebnis-Tabellen

In [None]:
def fmt(row, key, decimals=4):
    """Format mean +/- std."""
    m = row.get(f'{key}_mean', float('nan'))
    s = row.get(f'{key}_std', float('nan'))
    return f'{m:.{decimals}f} +/- {s:.{decimals}f}'


main = metrics_df[['model', 'name']].copy()
main = main.merge(flops_df[['model', 'params']], on='model')
main = main.merge(inference_df[['model', 'p95_ms', 'p95_std_ms']], on='model')

main['Accuracy (%)'] = metrics_df.apply(lambda r: fmt(r, 'accuracy', 2), axis=1)
main['RMSE'] = metrics_df.apply(lambda r: fmt(r, 'rmse'), axis=1)
main['MAE'] = metrics_df.apply(lambda r: fmt(r, 'mae'), axis=1)
main['R2 (sample)'] = metrics_df.apply(lambda r: fmt(r, 'r2'), axis=1)
main['R2 (sequence)'] = metrics_df.apply(lambda r: fmt(r, 'seq_r2'), axis=1)
main['Seq Accuracy (%)'] = metrics_df.apply(lambda r: fmt(r, 'seq_accuracy', 2), axis=1)
main['Seq RMSE'] = metrics_df.apply(lambda r: fmt(r, 'seq_rmse'), axis=1)
main['P95 (ms)'] = main.apply(
    lambda r: f"{r['p95_ms']:.3f} +/- {r['p95_std_ms']:.3f}", axis=1,
)

display_cols = [
    'model', 'name', 'params',
    'Accuracy (%)', 'RMSE', 'MAE',
    'R2 (sample)', 'R2 (sequence)',
    'Seq Accuracy (%)', 'Seq RMSE',
    'P95 (ms)',
]
main[display_cols]

## 8. Figures

In [None]:
import matplotlib
import matplotlib.pyplot as plt

try:
    import scienceplots  # noqa: F401
    plt.style.use(['science', 'ieee'])
except ImportError:
    print('WARNING: scienceplots not installed, using default style')

plt.rcParams.update({
    'text.usetex': True,
    'pgf.texsystem': 'pdflatex',
    'pgf.rcfonts': False,
    'pgf.preamble': '\n'.join([
        r'\usepackage[utf8]{inputenc}',
        r'\usepackage[T1]{fontenc}',
        r'\usepackage{amsmath}',
        r'\usepackage{siunitx}',
        r'\providecommand{\mathdefault}[1]{#1}',
    ]),
    'font.family': 'serif',
    'font.serif': ['Computer Modern Roman'],
    'figure.figsize': (3.5, 2.5),
    'savefig.bbox': 'tight',
    'savefig.pad_inches': 0.02,
    'lines.linewidth': 1.0,
    'lines.markersize': 4,
    'axes.grid': False,
    'legend.framealpha': 0.95,
    'legend.edgecolor': 'none',
    'savefig.dpi': 300,
})

MATLAB_BLUE = (0/255, 114/255, 189/255)
MATLAB_ORANGE = (217/255, 83/255, 25/255)
MATLAB_PURPLE = (126/255, 47/255, 142/255)

FIGURES_DIR = PROJECT_ROOT / 'figures'
FIGURES_DIR.mkdir(parents=True, exist_ok=True)


def save_figure(fig, name):
    """Save figure as PGF, PDF, and PNG."""
    for ext in ['pgf', 'pdf', 'png']:
        path = FIGURES_DIR / f'{name}.{ext}'
        fig.savefig(path)
        print(f'  Saved: {path.relative_to(PROJECT_ROOT)}')
    plt.close(fig)


print('Figure styling configured.')

### 8a. Attention Weights

In [None]:
# Average attention profiles across seeds for each attention model
attention_profiles = {}

for model_id in ['m4', 'm6', 'm7', 'm8']:
    seed_weights = []
    for seed in SEEDS:
        if seed not in results[model_id]:
            continue
        attn = results[model_id][seed]['attention']
        if attn is None:
            continue
        seed_weights.append(attn)

    if not seed_weights:
        print(f'  {model_id.upper()}: no attention data')
        continue

    combined = np.mean(seed_weights, axis=0)
    combined = combined / combined.sum()  # Normalize to sum=1
    attention_profiles[model_id] = combined

    last5 = combined[-5:].sum() * 100
    last10 = combined[-10:].sum() * 100
    last20 = combined[-20:].sum() * 100
    peak = np.argmax(combined)
    print(f'  {model_id.upper()}: Last5={last5:.1f}%, Last10={last10:.1f}%, '
          f'Last20={last20:.1f}%, Peak={peak}')

In [None]:
# Plot: 3 subplots (M6, M7, M8)
models_to_plot = [
    ('m6', 'Simple Attention', MATLAB_BLUE),
    ('m7', 'Additive Attention', MATLAB_ORANGE),
    ('m8', 'Scaled Dot-Product', MATLAB_PURPLE),
]

fig, axes = plt.subplots(1, 3, figsize=(7.16, 2.2), sharey=False)

for ax, (mid, label, color) in zip(axes, models_to_plot):
    if mid not in attention_profiles:
        ax.text(0.5, 0.5, 'No data', transform=ax.transAxes, ha='center')
        continue

    w = attention_profiles[mid]
    t = np.arange(len(w))

    ax.plot(t, w, color=color, linewidth=1.2, linestyle='-')
    ax.axhline(
        y=1.0 / 50, color='gray', linestyle='--', linewidth=0.7,
        label=r'Uniform ($\frac{1}{50}$)',
    )
    ax.set_xlabel('Time Step')
    ax.set_title(label, fontsize=8)
    ax.set_xlim(0, 49)
    ax.set_ylim(bottom=0)

axes[0].set_ylabel('Attention Weight')
axes[0].legend(fontsize=6, loc='upper left')
fig.tight_layout()

save_figure(fig, 'attention_weights_plot')

### 8b. Inference-Accuracy Tradeoff

In [None]:
# Build data from computed metrics + inference times
tradeoff_data = {}
for _, mrow in metrics_df.iterrows():
    mid = mrow['model']
    irow = inference_df[inference_df['model'] == mid].iloc[0]

    if mid in ('M1', 'M2'):
        mtype = 'mlp'
    elif mid in ('M3', 'M5'):
        mtype = 'lstm'
    else:
        mtype = 'lstm_attn'

    tradeoff_data[mid] = {
        'accuracy': mrow['accuracy_mean'],
        'p95_ms': irow['p95_ms'],
        'type': mtype,
    }

# Marker and color maps
MARKERS = {'mlp': 's', 'lstm': 'o', 'lstm_attn': '^'}
TYPE_COLORS = {'mlp': '#7f7f7f', 'lstm': '#1f77b4', 'lstm_attn': '#ff7f0e'}
MARKER_SIZES = {'mlp': 35, 'lstm': 45, 'lstm_attn': 50}

fig, ax = plt.subplots(figsize=(3.5, 2.8))

plotted_types = set()
for mid, d in tradeoff_data.items():
    mtype = d['type']
    label = None
    if mtype not in plotted_types:
        label = {'mlp': 'MLP Baseline', 'lstm': 'LSTM Baseline',
                 'lstm_attn': 'LSTM + Attention'}[mtype]
        plotted_types.add(mtype)

    ax.scatter(
        d['p95_ms'], d['accuracy'],
        marker=MARKERS[mtype], s=MARKER_SIZES[mtype],
        c=TYPE_COLORS[mtype], edgecolors='white', linewidths=0.5,
        label=label, zorder=4,
    )

# Highlight M3 (Pareto-optimal)
m3 = tradeoff_data['M3']
ax.scatter(
    m3['p95_ms'], m3['accuracy'],
    marker='o', s=120, facecolors='none',
    edgecolors='#d62728', linewidths=1.5, zorder=5,
)

# Model labels
label_offsets = {
    'M1': (5, -2), 'M2': (5, -2), 'M3': (5, 3), 'M4': (5, -5),
    'M5': (-8, 3), 'M6': (-5, -5), 'M7': (5, 3), 'M8': (5, -5),
}

for mid, d in tradeoff_data.items():
    x_off, y_off = label_offsets[mid]
    ha = 'left' if x_off >= 0 else 'right'
    va = 'bottom' if y_off > 0 else 'top'
    text = r'\textbf{M3}' if mid == 'M3' else mid

    ax.annotate(
        text, (d['p95_ms'], d['accuracy']),
        xytext=(x_off, y_off), textcoords='offset points',
        fontsize=7, ha=ha, va=va,
    )

ax.set_xscale('log')
ax.set_xlim(0.04, 7)
ax.set_xticks([0.05, 0.1, 0.5, 1, 2, 5])
ax.set_xticklabels(['0.05', '0.1', '0.5', '1', '2', '5'])
ax.set_xlabel(r'Inference Time P95 (ms)')
ax.set_ylabel(r'Accuracy (\%)')
ax.legend(loc='lower right', fontsize=7)

save_figure(fig, 'inference_accuracy_tradeoff')

### 8c. Prediction Timeseries

In [None]:
# Use seed 42, models M3/M5/M6
PRED_MODELS = ['m3', 'm5', 'm6']
PRED_SEED = 42

# Compute per-sequence RMSE using M5 as reference
ref_preds = results['m5'][PRED_SEED]['predictions'].flatten()
ref_targs = results['m5'][PRED_SEED]['targets'].flatten()

unique_seqs = np.unique(test_sequence_ids)
seq_rmse = {}
for sid in unique_seqs:
    mask = test_sequence_ids == sid
    p = ref_preds[mask]
    t = ref_targs[mask]
    seq_rmse[sid] = np.sqrt(np.mean((p - t) ** 2))

rmse_values = np.array(list(seq_rmse.values()))
seq_ids_arr = np.array(list(seq_rmse.keys()))


def pick_sequence(lo_pct, hi_pct):
    """Pick sequence with RMSE closest to midpoint of percentile range."""
    lo = np.percentile(rmse_values, lo_pct)
    hi = np.percentile(rmse_values, hi_pct)
    mask = (rmse_values >= lo) & (rmse_values <= hi)
    if not mask.any():
        mid = np.percentile(rmse_values, (lo_pct + hi_pct) / 2)
        idx = np.argmin(np.abs(rmse_values - mid))
        return seq_ids_arr[idx]
    mid = (lo + hi) / 2
    candidates = rmse_values[mask]
    cand_ids = seq_ids_arr[mask]
    best = np.argmin(np.abs(candidates - mid))
    return cand_ids[best]


selected = {
    'good': pick_sequence(10, 25),
    'median': pick_sequence(45, 55),
    'difficult': pick_sequence(75, 90),
}

for label, sid in selected.items():
    print(f'  {label}: seq_id={sid}, RMSE={seq_rmse[sid]:.4f}')

In [None]:
categories = [
    ('good', 'Good Prediction'),
    ('median', 'Median Prediction'),
    ('difficult', 'Difficult Prediction'),
]

model_colors = {'m3': MATLAB_BLUE, 'm5': MATLAB_ORANGE, 'm6': MATLAB_PURPLE}
model_styles = {'m3': '--', 'm5': '-.', 'm6': ':'}
model_labels = {'m3': 'M3 (Small)', 'm5': 'M5 (Medium)', 'm6': 'M6 (+ Attn)'}

fig, axes = plt.subplots(3, 1, figsize=(3.5, 5.5), sharex=True)

for ax, (cat, title_prefix) in zip(axes, categories):
    sid = selected[cat]
    mask = test_sequence_ids == sid

    gt = results['m5'][PRED_SEED]['targets'].flatten()[mask]
    timesteps = np.arange(len(gt))

    ax.plot(
        timesteps, gt, color='black', linewidth=1.0,
        linestyle='-', label='Ground Truth',
    )

    for mid in PRED_MODELS:
        p = results[mid][PRED_SEED]['predictions'].flatten()[mask]
        ax.plot(
            timesteps, p, color=model_colors[mid], linewidth=0.8,
            linestyle=model_styles[mid], label=model_labels[mid],
        )

    rmse_m5 = seq_rmse[sid]
    ax.set_title(
        f'{title_prefix} (RMSE$_{{\\mathrm{{M5}}}}={rmse_m5:.3f}$)',
        fontsize=8,
    )
    ax.set_ylabel('Torque (norm.)')

axes[-1].set_xlabel('Time Step')
axes[0].legend(fontsize=6, loc='best', ncol=2)
fig.tight_layout()

save_figure(fig, 'prediction_timeseries')

## 9. Ergebnisse exportieren

In [None]:
import json

RESULTS_DIR = PROJECT_ROOT / 'results'
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

# --- 1. Haupttabelle: alle Metriken (numerisch, kein Formatting) ---
export_metrics = metrics_df[['model', 'name']].copy()
export_metrics = export_metrics.merge(flops_df[['model', 'params']], on='model')
export_metrics = export_metrics.merge(
    inference_df[['model', 'p95_ms', 'p95_std_ms']], on='model',
)

# Sample-level
for key in ['mse', 'rmse', 'mae', 'r2', 'accuracy']:
    export_metrics[f'{key}_mean'] = metrics_df[f'{key}_mean']
    export_metrics[f'{key}_std'] = metrics_df[f'{key}_std']

# Sequence-level
for key in ['seq_rmse', 'seq_mae', 'seq_accuracy', 'seq_r2']:
    export_metrics[f'{key}_mean'] = metrics_df[f'{key}_mean']
    export_metrics[f'{key}_std'] = metrics_df[f'{key}_std']

metrics_path = RESULTS_DIR / 'eval_metrics.csv'
export_metrics.to_csv(metrics_path, index=False, float_format='%.6f')
print(f'Saved: {metrics_path.relative_to(PROJECT_ROOT)}')

# --- 2. Inference-Zeiten ---
inference_path = RESULTS_DIR / 'eval_inference.csv'
inference_df.to_csv(inference_path, index=False, float_format='%.6f')
print(f'Saved: {inference_path.relative_to(PROJECT_ROOT)}')

# --- 3. Attention Weight CSVs (pro Modell) ---
for model_id, profile in attention_profiles.items():
    csv_path = FIGURES_DIR / f'attention_weights_{model_id.upper()}.csv'
    attn_df = pd.DataFrame({
        'timestep': np.arange(len(profile)),
        'weight': profile,
    })
    attn_df.to_csv(csv_path, index=False, float_format='%.8f')
    print(f'Saved: {csv_path.relative_to(PROJECT_ROOT)}')

# --- 4. Statistische Ergebnisse (JSON) ---
stats_export = {
    'analysis_level': 'sequence',
    'n_bootstrap': 1000,
    'n_permutations': 10000,
    'n_test_sequences': int(len(np.unique(test_sequence_ids))),
    'accuracy_threshold': ACCURACY_THRESHOLD,
    'bootstrap_results': {},
    'comparisons': [],
}

for mc in MODELS:
    mid = mc.id.upper()
    r = bootstrap_results[mc.id]
    stats_export['bootstrap_results'][mid] = {
        metric: {
            'mean': r[metric]['mean'],
            'std': r[metric]['std'],
            'std_bootstrap': r[metric]['std_bootstrap'],
            'std_seed': r[metric]['std_seed'],
            'ci_lower': r[metric]['ci_lower'],
            'ci_upper': r[metric]['ci_upper'],
        }
        for metric in METRICS
    }

for c in comparison_results:
    c_clean = {
        'model_a': c['model_a'],
        'model_b': c['model_b'],
        'category': c['category'],
        'n_sequences': c['n_sequences'],
    }
    for metric in METRICS:
        c_clean[metric] = c[metric]
    stats_export['comparisons'].append(c_clean)

stats_path = RESULTS_DIR / 'eval_statistics.json'
with open(stats_path, 'w', encoding='utf-8') as f:
    json.dump(stats_export, f, indent=2)
print(f'Saved: {stats_path.relative_to(PROJECT_ROOT)}')