# TabSeq-Trace: 框架设计

本 Notebook 实现了 **TabSeq-Trace** 框架的核心组件。

## 1. 方法论概述 (Methodology Overview)
核心理念是将**回归任务 (Regression)** 重构为**二叉树上的序列生成任务**。
- **因果序列建模 (Autoregressive Modeling)**: 模型输入为历史轨迹 `[SOS, b_1, b_2, ...]`，输出为当前步的多热分布预测。
- **全息推理 (Holographic Inference)**: 基于 Multi-hot 的软约束累乘，重构概率密度。

---

In [1]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_california_housing
from typing import List, Tuple, Dict, Optional
import pandas as pd

## 2. 轨迹标签编码器 (Trace Label Encoder)
将连续标签映射为二进制决策序列。

In [2]:
class TraceLabelEncoder:
    def __init__(self, v_min: float, v_max: float, depth: int = 10):
        self.v_min = v_min
        self.v_max = v_max
        self.depth = depth
        self.n_bins = 2 ** depth 
        self.bin_width = (v_max - v_min) / self.n_bins

    def encode(self, y: float) -> Tuple[List[int], int]:
        y = np.clip(y, self.v_min, self.v_max)
        norm_y = (y - self.v_min) / (self.v_max - self.v_min)
        leaf_idx = int(min(np.floor(norm_y * self.n_bins), self.n_bins - 1))
        binary_str = format(leaf_idx, f'0{self.depth}b')
        sequence = [int(bit) for bit in binary_str]
        return sequence, leaf_idx
    
    def encode_multi_hot(self, leaf_idx: int) -> np.ndarray:
        multi_hot = np.zeros((self.depth, self.n_bins), dtype=np.float32)
        start, end = 0, self.n_bins
        for t in range(self.depth):
            mid = (start + end) // 2
            if leaf_idx < mid:
                multi_hot[t, start:mid] = 1.0
                end = mid
            else:
                multi_hot[t, mid:end] = 1.0
                start = mid
        return multi_hot

    def decode_bin_index(self, bin_idx: int) -> float:
        return self.v_min + (bin_idx + 0.5) * self.bin_width

    def decode_sequence(self, sequence: List[int]) -> float:
        bin_idx = 0
        for bit in sequence:
            bin_idx = (bin_idx << 1) | bit
        return self.decode_bin_index(bin_idx)

    def get_bin_edges(self, bin_idx: int) -> Tuple[float, float]:
        lower = self.v_min + bin_idx * self.bin_width
        upper = lower + self.bin_width
        return lower, upper

## 3. TabSeq 数据集加载器 (TabSeq Dataset Loader)
**自回归架构**：Input: `[SOS, b1...]`, Target: `[MHT_0, MHT_1...]`。


In [3]:
class TabSeqDataset(Dataset):
    def __init__(self, 
                 X_num: np.ndarray, 
                 X_cat: np.ndarray, 
                 y: np.ndarray, 
                 encoder: TraceLabelEncoder,
                 is_train: bool = True,
                 sos_token: int = 2): 
        self.X_num = torch.FloatTensor(X_num) if X_num is not None else torch.empty(len(y), 0)
        self.X_cat = torch.LongTensor(X_cat) if X_cat is not None else torch.empty(len(y), 0, dtype=torch.long)
        self.y = torch.FloatTensor(y).view(-1)
        self.encoder = encoder
        self.is_train = is_train
        self.sos_token = sos_token

        self.y_seqs = []
        self.y_multi_hots = []
        
        for val in y:
            seq, leaf_idx = self.encoder.encode(val)
            mht = self.encoder.encode_multi_hot(leaf_idx)
            self.y_seqs.append(seq)
            self.y_multi_hots.append(mht)
            
        self.y_seqs = torch.LongTensor(self.y_seqs)
        self.y_multi_hots = torch.FloatTensor(self.y_multi_hots)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        x_n = self.X_num[idx]
        x_c = self.X_cat[idx]
        target_seq = self.y_seqs[idx]
        
        dec_input = torch.cat([
            torch.tensor([self.sos_token], dtype=torch.long),
            target_seq[:-1]
        ])
        
        y_mht = self.y_multi_hots[idx]
        y_raw = self.y[idx]
        
        return {
            'x_num': x_n,
            'x_cat': x_c,
            'dec_input': dec_input,
            'y_seq': target_seq,  
            'y_mht': y_mht,
            'y_raw': y_raw
        }

## 4. 测试

In [4]:
def get_california_housing_loaders(batch_size: int = 64, 
                                   depth: int = 10, 
                                   val_split: float = 0.2):
    data = fetch_california_housing()
    X = data.data
    y = data.target
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=val_split, random_state=42)
    
    scaler = StandardScaler()
    X_train_num = scaler.fit_transform(X_train)
    X_val_num = scaler.transform(X_val)
    
    v_min, v_max = y.min() - 0.1, y.max() + 0.1
    encoder = TraceLabelEncoder(v_min, v_max, depth)
    
    train_set = TabSeqDataset(X_train_num, None, y_train, encoder, is_train=True)
    val_set = TabSeqDataset(X_val_num, None, y_val, encoder, is_train=False)
    
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
    
    print(f"数据集加载完毕: Depth={depth}")
    return train_loader, val_loader, encoder

## 5. 全息评估指标 (Extended Holographic Metrics)

**更新**: 与 Benchmark 逻辑完全对齐的评估指标。
1. **预测区间命中率**: 全局覆盖率。
2. **不同宽度区间命中率**: 按固定宽度分段。
3. **分桶回归命中率**: 按 0.2 和 0.4 宽度分桶的分类准确率。

In [5]:
class ExtendedHolographicMetric:
    def __init__(self, encoder: TraceLabelEncoder):
        self.encoder = encoder
        # 定义分桶 (0.2 和 0.4)
        self.bin_edges_02 = np.arange(0, 5.21, 0.2)
        self.bin_edges_04 = np.arange(0, 5.21, 0.4)

    def compute_metrics(self, 
                        model_probs: torch.Tensor, 
                        y_true: torch.Tensor, 
                        confidence: float = 0.90) -> Dict:
        """
        计算全息评估指标，包括全局覆盖率、分段覆盖率和分桶回归命中率。
        """
        batch_size, depth, n_bins = model_probs.shape
        
        # 1. 计算联合概率 & 归一化
        # 逻辑：叶子 i 在所有 t 步都被预测为 True 的概率连乘
        leaf_unnorm_probs = torch.prod(model_probs, dim=1)
        leaf_probs = leaf_unnorm_probs / (torch.sum(leaf_unnorm_probs, dim=1, keepdim=True) + 1e-9)
        
        # 2. 构建 CDF
        cdf = torch.cumsum(leaf_probs, dim=1)
        
        # 3. 点估计 (期望值)
        bin_values = torch.tensor([self.encoder.decode_bin_index(i) for i in range(n_bins)], device=y_true.device)
        y_pred_point = torch.sum(leaf_probs * bin_values, dim=1)
        mae = torch.mean(torch.abs(y_pred_point - y_true)).item()
        rmse = torch.sqrt(torch.mean((y_pred_point - y_true)**2)).item()
        
        # 4. 区间估计 (Extract Interval [L, U] from CDF)
        alpha = 1.0 - confidence
        lower_q = alpha / 2.0
        upper_q = 1.0 - (alpha / 2.0)
        
        lower_indices = torch.argmax((cdf >= lower_q).int(), dim=1)
        upper_indices = torch.argmax((cdf >= upper_q).int(), dim=1)
        
        L_pred = bin_values[lower_indices]
        U_pred = bin_values[upper_indices]
        
        # --- 统一指标计算逻辑 ---
        y_true_np = y_true.detach().cpu().numpy()
        L_pred_np = L_pred.detach().cpu().numpy()
        U_pred_np = U_pred.detach().cpu().numpy()
        y_pred_point_np = y_pred_point.detach().cpu().numpy()
        
        covered = (y_true_np >= L_pred_np) & (y_true_np <= U_pred_np)
        global_picp = np.mean(covered)
        widths = U_pred_np - L_pred_np
        mpiw = np.mean(widths)
        
        # A. 不同宽度区间命中率 (Stratified by Width)
        fixed_bins = [0.0, 0.4, 0.8, 1.2, 1.6, 2.0, 100.0]
        bin_indices = np.digitize(widths, fixed_bins) - 1
        
        stratified_w = {}
        for i in range(len(fixed_bins)-1):
            mask = (bin_indices == i)
            count = np.sum(mask)
            if count > 0:
                local_cov = np.mean(covered[mask])
                range_str = f"Width [{fixed_bins[i]:.1f}, {fixed_bins[i+1]:.1f})"
                stratified_w[range_str] = f"{local_cov:.4f} (n={count})"
        
        # B. 分桶回归命中率 (0.2 & 0.4)
        true_bins_02 = np.digitize(y_true_np, self.bin_edges_02)
        pred_bins_02 = np.digitize(y_pred_point_np, self.bin_edges_02)
        bin_acc_02 = np.mean(true_bins_02 == pred_bins_02)

        true_bins_04 = np.digitize(y_true_np, self.bin_edges_04)
        pred_bins_04 = np.digitize(y_pred_point_np, self.bin_edges_04)
        bin_acc_04 = np.mean(true_bins_04 == pred_bins_04)

        return {
            'MAE': mae,
            'RMSE': rmse,
            '预测区间命中率': global_picp,
            'MPIW': mpiw,
            '不同宽度区间命中率': stratified_w,
            '分桶回归命中率_0.2': bin_acc_02,
            '分桶回归命中率_0.4': bin_acc_04
        }

## 6. 功能验证 (Verification)
验证 ExtendedHolographicMetric 是否正常工作。

In [6]:
# 1. 准备数据
train_loader, val_loader, encoder = get_california_housing_loaders(batch_size=8, depth=6)
metric_calc = ExtendedHolographicMetric(encoder)

# 2. Mock Model
batch = next(iter(train_loader))
B, D, N = 8, 6, 64
mock_model_output = torch.where(batch['y_mht'] > 0.5, torch.tensor(0.9), torch.tensor(0.1))
mock_model_output += torch.randn_like(mock_model_output) * 0.05
mock_probs = torch.clamp(mock_model_output, 0.01, 0.99)

# 运行评估
metrics = metric_calc.compute_metrics(mock_probs, batch['y_raw'], confidence=0.90)
print("\n=== 基于 Multi-hot 输出的直接评估结果 (Extended) ===")
print(f"MAE: {metrics['MAE']:.4f}")
print(f"分桶回归命中率 (0.2): {metrics['分桶回归命中率_0.2']:.4f}")
print(f"分桶回归命中率 (0.4): {metrics['分桶回归命中率_0.4']:.4f}")
print(f"预测区间命中率: {metrics['预测区间命中率']:.4f}")
print("不同宽度区间命中率:")
for k, v in metrics['不同宽度区间命中率'].items():
    print(f"  {k}: {v}")

数据集加载完毕: Depth=6

=== 基于 Multi-hot 输出的直接评估结果 (Extended) ===
MAE: 0.0234
分桶回归命中率 (0.2): 0.6250
分桶回归命中率 (0.4): 0.8750
预测区间命中率: 0.3750
不同宽度区间命中率:
  Width [0.0, 0.4): 0.3750 (n=8)


  self.y_multi_hots = torch.FloatTensor(self.y_multi_hots)
