In [1]:
import os
import time
import copy
import torch
import random
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score

# ------------------------------------------------------------------------------
# 0. Utils & Setup
# ------------------------------------------------------------------------------
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    os.environ['PYTHONHASHSEED'] = str(seed)

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

# ------------------------------------------------------------------------------
# 1. Dataset (UCI-HAR) - 기존과 동일
# ------------------------------------------------------------------------------
class UCIHARDataset(Dataset):
    def __init__(self, data_path, split='train'):
        self.split = split
        if split == 'train':
            y = np.loadtxt(os.path.join(data_path, 'train', 'y_train.txt'))
            signal_path = os.path.join(data_path, 'train', 'Inertial Signals')
        else:
            y = np.loadtxt(os.path.join(data_path, 'test', 'y_test.txt'))
            signal_path = os.path.join(data_path, 'test', 'Inertial Signals')

        signals = []
        signal_files = [
            'body_acc_x', 'body_acc_y', 'body_acc_z',
            'body_gyro_x', 'body_gyro_y', 'body_gyro_z',
            'total_acc_x', 'total_acc_y', 'total_acc_z'
        ]

        for signal_file in signal_files:
            filename = os.path.join(signal_path, f'{signal_file}_{split}.txt')
            signal_data = np.loadtxt(filename)
            signals.append(signal_data)

        self.X = np.stack(signals, axis=-1).astype(np.float32)
        self.y = (y - 1).astype(np.int64)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return torch.FloatTensor(self.X[idx]), torch.LongTensor([self.y[idx]])[0]

# ------------------------------------------------------------------------------
# 2. Baseline Model Components
# ------------------------------------------------------------------------------
# ASF-DCL과 공정한 비교를 위해 동일한 Encoder 구조 사용
class LatentEncoder(nn.Module):
    def __init__(self, input_channels=9, latent_dim=64):
        super().__init__()
        self.conv1 = nn.Conv1d(input_channels, 32, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(32)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(64)
        self.conv3 = nn.Conv1d(64, latent_dim, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(latent_dim)

    def forward(self, x):
        x = x.transpose(1, 2)
        h = F.relu(self.bn1(self.conv1(x)))
        h = F.relu(self.bn2(self.conv2(h)))
        s = F.relu(self.bn3(self.conv3(h)))
        s = s.transpose(1, 2)
        return s

# Baseline Model: Encoder + Global Average Pooling + Classifier
class StandardCNN(nn.Module):
    def __init__(self, input_channels=9, latent_dim=64, num_classes=6, hidden_dim=64):
        super().__init__()
        self.latent_encoder = LatentEncoder(input_channels, latent_dim)

        # Flow 모듈 없이 바로 분류 (일반적인 CNN 구조)
        self.classifier = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        # 1. Encode: [Batch, Time, Dim]
        s = self.latent_encoder(x)

        # 2. Global Average Pooling (Time 축 평균)
        s_pool = torch.mean(s, dim=1)

        # 3. Classify
        logits = self.classifier(s_pool)
        return logits

# ------------------------------------------------------------------------------
# 3. Train & Evaluate Functions (Baseline용)
# ------------------------------------------------------------------------------
def train_epoch(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []

    for x, y in dataloader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        logits = model(x)
        loss = F.cross_entropy(logits, y, label_smoothing=0.05)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.detach().cpu().numpy())
        all_labels.extend(y.detach().cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    f1 = f1_score(all_labels, all_preds, average='macro')
    return avg_loss, f1

def evaluate_with_noise(model, dataloader, device, sigma):
    """
    AWGN 노이즈를 주입하여 모델의 견고성을 평가하는 함수
    sigma: 노이즈 강도 (Standard Deviation)
    """
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)

            # --- Noise Injection ---
            if sigma > 0:
                noise = torch.randn_like(x) * sigma
                x = x + noise
            # -----------------------

            logits = model(x)
            preds = torch.argmax(logits, dim=1)
            all_preds.extend(preds.detach().cpu().numpy())
            all_labels.extend(y.detach().cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    return acc, f1

# ------------------------------------------------------------------------------
# 4. Main Execution
# ------------------------------------------------------------------------------
def main():
    # 설정 (기존과 동일하게 맞춤)
    SEED = 42
    set_seed(SEED)
    DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/HAR_data/UCI_HAR' # 경로 확인 필요
    BATCH_SIZE = 64
    NUM_EPOCHS = 50
    LEARNING_RATE = 0.001
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    print(f"Running Standard 1D-CNN Baseline on {DEVICE}")

    # 데이터 로드
    train_dataset = UCIHARDataset(DATA_PATH, split='train')
    test_dataset = UCIHARDataset(DATA_PATH, split='test')

    g = torch.Generator()
    g.manual_seed(SEED)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                              worker_init_fn=seed_worker, generator=g)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                             worker_init_fn=seed_worker, generator=g)

    # 모델 초기화
    model = StandardCNN(input_channels=9, latent_dim=64, num_classes=6).to(DEVICE)
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
        optimizer, T_max=NUM_EPOCHS
    )

    best_f1 = 0.0
    best_model_wts = copy.deepcopy(model.state_dict())

    # --- 학습 루프 ---
    print("\nStarting Training (Standard CNN)...")
    for epoch in range(NUM_EPOCHS):
        t_loss, t_f1 = train_epoch(model, train_loader, optimizer, DEVICE)

        # Validation (Noise=0.0)
        v_acc, v_f1 = evaluate_with_noise(model, test_loader, DEVICE, sigma=0.0)

        if v_f1 > best_f1:
            best_f1 = v_f1
            best_model_wts = copy.deepcopy(model.state_dict())

        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{NUM_EPOCHS}] Train F1: {t_f1:.4f} | Test F1: {v_f1:.4f} (Best: {best_f1:.4f})")

    # --- 실험: AWGN Robustness ---
    print("\n" + "="*60)
    print(f" EXPERIMENT: Baseline Noise Robustness (Best F1: {best_f1:.4f})")
    print("="*60)

    # 최고 성능 모델 로드
    model.load_state_dict(best_model_wts)

    sigma_levels = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]

    print(f"{'Sigma':<10} | {'Accuracy':<10} | {'F1-Score':<10}")
    print("-" * 36)

    for sigma in sigma_levels:
        acc, f1 = evaluate_with_noise(model, test_loader, DEVICE, sigma=sigma)
        print(f"{sigma:<10.1f} | {acc:<10.4f} | {f1:<10.4f}")

    print("-" * 36)
    print("Baseline Experiment Completed.")

if __name__ == "__main__":
    main()

Running Standard 1D-CNN Baseline on cuda

Starting Training (Standard CNN)...
Epoch [10/50] Train F1: 0.9563 | Test F1: 0.9291 (Best: 0.9393)
Epoch [20/50] Train F1: 0.9620 | Test F1: 0.9267 (Best: 0.9488)
Epoch [30/50] Train F1: 0.9633 | Test F1: 0.9491 (Best: 0.9544)
Epoch [40/50] Train F1: 0.9655 | Test F1: 0.9367 (Best: 0.9544)
Epoch [50/50] Train F1: 0.9681 | Test F1: 0.9335 (Best: 0.9544)

 EXPERIMENT: Baseline Noise Robustness (Best F1: 0.9544)
Sigma      | Accuracy   | F1-Score  
------------------------------------
0.0        | 0.9532     | 0.9544    
0.1        | 0.9528     | 0.9535    
0.2        | 0.7727     | 0.7483    
0.3        | 0.6861     | 0.6445    
0.4        | 0.6447     | 0.5979    
0.5        | 0.6006     | 0.5537    
------------------------------------
Baseline Experiment Completed.
