In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
"""
Pure Switching Latent Dynamical Systems (SLDS) for UCI-HAR
Models activities as switching between multiple latent dynamics modes
WITHOUT Attractor-based State Flow components
"""

import os
import time
import torch
import random
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix

def set_seed(seed=42):
    """
    랜덤 시드를 고정하여 실험의 재현성(Reproducibility)을 보장함
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # 멀티 GPU 사용 시

    os.environ['PYTHONHASHSEED'] = str(seed)

    # CuDNN 결정론적 모드 (속도는 약간 느려질 수 있지만 재현성 필수)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

# ============================================================================
# UCI-HAR Dataset Loader
# ============================================================================

class UCIHARDataset(Dataset):
    def __init__(self, data_path, split='train'):
        base_path = os.path.join(data_path, split, 'Inertial Signals')

        signals = []
        signal_types = [
            'body_acc_x', 'body_acc_y', 'body_acc_z',
            'body_gyro_x', 'body_gyro_y', 'body_gyro_z',
            'total_acc_x', 'total_acc_y', 'total_acc_z'
        ]

        for signal_type in signal_types:
            filename = f'{signal_type}_{split}.txt'
            filepath = os.path.join(base_path, filename)
            data = np.loadtxt(filepath)
            signals.append(data)

        self.X = np.stack(signals, axis=-1)

        label_path = os.path.join(data_path, split, f'y_{split}.txt')
        self.y = np.loadtxt(label_path).astype(np.int64) - 1

        print(f'{split} set: {self.X.shape[0]} samples, {self.X.shape[1]} timesteps, {self.X.shape[2]} channels')

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return torch.FloatTensor(self.X[idx]), torch.LongTensor([self.y[idx]])[0]


# ============================================================================
# Switching Dynamics Module
# ============================================================================

class SwitchingDynamicsModule(nn.Module):
    """
    Models latent dynamics as switching between M different modes
    s_{t+1} = F_{z_t}(s_t), where z_t ∈ {1, ..., M}

    Each timestep has a mode assignment z_t learned from data
    """
    def __init__(self, latent_dim, num_modes=6, hidden_dim=128):
        super().__init__()
        self.latent_dim = latent_dim
        self.num_modes = num_modes

        # Mode-specific dynamics networks
        self.mode_dynamics = nn.ModuleList([
            nn.Sequential(
                nn.Linear(latent_dim, hidden_dim),
                nn.Tanh(),
                nn.Linear(hidden_dim, hidden_dim),
                nn.Tanh(),
                nn.Linear(hidden_dim, latent_dim),
                nn.Tanh()
            ) for _ in range(num_modes)
        ])

        # Mode inference network (predicts mode from latent state)
        self.mode_predictor = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, num_modes)
        )

    def forward(self, s_sequence):
        """
        Args:
            s_sequence: (B, T, D) sequence of latent states
        Returns:
            s_evolved: (B, T, D) evolved states
            mode_probs: (B, T, M) mode probabilities
            mode_assignments: (B, T) hard mode assignments
        """
        batch_size, seq_len, _ = s_sequence.shape

        # Predict mode probabilities for each timestep
        mode_logits = self.mode_predictor(s_sequence)  # (B, T, M)
        mode_probs = F.softmax(mode_logits, dim=-1)
        mode_assignments = mode_probs.argmax(dim=-1)  # (B, T)

        # Apply mode-specific dynamics using soft assignment
        s_evolved = torch.zeros_like(s_sequence)

        for m in range(self.num_modes):
            # Compute dynamics for mode m
            s_flat = s_sequence.reshape(-1, self.latent_dim)  # (B*T, D)
            delta_m = self.mode_dynamics[m](s_flat)  # (B*T, D)
            delta_m = delta_m.reshape(batch_size, seq_len, self.latent_dim)  # (B, T, D)

            # Weight by mode probability
            mode_weight = mode_probs[:, :, m:m+1]  # (B, T, 1)
            s_evolved = s_evolved + mode_weight * (s_sequence + delta_m)

        return s_evolved, mode_probs, mode_assignments

    def compute_mode_consistency_loss(self, mode_probs):
        """
        Encourage smooth mode transitions (adjacent timesteps similar modes)
        """
        # (B, T, M) -> (B, T-1, M)
        mode_diff = mode_probs[:, 1:, :] - mode_probs[:, :-1, :]
        consistency_loss = (mode_diff ** 2).mean()
        return consistency_loss

    def compute_mode_diversity_loss(self, mode_assignments):
        """
        Encourage using all available modes (avoid mode collapse)
        """
        # Count mode usage
        mode_counts = torch.zeros(self.num_modes, device=mode_assignments.device)
        for m in range(self.num_modes):
            mode_counts[m] = (mode_assignments == m).float().sum()

        # Normalize to get frequency
        mode_freq = mode_counts / (mode_assignments.numel() + 1e-8)

        # Diversity loss: maximize entropy
        entropy = -(mode_freq * torch.log(mode_freq + 1e-8)).sum()
        diversity_loss = -entropy  # Minimize negative entropy = maximize entropy

        return diversity_loss

    def compute_mode_occupancy_cap_loss(self, mode_probs, cap_ratio=0.20):
        """
        [New] 독과점 규제 Loss
        특정 모드의 점유율이 cap_ratio(예: 20%)를 넘으면 패널티를 부과함.
        Args:
            mode_probs: (B, T, M)
            cap_ratio: 허용할 최대 점유율 (기본 0.20 = 20%)
        """
        # 1. 배치 전체에서의 평균 모드 사용량 계산
        # (B, T, M) -> (M,)
        avg_usage = mode_probs.view(-1, self.num_modes).mean(dim=0)

        # 2. 상한선(Cap) 설정
        # 6개 모드니까 균등하면 16.6%임. 여유 있게 20%~25% 정도로 설정.
        limit = torch.tensor(cap_ratio, device=mode_probs.device)

        # 3. 초과분 계산 (ReLU 사용)
        # 상한선보다 낮으면 0, 높으면 그 차이만큼 벌점
        excess_usage = F.relu(avg_usage - limit)

        # 4. 벌점 합산 (독점한 놈들 다 합쳐서 Loss로)
        # 점유율은 작으니까(0.x 단위), Loss를 키우기 위해 sum 사용
        cap_loss = excess_usage.sum()

        return cap_loss


# ============================================================================
# Main Model
# ============================================================================

class SLDS_HAR(nn.Module):
    """
    Pure Switching Latent Dynamical Systems for HAR
    """
    def __init__(self, input_dim=9, num_classes=6, num_modes=6, latent_dim=32, hidden_dim=64):
        super().__init__()
        self.num_modes = num_modes
        self.num_classes = num_classes

        # Temporal encoder
        self.encoder = nn.Sequential(
            nn.Conv1d(input_dim, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Conv1d(128, latent_dim, kernel_size=5, padding=2),
            nn.BatchNorm1d(latent_dim),
            nn.ReLU()
        )

        # Switching dynamics
        self.switching_dynamics = SwitchingDynamicsModule(latent_dim, num_modes, hidden_dim)

        # Temporal pooling
        self.temporal_pool = nn.AdaptiveAvgPool1d(1)

        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(32, num_classes)
        )

        # Mode-to-class auxiliary classifier
        self.mode_class_predictor = nn.Linear(num_modes, num_classes)

    def forward(self, x, return_switching_info=False):
        """
        Args:
            x: (B, T, C) input time series
            return_switching_info: whether to return switching losses and info
        """
        batch_size = x.size(0)

        # Encode: (B, T, C) -> (B, C, T) -> (B, D, T)
        x = x.transpose(1, 2)
        latent_seq = self.encoder(x)  # (B, D, T)
        latent_seq = latent_seq.transpose(1, 2)  # (B, T, D)

        # Apply switching dynamics
        latent_evolved, mode_probs, mode_assignments = self.switching_dynamics(latent_seq)

        # Global average pooling
        latent_evolved = latent_evolved.transpose(1, 2)  # (B, D, T)
        latent_pooled = self.temporal_pool(latent_evolved).squeeze(-1)  # (B, D)

        # Classification
        logits = self.classifier(latent_pooled)

        # Switching information
        if return_switching_info:
            switching_info = {}

            # Mode consistency loss
            consistency_loss = self.switching_dynamics.compute_mode_consistency_loss(mode_probs)
            switching_info['consistency'] = consistency_loss

            # Mode diversity loss
            diversity_loss = self.switching_dynamics.compute_mode_diversity_loss(mode_assignments)
            switching_info['diversity'] = diversity_loss
            switching_info['occupancy_cap'] = self.switching_dynamics.compute_mode_occupancy_cap_loss(mode_probs, cap_ratio=0.20)

            # Mode-class alignment
            mode_avg = mode_probs.mean(dim=1)  # (B, M)
            mode_class_logits = self.mode_class_predictor(mode_avg)
            switching_info['mode_class_logits'] = mode_class_logits

            return logits, switching_info, mode_assignments
        else:
            return logits, mode_assignments


# ============================================================================
# Training & Evaluation
# ============================================================================

def train_epoch(model, train_loader, optimizer, device, lambda_cons, lambda_div, lambda_cap, lambda_align=0.0):
    model.train()
    total_loss = 0
    all_preds, all_labels = [], []

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()

        # Get predictions and switching information
        logits, switching_info, mode_assignments = model(data, return_switching_info=True)

        # Classification loss
        ce_loss = F.cross_entropy(logits, target)

        # Switching dynamics losses
        consistency_loss = switching_info['consistency']
        diversity_loss = switching_info['diversity']

        # Mode-class alignment loss
        mode_class_logits = switching_info['mode_class_logits']
        alignment_loss = F.cross_entropy(mode_class_logits, target)

        cap_loss = switching_info['occupancy_cap']

        # Total loss
        loss = ce_loss + lambda_cons * consistency_loss + lambda_div * diversity_loss + lambda_align * alignment_loss + lambda_cap * cap_loss

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        preds = logits.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(target.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    return total_loss / len(train_loader), acc

def evaluate(model, test_loader, device):
    model.eval()
    all_preds, all_labels = [], []
    all_mode_assignments = []

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            logits, mode_assignments = model(data, return_switching_info=False)
            preds = logits.argmax(dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(target.cpu().numpy())
            all_mode_assignments.append(mode_assignments.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    cm = confusion_matrix(all_labels, all_preds)

    # Analyze mode usage
    all_mode_assignments = np.concatenate(all_mode_assignments, axis=0)  # (N, T)
    mode_usage = []
    for m in range(model.num_modes):
        usage = (all_mode_assignments == m).sum() / all_mode_assignments.size
        mode_usage.append(usage)

    return acc, f1, cm, mode_usage


# ============================================================================
# Main Execution
# ============================================================================

def main():
    set_seed(42)
    g = torch.Generator()
    g.manual_seed(42)

    # Configuration
    data_path = '/content/drive/MyDrive/Colab Notebooks/UCI-HAR/data'
    batch_size = 64
    num_epochs = 100
    learning_rate = 0.001
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Loss weights
    lambda_cons = 0 # 10  # Mode consistency
    lambda_div = 0.0  #1.0  # Mode diversity
    lambda_align = 0.0 # 2.0  # Mode-class alignment
    lambda_cap = 0.0 # 5.0

    print('=' * 80)
    print('Pure Switching Latent Dynamical Systems (SLDS)')
    print('=' * 80)

    # Load datasets
    print('\nLoading UCI-HAR dataset...')
    train_dataset = UCIHARDataset(data_path, split='train')
    test_dataset = UCIHARDataset(data_path, split='test')

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                              num_workers=2, worker_init_fn=seed_worker, generator=g)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False,
                             num_workers=2, worker_init_fn=seed_worker, generator=g)

    # Model
    print('\nInitializing model...')
    model = SLDS_HAR(
        input_dim=9,
        num_classes=6,
        num_modes=6,  # Use 6 modes (same as num_classes)
        latent_dim=32,
        hidden_dim=64
    ).to(device)

    print(f'Total parameters: {sum(p.numel() for p in model.parameters()):,}')

    # Optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

    # Training loop
    print('\nStarting training...')
    best_acc = 0
    best_f1 = 0

    for epoch in range(num_epochs):
        start_time = time.time()

        train_loss, train_acc = train_epoch(model, train_loader, optimizer, device,
                                           lambda_cons, lambda_div, lambda_align, lambda_cap)
        test_acc, test_f1, cm, mode_usage = evaluate(model, test_loader, device)

        scheduler.step()

        epoch_time = time.time() - start_time

        if test_f1 > best_f1:
            best_f1 = test_f1
            best_acc = test_acc # 기록용으로 같이 업데이트
            torch.save(model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/best_slds_pure.pth')

        if (epoch + 1) % 10 == 0:
            mode_str = ' | '.join([
                f'M{i}:{mode_usage[i]:5.1%} {"▇" * int(mode_usage[i] * 10)}'
                for i in range(len(mode_usage))
            ])
            print(f'Epoch {epoch+1}/{num_epochs} ({epoch_time:.1f}s) | '
                  f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | '
                  f'Test Acc: {test_acc:.4f}, Test F1: {test_f1:.4f} | '
                  f'Best F1: {best_f1:.4f}')
            print(f'  Mode Usage: {mode_str}')

    print('\n' + '=' * 80)
    print(f'Training completed!')
    print(f'Best Test F1-Score: {best_f1:.4f}')
    print(f'Best Test Accuracy: {best_acc:.4f}')
    print('=' * 80)


if __name__ == '__main__':
    main()

Pure Switching Latent Dynamical Systems (SLDS)

Loading UCI-HAR dataset...
train set: 7352 samples, 128 timesteps, 9 channels
test set: 2947 samples, 128 timesteps, 9 channels

Initializing model...
Total parameters: 122,038

Starting training...
Epoch 10/100 (2.9s) | Train Loss: 0.1300, Train Acc: 0.9460 | Test Acc: 0.9077, Test F1: 0.9080 | Best F1: 0.9334
  Mode Usage: M0:18.4% ▇ | M1: 0.0%  | M2:27.5% ▇▇ | M3:11.0% ▇ | M4: 0.0%  | M5:43.1% ▇▇▇▇
Epoch 20/100 (2.3s) | Train Loss: 0.1196, Train Acc: 0.9514 | Test Acc: 0.8935, Test F1: 0.8905 | Best F1: 0.9406
  Mode Usage: M0:18.2% ▇ | M1: 0.0%  | M2:29.5% ▇▇ | M3: 7.2%  | M4: 0.0%  | M5:45.1% ▇▇▇▇
Epoch 30/100 (2.1s) | Train Loss: 0.1128, Train Acc: 0.9529 | Test Acc: 0.9430, Test F1: 0.9438 | Best F1: 0.9438
  Mode Usage: M0:18.5% ▇ | M1: 7.3%  | M2:25.4% ▇▇ | M3:11.1% ▇ | M4: 0.0%  | M5:37.6% ▇▇▇
Epoch 40/100 (2.1s) | Train Loss: 0.1001, Train Acc: 0.9547 | Test Acc: 0.9413, Test F1: 0.9419 | Best F1: 0.9438
  Mode Usage: M0:18.6% 