In [1]:
import os
import random
import math
import copy
import json
from dataclasses import dataclass
from typing import Tuple, Dict

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

In [2]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

@dataclass
class Config:
    """실험 설정"""
    mode: str = "self_supervised_transition"
    data_dir: str = "/content/drive/MyDrive/Colab Notebooks/UCI-HAR/data"
    save_dir: str = "/content/drive/MyDrive/Colab Notebooks/UCI-HAR/ssl_transition"

    # 학습 파라미터
    epochs: int = 30
    batch_size: int = 128
    lr: float = 3e-4
    weight_decay: float = 1e-4
    grad_clip: float = 1.0
    label_smoothing: float = 0.05

    # Augmentation 파라미터
    train_augment_prob: float = 0.4  # 전이 생성 확률 증가
    train_augment_mix: float = 0.35

    # 모델 파라미터
    d_model: int = 128
    use_hyperbolic: bool = False
    hyperbolic_c: float = 1.0

    # SSL 파라미터
    ssl_weight: float = 0.3  # 전이 탐지 손실 가중치
    consistency_weight: float = 0.2  # 시간적 일관성 손실 가중치
    transition_threshold: float = 0.5  # 전이 탐지 임계값

    # 시스템 파라미터
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    num_workers: int = 2

In [3]:
INERTIAL_SIGNALS_FOLDER = "Inertial Signals"
RAW_CHANNELS = [
    ("total_acc_x_", "txt"), ("total_acc_y_", "txt"), ("total_acc_z_", "txt"),
    ("body_acc_x_", "txt"), ("body_acc_y_", "txt"), ("body_acc_z_", "txt"),
    ("body_gyro_x_", "txt"), ("body_gyro_y_", "txt"), ("body_gyro_z_", "txt"),
]
_LABEL_MAP = {1:"WALKING", 2:"WALKING_UPSTAIRS", 3:"WALKING_DOWNSTAIRS", 4:"SITTING", 5:"STANDING", 6:"LAYING"}
_CODE_TO_LABEL_NAME = {i-1: _LABEL_MAP[i] for i in _LABEL_MAP}
LABEL_NAME_TO_CODE = {v: k for k, v in _CODE_TO_LABEL_NAME.items()}

In [4]:
def load_split_raw(root: str, split: str) -> Tuple[np.ndarray, np.ndarray]:
    assert split in ("train", "test")
    inertial_path = os.path.join(root, split, INERTIAL_SIGNALS_FOLDER)

    X_list = [
        np.loadtxt(os.path.join(inertial_path, p + split + "." + e))[..., None]
        for p, e in RAW_CHANNELS
    ]

    X = np.concatenate(X_list, axis=-1).transpose(0, 2, 1)
    y = np.loadtxt(os.path.join(root, split, f"y_{split}.txt")).astype(int)

    # ⭐️ 중요: UCI-HAR 라벨은 1~6이므로, 0~5로 변환
    y = y - 1

    return X, y

In [5]:
class UCIHARInertial(Dataset):
    def __init__(self, root: str, split: str, mean=None, std=None,
                 preloaded_data: Tuple[np.ndarray, np.ndarray] | None = None):
        super().__init__()

        if preloaded_data is not None:
            X, y = preloaded_data
        else:
            X, y = load_split_raw(root, split)

        self.X = X.astype(np.float32)
        self.y = (y - 1).astype(np.int64) if y.min() >= 1 else y.astype(np.int64)

        if mean is not None and std is not None:
            self.mean, self.std = mean, std
        else:
            self.mean = self.X.mean(axis=(0,2), keepdims=True)
            self.std = self.X.std(axis=(0,2), keepdims=True) + 1e-6

        if preloaded_data is None:
             self.X = (self.X - self.mean) / self.std

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return (
            torch.from_numpy(self.X[idx]),
            torch.tensor(self.y[idx], dtype=torch.long)
        )


In [6]:
def create_transition_batch(x: torch.Tensor, y: torch.Tensor, mix_ratio: float) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    전이 구간을 생성하고 ground truth 레이블 반환

    Returns:
        x_aug: 전이가 포함된 데이터
        transition_labels: 각 timestep별 전이 여부 (B, T)
    """
    B, C, T = x.shape
    device = x.device

    x_aug = x.clone()
    transition_labels = torch.zeros(B, T, dtype=torch.float32, device=device)
    mix_pts = int(T * mix_ratio)

    for i in range(B):
        if random.random() < 0.5:  # 50% 확률로 전이 생성
            other_class_indices = (y != y[i]).nonzero(as_tuple=True)[0]
            if len(other_class_indices) > 0:
                j = other_class_indices[random.randint(0, len(other_class_indices)-1)]
                x_aug[i, :, -mix_pts:] = x[j, :, :mix_pts].clone()
                transition_labels[i, -mix_pts:] = 1.0  # 전이 구간 표시

    return x_aug, transition_labels

In [7]:
class ConvBNAct(nn.Module):
    def __init__(self, c_in, c_out, k, s=1, p=None, g=1):
        super().__init__()
        self.c = nn.Conv1d(c_in, c_out, k, s, k//2 if p is None else p, groups=g, bias=False)
        self.bn = nn.BatchNorm1d(c_out)
        self.act = nn.GELU()

    def forward(self, x):
        return self.act(self.bn(self.c(x)))

class MultiPathCNN(nn.Module):
    def __init__(self, in_ch=9, d_model=128, branches=(3,5,9,15), stride=2):
        super().__init__()
        h = d_model // 2
        self.pre = ConvBNAct(in_ch, h, 1)
        self.branches = nn.ModuleList([
            nn.Sequential(ConvBNAct(h, h, k, stride, g=h), ConvBNAct(h, h, 1))
            for k in branches
        ])
        self.post = ConvBNAct(len(branches)*h, d_model, 1)
        self.stride = stride

    def forward(self, x):
        x_pre = self.pre(x)
        x_branches = [b(x_pre) for b in self.branches]
        return self.post(torch.cat(x_branches, dim=1))

In [8]:
class CNN_LSTM(nn.Module):
    def __init__(self, n_features=9, d_model=128, num_layers=1):
        super().__init__()

        self.stride = 2

        self.cnn_extractor = nn.Sequential(
            nn.Conv1d(n_features, 32, kernel_size=3, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),

            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2),

            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),

            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),

            nn.Conv1d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU()
        )

        self.lstm = nn.LSTM(
            input_size=512,
            hidden_size=d_model,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=False
        )

    def forward(self, x):
        x = self.cnn_extractor(x)
        x = x.permute(0, 2, 1)

        # 마지막 은닉 상태(hn) 대신, 모든 시간 단계의 출력(lstm_out)을 받습니다.
        lstm_out, _ = self.lstm(x) # lstm_out shape: (B, T_down, d_model)

        # Head가 기대하는 (B, d_model, T_down) 형태로 축을 다시 바꿔줍니다.
        fmap = lstm_out.permute(0, 2, 1)

        return fmap

In [9]:
class TransitionDetectionHead(nn.Module):
    """전이 구간 탐지를 위한 Self-Supervised Head"""
    def __init__(self, d_model: int):
        super().__init__()
        self.conv1 = ConvBNAct(d_model, d_model // 2, 3)
        self.conv2 = nn.Conv1d(d_model // 2, 1, 1)  # Binary classification per timestep

    def forward(self, fmap):
        """
        Args:
            fmap: (B, d_model, T_down)
        Returns:
            transition_logits: (B, T_down)
        """
        h = self.conv1(fmap)
        logits = self.conv2(h).squeeze(1)  # (B, T_down)
        return logits

In [10]:
class ClassificationHead(nn.Module):
    """행동 분류를 위한 Main Head"""
    def __init__(self, d_model: int, num_classes: int):
        super().__init__()
        self.gap = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, fmap):
        pooled = self.gap(fmap).squeeze(-1)
        logits = self.fc(pooled)
        return logits

In [11]:
class HyperbolicProjection(nn.Module):
    def __init__(self, c=1.0):
        super().__init__()
        self.c = c

    def forward(self, x):
        norm = torch.clamp(torch.norm(x, dim=-1, keepdim=True), min=1e-8)
        max_norm = (1.0 / math.sqrt(self.c)) - 1e-4
        scale = torch.clamp(norm, max=max_norm) / norm
        return x * scale

In [12]:
class HyperbolicClassificationHead(nn.Module):
    """쌍곡 공간 기반 분류 Head"""
    def __init__(self, d_model: int, num_classes: int, c: float = 1.0):
        super().__init__()
        self.c = c
        self.gap = nn.AdaptiveAvgPool1d(1)
        self.pre_proj = nn.Linear(d_model, d_model)
        self.hyperbolic_proj = HyperbolicProjection(c=c)
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, fmap):
        pooled = self.gap(fmap).squeeze(-1)
        h = self.pre_proj(pooled)
        h_hyp = self.hyperbolic_proj(h)
        logits = self.fc(h_hyp)
        return logits

In [13]:
class SSLHARModel(nn.Module):
    """Self-Supervised Learning HAR Model"""
    def __init__(self, d_model=128, num_classes=6, use_hyperbolic=False, hyperbolic_c=1.0):
        super().__init__()
        self.backbone = CNN_LSTM(d_model=d_model)

        # Main task: 행동 분류
        if use_hyperbolic:
            self.classification_head = HyperbolicClassificationHead(d_model, num_classes, c=hyperbolic_c)
        else:
            self.classification_head = ClassificationHead(d_model, num_classes)

        # Pretext task: 전이 탐지
        self.transition_head = TransitionDetectionHead(d_model)

        self.stride = self.backbone.stride

    def forward(self, x, return_features=False):
        """
        Args:
            x: (B, C, T)
            return_features: True면 intermediate features 반환
        Returns:
            class_logits: (B, num_classes)
            transition_logits: (B, T_down)
            features: (B, d_model, T_down) if return_features
        """
        fmap = self.backbone(x)  # (B, d_model, T_down)

        class_logits = self.classification_head(fmap)
        transition_logits = self.transition_head(fmap)

        if return_features:
            return class_logits, transition_logits, fmap
        return class_logits, transition_logits

In [14]:
def temporal_consistency_loss(features: torch.Tensor, transition_probs: torch.Tensor) -> torch.Tensor:
    """
    시간적 일관성 손실: 비전이 구간에서 feature가 일관되도록 유도

    Args:
        features: (B, d_model, T)
        transition_probs: (B, T) - sigmoid된 전이 확률
    """
    B, D, T = features.shape

    # 비전이 구간 가중치 (전이 확률이 낮을수록 높은 가중치)
    consistency_weights = 1.0 - transition_probs.unsqueeze(1)  # (B, 1, T)

    # 인접 timestep 간 차이
    diff = features[:, :, 1:] - features[:, :, :-1]  # (B, D, T-1)
    diff_norm = torch.norm(diff, dim=1)  # (B, T-1)

    # 비전이 구간에서만 일관성 요구
    weighted_diff = diff_norm * consistency_weights[:, 0, :-1]

    return weighted_diff.mean()

In [15]:
def transition_detection_loss(pred_logits: torch.Tensor, target_labels: torch.Tensor) -> torch.Tensor:
    """
    전이 탐지 손실 (Binary Cross Entropy)

    Args:
        pred_logits: (B, T_down)
        target_labels: (B, T_orig) - downsampling 필요
    """
    B, T_down = pred_logits.shape
    T_orig = target_labels.shape[1]
    stride = T_orig // T_down

    # Target을 downsampling (max pooling: 하나라도 전이면 전이로 간주)
    target_down = F.max_pool1d(
        target_labels.unsqueeze(1),
        kernel_size=stride,
        stride=stride
    ).squeeze(1)  # (B, T_down)

    # BCE Loss with logits
    loss = F.binary_cross_entropy_with_logits(pred_logits, target_down)
    return loss

In [16]:
def train_one_epoch_ssl(model: SSLHARModel, loader: DataLoader, opt: torch.optim.Optimizer, cfg: Config):
    model.train()
    total_loss, total_cls_loss, total_ssl_loss, total_cons_loss = 0.0, 0.0, 0.0, 0.0
    total_correct, total_samples = 0, 0

    for x, y in loader:
        x, y = x.to(cfg.device), y.to(cfg.device)

        # Self-supervised augmentation
        if random.random() < cfg.train_augment_prob:
            x_aug, transition_gt = create_transition_batch(x, y, cfg.train_augment_mix)
        else:
            x_aug = x
            transition_gt = torch.zeros(x.shape[0], x.shape[2], device=cfg.device)

        opt.zero_grad(set_to_none=True)

        # Forward pass
        class_logits, transition_logits, features = model(x_aug, return_features=True)

        # Loss 계산
        # 1. Main task: 행동 분류
        cls_loss = F.cross_entropy(class_logits, y, label_smoothing=cfg.label_smoothing)

        # 2. Pretext task: 전이 탐지
        ssl_loss = transition_detection_loss(transition_logits, transition_gt)

        # 3. Temporal consistency
        transition_probs = torch.sigmoid(transition_logits)
        cons_loss = temporal_consistency_loss(features, transition_probs)

        # Total loss
        loss = cls_loss + cfg.ssl_weight * ssl_loss + cfg.consistency_weight * cons_loss

        if torch.isnan(loss):
            continue

        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), cfg.grad_clip)
        opt.step()

        # Metrics
        pred = class_logits.argmax(dim=-1)
        total_correct += (pred == y).sum().item()
        total_loss += loss.item() * y.size(0)
        total_cls_loss += cls_loss.item() * y.size(0)
        total_ssl_loss += ssl_loss.item() * y.size(0)
        total_cons_loss += cons_loss.item() * y.size(0)
        total_samples += y.size(0)

    return {
        "loss": total_loss / total_samples,
        "cls_loss": total_cls_loss / total_samples,
        "ssl_loss": total_ssl_loss / total_samples,
        "cons_loss": total_cons_loss / total_samples,
        "acc": total_correct / total_samples
    }

In [17]:
@torch.no_grad()
def evaluate_ssl(model: SSLHARModel, loader: DataLoader, cfg: Config):
    model.eval()
    y_true, y_pred = [], []
    transition_probs_all = []

    for x, y in loader:
        x, y = x.to(cfg.device), y.to(cfg.device)
        class_logits, transition_logits = model(x)

        y_pred.append(class_logits.argmax(dim=-1).cpu().numpy())
        y_true.append(y.cpu().numpy())
        transition_probs_all.append(torch.sigmoid(transition_logits).cpu().numpy())

    y_true = np.concatenate(y_true)
    y_pred = np.concatenate(y_pred)
    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='macro')

    # 전이 탐지 능력 측정 (평균 confidence)
    avg_transition_conf = np.concatenate(transition_probs_all).mean()

    return acc, f1, avg_transition_conf

In [18]:
def create_transitional_test_set_with_gt(
    orig_dataset: UCIHARInertial, class_A: str, class_B: str, p: float, mix: float
) -> Tuple[UCIHARInertial, np.ndarray, dict]:
    """
    전이 테스트셋 생성 + ground truth 반환

    Returns:
        mod_dataset: 전이가 포함된 데이터셋
        transition_gt: (N, T) 전이 구간 ground truth
        info: 통계 정보
    """
    X, y = orig_dataset.X.copy(), orig_dataset.y.copy()
    N, C, T = X.shape
    transition_gt = np.zeros((N, T), dtype=np.float32)

    code_A, code_B = LABEL_NAME_TO_CODE[class_A], LABEL_NAME_TO_CODE[class_B]
    idx_A, idx_B = np.where(y == code_A)[0], np.where(y == code_B)[0]
    mix_pts = int(T * mix)

    # A → B 전이
    targets_A = np.random.choice(idx_A, max(1, int(len(idx_A) * p)), replace=False)
    sources_B = np.random.choice(idx_B, len(targets_A), replace=True)
    for t, s in zip(targets_A, sources_B):
        X[t, :, -mix_pts:] = orig_dataset.X[s, :, :mix_pts]
        transition_gt[t, -mix_pts:] = 1.0

    # B → A 전이
    targets_B = np.random.choice(idx_B, max(1, int(len(idx_B) * p)), replace=False)
    sources_A = np.random.choice(idx_A, len(targets_B), replace=True)
    for t, s in zip(targets_B, sources_A):
        X[t, :, -mix_pts:] = orig_dataset.X[s, :, :mix_pts]
        transition_gt[t, -mix_pts:] = 1.0

    mod_dataset = UCIHARInertial(
        root="", split="test", mean=orig_dataset.mean, std=orig_dataset.std,
        preloaded_data=(X, y)
    )

    info = {
        'modified_samples': len(targets_A) + len(targets_B),
        'modified_ratio': (len(targets_A) + len(targets_B)) / N,
    }
    return mod_dataset, transition_gt, info

In [19]:
@torch.no_grad()
def evaluate_transition_detection(
    model: SSLHARModel, dataset: UCIHARInertial, transition_gt: np.ndarray, cfg: Config
) -> Dict[str, float]:
    """전이 탐지 성능 평가"""
    model.eval()
    loader = DataLoader(dataset, cfg.batch_size, num_workers=cfg.num_workers)

    all_probs, all_labels = [], []

    for i, (x, y) in enumerate(loader):
        x = x.to(cfg.device)
        _, transition_logits = model(x)
        transition_probs = torch.sigmoid(transition_logits).cpu().numpy()  # (B, T_down)

        # Ground truth downsampling
        batch_start = i * cfg.batch_size
        batch_end = min(batch_start + cfg.batch_size, len(dataset))
        gt_batch = transition_gt[batch_start:batch_end]

        T_down = transition_probs.shape[1]
        stride = gt_batch.shape[1] // T_down
        gt_down = gt_batch.reshape(gt_batch.shape[0], T_down, stride).max(axis=2)

        all_probs.append(transition_probs)
        all_labels.append(gt_down)

    all_probs = np.concatenate(all_probs).flatten()
    all_labels = np.concatenate(all_labels).flatten()

    # AUC-ROC 계산
    auc = roc_auc_score(all_labels, all_probs)

    # Binary accuracy (threshold=0.5)
    pred_binary = (all_probs > cfg.transition_threshold).astype(int)
    acc = accuracy_score(all_labels, pred_binary)

    # Precision, Recall
    tp = ((pred_binary == 1) & (all_labels == 1)).sum()
    fp = ((pred_binary == 1) & (all_labels == 0)).sum()
    fn = ((pred_binary == 0) & (all_labels == 1)).sum()

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {
        'auc': auc,
        'acc': acc,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

In [20]:
class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NumpyEncoder, self).default(obj)

In [21]:
def run_ssl_experiment(cfg: Config):
    os.makedirs(cfg.save_dir, exist_ok=True)

    # 데이터셋 로드
    train_set = UCIHARInertial(cfg.data_dir, "train")
    test_set_orig = UCIHARInertial(cfg.data_dir, "test", mean=train_set.mean, std=train_set.std)

    # 전이 테스트 시나리오
    scenarios = [
        ("STANDING", "SITTING", 0.60, 0.50),
        ("STANDING", "SITTING", 0.70, 0.55),
        ("WALKING", "WALKING_UPSTAIRS", 0.65, 0.52),
        ("SITTING", "LAYING", 0.75, 0.58),
    ]

    print("\n" + "="*70)
    print("    🔬 TRANSITIONAL TEST SETS 생성 (Ground Truth 포함)")
    print("="*70)

    transition_test_data = []
    for clsA, clsB, p, mix in scenarios:
        test_set_mod, transition_gt, info = create_transitional_test_set_with_gt(
            test_set_orig, clsA, clsB, p=p, mix=mix
        )
        transition_test_data.append((test_set_mod, transition_gt, info))
        print(f"   - {clsA}↔{clsB} (p={p:.2f}, mix={mix:.2f}): {info['modified_samples']}개 샘플 변형")

    # 비교 모델
    ssl_configs = [
        {"name": "SSL_Euclidean", "use_hyperbolic": False},
        {"name": "SSL_Hyperbolic", "use_hyperbolic": True},
    ]

    results_table = []

    for ssl_cfg in ssl_configs:
        print(f"\n{'='*70}\n   실험: {ssl_cfg['name']}\n{'='*70}")

        random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)

        cfg.use_hyperbolic = ssl_cfg["use_hyperbolic"]
        train_loader = DataLoader(train_set, cfg.batch_size, shuffle=True, num_workers=cfg.num_workers)
        test_loader_orig = DataLoader(test_set_orig, cfg.batch_size, num_workers=cfg.num_workers)

        model = SSLHARModel(
            d_model=cfg.d_model, use_hyperbolic=cfg.use_hyperbolic, hyperbolic_c=cfg.hyperbolic_c
        ).to(cfg.device)

        opt = torch.optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)

        best_acc, best_wts = 0.0, None
        print(f"Training {ssl_cfg['name']} for {cfg.epochs} epochs...")

        for epoch in range(1, cfg.epochs + 1):
            stats = train_one_epoch_ssl(model, train_loader, opt, cfg)
            te_acc, te_f1, te_trans_conf = evaluate_ssl(model, test_loader_orig, cfg)

            if te_acc > best_acc:
                best_acc = te_acc
                best_wts = copy.deepcopy(model.state_dict())

            if epoch % 5 == 0 or epoch == 1:
                print(f"[{epoch:02d}/{cfg.epochs}] Train L:{stats['loss']:.4f} (Cls:{stats['cls_loss']:.4f} "
                      f"SSL:{stats['ssl_loss']:.4f} Cons:{stats['cons_loss']:.4f}) A:{stats['acc']:.4f} | "
                      f"Test A:{te_acc:.4f} F1:{te_f1:.4f} TransConf:{te_trans_conf:.4f}")

        if best_wts:
            model.load_state_dict(best_wts)
        print(f"✅ Best Original Test Acc: {best_acc:.4f}")

        # 원본 테스트셋 평가
        acc_orig, f1_orig, _ = evaluate_ssl(model, test_loader_orig, cfg)

        # 전이 테스트셋 평가
        transition_results = []
        print("\n   🔍 전이 테스트셋 평가 (Self-Supervised Detection)...")

        for i, (test_set_mod, transition_gt, info) in enumerate(transition_test_data):
            test_loader_mod = DataLoader(test_set_mod, cfg.batch_size, num_workers=cfg.num_workers)

            # 분류 성능
            acc_trans, f1_trans, _ = evaluate_ssl(model, test_loader_mod, cfg)
            drop = acc_orig - acc_trans

            # 전이 탐지 성능
            detection_metrics = evaluate_transition_detection(model, test_set_mod, transition_gt, cfg)

            transition_results.append({
                'scenario': i+1,
                'class_acc': acc_trans,
                'class_drop': drop,
                'detection_auc': detection_metrics['auc'],
                'detection_acc': detection_metrics['acc'],
                'detection_f1': detection_metrics['f1'],
                'detection_precision': detection_metrics['precision'],
                'detection_recall': detection_metrics['recall']
            })

            print(f"     - Scenario {i+1}: ClassAcc={acc_trans:.4f} (Drop={drop:.4f}) | "
                  f"DetAUC={detection_metrics['auc']:.4f} DetF1={detection_metrics['f1']:.4f}")

        # 평균 성능 계산
        avg_trans_acc = np.mean([r['class_acc'] for r in transition_results])
        avg_drop = acc_orig - avg_trans_acc
        retention = (1 - avg_drop / acc_orig) * 100 if acc_orig > 0 else 0
        avg_det_auc = np.mean([r['detection_auc'] for r in transition_results])
        avg_det_f1 = np.mean([r['detection_f1'] for r in transition_results])

        results_table.append({
            "config": ssl_cfg["name"],
            "orig_acc": acc_orig,
            "orig_f1": f1_orig,
            "avg_trans_acc": avg_trans_acc,
            "avg_drop": avg_drop,
            "retention": retention,
            "avg_detection_auc": avg_det_auc,
            "avg_detection_f1": avg_det_f1,
            "transition_results": transition_results
        })

    # 최종 결과 요약
    print(f"\n{'='*70}\n   SELF-SUPERVISED LEARNING 실험 결과\n{'='*70}")
    print(f"{'Config':<20} {'Orig Acc':<10} {'Trans Acc':<11} {'Drop':<10} {'Retention':<12} {'Det AUC':<10} {'Det F1':<10}")
    print("-" * 95)

    for r in results_table:
        print(f"{r['config']:<20} {r['orig_acc']:.4f}     {r['avg_trans_acc']:.4f}      "
              f"{r['avg_drop']:.4f}   {r['retention']:.2f}%      {r['avg_detection_auc']:.4f}    {r['avg_detection_f1']:.4f}")

    # 상세 비교 분석
    if len(results_table) > 1:
        euclidean, hyperbolic = results_table[0], results_table[1]

        print("\n" + "-"*95)
        print("📊 상세 비교 분석")
        print("-" * 95)

        # 분류 성능 비교
        drop_improve = ((euclidean['avg_drop'] - hyperbolic['avg_drop']) / euclidean['avg_drop'] * 100) \
                       if euclidean['avg_drop'] > 0 else 0
        print(f"\n1️⃣  분류 강건성:")
        print(f"   - Drop 감소율: {drop_improve:+.2f}%")
        print(f"   - Retention 이득: {hyperbolic['retention'] - euclidean['retention']:+.2f}pp")

        # 전이 탐지 성능 비교
        det_auc_improve = (hyperbolic['avg_detection_auc'] - euclidean['avg_detection_auc']) * 100
        det_f1_improve = (hyperbolic['avg_detection_f1'] - euclidean['avg_detection_f1']) * 100
        print(f"\n2️⃣  전이 탐지 성능:")
        print(f"   - AUC 개선: {det_auc_improve:+.2f}pp")
        print(f"   - F1 개선: {det_f1_improve:+.2f}pp")

        # 시나리오별 상세 비교
        print(f"\n3️⃣  시나리오별 전이 탐지 성능:")
        print(f"   {'Scenario':<12} {'Euclidean AUC':<16} {'Hyperbolic AUC':<16} {'개선':<10}")
        print("   " + "-"*60)
        for i in range(len(euclidean['transition_results'])):
            euc_auc = euclidean['transition_results'][i]['detection_auc']
            hyp_auc = hyperbolic['transition_results'][i]['detection_auc']
            improve = (hyp_auc - euc_auc) * 100
            print(f"   Scenario {i+1:<4} {euc_auc:.4f}           {hyp_auc:.4f}           {improve:+.2f}pp")

        # 해석
        print("\n" + "-"*95)
        print("🧠 결과 해석:")
        print("-" * 95)

        if drop_improve > 20 and det_auc_improve > 5:
            print("✅ 쌍곡 공간이 Self-Supervised Learning에서 탁월한 성능 발휘:")
            print("   1. 전이 구간을 더 정확하게 탐지 (높은 AUC)")
            print("   2. 탐지된 정보를 활용해 분류 강건성 크게 향상")
            print("   3. 쌍곡 기하학이 implicit + explicit robustness 모두 제공")
        elif drop_improve > 10 or det_auc_improve > 3:
            print("⚠️  쌍곡 공간이 적당한 개선 제공:")
            print("   - 전이 탐지 또는 분류 강건성 중 하나에서 우수")
            print("   - SSL 파라미터 튜닝으로 추가 개선 가능")
        else:
            print("❌ 제한적 개선:")
            print("   - Euclidean 공간도 SSL로 충분한 강건성 확보")
            print("   - 쌍곡 공간의 이점이 SSL 설정에서 두드러지지 않음")

        print("\n💡 핵심 인사이트:")
        if hyperbolic['avg_detection_auc'] > 0.75:
            print(f"   ✓ 쌍곡 모델의 전이 탐지 AUC {hyperbolic['avg_detection_auc']:.3f}로 신뢰할만한 수준")
            print("   ✓ Self-Supervised Head가 전이 패턴을 효과적으로 학습")
        if hyperbolic['retention'] > 85:
            print(f"   ✓ Retention {hyperbolic['retention']:.1f}%로 실용적 수준의 강건성 달성")

        print("\n📈 추가 실험 제안:")
        print("   1. SSL 가중치(ssl_weight, consistency_weight) 최적화")
        print("   2. 더 다양한 전이 패턴(점진적 전이, 다중 전이 등) 테스트")
        print("   3. Attention 메커니즘 추가로 전이 구간 집중 학습")
        print("   4. 준지도 학습(Semi-supervised) 설정 실험")

    # 결과 저장
    with open(os.path.join(cfg.save_dir, "ssl_results.json"), "w") as f:
        json.dump(results_table, f, indent=2, cls=NumpyEncoder)
    print(f"\n✅ Results saved to '{cfg.save_dir}/ssl_results.json'")

    # 시각화를 위한 추가 정보 저장
    visualization_data = {
        'configs': [r['config'] for r in results_table],
        'orig_acc': [r['orig_acc'] for r in results_table],
        'trans_acc': [r['avg_trans_acc'] for r in results_table],
        'detection_auc': [r['avg_detection_auc'] for r in results_table],
        'detection_f1': [r['avg_detection_f1'] for r in results_table],
        'retention': [r['retention'] for r in results_table]
    }

    with open(os.path.join(cfg.save_dir, "ssl_visualization_data.json"), "w") as f:
        json.dump(visualization_data, f, indent=2, cls=NumpyEncoder)

In [22]:
if __name__ == "__main__":
    config = Config()

    print("\n" + "="*70)
    print("   🧪 UCI-HAR Self-Supervised Learning with Transition Detection")
    print("="*70)
    print("   핵심 아이디어:")
    print("   1. 전이 구간을 mask로 표시하지 않음")
    print("   2. Multipath CNN이 자기주도학습으로 전이 구간 탐지")
    print("   3. Main Task: 행동 분류 / Pretext Task: 전이 탐지")
    print("   4. Temporal Consistency Loss로 암시적 강건성 학습")
    print("="*70)
    print(f"Device: {config.device}")
    print(f"Epochs: {config.epochs}, LR: {config.lr}")
    print(f"SSL Weight: {config.ssl_weight}, Consistency Weight: {config.consistency_weight}")
    print(f"Transition Augment Prob: {config.train_augment_prob}")
    print("="*70 + "\n")

    run_ssl_experiment(config)


   🧪 UCI-HAR Self-Supervised Learning with Transition Detection
   핵심 아이디어:
   1. 전이 구간을 mask로 표시하지 않음
   2. Multipath CNN이 자기주도학습으로 전이 구간 탐지
   3. Main Task: 행동 분류 / Pretext Task: 전이 탐지
   4. Temporal Consistency Loss로 암시적 강건성 학습
Device: cuda
Epochs: 30, LR: 0.0003
SSL Weight: 0.3, Consistency Weight: 0.2
Transition Augment Prob: 0.4


    🔬 TRANSITIONAL TEST SETS 생성 (Ground Truth 포함)
   - STANDING↔SITTING (p=0.60, mix=0.50): 613개 샘플 변형
   - STANDING↔SITTING (p=0.70, mix=0.55): 715개 샘플 변형
   - WALKING↔WALKING_UPSTAIRS (p=0.65, mix=0.52): 628개 샘플 변형
   - SITTING↔LAYING (p=0.75, mix=0.58): 770개 샘플 변형

   실험: SSL_Euclidean
Training SSL_Euclidean for 30 epochs...
[01/30] Train L:0.9212 (Cls:0.6860 SSL:0.5784 Cons:0.3082) A:0.8723 | Test A:0.8860 F1:0.8861 TransConf:0.3167
[05/30] Train L:0.4172 (Cls:0.3394 SSL:0.1730 Cons:0.1296) A:0.9539 | Test A:0.9209 F1:0.9218 TransConf:0.1326
[10/30] Train L:0.3694 (Cls:0.3273 SSL:0.0742 Cons:0.0995) A:0.9574 | Test A:0.9260 F1:0.9266 TransConf:0.04