# 스쿼트 파운데이션 모델 벤치마크 노트북

이 노트북은 라벨링된 스쿼트 데이터셋으로 CNN, CNN-GRU, ViT 기반 모델을 학습·평가해 성능을 비교하기 위한 베이스라인 워크플로를 제공합니다. 아래 순서를 따라가면서 데이터를 불러오고, 증강을 적용하고, 각 모델을 학습/검증/테스트할 수 있습니다.

## 1. 환경 설정 및 공통 유틸리티

In [None]:
import sys
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Tuple

import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset, Subset
from tqdm.auto import tqdm

# 프로젝트 소스 경로를 PYTHONPATH에 추가
PROJECT_ROOT = Path("..").resolve()
if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

from src.augmentations import (
    add_gaussian_noise,
    compose_transforms,
    random_time_shift,
    random_time_stretch,
    random_time_warp,
    random_scaling,
)
from src.data_loading import (
    DatasetLayout,
    SquatWindowDataset,
    iter_class_counts,
    make_dataloader,
    train_val_test_split,
)
from src.models import TemporalCNNGRU

torch.manual_seed(41)
np.random.seed(41)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

## 2. 데이터셋 구성 및 탐색

In [None]:
# 데이터와 결과 저장 경로 설정
DATA_ROOT = (PROJECT_ROOT / "data").resolve()
RESULTS_DIR = (PROJECT_ROOT / "results" / "foundation_models").resolve()
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

layout = DatasetLayout(DATA_ROOT)
layout

In [None]:
# 증강 파이프라인 정의 및 원본 데이터 로드
train_transforms = compose_transforms(
    [
        random_time_shift(max_shift=5),
        random_scaling(0.9, 1.1),
        random_time_stretch(0.85, 1.2),
        random_time_warp(0.15),
        add_gaussian_noise(0.01),
    ]
)

raw_dataset = SquatWindowDataset(layout.manually_labeled, transforms=None)
len(raw_dataset)


In [None]:
# 클래스별 샘플 분포 확인
counts = list(iter_class_counts(raw_dataset))
pd.DataFrame([(cls.name, count) for cls, count in counts], columns=["class", "count"])


In [None]:
# 학습/검증/테스트 분할 및 데이터로더 생성
train_subset, val_subset, test_subset = train_val_test_split(raw_dataset, ratios=(0.7, 0.15, 0.15), seed=41)

class AugmentedSubset(Dataset):
    def __init__(self, subset: Subset, transform: Optional[Callable[[torch.Tensor], torch.Tensor]] = None) -> None:
        self.subset = subset
        self.transform = transform

    def __len__(self) -> int:
        return len(self.subset)

    def __getitem__(self, index: int):
        window, label = self.subset[index]
        if self.transform is not None:
            window = self.transform(window)
        return window, label

train_dataset = AugmentedSubset(train_subset, transform=train_transforms)
val_dataset = AugmentedSubset(val_subset)
test_dataset = AugmentedSubset(test_subset)

train_loader = make_dataloader(train_dataset, batch_size=64, shuffle=True)
val_loader = make_dataloader(val_dataset, batch_size=64, shuffle=False)
test_loader = make_dataloader(test_dataset, batch_size=64, shuffle=False)

next(iter(train_loader))[0].shape


## 3. 공통 학습 루틴 정의

In [None]:
@dataclass
class TrainingConfig:
    epochs: int = 30
    learning_rate: float = 3e-4
    weight_decay: float = 1e-3
    grad_clip: float = 5.0


def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    optimizer: torch.optim.Optimizer,
    config: "TrainingConfig",
) -> float:
    model.train()
    total_loss, total_samples = 0.0, 0

    for batch, targets in tqdm(loader, desc="train", leave=False):
        batch, targets = batch.to(DEVICE), targets.to(DEVICE)

        optimizer.zero_grad()
        logits = model(batch)
        loss = criterion(logits, targets)
        loss.backward()
        if config.grad_clip:
            nn.utils.clip_grad_norm_(model.parameters(), config.grad_clip)
        optimizer.step()

        total_loss += loss.item() * batch.size(0)
        total_samples += batch.size(0)

    return total_loss / max(total_samples, 1)


@torch.no_grad()
def evaluate(model: nn.Module, loader: DataLoader, criterion: nn.Module) -> Dict[str, float]:
    model.eval()
    total_loss, total_correct, total_samples = 0.0, 0, 0

    for batch, targets in tqdm(loader, desc="eval", leave=False):
        batch, targets = batch.to(DEVICE), targets.to(DEVICE)
        logits = model(batch)
        loss = criterion(logits, targets)

        total_loss += loss.item() * batch.size(0)
        total_correct += (logits.argmax(dim=1) == targets).sum().item()
        total_samples += batch.size(0)

    return {
        "loss": total_loss / max(total_samples, 1),
        "accuracy": total_correct / max(total_samples, 1),
    }


def fit(
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    config: "TrainingConfig",
) -> Tuple[nn.Module, Dict[str, List[float]]]:
    model = model.to(DEVICE)
    optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
    criterion = nn.CrossEntropyLoss()
    history = {"train_loss": [], "val_loss": [], "val_acc": []}
    best_score = 0.0
    best_state = None

    for epoch in range(1, config.epochs + 1):
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, config)
        metrics = evaluate(model, val_loader, criterion)

        history["train_loss"].append(train_loss)
        history["val_loss"].append(metrics["loss"])
        history["val_acc"].append(metrics["accuracy"])

        if metrics["accuracy"] > best_score:
            best_score = metrics["accuracy"]
            best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}

        print(
            f"Epoch {epoch:03d} | train_loss={train_loss:.4f} | val_loss={metrics['loss']:.4f} | val_acc={metrics['accuracy']:.3%}"
        )

    if best_state is not None:
        model.load_state_dict(best_state)

    return model, history


config = TrainingConfig(epochs=30, learning_rate=3e-4, weight_decay=1e-3, grad_clip=5.0)
config


## 4. 모델 정의

기본 CNN, CNN-GRU(TemporalCNNGRU), IMU 전용 ViT를 설정합니다.

In [None]:
class CNNBaseline(nn.Module):
    """간단한 1D CNN 분류기."""

    def __init__(self, in_channels: int, num_classes: int) -> None:
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(in_channels, 64, kernel_size=7, padding=3),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(128, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


class ViTSensor(nn.Module):
    """간단한 시계열 Vision Transformer."""

    def __init__(
        self,
        in_channels: int,
        num_classes: int,
        seq_len: int,
        patch_size: int = 16,
        dim: int = 128,
        depth: int = 4,
        heads: int = 4,
        mlp_ratio: float = 2.0,
        dropout: float = 0.1,
    ) -> None:
        super().__init__()
        assert seq_len % patch_size == 0, "Sequence length must be divisible by patch size."
        self.patch_size = patch_size
        self.num_patches = seq_len // patch_size
        self.patch_dim = in_channels * patch_size

        self.to_patch_embedding = nn.Sequential(
            nn.Unfold(kernel_size=(1, patch_size), stride=(1, patch_size)),
        )
        self.linear_proj = nn.Linear(self.patch_dim, dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches + 1, dim))

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=dim,
            nhead=heads,
            dim_feedforward=int(dim * mlp_ratio),
            dropout=dropout,
            batch_first=True,
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=depth)

        self.mlp_head = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(dim, num_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        b, c, t = x.shape
        x = x.unsqueeze(2)  # (b, c, 1, t)
        patches = self.to_patch_embedding(x)  # (b, c * patch_size, num_patches)
        patches = patches.transpose(1, 2)  # (b, num_patches, patch_dim)
        tokens = self.linear_proj(patches)

        cls_tokens = self.cls_token.repeat(b, 1, 1)
        tokens = torch.cat([cls_tokens, tokens], dim=1)
        tokens = tokens + self.pos_embedding[:, : tokens.size(1), :]

        encoded = self.transformer(tokens)
        return self.mlp_head(encoded[:, 0])

class CNNLSTM(nn.Module):
    """CNN feature extractor followed by an LSTM head."""

    def __init__(
        self,
        in_channels: int,
        num_classes: int,
        conv_channels: int = 64,
        hidden_size: int = 128,
        num_layers: int = 1,
        dropout: float = 0.1,
    ) -> None:
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels, conv_channels, kernel_size=7, padding=3),
            nn.BatchNorm1d(conv_channels),
            nn.ReLU(inplace=True),
            nn.Conv1d(conv_channels, conv_channels, kernel_size=5, padding=2),
            nn.BatchNorm1d(conv_channels),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
        )
        self.lstm = nn.LSTM(
            input_size=conv_channels,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0.0,
        )
        self.head = nn.Sequential(
            nn.LayerNorm(hidden_size * 2),
            nn.Linear(hidden_size * 2, hidden_size),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, num_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        features = self.encoder(x)
        sequence = features.transpose(1, 2)
        output, _ = self.lstm(sequence)
        pooled = output.mean(dim=1)
        return self.head(pooled)


## 5. 학습 및 검증

모델별로 학습을 수행하고, 검증 성능을 비교합니다.

In [None]:
def infer_input_shape(loader: DataLoader) -> Tuple[int, int]:
    sample, _ = next(iter(loader))
    return sample.size(1), sample.size(2)


in_channels, seq_len = infer_input_shape(train_loader)
num_classes = 5

model_factories: Dict[str, Callable[[], nn.Module]] = {
    "CNNBaseline": lambda: CNNBaseline(in_channels, num_classes),
    "CNNLSTM": lambda: CNNLSTM(in_channels, num_classes),
    "TemporalCNNGRU": lambda: TemporalCNNGRU(in_channels=in_channels, num_classes=num_classes),
    "ViTSensor": lambda: ViTSensor(in_channels=in_channels, num_classes=num_classes, seq_len=seq_len, patch_size=16),
}

histories: Dict[str, Dict[str, List[float]]] = {}
val_metrics: Dict[str, Dict[str, float]] = {}
trained_models: Dict[str, nn.Module] = {}

for name, factory in model_factories.items():
    print(f"\n=== Training: {name} ===")
    model = factory()
    model, history = fit(model, train_loader, val_loader, config)
    metrics = evaluate(model, val_loader, nn.CrossEntropyLoss())

    histories[name] = history
    val_metrics[name] = metrics
    trained_models[name] = model

pd.DataFrame(val_metrics).T

## 6. 테스트 세트 평가

In [None]:
test_results = {}
criterion = nn.CrossEntropyLoss()

for name, model in trained_models.items():
    metrics = evaluate(model, test_loader, criterion)
    test_results[name] = metrics
    print(f"{name} | test_loss={metrics['loss']:.4f} | test_acc={metrics['accuracy']:.3%}")

test_df = pd.DataFrame(test_results).T
test_df

## 7. 결과 저장 및 시각화

In [None]:
summary = {
    "val_metrics": val_metrics,
    "test_metrics": test_results,
    "config": config.__dict__,
}

summary_path = RESULTS_DIR / "foundation_model_results.json"
with summary_path.open("w", encoding="utf-8") as f:
    json.dump(summary, f, indent=2)

summary_path

필요에 따라 `histories`를 활용해 학습 곡선을 시각화하거나, 혼동 행렬과 같은 추가 분석을 이어가세요.