In [1]:
# ==================================================================================================
# üåå TITAN OS: "PENROSE RING" GRAND UNIFIED OPTIMIZATION PROTOCOL (v2.1 FINAL)
# ==================================================================================================
# PURPOSE:  Create the ultimate CPU Operating System Blueprint using Real-World Data.
# DATA:     1. OpenML 197 'cpu_act' (8,192 OS Kernel snapshots)
#           2. OpenML 533 (CPU Performance Regression benchmarks)
#           3. OpenML 44148 (Exotic Superconductor Physics for Hardware Layer)
# STACK:    Full 5-Stage Titan Suite (Validation -> Discovery -> Attribution -> Causal -> Optimization)
# OUTPUTS:  Developer-Ready 'Control Surface' CSV & 'OS Profile' JSON.
# ==================================================================================================

import os
import sys
import json
import warnings
import logging
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import networkx as nx
import xgboost as xgb
from typing import Dict, List, Tuple
from dataclasses import dataclass
from tqdm.auto import tqdm
from scipy.stats import ks_2samp, chisquare
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.ensemble import IsolationForest, RandomForestRegressor
from sklearn.decomposition import PCA
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# GPU Configuration
warnings.filterwarnings('ignore')
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"üöÄ SYSTEM ONLINE: TITAN OS OPTIMIZER | DEVICE: {DEVICE}")

# ==================================================================================================
# 1. TITAN VALIDATION FRAMEWORK (TVF) - MANIFOLD EDITION
# ==================================================================================================
@dataclass
class TVFConfig:
    drift_alpha: float = 0.05
    reconstruction_error_threshold: float = 0.15
    min_explained_variance_ratio: float = 0.85
    iforest_contamination: float = 0.03

class TitanValidationFramework:
    def __init__(self, reference_data: pd.DataFrame, config: TVFConfig = None):
        self.config = config or TVFConfig()
        self.reference = reference_data.copy()
        self.num_cols = self.reference.select_dtypes(include=[np.number]).columns.tolist()
        self.scaler = StandardScaler()
        
        print("   [TVF] Learning Baseline Structure (Manifold & Anomalies)...")
        if len(self.num_cols) > 0:
            X = self.scaler.fit_transform(self.reference[self.num_cols].fillna(0))
            
            # Learn the "Safe Operating Manifold" via PCA
            self.pca = PCA(n_components=0.95)
            self.pca.fit(X)
            X_recon = self.pca.inverse_transform(self.pca.transform(X))
            self.ref_recon_error = np.mean(np.square(X - X_recon))
            
            # Learn "Safe Operating Envelope" via Isolation Forest
            self.iforest = IsolationForest(
                contamination=self.config.iforest_contamination, n_jobs=-1, random_state=42
            )
            self.iforest.fit(X)
        else:
            self.pca = None
            self.iforest = None
            self.ref_recon_error = 0.0

    def validate(self, new_data: pd.DataFrame) -> Tuple[bool, pd.DataFrame]:
        report_rows = []
        # Check Integrity
        missing = set(self.reference.columns) - set(new_data.columns)
        if missing:
            report_rows.append({"Module": "Integrity", "Status": "RED", "Reason": f"Missing: {list(missing)[:3]}"})
        
        if self.pca is not None and len(self.num_cols) > 0:
            X = self.scaler.transform(new_data[self.num_cols].fillna(0))
            
            # Check Manifold Drift
            X_recon = self.pca.inverse_transform(self.pca.transform(X))
            curr_error = np.mean(np.square(X - X_recon))
            ratio = curr_error / max(self.ref_recon_error, 1e-9)
            if ratio > (1 + self.config.reconstruction_error_threshold):
                report_rows.append({"Module": "Structure", "Status": "RED", "Reason": f"Manifold Drift (Err: {ratio:.2f}x)"})
            
            # Check Anomalies
            if self.iforest is not None:
                preds = self.iforest.predict(X)
                rate = (preds == -1).mean()
                if rate > self.config.iforest_contamination * 3:
                    report_rows.append({"Module": "Anomalies", "Status": "RED", "Reason": f"High Anomaly Rate ({rate:.1%})"})

        report = pd.DataFrame(report_rows) if report_rows else pd.DataFrame(
            [{"Module": "Global", "Status": "GREEN", "Reason": "All Systems Nominal"}]
        )
        return len(report_rows) == 0, report

    def manifold_penalty(self, candidate_df: pd.DataFrame) -> float:
        """Calculates how 'unrealistic' a proposed OS config is."""
        if self.pca is None or len(self.num_cols) == 0: return 0.0
        
        # Ensure alignment
        df = candidate_df.copy()
        for c in self.num_cols:
            if c not in df.columns: df[c] = 0.0 # Zero fill missing
            
        X = self.scaler.transform(df[self.num_cols].fillna(0))
        X_recon = self.pca.inverse_transform(self.pca.transform(X))
        
        # Reconstruction error ratio relative to baseline
        recon_ratio = np.mean((X - X_recon) ** 2) / max(self.ref_recon_error, 1e-9)
        
        # Penalty applies only if we drift significantly outside the baseline manifold
        penalty = max(0.0, recon_ratio - 1.2) 
        return float(penalty)

# ==================================================================================================
# 2. UNIFIED DISCOVERY ENGINE (UDE)
# ==================================================================================================
class UnifiedDiscoveryEngine:
    def __init__(self, df, target_col):
        self.raw_df = df.copy()
        self.target = target_col
        self.features = [c for c in df.columns if c != target_col]
        print(f"üîç UDE ONLINE. Target: '{self.target}'")

    def scan_physics(self):
        print("\n>> [UDE] SCANNING NON-LINEAR PHYSICS...")
        X, y = self.raw_df[self.features], self.raw_df[self.target]
        model = xgb.XGBRegressor(n_estimators=200, max_depth=5, n_jobs=-1, random_state=42)
        model.fit(X, y)
        imp = pd.Series(model.feature_importances_, index=self.features).sort_values(ascending=False)
        print(f"   Baseline R¬≤: {model.score(X, y):.4f}")
        return imp

    def detect_redundancy(self, top_features, threshold=0.90):
        print("\n>> [UDE] PURGING REDUNDANT DIMENSIONS...")
        survivors = []
        for f in top_features:
            is_redundant = False
            if survivors:
                X_surv = self.raw_df[survivors]
                y_feat = self.raw_df[f]
                # Can we predict this feature from the ones we already kept?
                r2 = RandomForestRegressor(
                    n_estimators=40, max_depth=6, n_jobs=-1, random_state=42
                ).fit(X_surv, y_feat).score(X_surv, y_feat)
                
                if r2 > threshold:
                    print(f"   ‚öîÔ∏è Killed '{f}' (Redundant with Survivors, R¬≤={r2:.2f})")
                    is_redundant = True
            if not is_redundant:
                survivors.append(f)
        return survivors

# ==================================================================================================
# 3. UNIVERSAL ATTRIBUTION VALIDATOR (UAV)
# ==================================================================================================
class UniversalAttributionValidator:
    def __init__(self, X_df, y_series):
        self.X_vals = X_df.values.astype(np.float32)
        self.y_vals = y_series.values.astype(np.float32)
        self.cols = X_df.columns.tolist()
        self.X_t = torch.from_numpy(self.X_vals).to(DEVICE)
        self.y_t = torch.from_numpy(self.y_vals).reshape(-1, 1).to(DEVICE)
        self._train_proxy()

    def _train_proxy(self):
        print(f"\n‚öñÔ∏è [UAV] TRAINING DIFFERENTIABLE SURROGATE...")
        d_in = self.X_t.shape[1]
        self.model = nn.Sequential(
            nn.Linear(d_in, 128), nn.SiLU(), nn.Dropout(0.1),
            nn.Linear(128, 64), nn.SiLU(), nn.Linear(64, 1)
        ).to(DEVICE)
        opt = torch.optim.Adam(self.model.parameters(), lr=0.002)
        loss_fn = nn.MSELoss()
        
        # Fast convergence training
        pbar = tqdm(range(400), desc="   Training Surrogate", leave=False)
        for _ in pbar:
            opt.zero_grad()
            pred = self.model(self.X_t + torch.randn_like(self.X_t)*0.01) # Add noise for robustness
            loss = loss_fn(pred, self.y_t)
            loss.backward()
            opt.step()

    def compute_integrated_gradients(self, n_steps=50):
        print("   [UAV] Calculating Gradient Flows (Sensitivity)...")
        baseline = torch.mean(self.X_t, dim=0, keepdim=True)
        batch = self.X_t[:min(2000, self.X_t.shape[0])] # Sample for speed
        grads_acc = torch.zeros_like(batch)
        
        for alpha in np.linspace(0, 1, n_steps):
            x_step = baseline + alpha * (batch - baseline)
            x_step.requires_grad = True
            out = self.model(x_step).sum()
            out.backward()
            grads_acc += x_step.grad
            
        avg_grads = grads_acc / n_steps
        attr = (batch - baseline) * avg_grads
        return pd.Series(attr.detach().cpu().numpy().mean(axis=0), index=self.cols).sort_values(ascending=False)

# ==================================================================================================
# 4. PLATINUM CAUSAL INTERVENTION ENGINE
# ==================================================================================================
class PlatinumCausalEngine:
    def __init__(self, df, graph_edges):
        self.df = df
        self.G = nx.DiGraph(graph_edges)
        self.models = {}
        self._fit_mechanisms()

    def _fit_mechanisms(self):
        print("\n‚öôÔ∏è [UIF] FITTING CAUSAL MECHANISMS...")
        for node in self.G.nodes():
            parents = list(self.G.predecessors(node))
            if parents:
                self.models[node] = xgb.XGBRegressor(
                    n_estimators=80, max_depth=4, n_jobs=-1
                ).fit(self.df[parents], self.df[node])

    def simulate(self, intervention: dict, target: str):
        sim_df = self.df.sample(n=min(1500, len(self.df)), replace=True).copy()
        
        # 1. Apply Intervention
        for node, val in intervention.items():
            sim_df[node] = val
            
        # 2. Propagate through Graph
        sorted_nodes = list(nx.topological_sort(self.G))
        for node in sorted_nodes:
            if node in intervention: continue
            parents = list(self.G.predecessors(node))
            if parents:
                sim_df[node] = self.models[node].predict(sim_df[parents])
                
        return sim_df[target].mean(), sim_df[target].std()

# ==================================================================================================
# 5. UNIFIED OPTIMIZATION FRAMEWORK
# ==================================================================================================
class UnifiedOptimizer:
    def __init__(self, objective_func, bounds):
        self.func = objective_func
        self.bounds = bounds

    def optimize_genetic(self, pop_size=60, generations=20):
        print("üß¨ [UOF] RUNNING GENETIC EVOLUTION...")
        from scipy.optimize import differential_evolution
        
        def wrapper(x):
            params = {k: v for k, v in zip(self.bounds.keys(), x)}
            return -self.func(**params) # Minimize negative = Maximize
        
        result = differential_evolution(
            wrapper, 
            bounds=list(self.bounds.values()), 
            maxiter=generations, 
            popsize=max(6, pop_size // 10),
            workers=1
        )
        return {k: v for k, v in zip(self.bounds.keys(), result.x)}, -result.fun

# ==================================================================================================
# üöÄ MAIN EXECUTION PROTOCOL
# ==================================================================================================
def main():
    print("==================================================================================")
    print("üåå INITIATING GRAND UNIFIED DATA INGESTION (REAL PHYSICS, EXPANDED)")
    print("==================================================================================")
    
    # --------------------------------------------------------------------------------------------
    # A. DATA INGESTION
    # --------------------------------------------------------------------------------------------
    # 1. OS KERNEL DATA (Workload: System Calls, I/O, Paging)
    print(">> [SOURCE 1] Fetching OS Kernel Data (Delve: 'cpu_act', ID 197)...")
    os_data = fetch_openml(data_id=197, as_frame=True, parser='auto').frame
    print(f"   ‚úì OS Data Secured: {os_data.shape} - (System Calls, I/O, Interrupts)")
    
    # 2. CPU PERFORMANCE REGRESSION (Benchmarks)
    print(">> [SOURCE 2] Fetching CPU Performance Regression Data (ID 533)...")
    try:
        cpu_perf = fetch_openml(data_id=533, as_frame=True, parser='auto').frame
        print(f"   ‚úì CPU Perf Data Secured: {cpu_perf.shape}")
        # Rename to avoid collisions
        cpu_perf_numeric = cpu_perf.select_dtypes(include=[np.number])
        cpu_perf_numeric.columns = [f"CPUPerf_{c}" for c in cpu_perf_numeric.columns]
    except Exception as e:
        print(f"   ‚ö†Ô∏è CPU Perf dataset unavailable ({e}), proceeding with OS data only.")
        cpu_perf_numeric = pd.DataFrame()
    
    # 3. EXOTIC HARDWARE DATA (Superconductors for 'Future Material' Simulation)
    print(">> [SOURCE 3] Fetching Exotic Hardware Data (Superconduct, ID 44148)...")
    hw_full = fetch_openml(data_id=44148, as_frame=True, parser='auto').frame
    print(f"   ‚úì Hardware Data Secured: {hw_full.shape} - (81 features + critical_temp)")
    
    hw_features = hw_full.iloc[:, :-1]
    hw_target = hw_full.iloc[:, -1]
    
    # Take a wider slice of hardware features (Top 24)
    n_hw_feats = min(24, hw_features.shape[1])
    hw_subset = hw_features.iloc[:, :n_hw_feats].copy()
    hw_subset.columns = [f"HW_Feat_{i+1}" for i in range(n_hw_feats)]
    hw_subset["HW_Thermal_Limit"] = hw_target.values

    # --------------------------------------------------------------------------------------------
    # B. FUSION & PREPROCESSING
    # --------------------------------------------------------------------------------------------
    print("\n>> [FUSION] Merging OS + CPUPerf + Hardware into 'Penrose Ring' Matrix...")
    n_samples = min(len(os_data), len(hw_subset), 10000)
    
    df_os = os_data.sample(n_samples, random_state=42).reset_index(drop=True)
    df_hw = hw_subset.sample(n_samples, random_state=42).reset_index(drop=True)
    
    if cpu_perf_numeric.empty:
        df_main = pd.concat([df_os, df_hw], axis=1)
    else:
        cpu_perf_slice = cpu_perf_numeric.sample(n_samples, replace=True, random_state=42).reset_index(drop=True)
        df_main = pd.concat([df_os, cpu_perf_slice, df_hw], axis=1)
    
    # Target: 'usr' (User CPU Time - Goal is to Maximize useful work, Minimize 'sys' overhead)
    target_col = 'usr'
    if target_col not in df_main.columns:
        raise RuntimeError(f"Target column '{target_col}' not found. Available: {df_main.columns.tolist()[:30]}")
    
    # Keep Raw Copy for Final Reporting
    df_raw = df_main.copy()
    
    # Normalize [0,1] for Neural Nets & Optimization
    scaler = MinMaxScaler()
    df_main = pd.DataFrame(scaler.fit_transform(df_main), columns=df_main.columns)
    print(f"‚úì FUSION COMPLETE. Matrix: {df_main.shape}")

    # --------------------------------------------------------------------------------------------
    # C. EXECUTE TITAN STACK
    # --------------------------------------------------------------------------------------------
    
    # PHASE 1: VALIDATION
    print("\n[PHASE 1] VALIDATION SCAN")
    tvf = TitanValidationFramework(df_main)
    passed, report = tvf.validate(df_main)
    print(report)

    # PHASE 2: DISCOVERY
    print("\n[PHASE 2] DISCOVERING INFLUENTIAL PARAMETERS")
    ude = UnifiedDiscoveryEngine(df_main, target_col)
    importances = ude.scan_physics()
    
    # Take top 20 drivers
    top_drivers = importances.head(20).index.tolist()
    print(f"   Top Drivers: {top_drivers}")
    
    # Clean Redundancy
    clean_drivers = ude.detect_redundancy(top_drivers)
    print(f"   Unique Control Knobs: {clean_drivers}")

    # PHASE 3: ATTRIBUTION
    print("\n[PHASE 3] DECODING PHYSICS (Attribution)")
    uav = UniversalAttributionValidator(df_main[clean_drivers], df_main[target_col])
    attr_scores = uav.compute_integrated_gradients()
    print("\nüèÜ PARAMETER IMPACT (Sensitivity):")
    print(attr_scores)

    # PHASE 4: CAUSAL SIMULATION
    print("\n[PHASE 4] SIMULATING INTERVENTIONS")
    # Build Graph: HW Features -> System Calls -> User CPU
    edges = []
    if ('HW_Feat_1' in clean_drivers) and ('scall' in clean_drivers):
        edges.append(('HW_Feat_1', 'scall'))
    for d in clean_drivers:
        if d != target_col:
            edges.append((d, target_col))
    
    pce = PlatinumCausalEngine(df_main[clean_drivers + [target_col]], edges)
    
    # Test Scenario: "Kernel Bypass" (Minimizing System Calls)
    if 'scall' in clean_drivers:
        print(">> Simulating 'Kernel Bypass' (scall = 0.1)...")
        mu, std = pce.simulate({'scall': 0.1}, target=target_col)
        print(f"   Predicted User CPU (normalized): {mu:.3f} (¬±{std:.3f})")

    # PHASE 5: OPTIMIZATION (With Manifold Regularization)
    print("\n[PHASE 5] PENROSE RING OPTIMIZATION (Regularized)")
    
    def objective(**kwargs):
        # 1. Build Vector
        vec = [kwargs.get(col, 0.5) for col in clean_drivers]
        t_vec = torch.tensor([vec], dtype=torch.float32).to(DEVICE)
        
        # 2. Get Raw Performance Score from Surrogate
        with torch.no_grad():
            base_score = float(uav.model(t_vec).item())
        
        # 3. Calculate Manifold Penalty (Is this config realistic?)
        candidate = {c: 0.5 for c in df_main.columns}
        for k, v in zip(clean_drivers, vec):
            candidate[k] = v
        cand_df = pd.DataFrame([candidate])
        penalty = tvf.manifold_penalty(cand_df)
        
        # 4. Return Regularized Score
        # Penalty weight 0.1 keeps us inside the valid operating envelope
        return base_score - 0.1 * penalty

    bounds = {k: (0.0, 1.0) for k in clean_drivers}
    optimizer = UnifiedOptimizer(objective, bounds)
    best_cfg, best_val = optimizer.optimize_genetic(generations=15)

    # --------------------------------------------------------------------------------------------
    # D. EXPORT & REPORTING
    # --------------------------------------------------------------------------------------------
    print("\n[PHASE 6] EXPORTING DEVELOPER BLUEPRINT")
    
    # 1. Denormalize Best Config to Original Units
    best_row_norm = {c: 0.5 for c in df_main.columns} # Default baseline
    for k, v in best_cfg.items():
        best_row_norm[k] = v
        
    best_row_norm_df = pd.DataFrame([best_row_norm])
    best_row_raw_vals = scaler.inverse_transform(best_row_norm_df[df_main.columns])
    best_row_raw = pd.Series(best_row_raw_vals[0], index=df_main.columns)
    
    # 2. Build Summary Table
    summary_rows = []
    for knob in clean_drivers:
        col_idx = list(df_main.columns).index(knob)
        col_min = scaler.data_min_[col_idx]
        col_max = scaler.data_max_[col_idx]
        
        recommended = best_row_raw[knob]
        sensitivity = attr_scores.get(knob, 0.0)
        
        summary_rows.append({
            "Parameter": knob,
            "Original_Min": col_min,
            "Original_Max": col_max,
            "Recommended_Value": recommended,
            "Sensitivity_Sign": "Positive" if sensitivity > 0 else "Negative",
            "Sensitivity_Magnitude": float(abs(sensitivity))
        })
    
    summary_df = pd.DataFrame(summary_rows).sort_values("Sensitivity_Magnitude", ascending=False)
    
    # 3. Print Final Report
    print("\n==================================================================================")
    print("üèÅ FINAL SYSTEM BLUEPRINT (Original Units)")
    print("==================================================================================")
    print(f"üåü MAX ACHIEVABLE USER CPU (normalized surrogate): {best_val:.2%}")
    print("\nüîß OPTIMAL CONTROL SURFACE (Top by Sensitivity):")
    print(summary_df.head(20).to_string(index=False))
    print("==================================================================================")

    # 4. Generate JSON/CSV Artifacts
    # CSV
    summary_df.to_csv("titan_os_control_surface.csv", index=False)
    print(f"\nüìÑ Saved Control Surface to: titan_os_control_surface.csv")
    
    # JSON
    os_spec = {
        "meta": {
            "version": "titan_os_penrose_v2",
            "description": "Unified OS+Hardware Optimization Profile",
            "target_metric": target_col,
            "normalized_score": best_val
        },
        "controls": [],
        "profiles": {
            "baseline_normalized": {c: 0.5 for c in df_main.columns},
            "optimized_normalized": {c: best_cfg.get(c, 0.5) for c in df_main.columns}
        }
    }
    
    for _, row in summary_df.iterrows():
        os_spec["controls"].append({
            "name": row["Parameter"],
            "recommended": float(row["Recommended_Value"]),
            "min": float(row["Original_Min"]),
            "max": float(row["Original_Max"]),
            "impact": float(row["Sensitivity_Magnitude"])
        })
        
    with open("titan_os_profile.json", "w") as f:
        json.dump(os_spec, f, indent=2)
    print(f"üì¶ Saved JSON Profile to: titan_os_profile.json")

if __name__ == "__main__":
    main()

üöÄ SYSTEM ONLINE: TITAN OS OPTIMIZER | DEVICE: cuda
üåå INITIATING GRAND UNIFIED DATA INGESTION (REAL PHYSICS, EXPANDED)
>> [SOURCE 1] Fetching OS Kernel Data (Delve: 'cpu_act', ID 197)...
   ‚úì OS Data Secured: (8192, 22) - (System Calls, I/O, Interrupts)
>> [SOURCE 2] Fetching CPU Performance Regression Data (ID 533)...
   ‚úì CPU Perf Data Secured: (559, 5)
>> [SOURCE 3] Fetching Exotic Hardware Data (Superconduct, ID 44148)...
   ‚úì Hardware Data Secured: (21263, 80) - (81 features + critical_temp)

>> [FUSION] Merging OS + CPUPerf + Hardware into 'Penrose Ring' Matrix...
‚úì FUSION COMPLETE. Matrix: (8192, 51)

[PHASE 1] VALIDATION SCAN
   [TVF] Learning Baseline Structure (Manifold & Anomalies)...
   Module Status               Reason
0  Global  GREEN  All Systems Nominal

[PHASE 2] DISCOVERING INFLUENTIAL PARAMETERS
üîç UDE ONLINE. Target: 'usr'

>> [UDE] SCANNING NON-LINEAR PHYSICS...
   Baseline R¬≤: 0.9983
   Top Drivers: ['freeswap', 'vflt', 'runqsz', 'scall', 'swrite'

   Training Surrogate:   0%|          | 0/400 [00:00<?, ?it/s]

   [UAV] Calculating Gradient Flows (Sensitivity)...

üèÜ PARAMETER IMPACT (Sensitivity):
runqsz      0.001585
vflt        0.000907
freeswap    0.000572
rchar       0.000267
lread       0.000229
wchar       0.000162
pgout       0.000156
sread       0.000132
exec        0.000100
lwrite      0.000030
swrite      0.000020
scall      -0.000025
pgfree     -0.000075
ppgin      -0.000088
freemem    -0.001517
dtype: float32

[PHASE 4] SIMULATING INTERVENTIONS

‚öôÔ∏è [UIF] FITTING CAUSAL MECHANISMS...
>> Simulating 'Kernel Bypass' (scall = 0.1)...
   Predicted User CPU (normalized): 0.861 (¬±0.175)

[PHASE 5] PENROSE RING OPTIMIZATION (Regularized)
üß¨ [UOF] RUNNING GENETIC EVOLUTION...

[PHASE 6] EXPORTING DEVELOPER BLUEPRINT

üèÅ FINAL SYSTEM BLUEPRINT (Original Units)
üåü MAX ACHIEVABLE USER CPU (normalized surrogate): 40.40%

üîß OPTIMAL CONTROL SURFACE (Top by Sensitivity):
Parameter  Original_Min  Original_Max  Recommended_Value Sensitivity_Sign  Sensitivity_Magnitude
   runqsz     

In [2]:
# ==================================================================================================
# ‚ö° TITAN GPU: SYNTHETIC KERNEL OPTIMIZATION PROTOCOL (Cell 2, Self-contained)
# ==================================================================================================
# PURPOSE: Mirror CPU Cell 1 for GPU using a synthetic but realistic kernel-performance dataset.
# DATA:    Generated in-notebook: grid/block sizes, memory tiling, vector widths, precision modes.
# TARGET:  Perf_Score ~ throughput (higher is better).
# STACK:   TVF, UDE, UAV, UIF, UOF.
# OUTPUTS: titan_gpu_control_surface.csv, titan_gpu_profile.json
# ==================================================================================================

import os
import json
import warnings
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import networkx as nx
import xgboost as xgb
from dataclasses import dataclass
from typing import Dict, List, Tuple
from tqdm.auto import tqdm
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.ensemble import IsolationForest, RandomForestRegressor
from sklearn.decomposition import PCA

warnings.filterwarnings('ignore')
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"üöÄ SYSTEM ONLINE: TITAN GPU OPTIMIZER | DEVICE: {DEVICE}")

# ==================================================================================================
# 0. SYNTHETIC GPU KERNEL DATA GENERATOR
# ==================================================================================================
def generate_gpu_kernel_data(n_samples: int = 20000, seed: int = 42) -> pd.DataFrame:
    rng = np.random.default_rng(seed)
    
    block_x = rng.integers(32, 1025, size=n_samples)
    block_y = rng.integers(1, 33, size=n_samples)
    grid_x  = rng.integers(16, 2049, size=n_samples)
    grid_y  = rng.integers(1, 129, size=n_samples)
    smem_kb = rng.integers(0, 65, size=n_samples)
    reg_per_thread = rng.integers(16, 129, size=n_samples)

    vwidth = rng.choice([1, 2, 4, 8], size=n_samples)
    unroll = rng.choice([1, 2, 4, 8], size=n_samples)
    use_tex = rng.integers(0, 2, size=n_samples)
    use_f16 = rng.integers(0, 2, size=n_samples)
    use_f32 = 1 - use_f16

    # New knobs
    mem_stride = rng.choice([1, 2, 4, 8], size=n_samples)
    branch_divergence = rng.random(size=n_samples)          # 0..1
    l2_hits = rng.beta(5, 2, size=n_samples)                # bias to high hit rate
    tensor_cores = rng.integers(0, 2, size=n_samples)

    occupancy = np.minimum(1.0,
        (block_x * block_y) / 1024.0 *
        (48.0 / np.maximum(16, reg_per_thread)) *
        (1.0 - smem_kb / 96.0)
    )
    occupancy = np.clip(occupancy, 0.0, 1.0)

    mem_eff = np.minimum(1.0, vwidth / 4.0) * (0.5 + 0.5 * use_tex)
    mem_eff *= (1.0 - 0.15 * (mem_stride > 2))             # penalty for bad stride
    mem_eff *= (0.7 + 0.3 * l2_hits)                       # more L2 hits = better

    math_throughput = 0.4 + 0.4 * use_f16 + 0.2 * (unroll / 8.0)
    math_throughput *= (1.0 + 0.3 * (tensor_cores * use_f16))

    base_perf = occupancy * (0.4 * mem_eff + 0.6 * math_throughput)

    penalty = 0.0
    penalty += (reg_per_thread > 96) * 0.2
    penalty += (smem_kb > 48) * 0.2
    penalty += ((block_x * block_y) > 1024) * 0.5
    penalty += 0.3 * branch_divergence                         # divergence hurts

    perf_score = np.maximum(0.0, base_perf - penalty + rng.normal(0, 0.02, size=n_samples))

    df = pd.DataFrame({
        "block_x": block_x,
        "block_y": block_y,
        "grid_x": grid_x,
        "grid_y": grid_y,
        "smem_kb": smem_kb,
        "reg_per_thread": reg_per_thread,
        "vwidth": vwidth,
        "unroll": unroll,
        "use_tex": use_tex,
        "use_f16": use_f16,
        "use_f32": use_f32,
        "mem_stride": mem_stride,
        "branch_divergence": branch_divergence,
        "l2_hits": l2_hits,
        "tensor_cores": tensor_cores,
        "Perf_Score": perf_score,
    })
    return df

# ==================================================================================================
# 1. TVF-GPU
# ==================================================================================================
@dataclass
class TVFConfigGPU:
    reconstruction_error_threshold: float = 0.15
    iforest_contamination: float = 0.02

class TitanValidationFrameworkGPU:
    def __init__(self, reference_data: pd.DataFrame, config: TVFConfigGPU = None):
        self.config = config or TVFConfigGPU()
        self.reference = reference_data.copy()
        self.num_cols = self.reference.select_dtypes(include=[np.number]).columns.tolist()
        self.scaler = StandardScaler()
        
        print("   [TVF-GPU] Learning Baseline Structure (Manifold & Anomalies)...")
        if len(self.num_cols) > 0:
            X = self.scaler.fit_transform(self.reference[self.num_cols].fillna(0))
            self.pca = PCA(n_components=0.95)
            self.pca.fit(X)
            X_recon = self.pca.inverse_transform(self.pca.transform(X))
            self.ref_recon_error = np.mean(np.square(X - X_recon))
            self.iforest = IsolationForest(
                contamination=self.config.iforest_contamination,
                n_jobs=-1,
                random_state=42
            )
            self.iforest.fit(X)
        else:
            self.pca = None
            self.iforest = None
            self.ref_recon_error = 0.0

    def validate(self, new_data: pd.DataFrame) -> Tuple[bool, pd.DataFrame]:
        report_rows = []
        missing = set(self.reference.columns) - set(new_data.columns)
        if missing:
            report_rows.append({"Module": "Integrity", "Status": "RED", "Reason": f"Missing: {list(missing)[:3]}"})
        
        if self.pca is not None and len(self.num_cols) > 0:
            X = self.scaler.transform(new_data[self.num_cols].fillna(0))
            X_recon = self.pca.inverse_transform(self.pca.transform(X))
            curr_error = np.mean(np.square(X - X_recon))
            ratio = curr_error / max(self.ref_recon_error, 1e-9)
            if ratio > (1 + self.config.reconstruction_error_threshold):
                report_rows.append({"Module": "Structure", "Status": "RED", "Reason": f"Manifold Drift (Err: {ratio:.2f}x)"})
            if self.iforest is not None:
                preds = self.iforest.predict(X)
                rate = (preds == -1).mean()
                if rate > self.config.iforest_contamination * 3:
                    report_rows.append({"Module": "Anomalies", "Status": "RED", "Reason": f"High Anomaly Rate ({rate:.1%})"})
        
        report = pd.DataFrame(report_rows) if report_rows else pd.DataFrame(
            [{"Module": "Global", "Status": "GREEN", "Reason": "All Systems Nominal"}]
        )
        return len(report_rows) == 0, report

    def manifold_penalty(self, candidate_df: pd.DataFrame) -> float:
        if self.pca is None or len(self.num_cols) == 0:
            return 0.0
        df = candidate_df.copy()
        for c in self.num_cols:
            if c not in df.columns:
                df[c] = 0.0
        X = self.scaler.transform(df[self.num_cols].fillna(0))
        X_recon = self.pca.inverse_transform(self.pca.transform(X))
        recon_ratio = np.mean((X - X_recon) ** 2) / max(self.ref_recon_error, 1e-9)
        return float(max(0.0, recon_ratio - 1.2))

# ==================================================================================================
# 2. UDE-GPU
# ==================================================================================================
class UnifiedDiscoveryEngineGPU:
    def __init__(self, df, target_col):
        self.raw_df = df.copy()
        self.target = target_col
        self.features = [c for c in df.columns if c != target_col]
        print(f"üîç UDE-GPU ONLINE. Target: '{self.target}'")

    def scan_physics(self):
        print("\n>> [UDE-GPU] SCANNING SYNTHETIC KERNEL PHYSICS...")
        X, y = self.raw_df[self.features], self.raw_df[self.target]
        model = xgb.XGBRegressor(n_estimators=200, max_depth=6, n_jobs=-1, random_state=42)
        model.fit(X, y)
        imp = pd.Series(model.feature_importances_, index=self.features).sort_values(ascending=False)
        print(f"   Baseline R¬≤: {model.score(X, y):.4f}")
        self.model = model
        return imp

    def detect_redundancy(self, top_features, threshold=0.90):
        print("\n>> [UDE-GPU] PURGING REDUNDANT DIMENSIONS...")
        survivors = []
        for f in top_features:
            is_redundant = False
            if survivors:
                X_surv = self.raw_df[survivors]
                y_feat = self.raw_df[f]
                r2 = RandomForestRegressor(
                    n_estimators=40, max_depth=6, n_jobs=-1, random_state=42
                ).fit(X_surv, y_feat).score(X_surv, y_feat)
                if r2 > threshold:
                    print(f"   ‚öîÔ∏è Killed '{f}' (Redundant with Survivors, R¬≤={r2:.2f})")
                    is_redundant = True
            if not is_redundant:
                survivors.append(f)
        return survivors

# ==================================================================================================
# 3. UAV-GPU
# ==================================================================================================
class UniversalAttributionValidatorGPU:
    def __init__(self, X_df, y_series):
        self.X_vals = X_df.values.astype(np.float32)
        self.y_vals = y_series.values.astype(np.float32)
        self.cols = X_df.columns.tolist()
        self.X_t = torch.from_numpy(self.X_vals).to(DEVICE)
        self.y_t = torch.from_numpy(self.y_vals).reshape(-1, 1).to(DEVICE)
        self._train_proxy()

    def _train_proxy(self):
        print(f"\n‚öñÔ∏è [UAV-GPU] TRAINING DIFFERENTIABLE SURROGATE...")
        d_in = self.X_t.shape[1]
        self.model = nn.Sequential(
            nn.Linear(d_in, 128), nn.SiLU(), nn.Dropout(0.1),
            nn.Linear(128, 64), nn.SiLU(), nn.Linear(64, 1)
        ).to(DEVICE)
        opt = torch.optim.Adam(self.model.parameters(), lr=0.002)
        loss_fn = nn.MSELoss()
        for _ in tqdm(range(400), desc="   Training GPU Surrogate", leave=False):
            opt.zero_grad()
            pred = self.model(self.X_t + torch.randn_like(self.X_t)*0.01)
            loss = loss_fn(pred, self.y_t)
            loss.backward()
            opt.step()

    def compute_integrated_gradients(self, n_steps=50):
        print("   [UAV-GPU] Calculating Gradient Flows (Sensitivity)...")
        baseline = torch.mean(self.X_t, dim=0, keepdim=True)
        batch = self.X_t[:min(5000, self.X_t.shape[0])]
        grads_acc = torch.zeros_like(batch)
        for alpha in np.linspace(0, 1, n_steps):
            x_step = baseline + alpha * (batch - baseline)
            x_step.requires_grad = True
            out = self.model(x_step).sum()
            out.backward()
            grads_acc += x_step.grad
        avg_grads = grads_acc / n_steps
        attr = (batch - baseline) * avg_grads
        return pd.Series(attr.detach().cpu().numpy().mean(axis=0), index=self.cols).sort_values(ascending=False)

# ==================================================================================================
# 4. UIF-GPU
# ==================================================================================================
class PlatinumCausalEngineGPU:
    def __init__(self, df, graph_edges):
        self.df = df
        self.G = nx.DiGraph(graph_edges)
        self.models = {}
        self._fit_mechanisms()

    def _fit_mechanisms(self):
        print("\n‚öôÔ∏è [UIF-GPU] FITTING KERNEL CAUSAL MECHANISMS...")
        for node in self.G.nodes():
            parents = list(self.G.predecessors(node))
            if parents:
                self.models[node] = xgb.XGBRegressor(
                    n_estimators=80, max_depth=4, n_jobs=-1
                ).fit(self.df[parents], self.df[node])

    def simulate(self, intervention: dict, target: str):
        sim_df = self.df.sample(n=min(5000, len(self.df)), replace=True).copy()
        for node, val in intervention.items():
            sim_df[node] = val
        sorted_nodes = list(nx.topological_sort(self.G))
        for node in sorted_nodes:
            if node in intervention: continue
            parents = list(self.G.predecessors(node))
            if parents:
                sim_df[node] = self.models[node].predict(sim_df[parents])
        return sim_df[target].mean(), sim_df[target].std()

# ==================================================================================================
# 5. UOF-GPU
# ==================================================================================================
class UnifiedOptimizerGPU:
    def __init__(self, objective_func, bounds):
        self.func = objective_func
        self.bounds = bounds

    def optimize_genetic(self, pop_size=60, generations=12):
        print("üß¨ [UOF-GPU] RUNNING GENETIC EVOLUTION...")
        from scipy.optimize import differential_evolution
        
        def wrapper(x):
            params = {k: v for k, v in zip(self.bounds.keys(), x)}
            return -self.func(**params)
        
        result = differential_evolution(
            wrapper,
            bounds=list(self.bounds.values()),
            maxiter=generations,
            popsize=max(6, pop_size // 10),
            workers=1
        )
        return {k: v for k, v in zip(self.bounds.keys(), result.x)}, -result.fun

# ==================================================================================================
# üöÄ MAIN EXECUTION: GPU SYNTHETIC KERNEL PIPELINE
# ==================================================================================================
def main_gpu():
    print("==================================================================================")
    print("‚ö° INITIATING SYNTHETIC GPU KERNEL OPTIMIZATION PIPELINE")
    print("==================================================================================")
    
    # A. DATA GENERATION
    print(">> [SOURCE] Generating Synthetic GPU Kernel Data...")
    gpu_df = generate_gpu_kernel_data(n_samples=20000, seed=42)
    print(f"   ‚úì Synthetic Kernel Data: {gpu_df.shape}")
    
    target_col = "Perf_Score"
    feature_cols = [c for c in gpu_df.columns if c != target_col]
    
    # Normalize [0,1]
    scaler = MinMaxScaler()
    gpu_norm = pd.DataFrame(
        scaler.fit_transform(gpu_df),
        columns=gpu_df.columns
    )
    
    # B. VALIDATION
    print("\n[PHASE 1] VALIDATION SCAN (GPU)")
    tvf_gpu = TitanValidationFrameworkGPU(gpu_norm)
    passed, report = tvf_gpu.validate(gpu_norm)
    print(report)
    
    # C. DISCOVERY
    print("\n[PHASE 2] DISCOVERING GPU KERNEL DRIVERS")
    ude_gpu = UnifiedDiscoveryEngineGPU(gpu_norm, target_col)
    importances = ude_gpu.scan_physics()
    top_drivers = importances.head(len(feature_cols)).index.tolist()
    print(f"   Top Drivers: {top_drivers}")
    clean_drivers = ude_gpu.detect_redundancy(top_drivers)
    print(f"   Unique Kernel Knobs: {clean_drivers}")
    
    # D. ATTRIBUTION
    print("\n[PHASE 3] DECODING KERNEL PHYSICS (Attribution)")
    uav_gpu = UniversalAttributionValidatorGPU(gpu_norm[clean_drivers], gpu_norm[target_col])
    attr_scores = uav_gpu.compute_integrated_gradients()
    print("\nüèÜ KERNEL PARAMETER IMPACT (Sensitivity):")
    print(attr_scores)
    
    # E. CAUSAL INTERVENTION
    print("\n[PHASE 4] SIMULATING INTERVENTIONS")
    edges = []
    for src in ["block_x", "block_y"]:
        if src in clean_drivers and "Perf_Score" in [target_col]:
            edges.append((src, target_col))
    for d in clean_drivers:
        if d != target_col and (d, target_col) not in edges:
            edges.append((d, target_col))
    pce_gpu = PlatinumCausalEngineGPU(gpu_norm[clean_drivers + [target_col]], edges)
    
    top_pos = [k for k in attr_scores.index if attr_scores[k] > 0][:3]
    interventions = {k: 1.0 for k in top_pos}
    if interventions:
        print(f">> Simulating 'Max-Out' Strategy on {interventions} ...")
        mu, std = pce_gpu.simulate(interventions, target=target_col)
        print(f"   Predicted {target_col} (normalized): {mu:.4f} (¬±{std:.4f})")
    
    # F. OPTIMIZATION
    print("\n[PHASE 5] GPU KERNEL OPTIMIZATION (Regularized)")
    def objective_gpu(**kwargs):
        vec = [kwargs.get(col, 0.5) for col in clean_drivers]
        t_vec = torch.tensor([vec], dtype=torch.float32).to(DEVICE)
        with torch.no_grad():
            base_score = float(uav_gpu.model(t_vec).item())
        candidate = {c: 0.5 for c in gpu_norm.columns}
        for k, v in zip(clean_drivers, vec):
            candidate[k] = v
        cand_df = pd.DataFrame([candidate])
        penalty = tvf_gpu.manifold_penalty(cand_df)
        return base_score - 0.1 * penalty
    
    bounds_gpu = {k: (0.0, 1.0) for k in clean_drivers}
    uof_gpu = UnifiedOptimizerGPU(objective_gpu, bounds_gpu)
    best_cfg, best_val = uof_gpu.optimize_genetic(generations=12)
    
    # G. EXPORT
    print("\n[PHASE 6] EXPORTING GPU KERNEL BLUEPRINT")
    best_row_norm = {c: 0.5 for c in gpu_norm.columns}
    for k, v in best_cfg.items():
        best_row_norm[k] = v
    best_row_norm_df = pd.DataFrame([best_row_norm])
    best_row_raw_vals = scaler.inverse_transform(best_row_norm_df[gpu_norm.columns])
    best_row_raw = pd.Series(best_row_raw_vals[0], index=gpu_norm.columns)
    
    summary_rows = []
    for knob in clean_drivers:
        col_idx = list(gpu_norm.columns).index(knob)
        col_min = scaler.data_min_[col_idx]
        col_max = scaler.data_max_[col_idx]
        recommended = best_row_raw[knob]
        sensitivity = attr_scores.get(knob, 0.0)
        summary_rows.append({
            "Parameter": knob,
            "Original_Min": col_min,
            "Original_Max": col_max,
            "Recommended_Value": recommended,
            "Sensitivity_Sign": "Positive" if sensitivity > 0 else "Negative",
            "Sensitivity_Magnitude": float(abs(sensitivity))
        })
    summary_df = pd.DataFrame(summary_rows).sort_values("Sensitivity_Magnitude", ascending=False)
    
    print("\n==================================================================================")
    print(f"üèÅ FINAL GPU KERNEL BLUEPRINT (Original Units, target=Perf_Score)")
    print("==================================================================================")
    print(f"üåü MAX ACHIEVABLE Perf_Score (normalized surrogate): {best_val:.4f}")
    print("\nüîß OPTIMAL GPU CONFIG (Top by Sensitivity):")
    print(summary_df.to_string(index=False))
    print("==================================================================================")
    
    gpu_csv_path = "titan_gpu_control_surface.csv"
    summary_df.to_csv(gpu_csv_path, index=False)
    print(f"üìÑ Saved GPU Control Surface to: {gpu_csv_path}")
    
    gpu_spec = {
        "meta": {
            "version": "titan_gpu_synthetic_v1",
            "description": "GPU kernel optimization profile from synthetic kernel-performance simulator.",
            "target_metric": "Perf_Score",
            "normalized_score": best_val
        },
        "controls": [],
        "profiles": {
            "baseline_normalized": {c: 0.5 for c in gpu_norm.columns},
            "optimized_normalized": {c: best_cfg.get(c, 0.5) for c in gpu_norm.columns}
        }
    }
    for _, row in summary_df.iterrows():
        gpu_spec["controls"].append({
            "name": row["Parameter"],
            "recommended": float(row["Recommended_Value"]),
            "min": float(row["Original_Min"]),
            "max": float(row["Original_Max"]),
            "impact": float(row["Sensitivity_Magnitude"])
        })
    gpu_json_path = "titan_gpu_profile.json"
    with open(gpu_json_path, "w") as f:
        json.dump(gpu_spec, f, indent=2)
    print(f"üì¶ Saved GPU JSON Profile to: {gpu_json_path}")

if __name__ == "__main__":
    main_gpu()

üöÄ SYSTEM ONLINE: TITAN GPU OPTIMIZER | DEVICE: cuda
‚ö° INITIATING SYNTHETIC GPU KERNEL OPTIMIZATION PIPELINE
>> [SOURCE] Generating Synthetic GPU Kernel Data...
   ‚úì Synthetic Kernel Data: (20000, 16)

[PHASE 1] VALIDATION SCAN (GPU)
   [TVF-GPU] Learning Baseline Structure (Manifold & Anomalies)...
   Module Status               Reason
0  Global  GREEN  All Systems Nominal

[PHASE 2] DISCOVERING GPU KERNEL DRIVERS
üîç UDE-GPU ONLINE. Target: 'Perf_Score'

>> [UDE-GPU] SCANNING SYNTHETIC KERNEL PHYSICS...
   Baseline R¬≤: 0.9831
   Top Drivers: ['tensor_cores', 'use_f16', 'use_tex', 'vwidth', 'reg_per_thread', 'smem_kb', 'branch_divergence', 'unroll', 'block_y', 'block_x', 'mem_stride', 'l2_hits', 'grid_y', 'grid_x', 'use_f32']

>> [UDE-GPU] PURGING REDUNDANT DIMENSIONS...
   ‚öîÔ∏è Killed 'use_f32' (Redundant with Survivors, R¬≤=1.00)
   Unique Kernel Knobs: ['tensor_cores', 'use_f16', 'use_tex', 'vwidth', 'reg_per_thread', 'smem_kb', 'branch_divergence', 'unroll', 'block_y', '

   Training GPU Surrogate:   0%|          | 0/400 [00:00<?, ?it/s]

   [UAV-GPU] Calculating Gradient Flows (Sensitivity)...

üèÜ KERNEL PARAMETER IMPACT (Sensitivity):
use_f16              1.180712e-02
branch_divergence    5.223335e-03
tensor_cores         1.451301e-03
unroll               1.289340e-03
smem_kb              2.601430e-04
reg_per_thread       7.069767e-07
mem_stride          -3.649417e-05
l2_hits             -1.554212e-04
grid_y              -2.838218e-04
grid_x              -3.251331e-04
use_tex             -3.804266e-04
block_x             -1.089204e-03
block_y             -1.921759e-03
vwidth              -9.609383e-03
dtype: float32

[PHASE 4] SIMULATING INTERVENTIONS

‚öôÔ∏è [UIF-GPU] FITTING KERNEL CAUSAL MECHANISMS...
>> Simulating 'Max-Out' Strategy on {'use_f16': 1.0, 'branch_divergence': 1.0, 'tensor_cores': 1.0} ...
   Predicted Perf_Score (normalized): 0.0661 (¬±0.0811)

[PHASE 5] GPU KERNEL OPTIMIZATION (Regularized)
üß¨ [UOF-GPU] RUNNING GENETIC EVOLUTION...

[PHASE 6] EXPORTING GPU KERNEL BLUEPRINT

üèÅ FINAL GPU KERNEL

In [3]:
# TITAN 0.1nm SINGULARITY ‚Äì ALL PROCESSING UNITS BLUEPRINT (Cell 3)

import os
import json
import warnings
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import xgboost as xgb
import networkx as nx

from dataclasses import dataclass
from typing import Dict, List, Tuple
from tqdm.auto import tqdm
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.ensemble import IsolationForest, RandomForestRegressor
from sklearn.decomposition import PCA
from sklearn.metrics import r2_score

warnings.filterwarnings("ignore")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 3 :: DEVICE = {DEVICE}")

# --------------------------------------------------------------------
# A. DATA INGESTION FROM KAGGLE INPUT
# --------------------------------------------------------------------
BASE = "/kaggle/input/0-1nm-chip-data"

sens_path   = os.path.join(BASE, "Titan_Sensitivity_Map.csv")
iron_path   = os.path.join(BASE, "Titan_Ironclad_Master.csv")
gold_path   = os.path.join(BASE, "Titan_Gold_Master.csv")
specs_path  = os.path.join(BASE, "Titan_Final_Specs.json")
exec_path   = os.path.join(BASE, "Titan_Executive_Summary.txt")
recipe_path = os.path.join(BASE, "0.1nm_Singularity_Recipe.csv")

print("LOADING 0.1nm DATA ASSEMBLY...")
sens_df   = pd.read_csv(sens_path)
iron_df   = pd.read_csv(iron_path)
gold_df   = pd.read_csv(gold_path)
specs     = json.load(open(specs_path, "r"))
exec_txt  = open(exec_path, "r", encoding="utf-8").read()
recipe_df = pd.read_csv(recipe_path)

print(f"Sensitivity map shape: {sens_df.shape}")
print(f"Ironclad master shape: {iron_df.shape}")
print(f"Gold master shape:     {gold_df.shape}")
print(f"Recipe rows:           {recipe_df.shape}")
print(f"Specs meta version:    {specs.get('meta', {}).get('version', 'NA')}")

# --------------------------------------------------------------------
# B. PROCESSING UNIT TAXONOMY FOR 0.1 nm CHIP
# --------------------------------------------------------------------
# Inspired by CPU OS ‚Äúcontrol surface‚Äù + GPU kernel ‚Äúknobs‚Äù,
# but now for topological 0.1 nm time‚Äëcrystal qubits.

PROCESSING_UNIT_TYPES = [
    "Majorana_QPU_Core",          # Primary topological qubit array
    "Floquet_Drive_Unit",         # Periodic drive/time‚Äëcrystal engine
    "Rashba_Field_Controller",    # Spin‚Äëorbit coupling synthesis
    "Zeeman_Magnet_Unit",         # Static / slowly varying Bz control
    "Laser_Coherence_Unit",       # Femtosecond THz laser timing unit
    "Thermal_Shielding_Unit",     # 10 mK fridge + shielding coordination
    "Yield_Optimization_Unit",    # Manufacturability / Monte Carlo aware
    "Error_Correction_Unit",      # Logical code & syndrome pipeline
    "Control_Fabric_Unit",        # Routing, multiplexing, fan‚Äëout
    "Calibration_Unit"            # In‚Äësitu calibration & retune engine
]

print("PROCESSING UNITS DECLARED:")
for pu in PROCESSING_UNIT_TYPES:
    print(" -", pu)

# --------------------------------------------------------------------
# C. TVF ‚Äì LIGHTWEIGHT VALIDATION FOR SENSITIVITY MAP
# --------------------------------------------------------------------
@dataclass
class TVFConfigTC:
    reconstruction_error_threshold: float = 0.15
    iforest_contamination: float = 0.02

class TitanValidationFrameworkTC:
    def __init__(self, reference: pd.DataFrame, config: TVFConfigTC = None):
        self.config = config or TVFConfigTC()
        self.reference = reference.copy()
        self.num_cols = self.reference.select_dtypes(include=np.number).columns.tolist()
        self.scaler = StandardScaler()
        print("TVF-TC :: Learning baseline manifold...")
        if len(self.num_cols) > 0:
            X = self.scaler.fit_transform(self.reference[self.num_cols].fillna(0))
            self.pca = PCA(n_components=0.95)
            self.pca.fit(X)
            X_recon = self.pca.inverse_transform(self.pca.transform(X))
            self.ref_error = np.mean((X - X_recon) ** 2)
            self.iforest = IsolationForest(
                contamination=self.config.iforest_contamination,
                n_jobs=-1,
                random_state=42
            ).fit(X)
        else:
            self.pca = None
            self.iforest = None
            self.ref_error = 0.0

    def validate(self, new_df: pd.DataFrame) -> Tuple[bool, pd.DataFrame]:
        rows = []
        missing = set(self.reference.columns) - set(new_df.columns)
        if missing:
            rows.append(dict(Module="Integrity",
                            Status="RED",
                            Reason=f"Missing columns: {list(missing)[:5]}"))
        if self.pca is not None and len(self.num_cols) > 0:
            X = self.scaler.transform(new_df[self.num_cols].fillna(0))
            X_recon = self.pca.inverse_transform(self.pca.transform(X))
            err = np.mean((X - X_recon) ** 2)
            ratio = err / max(self.ref_error, 1e-9)
            if ratio > 1 + self.config.reconstruction_error_threshold:
                rows.append(dict(Module="Structure",
                                 Status="RED",
                                 Reason=f"Manifold drift err ratio={ratio:.2f}"))
        if self.iforest is not None:
            X = self.scaler.transform(new_df[self.num_cols].fillna(0))
            preds = self.iforest.predict(X)
            rate = (preds == -1).mean()
            if rate > self.config.iforest_contamination * 3:
                rows.append(dict(Module="Anomalies",
                                 Status="RED",
                                 Reason=f"High anomaly rate={rate:.3f}"))
        if not rows:
            rows.append(dict(Module="Global",
                             Status="GREEN",
                             Reason="All systems nominal"))
        return (len(rows) == 1 and rows[0]["Status"] == "GREEN",
                pd.DataFrame(rows))

print("\nPHASE 1 :: VALIDATION ‚Äì SENSITIVITY MAP")
# Reference = full sensitivity map; we validate on same to get envelope
tvf_tc = TitanValidationFrameworkTC(sens_df)
passed_tc, report_tc = tvf_tc.validate(sens_df)
print(report_tc.to_string(index=False))

# --------------------------------------------------------------------
# D. DISCOVERY ‚Äì WHAT DRIVES COHERENCE / YIELD
# --------------------------------------------------------------------
# Assume columns like: Rashba (eV), Zeeman (T), Gap (meV), Freq (THz),
# Coherence (0..1), Yield (0..1), Status (PASS/FAIL)
# We will:
#   - define numerical features
#   - targets: "Coherence" and "Yield" (if present)
#   - feature importance via XGBoost

sens_num = sens_df.select_dtypes(include=np.number).copy()
# Normalize quickly for stability
mm = MinMaxScaler()
sens_norm = pd.DataFrame(mm.fit_transform(sens_num), columns=sens_num.columns)

targets = []
for t in ["Coherence", "Yield"]:
    if t in sens_norm.columns:
        targets.append(t)

if len(targets) == 0:
    # Fallback: if not explicitly named, assume last numeric col is coherence proxy
    targets.append(sens_norm.columns[-1])

print("\nPHASE 2 :: DISCOVERY ‚Äì DRIVERS OF COHERENCE/YIELD")
feature_importances: Dict[str, Dict[str, float]] = {}

for tgt in targets:
    X = sens_norm.drop(columns=[tgt])
    y = sens_norm[tgt]
    model = xgb.XGBRegressor(
        n_estimators=200,
        max_depth=5,
        learning_rate=0.05,
        subsample=0.9,
        colsample_bytree=0.9,
        random_state=42,
        n_jobs=-1
    )
    model.fit(X, y)
    r2 = model.score(X, y)
    imp = pd.Series(model.feature_importances_, index=X.columns).sort_values(ascending=False)
    feature_importances[tgt] = imp.to_dict()
    print(f"\nTarget = {tgt} :: in-sample R2 = {r2:.4f}")
    print(imp.head(10).to_string())

# --------------------------------------------------------------------
# E. ATTRIBUTION ‚Äì SIMPLE NN SURROGATE + GRADIENTS
# --------------------------------------------------------------------
class SurrogateNN(nn.Module):
    def __init__(self, d_in: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_in, 64),
            nn.SiLU(),
            nn.Dropout(0.1),
            nn.Linear(64, 32),
            nn.SiLU(),
            nn.Linear(32, 1),
        )

    def forward(self, x):
        return self.net(x)

def train_surrogate(X: np.ndarray, y: np.ndarray, epochs: int = 300) -> SurrogateNN:
    X_t = torch.tensor(X, dtype=torch.float32, device=DEVICE)
    y_t = torch.tensor(y.reshape(-1, 1), dtype=torch.float32, device=DEVICE)
    model = SurrogateNN(X_t.shape[1]).to(DEVICE)
    opt = torch.optim.Adam(model.parameters(), lr=0.002, weight_decay=1e-4)
    loss_fn = nn.MSELoss()
    for _ in range(epochs):
        opt.zero_grad()
        pred = model(X_t)
        loss = loss_fn(pred, y_t)
        loss.backward()
        opt.step()
    return model

def compute_grad_sensitivity(model: SurrogateNN, X: np.ndarray, cols: List[str]) -> pd.Series:
    X_t = torch.tensor(X, dtype=torch.float32, device=DEVICE, requires_grad=True)
    out = model(X_t).mean()
    out.backward()
    grads = X_t.grad.detach().cpu().numpy()
    sens = np.mean(np.abs(grads), axis=0)
    return pd.Series(sens, index=cols).sort_values(ascending=False)

print("\nPHASE 3 :: ATTRIBUTION ‚Äì PARAMETER SENSITIVITY")

surrogate_sensitivity: Dict[str, Dict[str, float]] = {}
for tgt in targets:
    X = sens_norm.drop(columns=[tgt])
    y = sens_norm[tgt].values
    model = train_surrogate(X.values, y, epochs=200)
    sens_scores = compute_grad_sensitivity(model, X.values, X.columns.tolist())
    surrogate_sensitivity[tgt] = sens_scores.to_dict()
    print(f"\nSurrogate gradient sensitivity for target = {tgt}")
    print(sens_scores.head(10).to_string())

# --------------------------------------------------------------------
# F. PROCESSING UNIT PARAMETER SURFACES
# --------------------------------------------------------------------
# We now map: features + sensitivities + gold/iron recipes
# into per‚ÄëProcessingUnit normalized control surfaces.

# Extract canonical recipe values (Gold) as ‚Äúbest known‚Äù
gold_map = {}
if "Parameter" in gold_df.columns and "Setpoint" in gold_df.columns:
    for _, row in gold_df.iterrows():
        name = str(row["Parameter"]).strip()
        gold_map[name.upper()] = float(row["Setpoint"])
else:
    # Fallback to generic 2‚Äëcol layout
    for _, row in gold_df.iterrows():
        name = str(row.iloc[0]).strip()
        val = float(row.iloc[1])
        gold_map[name.upper()] = val

# Extract nominal recipe row (0.1nm_Singularity_Recipe)
recipe_map = {}
if "Param" in recipe_df.columns and "Val" in recipe_df.columns:
    for _, row in recipe_df.iterrows():
        recipe_map[str(row["Param"]).upper()] = float(row["Val"])
else:
    # Fallback: first two columns
    for _, row in recipe_df.iterrows():
        recipe_map[str(row.iloc[0]).upper()] = float(row.iloc[1])

def get_nominal(param_key: str, default: float = 0.0) -> float:
    k = param_key.upper()
    if k in gold_map:
        return gold_map[k]
    if k in recipe_map:
        return recipe_map[k]
    return default

# Helper: min/max from sensitivity map (denormalized)
sens_min = sens_num.min()
sens_max = sens_num.max()

def original_minmax(col: str) -> Tuple[float, float]:
    if col in sens_min.index:
        return float(sens_min[col]), float(sens_max[col])
    return 0.0, 1.0

# Map features to ‚Äúwhich ProcessingUnit owns them‚Äù
def assign_unit_for_feature(fname: str) -> str:
    n = fname.lower()
    if "rashba" in n:
        return "Rashba_Field_Controller"
    if "zeeman" in n or "bz" in n:
        return "Zeeman_Magnet_Unit"
    if "gap" in n or "majorana" in n:
        return "Majorana_QPU_Core"
    if "freq" in n or "laser" in n or "thz" in n:
        return "Laser_Coherence_Unit"
    if "temp" in n or "thermal" in n or "mK" in n:
        return "Thermal_Shielding_Unit"
    if "yield" in n:
        return "Yield_Optimization_Unit"
    if "coherence" in n:
        return "Majorana_QPU_Core"
    # default catch‚Äëall
    return "Control_Fabric_Unit"

# Build unified set of driver features from importance + sensitivity
driver_features = set()
for tgt in targets:
    imp = feature_importances[tgt]
    sens = surrogate_sensitivity[tgt]
    # Take top 15 from each
    top_imp = sorted(imp.items(), key=lambda x: x[1], reverse=True)[:15]
    top_sens = sorted(sens.items(), key=lambda x: x[1], reverse=True)[:15]
    for k, _ in top_imp + top_sens:
        driver_features.add(k)

driver_features = sorted(list(driver_features))
print("\nUNIFIED DRIVER FEATURES (merged):", driver_features)

# --------------------------------------------------------------------
# G. BUILD JSON BLUEPRINT ‚Äì ALL PROCESSING UNITS
# --------------------------------------------------------------------
# Structure:
# {
#   "meta": {...},
#   "processing_units": [
#       {
#         "name": "...",
#         "role": "...",
#         "controls": [
#            { "parameter": "...", "owner_feature": "...", "min":..., "max":..., "nominal":..., "importance":..., "sensitivity":... }
#         ]
#       }, ...
#   ]
# }

PU_ROLES = {
    "Majorana_QPU_Core": "Topological qubit array and Majorana gap stabilization.",
    "Floquet_Drive_Unit": "Implements periodic drive for time-crystal Floquet engineering.",
    "Rashba_Field_Controller": "Controls spin-orbit coupling strength (Rashba).",
    "Zeeman_Magnet_Unit": "Controls static and slow-varying Zeeman Bz field.",
    "Laser_Coherence_Unit": "Controls THz laser frequency and timing coherence.",
    "Thermal_Shielding_Unit": "Coordinates fridge temperature and shielding at 10mK.",
    "Yield_Optimization_Unit": "Maximizes fabrication yield under jitter and noise.",
    "Error_Correction_Unit": "Logical error correction and syndrome routing.",
    "Control_Fabric_Unit": "Routing, multiplexing, fanout, and digital control fabric.",
    "Calibration_Unit": "Automated calibration and re-tuning of all analog parameters."
}

blueprint = {
    "meta": {
        "version": "titan-singularity-cell3-v1",
        "description": "0.1nm topological time-crystal processing-unit taxonomy and control surfaces.",
        "source_specs_version": specs.get("meta", {}).get("version", "unknown"),
        "targets": targets,
    },
    "processing_units": []
}

# Initialize units
unit_map: Dict[str, Dict] = {}
for pu in PROCESSING_UNIT_TYPES:
    unit_map[pu] = {
        "name": pu,
        "role": PU_ROLES.get(pu, ""),
        "controls": []
    }

# Pack controls per driver feature
for feat in driver_features:
    owner = assign_unit_for_feature(feat)
    min_v, max_v = original_minmax(feat)
    nominal = get_nominal(feat, default=(min_v + max_v) / 2.0)

    # Aggregate importance/sensitivity across all targets
    imp_vals = []
    sens_vals = []
    for tgt in targets:
        imp_vals.append(feature_importances.get(tgt, {}).get(feat, 0.0))
        sens_vals.append(surrogate_sensitivity.get(tgt, {}).get(feat, 0.0))

    importance = float(np.mean(imp_vals)) if imp_vals else 0.0
    sensitivity = float(np.mean(sens_vals)) if sens_vals else 0.0

    control = {
        "parameter": feat,
        "owner_feature": feat,
        "min": float(min_v),
        "max": float(max_v),
        "nominal": float(nominal),
        "importance_score": importance,
        "sensitivity_score": sensitivity
    }
    unit_map[owner]["controls"].append(control)

# Simple placeholder controls for units that may not receive features
for pu in PROCESSING_UNIT_TYPES:
    if not unit_map[pu]["controls"]:
        unit_map[pu]["controls"].append({
            "parameter": "virtual_control_gain",
            "owner_feature": "none",
            "min": 0.0,
            "max": 1.0,
            "nominal": 0.5,
            "importance_score": 0.0,
            "sensitivity_score": 0.0
        })

blueprint["processing_units"] = list(unit_map.values())

# --------------------------------------------------------------------
# H. EXPORT JSON FOR CELL 4 (DEVELOPER-SPECIFIC DETAILS)
# --------------------------------------------------------------------
out_json_path = "titan_0_1nm_processing_units.json"
with open(out_json_path, "w") as f:
    json.dump(blueprint, f, indent=2)

print("\nFINAL OUTPUT :: 0.1nm PROCESSING UNIT BLUEPRINT")
print(f"- Saved JSON to {out_json_path}")
print("- Processing Units:")
for pu in PROCESSING_UNIT_TYPES:
    n_ctrl = len(unit_map[pu]["controls"])
    print(f"  * {pu}: {n_ctrl} controls")

SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 3 :: DEVICE = cuda
LOADING 0.1nm DATA ASSEMBLY...
Sensitivity map shape: (10000, 7)
Ironclad master shape: (4, 3)
Gold master shape:     (4, 3)
Recipe rows:           (4, 3)
Specs meta version:    v17.0
PROCESSING UNITS DECLARED:
 - Majorana_QPU_Core
 - Floquet_Drive_Unit
 - Rashba_Field_Controller
 - Zeeman_Magnet_Unit
 - Laser_Coherence_Unit
 - Thermal_Shielding_Unit
 - Yield_Optimization_Unit
 - Error_Correction_Unit
 - Control_Fabric_Unit
 - Calibration_Unit

PHASE 1 :: VALIDATION ‚Äì SENSITIVITY MAP
TVF-TC :: Learning baseline manifold...
Module Status              Reason
Global  GREEN All systems nominal

PHASE 2 :: DISCOVERY ‚Äì DRIVERS OF COHERENCE/YIELD

Target = Coherence_Yield :: in-sample R2 = 0.9998
Freq_THz     0.993414
Rashba_eV    0.002148
Gap_meV      0.001753
Zeeman_T     0.001687
Trial_ID     0.000998

PHASE 3 :: ATTRIBUTION ‚Äì PARAMETER SENSITIVITY

Surrogate gradient sensitivity for target = Coherence_Yield
Gap_meV    

In [4]:
# TITAN 0.1nm SINGULARITY ‚Äì CELL 4
# Developer-Ready Per-Unit Modes & Parameters

import json
import os
from copy import deepcopy

BASE_BLUEPRINT_PATH = "titan_0_1nm_processing_units.json"

print("SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 4")

# -------------------------------------------------------------
# A. LOAD CELL 3 BLUEPRINT + FINAL SPECS
# -------------------------------------------------------------
with open(BASE_BLUEPRINT_PATH, "r") as f:
    base_blueprint = json.load(f)

# If running in Kaggle, also load Titan_Final_Specs.json for exact recipe
specs_path = "/kaggle/input/0-1nm-chip-data/Titan_Final_Specs.json"
specs = json.load(open(specs_path, "r"))

recipe = specs.get("recipe", {})
perf   = specs.get("performance", {})
meta   = specs.get("meta", {})

print("Loaded base blueprint version:", base_blueprint["meta"]["version"])
print("Specs version:", meta.get("version"), "target:", meta.get("target"))

# Convenience helpers
def get_unit(name: str):
    for u in base_blueprint["processing_units"]:
        if u["name"] == name:
            return u
    return None

def nominal_from_recipe(key: str, fallback: float):
    # Map from blueprint parameter naming to recipe keys
    key_upper = key.upper()
    if "RASHBA" in key_upper:
        return float(recipe.get("RASHBA_COUPLING", fallback))
    if "ZEEMAN" in key_upper:
        return float(recipe.get("ZEEMAN_FIELD", fallback))
    if "GAP" in key_upper or "MAJORANA" in key_upper:
        return float(recipe.get("MAJORANA_GAP", fallback))
    if "FREQ" in key_upper or "THZ" in key_upper:
        return float(recipe.get("LASER_FREQ_THz", fallback))
    return float(fallback)

# -------------------------------------------------------------
# B. DEFINE STANDARD MODE SCHEMA
# -------------------------------------------------------------
# Each Processing Unit will have:
# {
#   "name": "...",
#   "modes": [
#       {
#          "mode_name": "...",
#          "description": "...",
#          "target_profile": { ... },
#          "controls": [ ... ]
#       },
#       ...
#   ]
# }

system_modes = {
    "meta": {
        "version": "titan-singularity-cell4-v1",
        "description": "Per-Processing-Unit operating modes and parameter envelopes for 0.1nm Time Crystal.",
        "derived_from": base_blueprint["meta"]["version"],
        "specs_version": meta.get("version", "unknown"),
        "target": meta.get("target", "0.1nm Time Crystal"),
        "global_performance": perf,
    },
    "units": []
}

# -------------------------------------------------------------
# C. MODE GENERATORS PER PROCESSING UNIT
# -------------------------------------------------------------
def make_majorana_modes(unit):
    # Gap_meV already present as control
    gap_ctrl = unit["controls"][0]
    base_nom = nominal_from_recipe("MAJORANA_GAP", gap_ctrl["nominal"])
    return [
        {
            "mode_name": "Idle_Protected",
            "description": "QPU parked in topological phase with maximal gap and coherence.",
            "target_profile": {
                "coherence_target": 0.9999,
                "gap_target_meV": base_nom,
                "allowed_gap_variation_meV": 0.25 * (gap_ctrl["max"] - gap_ctrl["min"]),
                "allowed_error_rate": 1e-6
            },
            "controls": [
                {
                    "parameter": gap_ctrl["parameter"],
                    "nominal": base_nom,
                    "range": [gap_ctrl["min"], gap_ctrl["max"]],
                    "tuning_policy": "keep within 1œÉ of nominal during Idle_Protected",
                    "priority": "HIGH"
                }
            ]
        },
        {
            "mode_name": "Gate_Operation",
            "description": "QPU in active braiding / gate execution; allows small transient gap modulation.",
            "target_profile": {
                "coherence_target": 0.999,
                "gap_target_meV": base_nom,
                "allowed_gap_variation_meV": 0.5 * (gap_ctrl["max"] - gap_ctrl["min"]),
                "allowed_error_rate": 5e-5
            },
            "controls": [
                {
                    "parameter": gap_ctrl["parameter"],
                    "nominal": base_nom,
                    "range": [gap_ctrl["min"], gap_ctrl["max"]],
                    "tuning_policy": "allow short excursions within full envelope during gates",
                    "priority": "HIGH"
                }
            ]
        }
    ]

def make_floquet_drive_modes():
    # Floquet specifics are not in the map; define waveform-level modes.
    return [
        {
            "mode_name": "TimeCrystal_Lock",
            "description": "Fundamental Floquet drive establishing the time-crystal state.",
            "target_profile": {
                "drive_frequency_THz": recipe.get("LASER_FREQ_THz", 10.5),
                "subharmonic_ratio": 2.0,      # period-doubled response
                "duty_cycle": 0.5,
                "modulation_depth": 0.15
            },
            "controls": [
                {
                    "parameter": "drive_freq_THz",
                    "nominal": recipe.get("LASER_FREQ_THz", 10.5),
                    "range": [10.48, 10.52],
                    "tuning_policy": "locked via femtosecond laser, active feedback",
                    "priority": "CRITICAL"
                },
                {
                    "parameter": "drive_amplitude_rel",
                    "nominal": 1.0,
                    "range": [0.8, 1.2],
                    "tuning_policy": "adjust amplitude to maintain subharmonic response",
                    "priority": "HIGH"
                },
                {
                    "parameter": "subharmonic_ratio",
                    "nominal": 2.0,
                    "range": [1.8, 2.2],
                    "tuning_policy": "monitor spectral peaks, enforce period doubling",
                    "priority": "HIGH"
                }
            ]
        },
        {
            "mode_name": "Calibration_Sweep",
            "description": "Slow sweep of drive parameters to map stability lobes and update envelopes.",
            "target_profile": {
                "sweep_span_THz": 0.04,
                "max_runtime_s": 600.0,
                "update_sensitivity_maps": True
            },
            "controls": [
                {
                    "parameter": "drive_freq_THz",
                    "nominal": recipe.get("LASER_FREQ_THz", 10.5),
                    "range": [10.46, 10.54],
                    "tuning_policy": "scan frequency; record coherence & yield per point",
                    "priority": "MEDIUM"
                },
                {
                    "parameter": "drive_amplitude_rel",
                    "nominal": 1.0,
                    "range": [0.6, 1.4],
                    "tuning_policy": "scan amplitude during calibration only",
                    "priority": "MEDIUM"
                }
            ]
        }
    ]

def make_rashba_modes(unit):
    r_ctrl = unit["controls"][0]
    base_nom = nominal_from_recipe("RASHBA_COUPLING", r_ctrl["nominal"])
    return [
        {
            "mode_name": "Material_Setpoint",
            "description": "Static Rashba coupling set by growth and fine-tuned via gate bias.",
            "target_profile": {
                "rashba_target_eV": base_nom,
                "allowed_variation_eV": 0.25 * (r_ctrl["max"] - r_ctrl["min"])
            },
            "controls": [
                {
                    "parameter": r_ctrl["parameter"],
                    "nominal": base_nom,
                    "range": [r_ctrl["min"], r_ctrl["max"]],
                    "tuning_policy": "calibrate at cooldown; minor drift-corrections only",
                    "priority": "HIGH"
                }
            ]
        }
    ]

def make_zeeman_modes(unit):
    z_ctrl = unit["controls"][0]
    base_nom = nominal_from_recipe("ZEEMAN_FIELD", z_ctrl["nominal"])
    return [
        {
            "mode_name": "Static_Field",
            "description": "DC Zeeman field to place nanowire in topological regime.",
            "target_profile": {
                "field_target_T": base_nom,
                "allowed_variation_T": 0.25 * (z_ctrl["max"] - z_ctrl["min"])
            },
            "controls": [
                {
                    "parameter": z_ctrl["parameter"],
                    "nominal": base_nom,
                    "range": [z_ctrl["min"], z_ctrl["max"]],
                    "tuning_policy": "ramp slowly; minimize eddy currents and heating",
                    "priority": "HIGH"
                }
            ]
        },
        {
            "mode_name": "Field_Scan_Calibration",
            "description": "Slow field scan to verify topological phase boundaries against sensitivity map.",
            "target_profile": {
                "scan_span_T": 0.01,
                "max_runtime_s": 900.0
            },
            "controls": [
                {
                    "parameter": z_ctrl["parameter"],
                    "nominal": base_nom,
                    "range": [z_ctrl["min"], z_ctrl["max"]],
                    "tuning_policy": "oscillatory scan within allowed window during calibration only",
                    "priority": "MEDIUM"
                }
            ]
        }
    ]

def make_laser_modes(unit):
    f_ctrl = unit["controls"][0]
    base_nom = nominal_from_recipe("LASER_FREQ_THz", f_ctrl["nominal"])
    return [
        {
            "mode_name": "Frequency_Locked",
            "description": "Laser locked to THz drive frequency with femtosecond stability.",
            "target_profile": {
                "freq_target_THz": base_nom,
                "freq_tolerance_THz": 0.001,
                "timing_jitter_fs_rms": 10.0
            },
            "controls": [
                {
                    "parameter": f_ctrl["parameter"],
                    "nominal": base_nom,
                    "range": [f_ctrl["min"], f_ctrl["max"]],
                    "tuning_policy": "PLL lock; active compensation for slow drift",
                    "priority": "CRITICAL"
                }
            ]
        }
    ]

def make_thermal_modes():
    return [
        {
            "mode_name": "Base_Temperature",
            "description": "Dilution refrigerator at ~10 mK with full shielding.",
            "target_profile": {
                "base_temp_mK": 10.0,
                "max_temp_mK": 15.0,
                "field_cool_protocol": "zero-field-cool then ramp Zeeman"
            },
            "controls": [
                {
                    "parameter": "fridge_setpoint_mK",
                    "nominal": 10.0,
                    "range": [8.0, 15.0],
                    "tuning_policy": "PID fridge control; keep < 12 mK during operation",
                    "priority": "HIGH"
                },
                {
                    "parameter": "shield_state",
                    "nominal": 1.0,
                    "range": [0.0, 1.0],
                    "tuning_policy": "1 = closed; interlocks prevent operation at 0",
                    "priority": "CRITICAL"
                }
            ]
        }
    ]

def make_yield_modes():
    return [
        {
            "mode_name": "MonteCarlo_Qualification",
            "description": "Reproduces 10,000-point jitter stress test to qualify device yield.",
            "target_profile": {
                "target_yield_fraction": 0.32,
                "num_samples": 10000
            },
            "controls": [
                {
                    "parameter": "jitter_std_freq_ppm",
                    "nominal": 5.0,
                    "range": [1.0, 20.0],
                    "tuning_policy": "simulate and measure coherence under specified jitter",
                    "priority": "MEDIUM"
                },
                {
                    "parameter": "fabrication_offset_sigma",
                    "nominal": 1.0,
                    "range": [0.5, 3.0],
                    "tuning_policy": "model array of devices across wafer",
                    "priority": "MEDIUM"
                }
            ]
        }
    ]

def make_error_correction_modes():
    return [
        {
            "mode_name": "Logical_Run",
            "description": "Continuous syndrome extraction and correction on logical qubits.",
            "target_profile": {
                "code_type": "surface_code_majorana",
                "code_distance": 9,
                "cycle_time_ns": 100.0
            },
            "controls": [
                {
                    "parameter": "code_distance",
                    "nominal": 9,
                    "range": [5, 17],
                    "tuning_policy": "trade coherence budget vs overhead",
                    "priority": "HIGH"
                },
                {
                    "parameter": "syndrome_cycle_time_ns",
                    "nominal": 100.0,
                    "range": [50.0, 200.0],
                    "tuning_policy": "match to gate times and decoherence",
                    "priority": "HIGH"
                }
            ]
        }
    ]

def make_control_fabric_modes(unit):
    # Trial_ID was used as a proxy; reinterpret as experiment indexing
    return [
        {
            "mode_name": "Experiment_Scheduler",
            "description": "Assigns trial IDs and sequences pulses for calibration and production.",
            "target_profile": {
                "max_parallel_experiments": 64,
                "max_trial_id": unit["controls"][0]["max"]
            },
            "controls": [
                {
                    "parameter": "max_parallel_experiments",
                    "nominal": 32,
                    "range": [8, 128],
                    "tuning_policy": "set based on control hardware bandwidth",
                    "priority": "MEDIUM"
                },
                {
                    "parameter": "trial_id_stride",
                    "nominal": 1,
                    "range": [1, 100],
                    "tuning_policy": "encode experiment metadata in ID spacing",
                    "priority": "LOW"
                }
            ]
        }
    ]

def make_calibration_modes():
    return [
        {
            "mode_name": "Daily_Calibration",
            "description": "Automated daily tune-up across Rashba, Zeeman, and laser parameters.",
            "target_profile": {
                "duration_minutes": 60,
                "max_allowed_drift_sigma": 2.0
            },
            "controls": [
                {
                    "parameter": "calibration_interval_hours",
                    "nominal": 24.0,
                    "range": [4.0, 48.0],
                    "tuning_policy": "shorten interval if drift exceeds threshold",
                    "priority": "MEDIUM"
                }
            ]
        }
    ]

# -------------------------------------------------------------
# D. BUILD MODES FOR ALL UNITS
# -------------------------------------------------------------
for unit in base_blueprint["processing_units"]:
    name = unit["name"]
    unit_entry = {
        "name": name,
        "role": unit["role"],
        "modes": []
    }

    if name == "Majorana_QPU_Core":
        unit_entry["modes"] = make_majorana_modes(unit)
    elif name == "Floquet_Drive_Unit":
        unit_entry["modes"] = make_floquet_drive_modes()
    elif name == "Rashba_Field_Controller":
        unit_entry["modes"] = make_rashba_modes(unit)
    elif name == "Zeeman_Magnet_Unit":
        unit_entry["modes"] = make_zeeman_modes(unit)
    elif name == "Laser_Coherence_Unit":
        unit_entry["modes"] = make_laser_modes(unit)
    elif name == "Thermal_Shielding_Unit":
        unit_entry["modes"] = make_thermal_modes()
    elif name == "Yield_Optimization_Unit":
        unit_entry["modes"] = make_yield_modes()
    elif name == "Error_Correction_Unit":
        unit_entry["modes"] = make_error_correction_modes()
    elif name == "Control_Fabric_Unit":
        unit_entry["modes"] = make_control_fabric_modes(unit)
    elif name == "Calibration_Unit":
        unit_entry["modes"] = make_calibration_modes()
    else:
        unit_entry["modes"] = []

    system_modes["units"].append(unit_entry)

# -------------------------------------------------------------
# E. MERGED ‚ÄúFULL SPEC‚Äù ARTIFACT
# -------------------------------------------------------------
full_spec = {
    "meta": {
        "version": "titan-singularity-fullspec-v1",
        "base_blueprint": base_blueprint["meta"]["version"],
        "modes_version": system_modes["meta"]["version"],
        "target": meta.get("target", "0.1nm Time Crystal")
    },
    "specs": specs,
    "control_surface": base_blueprint,
    "system_modes": system_modes
}

# -------------------------------------------------------------
# F. SAVE JSON OUTPUTS
# -------------------------------------------------------------
modes_path = "titan_0_1nm_system_modes.json"
fullspec_path = "titan_0_1nm_full_spec.json"

with open(modes_path, "w") as f:
    json.dump(system_modes, f, indent=2)

with open(fullspec_path, "w") as f:
    json.dump(full_spec, f, indent=2)

print("EXPORT COMPLETE:")
print(" -", modes_path)
print(" -", fullspec_path)
for u in system_modes["units"]:
    print(f"   * {u['name']}: {len(u['modes'])} modes")

SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 4
Loaded base blueprint version: titan-singularity-cell3-v1
Specs version: v17.0 target: 0.1nm Time Crystal
EXPORT COMPLETE:
 - titan_0_1nm_system_modes.json
 - titan_0_1nm_full_spec.json
   * Majorana_QPU_Core: 2 modes
   * Floquet_Drive_Unit: 2 modes
   * Rashba_Field_Controller: 1 modes
   * Zeeman_Magnet_Unit: 2 modes
   * Laser_Coherence_Unit: 1 modes
   * Thermal_Shielding_Unit: 1 modes
   * Yield_Optimization_Unit: 1 modes
   * Error_Correction_Unit: 1 modes
   * Control_Fabric_Unit: 1 modes
   * Calibration_Unit: 1 modes


In [5]:
# TITAN 0.1nm SINGULARITY ‚Äì CELL 5
# Experiment Campaign Generator

import json
import math

print("SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 5")

MODES_PATH = "titan_0_1nm_system_modes.json"

with open(MODES_PATH, "r") as f:
    modes_spec = json.load(f)

print("Loaded modes version:", modes_spec["meta"]["version"])

# -------------------------------------------------------------
# Helpers
# -------------------------------------------------------------
def find_unit(units, name):
    for u in units:
        if u["name"] == name:
            return u
    return None

units = modes_spec["units"]

floquet_unit   = find_unit(units, "Floquet_Drive_Unit")
zeeman_unit    = find_unit(units, "Zeeman_Magnet_Unit")
yield_unit     = find_unit(units, "Yield_Optimization_Unit")
majorana_unit  = find_unit(units, "Majorana_QPU_Core")

# -------------------------------------------------------------
# 1. Floquet Drive Calibration Experiments
# -------------------------------------------------------------
floquet_experiments = []

if floquet_unit is not None:
    # TimeCrystal_Lock mode
    lock_mode = next(m for m in floquet_unit["modes"] if m["mode_name"] == "TimeCrystal_Lock")
    lock_freq   = lock_mode["target_profile"]["drive_frequency_THz"]
    lock_ctrls  = {c["parameter"]: c for c in lock_mode["controls"]}
    freq_range  = lock_ctrls["drive_freq_THz"]["range"]
    
    # Calibration_Sweep mode
    sweep_mode = next(m for m in floquet_unit["modes"] if m["mode_name"] == "Calibration_Sweep")
    sweep_span = sweep_mode["target_profile"]["sweep_span_THz"]
    sweep_ctrls = {c["parameter"]: c for c in sweep_mode["controls"]}
    sweep_freq_range = sweep_ctrls["drive_freq_THz"]["range"]
    
    # Experiment 1: High-resolution lock neighborhood scan
    n_points_lock = 41  # e.g., ~1 GHz resolution over 40 GHz window
    f_min_lock = max(freq_range[0], lock_freq - sweep_span / 2.0)
    f_max_lock = min(freq_range[1], lock_freq + sweep_span / 2.0)
    lock_freqs = [f_min_lock + i * (f_max_lock - f_min_lock) / (n_points_lock - 1)
                  for i in range(n_points_lock)]
    
    floquet_experiments.append({
        "name": "Floquet_Lock_Neighborhood",
        "description": "Fine scan around lock frequency to refine time-crystal stability window.",
        "unit": "Floquet_Drive_Unit",
        "mode": "Calibration_Sweep",
        "sweep_type": "frequency",
        "parameters": {
            "drive_freq_THz_points": lock_freqs,
            "drive_amplitude_rel": 1.0,
            "subharmonic_ratio_target": 2.0
        },
        "measurements": [
            "coherence_time",
            "subharmonic_peak_amplitude",
            "yield_fraction"
        ]
    })
    
    # Experiment 2: Amplitude scan at fixed lock frequency
    n_points_amp = 21
    amp_min, amp_max = lock_ctrls["drive_amplitude_rel"]["range"]
    amp_points = [amp_min + i * (amp_max - amp_min) / (n_points_amp - 1)
                  for i in range(n_points_amp)]
    
    floquet_experiments.append({
        "name": "Floquet_Amplitude_Response",
        "description": "Scan drive amplitude at lock frequency to map subharmonic response.",
        "unit": "Floquet_Drive_Unit",
        "mode": "TimeCrystal_Lock",
        "sweep_type": "amplitude",
        "parameters": {
            "drive_freq_THz": lock_freq,
            "drive_amplitude_rel_points": amp_points,
            "subharmonic_ratio_target": 2.0
        },
        "measurements": [
            "coherence_time",
            "subharmonic_peak_amplitude"
        ]
    })

# -------------------------------------------------------------
# 2. Zeeman Field Phase-Boundary Experiments
# -------------------------------------------------------------
zeeman_experiments = []

if zeeman_unit is not None and majorana_unit is not None:
    z_mode_static = next(m for m in zeeman_unit["modes"]
                         if m["mode_name"] == "Static_Field")
    z_ctrl = z_mode_static["controls"][0]
    B_min, B_max = z_ctrl["range"]
    B_mid = z_mode_static["target_profile"]["field_target_T"]
    
    gap_ctrl = majorana_unit["modes"][0]["controls"][0]
    
    # Experiment 3: Zeeman scan across full safe range
    n_points_B = 51
    B_points = [B_min + i * (B_max - B_min) / (n_points_B - 1)
                for i in range(n_points_B)]
    
    zeeman_experiments.append({
        "name": "Zeeman_Phase_Boundary_Scan",
        "description": "Scan Zeeman field to locate transitions into/out of the topological regime.",
        "unit": "Zeeman_Magnet_Unit",
        "mode": "Field_Scan_Calibration",
        "sweep_type": "field",
        "parameters": {
            "Zeeman_T_points": B_points
        },
        "measurements": [
            "gap_meV",
            "coherence_time",
            "parity_stability"
        ]
    })
    
    # Experiment 4: Zeeman modulation during gate operation
    zeeman_experiments.append({
        "name": "Zeeman_During_Gates",
        "description": "Test robustness of gate operations to small Zeeman modulations.",
        "unit": "Zeeman_Magnet_Unit",
        "mode": "Field_Scan_Calibration",
        "sweep_type": "field_modulation",
        "parameters": {
            "Zeeman_center_T": B_mid,
            "Zeeman_modulation_delta_T": 0.001,
            "num_cycles": 100
        },
        "measurements": [
            "gate_fidelity",
            "error_rate_per_cycle"
        ]
    })

# -------------------------------------------------------------
# 3. Yield Monte Carlo Qualification Experiments
# -------------------------------------------------------------
yield_experiments = []

if yield_unit is not None:
    mc_mode = yield_unit["modes"][0]
    profile = mc_mode["target_profile"]
    controls = {c["parameter"]: c for c in mc_mode["controls"]}
    
    target_yield = profile["target_yield_fraction"]
    
    # Experiment 5: Reproduce original 10k-point MC
    yield_experiments.append({
        "name": "Yield_MC_Reproduction",
        "description": "Reproduce 10,000-point jitter stress test to validate original yield estimate.",
        "unit": "Yield_Optimization_Unit",
        "mode": "MonteCarlo_Qualification",
        "parameters": {
            "num_samples": profile["num_samples"],
            "jitter_std_freq_ppm": controls["jitter_std_freq_ppm"]["nominal"],
            "fabrication_offset_sigma": controls["fabrication_offset_sigma"]["nominal"]
        },
        "acceptance_criteria": {
            "yield_fraction_min": 0.25,
            "yield_fraction_target": target_yield
        },
        "measurements": [
            "yield_fraction",
            "coherence_distribution",
            "failure_mode_statistics"
        ]
    })
    
    # Experiment 6: Yield vs jitter curve
    jitter_min, jitter_max = controls["jitter_std_freq_ppm"]["range"]
    n_points_jitter = 9
    jitter_points = [jitter_min + i * (jitter_max - jitter_min) / (n_points_jitter - 1)
                     for i in range(n_points_jitter)]
    
    yield_experiments.append({
        "name": "Yield_vs_Jitter_Curve",
        "description": "Scan laser jitter and measure yield to map robustness curve.",
        "unit": "Yield_Optimization_Unit",
        "mode": "MonteCarlo_Qualification",
        "parameters": {
            "num_samples": 2000,
            "jitter_std_freq_ppm_points": jitter_points,
            "fabrication_offset_sigma": controls["fabrication_offset_sigma"]["nominal"]
        },
        "measurements": [
            "yield_fraction",
            "coherence_distribution"
        ]
    })

# -------------------------------------------------------------
# 4. Assemble Experiment Plan Artifact
# -------------------------------------------------------------
experiment_plans = {
    "meta": {
        "version": "titan-singularity-expplans-v1",
        "derived_from_modes": modes_spec["meta"]["version"],
        "target": modes_spec["meta"]["target"]
    },
    "experiments": {
        "floquet_calibration": floquet_experiments,
        "zeeman_phase": zeeman_experiments,
        "yield_qualification": yield_experiments
    }
}

OUT_PATH = "titan_0_1nm_experiment_plans.json"
with open(OUT_PATH, "w") as f:
    json.dump(experiment_plans, f, indent=2)

print("EXPORT COMPLETE ::", OUT_PATH)
for group, exps in experiment_plans["experiments"].items():
    print(f" - {group}: {len(exps)} experiments")

SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 5
Loaded modes version: titan-singularity-cell4-v1
EXPORT COMPLETE :: titan_0_1nm_experiment_plans.json
 - floquet_calibration: 2 experiments
 - zeeman_phase: 2 experiments
 - yield_qualification: 2 experiments


In [6]:
# TITAN 0.1nm SINGULARITY ‚Äì CELL 5 (UPGRADE)
# Add runtime metadata + priority/dependencies

import json

print("SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 5B")

IN_PATH  = "titan_0_1nm_experiment_plans.json"
OUT_PATH = "titan_0_1nm_experiment_plans_v2.json"

with open(IN_PATH, "r") as f:
    plans = json.load(f)

print("Loaded exp plan version:", plans["meta"]["version"])

# -------------------------------------------------------------
# Helper to attach metadata
# -------------------------------------------------------------
def add_metadata(exp, defaults):
    # Attach or override runtime + scheduling fields
    exp["runtime"] = {
        "estimated_duration_per_point_s": defaults["per_point_s"],
        "repetitions": defaults["repetitions"],
        "randomize_order": defaults["randomize"]
    }
    exp["priority"] = defaults["priority"]
    exp["depends_on"] = defaults.get("depends_on", [])
    return exp

exps = plans["experiments"]

# Short names to refer to dependencies
name_to_group = {}
for group_name, group_exps in exps.items():
    for e in group_exps:
        name_to_group[e["name"]] = group_name

# -------------------------------------------------------------
# 1. Floquet calibration experiments
# -------------------------------------------------------------
for e in exps.get("floquet_calibration", []):
    if e["name"] == "Floquet_Lock_Neighborhood":
        add_metadata(e, {
            "per_point_s": 2.0,        # e.g. 2 s per freq point
            "repetitions": 5,
            "randomize": False,        # monotonic sweep for lock finding
            "priority": "CRITICAL",
            "depends_on": []           # starting point
        })
    elif e["name"] == "Floquet_Amplitude_Response":
        add_metadata(e, {
            "per_point_s": 1.5,
            "repetitions": 3,
            "randomize": True,         # randomize amplitude order to de-correlate drift
            "priority": "HIGH",
            "depends_on": ["Floquet_Lock_Neighborhood"]
        })

# -------------------------------------------------------------
# 2. Zeeman phase experiments
# -------------------------------------------------------------
for e in exps.get("zeeman_phase", []):
    if e["name"] == "Zeeman_Phase_Boundary_Scan":
        add_metadata(e, {
            "per_point_s": 3.0,
            "repetitions": 3,
            "randomize": False,        # ramp Zeeman smoothly
            "priority": "HIGH",
            "depends_on": ["Floquet_Lock_Neighborhood"]
        })
    elif e["name"] == "Zeeman_During_Gates":
        add_metadata(e, {
            "per_point_s": 4.0,
            "repetitions": 10,
            "randomize": True,
            "priority": "HIGH",
            "depends_on": [
                "Floquet_Lock_Neighborhood",
                "Floquet_Amplitude_Response",
                "Zeeman_Phase_Boundary_Scan"
            ]
        })

# -------------------------------------------------------------
# 3. Yield qualification experiments
# -------------------------------------------------------------
for e in exps.get("yield_qualification", []):
    if e["name"] == "Yield_MC_Reproduction":
        add_metadata(e, {
            "per_point_s": 0.01,       # MC samples are simulated or fast offline
            "repetitions": 1,
            "randomize": True,
            "priority": "MEDIUM",
            "depends_on": ["Floquet_Lock_Neighborhood"]
        })
    elif e["name"] == "Yield_vs_Jitter_Curve":
        add_metadata(e, {
            "per_point_s": 0.02,
            "repetitions": 1,
            "randomize": True,
            "priority": "MEDIUM",
            "depends_on": ["Yield_MC_Reproduction"]
        })

# -------------------------------------------------------------
# 4. Bump meta version and export
# -------------------------------------------------------------
plans["meta"]["version"] = "titan-singularity-expplans-v2"
plans["meta"]["notes"] = (
    "Added runtime estimates, repetitions, randomization flags, and priority/dependency graph "
    "for scheduler integration."
)

with open(OUT_PATH, "w") as f:
    json.dump(plans, f, indent=2)

print("EXPORT COMPLETE ::", OUT_PATH)
for group, group_exps in plans["experiments"].items():
    print(f" - {group}:")
    for e in group_exps:
        rt = e.get("runtime", {})
        print(
            f"   * {e['name']} | priority={e.get('priority')} | "
            f"per_point_s={rt.get('estimated_duration_per_point_s')} | "
            f"reps={rt.get('repetitions')} | depends_on={e.get('depends_on', [])}"
        )

SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 5B
Loaded exp plan version: titan-singularity-expplans-v1
EXPORT COMPLETE :: titan_0_1nm_experiment_plans_v2.json
 - floquet_calibration:
   * Floquet_Lock_Neighborhood | priority=CRITICAL | per_point_s=2.0 | reps=5 | depends_on=[]
   * Floquet_Amplitude_Response | priority=HIGH | per_point_s=1.5 | reps=3 | depends_on=['Floquet_Lock_Neighborhood']
 - zeeman_phase:
   * Zeeman_Phase_Boundary_Scan | priority=HIGH | per_point_s=3.0 | reps=3 | depends_on=['Floquet_Lock_Neighborhood']
   * Zeeman_During_Gates | priority=HIGH | per_point_s=4.0 | reps=10 | depends_on=['Floquet_Lock_Neighborhood', 'Floquet_Amplitude_Response', 'Zeeman_Phase_Boundary_Scan']
 - yield_qualification:
   * Yield_MC_Reproduction | priority=MEDIUM | per_point_s=0.01 | reps=1 | depends_on=['Floquet_Lock_Neighborhood']
   * Yield_vs_Jitter_Curve | priority=MEDIUM | per_point_s=0.02 | reps=1 | depends_on=['Yield_MC_Reproduction']


In [7]:
# TITAN 0.1nm SINGULARITY ‚Äì CELL 6 (FIXED)
# Hardware IO & Firmware Mapping

import json

print("SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 6B")

FULL_SPEC_PATH = "titan_0_1nm_full_spec.json"
OUT_IO_PATH = "titan_0_1nm_control_IO_map.json"

with open(FULL_SPEC_PATH, "r") as f:
    full_spec = json.load(f)

print("Loaded spec version:", full_spec["meta"]["version"])

# -------------------------------------------------------------
# Helper: extract safe range + nominal from a control block
# -------------------------------------------------------------
def extract_range_and_nominal(ctrl):
    # Modes use "range": [min, max]; control_surface uses "min"/"max".
    if "range" in ctrl and isinstance(ctrl["range"], list) and len(ctrl["range"]) == 2:
        cmin, cmax = ctrl["range"]
    else:
        cmin = ctrl.get("min", 0.0)
        cmax = ctrl.get("max", 1.0)
    nominal = ctrl.get("nominal", (cmin + cmax) / 2.0)
    return float(cmin), float(cmax), float(nominal)

# -------------------------------------------------------------
# IO Channel Definitions
# -------------------------------------------------------------
def map_parameter_to_io(unit_name, ctrl):
    param = ctrl["parameter"]
    cmin, cmax, nominal = extract_range_and_nominal(ctrl)

    io_def = {
        "parameter_key": param,
        "unit_owner": unit_name,
        "interface_type": "UNKNOWN",
        "channel_id": "CH_00",
        "resolution_bits": 16,
        "update_rate_Hz": 1000.0,
        "safe_range": [cmin, cmax],
        "default_setpoint": nominal
    }

    p = param

    if "Freq_THz" in p or "drive_freq" in p:
        io_def["interface_type"] = "OPTICAL_LOCK_LOOP"
        io_def["channel_id"] = "OLL_CH_01"
        io_def["resolution_bits"] = 32
        io_def["update_rate_Hz"] = 10e6

    elif "Rashba" in p or "Gate" in p:
        io_def["interface_type"] = "DAC_PRECISION_DC"
        io_def["channel_id"] = "DAC_SLOT3_CH04"
        io_def["resolution_bits"] = 20
        io_def["update_rate_Hz"] = 100e3

    elif "Zeeman" in p or "Field" in p:
        io_def["interface_type"] = "MAGNET_SUPPLY_CURRENT"
        io_def["channel_id"] = "PSU_MAIN_COIL"
        io_def["resolution_bits"] = 24
        io_def["update_rate_Hz"] = 10.0

    elif "Gap" in p or "Majorana" in p:
        io_def["interface_type"] = "VIRTUAL_DERIVED"
        io_def["channel_id"] = "META_REG_01"
        io_def["resolution_bits"] = 32

    elif "jitter" in p or "offset" in p:
        io_def["interface_type"] = "SIMULATION_CONFIG"
        io_def["channel_id"] = "SIM_CONF_0X10"

    elif "code_distance" in p or "cycle_time" in p:
        io_def["interface_type"] = "FPGA_CONFIG_REG"
        io_def["channel_id"] = "FPGA_CORE_REG_04"
        io_def["resolution_bits"] = 32

    elif "drive_amplitude" in p:
        io_def["interface_type"] = "AWG_RF_MODULATOR"
        io_def["channel_id"] = "AWG_MOD_CH01"
        io_def["resolution_bits"] = 16
        io_def["update_rate_Hz"] = 1e9

    else:
        io_def["interface_type"] = "GENERIC_SLOW_CONTROL"
        io_def["channel_id"] = "SC_BUS_01"

    return io_def

# -------------------------------------------------------------
# Build IO Map
# -------------------------------------------------------------
io_map = {
    "meta": {
        "version": "titan-singularity-io-v1",
        "description": "Hardware channel mapping for 0.1nm Titan control stack.",
        "derived_from_spec": full_spec["meta"]["version"]
    },
    "channels": []
}

seen_params = set()

for unit in full_spec["system_modes"]["units"]:
    for mode in unit.get("modes", []):
        for ctrl in mode.get("controls", []):
            p_name = ctrl["parameter"]
            unique_key = f"{unit['name']}::{p_name}"
            if unique_key in seen_params:
                continue
            channel_def = map_parameter_to_io(unit["name"], ctrl)
            channel_def["unique_id"] = unique_key
            io_map["channels"].append(channel_def)
            seen_params.add(unique_key)

# -------------------------------------------------------------
# Export
# -------------------------------------------------------------
with open(OUT_IO_PATH, "w") as f:
    json.dump(io_map, f, indent=2)

print("EXPORT COMPLETE ::", OUT_IO_PATH)
print(f" - Mapped {len(io_map['channels'])} hardware control channels.")
types = sorted(set(c["interface_type"] for c in io_map["channels"]))
print(" - Interface types used:")
for t in types:
    print(f"   * {t}")

SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 6B
Loaded spec version: titan-singularity-fullspec-v1
EXPORT COMPLETE :: titan_0_1nm_control_IO_map.json
 - Mapped 16 hardware control channels.
 - Interface types used:
   * AWG_RF_MODULATOR
   * DAC_PRECISION_DC
   * FPGA_CONFIG_REG
   * GENERIC_SLOW_CONTROL
   * MAGNET_SUPPLY_CURRENT
   * OPTICAL_LOCK_LOOP
   * SIMULATION_CONFIG
   * VIRTUAL_DERIVED


In [8]:
# TITAN 0.1nm SINGULARITY ‚Äì CELL 6B
# Add units, conversion formulas, readbacks, and interlocks

import json

print("SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 6C")

IN_IO_PATH  = "titan_0_1nm_control_IO_map.json"
OUT_IO_PATH = "titan_0_1nm_control_IO_map_v2.json"

with open(IN_IO_PATH, "r") as f:
    io_map = json.load(f)

print("Loaded IO map version:", io_map["meta"]["version"])

channels = io_map["channels"]

# Build a quick lookup for shield_state unique_id
shield_channel_uid = None
for ch in channels:
    if ch["parameter_key"] == "shield_state":
        shield_channel_uid = ch["unique_id"]
        break

# Helper: enrich a channel with units, conversion, readback, interlocks
def enrich_channel(ch):
    pk = ch["parameter_key"]
    iface = ch["interface_type"]
    sr_min, sr_max = ch["safe_range"]

    # Defaults
    ch["units"] = "arb"
    ch["conversion_to_physical"] = "identity: physical = control_value"
    ch["readback_channel"] = {
        "type": "NONE",
        "channel_id": None,
        "units": None
    }
    ch["interlocks"] = []

    # Majorana gap ‚Äì virtual
    if pk == "Gap_meV":
        ch["units"] = "meV"
        ch["conversion_to_physical"] = (
            "virtual: Gap_meV is inferred from Rashba_eV + Zeeman_T + device model, "
            "not directly set via hardware"
        )
        ch["readback_channel"] = {
            "type": "ANALYSIS_DERIVED",
            "channel_id": "ANALYSIS_PIPELINE_GAP",
            "units": "meV"
        }

    # Floquet drive frequency
    elif pk == "drive_freq_THz":
        ch["units"] = "THz"
        ch["conversion_to_physical"] = (
            "f_THz = f_synth / 1e12, where f_synth is the RF synthesizer frequency in Hz "
            "programmed via OLL_CH_01"
        )
        ch["readback_channel"] = {
            "type": "PLL_MONITOR",
            "channel_id": "OLL_CH_01_MON",
            "units": "Hz"
        }
        if shield_channel_uid:
            ch["interlocks"].append({
                "condition": f"{shield_channel_uid} == 1",
                "action": "ALLOW_OUTPUT",
                "else_action": "FORCE_OUTPUT_OFF",
                "description": "Enable Floquet drive only when thermal shielding is closed."
            })

    # Floquet amplitude
    elif pk == "drive_amplitude_rel":
        ch["units"] = "relative"
        ch["conversion_to_physical"] = (
            "V_RMS = drive_amplitude_rel * V_ref, where V_ref is AWG full-scale voltage"
        )
        ch["readback_channel"] = {
            "type": "AWG_MONITOR",
            "channel_id": "AWG_MOD_CH01_MON",
            "units": "V_RMS"
        }
        if shield_channel_uid:
            ch["interlocks"].append({
                "condition": f"{shield_channel_uid} == 1",
                "action": "ALLOW_OUTPUT",
                "else_action": "FORCE_OUTPUT_OFF",
                "description": "Disable RF modulation if shield is open."
            })

    # Floquet subharmonic ratio
    elif pk == "subharmonic_ratio":
        ch["units"] = "dimensionless"
        ch["conversion_to_physical"] = (
            "subharmonic_ratio = f_drive / f_response_peak, monitored by spectrum analyzer"
        )
        ch["readback_channel"] = {
            "type": "SPECTRUM_ANALYZER",
            "channel_id": "SPEC_TC_PEAK",
            "units": "ratio"
        }

    # Rashba gate
    elif pk == "Rashba_eV":
        ch["units"] = "eV equivalent"
        ch["conversion_to_physical"] = (
            "Rashba_eV = alpha * V_gate, where V_gate is DAC voltage and alpha is "
            "device- and geometry-specific calibration (eV/V)"
        )
        ch["readback_channel"] = {
            "type": "DAC_VMON",
            "channel_id": "DAC_SLOT3_CH04_MON",
            "units": "V"
        }

    # Zeeman field
    elif pk == "Zeeman_T":
        ch["units"] = "Tesla"
        ch["conversion_to_physical"] = (
            "B_T = k_BI * I_coil, where I_coil is PSU_MAIN_COIL current in A and k_BI is "
            "coil calibration factor (T/A)"
        )
        ch["readback_channel"] = {
            "type": "PSU_IMON",
            "channel_id": "PSU_MAIN_COIL_IMON",
            "units": "A"
        }
        if shield_channel_uid:
            ch["interlocks"].append({
                "condition": f"{shield_channel_uid} == 1",
                "action": "ALLOW_RAMP",
                "else_action": "HOLD_FIELD_AT_0",
                "description": "Do not ramp Zeeman field with shield open."
            })

    # Laser base frequency
    elif pk == "Freq_THz":
        ch["units"] = "THz"
        ch["conversion_to_physical"] = (
            "Laser frequency in THz; control maps to cavity or comb offset via OLL_CH_01."
        )
        ch["readback_channel"] = {
            "type": "FREQUENCY_COUNTER",
            "channel_id": "LASER_FREQ_MON",
            "units": "Hz"
        }
        if shield_channel_uid:
            ch["interlocks"].append({
                "condition": f"{shield_channel_uid} == 1",
                "action": "ALLOW_LOCK",
                "else_action": "UNLOCK_AND_SHUTTER",
                "description": "Require closed shield for laser lock and beam unshuttering."
            })

    # Fridge and shield
    elif pk == "fridge_setpoint_mK":
        ch["units"] = "mK"
        ch["conversion_to_physical"] = (
            "Fridge temperature setpoint in mK; maps to cryostat controller setpoint."
        )
        ch["readback_channel"] = {
            "type": "THERMOMETER",
            "channel_id": "TEMP_STAGE_MIXER",
            "units": "mK"
        }
    elif pk == "shield_state":
        ch["units"] = "logic"
        ch["conversion_to_physical"] = "0 = open, 1 = closed; controlled by mechanical/EM shield actuator."
        ch["readback_channel"] = {
            "type": "LIMIT_SWITCH",
            "channel_id": "SHIELD_LIMIT_SW",
            "units": "logic"
        }

    # Yield simulation controls
    elif pk in ("jitter_std_freq_ppm", "fabrication_offset_sigma"):
        ch["units"] = "dimensionless"
        ch["conversion_to_physical"] = (
            "Simulation-only parameter; affects Monte Carlo sampling distributions."
        )
        ch["readback_channel"] = {
            "type": "SOFTWARE_STATE",
            "channel_id": "SIM_CONF_STATE",
            "units": "config"
        }

    # Error correction
    elif pk == "code_distance":
        ch["units"] = "distance"
        ch["conversion_to_physical"] = (
            "Code distance; maps to number of physical qubits per logical qubit in FPGA configuration."
        )
        ch["readback_channel"] = {
            "type": "FPGA_STATUS",
            "channel_id": "FPGA_CORE_REG_04_RD",
            "units": "int"
        }
    elif pk == "syndrome_cycle_time_ns":
        ch["units"] = "ns"
        ch["conversion_to_physical"] = (
            "Cycle time in ns for syndrome extraction; sets FPGA timing for gate sequences."
        )
        ch["readback_channel"] = {
            "type": "FPGA_STATUS",
            "channel_id": "FPGA_CORE_REG_04_RD",
            "units": "ns"
        }

    # Control fabric
    elif pk == "max_parallel_experiments":
        ch["units"] = "count"
        ch["conversion_to_physical"] = "Maximum number of experiments that may run concurrently in scheduler."
        ch["readback_channel"] = {
            "type": "SCHEDULER_STATUS",
            "channel_id": "SCHED_MAX_PAR_RD",
            "units": "count"
        }
    elif pk == "trial_id_stride":
        ch["units"] = "ID_step"
        ch["conversion_to_physical"] = "Stride in experiment IDs; used for encoding metadata."
        ch["readback_channel"] = {
            "type": "SCHEDULER_STATUS",
            "channel_id": "SCHED_TRIAL_STRIDE_RD",
            "units": "ID_step"
        }

    # Calibration
    elif pk == "calibration_interval_hours":
        ch["units"] = "hours"
        ch["conversion_to_physical"] = "Interval in hours between automated calibration runs."
        ch["readback_channel"] = {
            "type": "SOFTWARE_STATE",
            "channel_id": "CALIB_SCHED_STATE",
            "units": "hours"
        }

    return ch

# Enrich all channels
for ch in channels:
    enrich_channel(ch)

# Update meta and export
io_map["meta"]["version"] = "titan-singularity-io-v2"
io_map["meta"]["notes"] = (
    "Added units, conversion formulas, readback channel descriptors, "
    "and basic interlocks (shield/laser/Zeeman/Floquet)."
)

with open(OUT_IO_PATH, "w") as f:
    json.dump(io_map, f, indent=2)

print("EXPORT COMPLETE ::", OUT_IO_PATH)
print(f" - Channels enriched: {len(channels)}")
print("Sample channels with interlocks:")
for ch in channels:
    if ch["interlocks"]:
        print(f"  * {ch['unique_id']} -> {len(ch['interlocks'])} interlocks")

SYSTEM ONLINE :: TITAN 0.1nm SINGULARITY CELL 6C
Loaded IO map version: titan-singularity-io-v1
EXPORT COMPLETE :: titan_0_1nm_control_IO_map_v2.json
 - Channels enriched: 16
Sample channels with interlocks:
  * Floquet_Drive_Unit::drive_freq_THz -> 1 interlocks
  * Floquet_Drive_Unit::drive_amplitude_rel -> 1 interlocks
  * Zeeman_Magnet_Unit::Zeeman_T -> 1 interlocks
  * Laser_Coherence_Unit::Freq_THz -> 1 interlocks
