# UCI-HAR

In [1]:
import os, copy, random
import numpy as np
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import f1_score

# ──────────────────────────────────────────────────────────────────────────────
# Seed / Device
# ──────────────────────────────────────────────────────────────────────────────
def set_seed(seed=42):
    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

DEVICE  = torch.device("cuda" if torch.cuda.is_available() else "cpu")
USE_GPU = DEVICE.type == "cuda"
print(f"Device: {DEVICE} | pin_memory: {USE_GPU}")


# ──────────────────────────────────────────────────────────────────────────────
# Dataset
# ──────────────────────────────────────────────────────────────────────────────
CLASS_NAMES = ["WALK", "UP", "DOWN", "SIT", "STAND", "LAY"]

class UCIHARDataset(Dataset):
    def __init__(self, data_dir, split="train"):
        self.data_dir = Path(data_dir)
        self.split    = split
        self.X, self.y = self._load_data()
        self.X = torch.FloatTensor(self.X)
        self.y = torch.LongTensor(self.y) - 1

    def _load_data(self):
        split_dir    = self.data_dir / self.split
        signal_types = [
            "body_acc_x","body_acc_y","body_acc_z",
            "body_gyro_x","body_gyro_y","body_gyro_z",
            "total_acc_x","total_acc_y","total_acc_z",
        ]
        signals = []
        for st in signal_types:
            fname = split_dir / "Inertial Signals" / f"{st}_{self.split}.txt"
            signals.append(np.loadtxt(fname))
        X = np.stack(signals, axis=1)
        y = np.loadtxt(split_dir / f"y_{self.split}.txt", dtype=int)
        return X, y

    def __len__(self):  return len(self.X)

    def __getitem__(self, idx): return self.X[idx], self.y[idx]


# ──────────────────────────────────────────────────────────────────────────────
# Compute-Aware Degree Gate
# ──────────────────────────────────────────────────────────────────────────────
class ComputeAwareDegreeGate(nn.Module):
    def __init__(self,
                 channels,
                 max_degree=3,
                 gate_hidden_dim=16,
                 temperature_initial=5.0,
                 temperature_min=0.5
        ):
        super().__init__()
        self.max_degree = max_degree

        self.gate = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(1),
            nn.Linear(channels, gate_hidden_dim),
            nn.GELU(),
            nn.Linear(gate_hidden_dim, max_degree),
        )

        nn.init.zeros_(self.gate[-1].bias)
        if max_degree >= 3:
            self.gate[-1].bias.data[1] = 0.4
        self.register_buffer("temperature", torch.tensor(float(temperature_initial)))
        self.temperature_min = float(temperature_min)

    def set_temperature(self, t):
        self.temperature.fill_(max(float(t), self.temperature_min))

    def forward(self, x):
        logits = self.gate(x)
        soft_probs = F.softmax(logits / self.temperature, dim=-1)

        if self.training:
            hard_idx = logits.argmax(dim=-1)
            hard_oh = F.one_hot(hard_idx, num_classes=self.max_degree).float()
            degree_w = hard_oh - soft_probs.detach() + soft_probs
        else:
            degree_w = F.one_hot(logits.argmax(dim=-1), num_classes=self.max_degree).float()

        return degree_w, logits, soft_probs


# ──────────────────────────────────────────────────────────────────────────────
# PADRe Block
# ──────────────────────────────────────────────────────────────────────────────
class PADReBlock(nn.Module):
    def __init__(self,
                 channels,
                 seq_len,
                 max_degree=3,
                 token_kernel=11,
                 gate_hidden_dim=16,
                 temperature_initial=5.0,
                 temperature_min=0.5,
        ):
        super().__init__()
        self.max_degree = max_degree

        self.degree_gate = ComputeAwareDegreeGate(
            channels,
            max_degree=max_degree,
            gate_hidden_dim=gate_hidden_dim,
            temperature_initial=temperature_initial,
            temperature_min=temperature_min
        )

        self.channel_mixing = nn.ModuleList([
            nn.Conv1d(channels, channels, kernel_size=1) for _ in range(max_degree)
        ])

        self.token_mixing = nn.ModuleList([
            nn.Conv1d(channels, channels, kernel_size=token_kernel,
                      padding=token_kernel // 2, groups=channels)
            for _ in range(max_degree)
        ])

        self.pre_hadamard_channel = nn.ModuleList([
            nn.Conv1d(channels, channels, kernel_size=1) for _ in range(max_degree-1)
        ])

        self.pre_hadamard_token = nn.ModuleList([
            nn.Conv1d(channels, channels, kernel_size=token_kernel,
                      padding=token_kernel // 2, groups=channels)
            for _ in range(max_degree-1)
        ])

        self.norm = nn.LayerNorm(channels)

    def _build_Z(self, x, max_deg):
        Y = [self.token_mixing[i](self.channel_mixing[i](x)) for i in range(max_deg)]
        Z = [Y[0]]
        for i in range(1, max_deg):
            Z_ = self.pre_hadamard_token[i-1](self.pre_hadamard_channel[i-1](Z[-1]))
            Z.append(Z_ * Y[i])
        return Z

    def _hard_forward(self, x, sel):
        B = x.shape[0]
        max_deg = max(1, min(int(sel.max().item()) + 1, self.max_degree))
        Z = self._build_Z(x, max_deg)
        Z_stack = torch.stack(Z, dim=0)
        return Z_stack[sel, torch.arange(B, device=x.device)]

    def forward(self, x, return_gate_info=False):
        dw, logits, sp = self.degree_gate(x)
        sel = dw.argmax(dim=-1)
        out = self._hard_forward(x, sel)

        out = self.norm(out.permute(0, 2, 1)).permute(0, 2, 1)

        if return_gate_info:
            return out, {
                "degree_selection": dw,
                "soft_probs": sp,
                "logits": logits,
                "compute_cost": (sel + 1).float().mean().item()
            }
        return out


# ──────────────────────────────────────────────────────────────────────────────
# Adaptive PADRe Model
# ──────────────────────────────────────────────────────────────────────────────
class PADReHAR(nn.Module):
    def __init__(self,
                 in_channels=9,
                 seq_len=128,
                 num_classes=6,
                 hidden_dim=48,
                 num_layers=3,
                 max_degree=3,
                 gate_hidden_dim=16,
                 dropout=0.2,
                 temperature_initial=5.0,
                 temperature_min=0.5,
        ):
        super().__init__()
        self.num_layers = num_layers
        self.max_degree = max_degree

        self.input_proj = nn.Conv1d(in_channels, hidden_dim, kernel_size=1)

        self.padre_blocks = nn.ModuleList([
            PADReBlock(hidden_dim, seq_len, max_degree=max_degree, token_kernel=11,
                       gate_hidden_dim=gate_hidden_dim,
                       temperature_initial=temperature_initial,
                       temperature_min=temperature_min)
            for _ in range(num_layers)
        ])


        self.ffn = nn.ModuleList([
            nn.Sequential(
                nn.Conv1d(hidden_dim, hidden_dim * 2, kernel_size=1),
                nn.GELU(),
                nn.Dropout(dropout),
                nn.Conv1d(hidden_dim * 2, hidden_dim, kernel_size=1),
                nn.Dropout(dropout),
            )
            for _ in range(num_layers)
        ])

        self.norms1 = nn.ModuleList([nn.LayerNorm(hidden_dim) for _ in range(num_layers)])
        self.norms2 = nn.ModuleList([nn.LayerNorm(hidden_dim) for _ in range(num_layers)])

        self.global_pool = nn.AdaptiveAvgPool1d(1)
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim), nn.LayerNorm(hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, num_classes),
        )

    def set_temperature(self, t):
        for b in self.padre_blocks:
            b.degree_gate.set_temperature(t)

    def _ln(self, norm, x):
        return norm(x.permute(0, 2, 1)).permute(0, 2, 1)

    def forward(self, x, return_gate_info=False):
        x = self.input_proj(x)
        gate_info_list = [] if return_gate_info else None
        total_compute  = 0.0

        for i, block in enumerate(self.padre_blocks):
            res = x

            if return_gate_info:
                x, gi = block(x, return_gate_info=True)
                gate_info_list.append(gi)
                total_compute += gi["compute_cost"]
            else:
                x = block(x)

            x = self._ln(self.norms1[i], x + res)

            res2 = x
            x = self.ffn[i](x)
            x = self._ln(self.norms2[i], x + res2)

        logits = self.classifier(self.global_pool(x).squeeze(-1))

        return (logits, gate_info_list, total_compute) if return_gate_info else logits


# ──────────────────────────────────────────────────────────────────────────────
# Utils
# ──────────────────────────────────────────────────────────────────────────────
def cosine_temperature(ep, total, tmax=5.0, tmin=0.5):
    r = ep / max(total - 1, 1)
    return tmin + (tmax - tmin) * 0.5 * (1.0 + np.cos(np.pi * r))

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


# ──────────────────────────────────────────────────────────────────────────────
# Train & Eval
# ──────────────────────────────────────────────────────────────────────────────
def train_model(model,
                train_loader,
                test_loader,
                device,
                lr=1e-3,
                weight_decay=1e-4,
                epochs=30,
                seed=42
    ):
    set_seed(seed)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=1e-5)
    criterion = nn.CrossEntropyLoss()

    best_f1 = -1.0
    best_state = None

    for ep in range(epochs):
        temp = cosine_temperature(ep, epochs, tmax=5.0, tmin=0.5)
        model.set_temperature(temp)

        model.train()
        train_loss_sum = 0.0
        train_n = 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            loss = criterion(model(X), y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            bs = y.size(0)
            train_loss_sum += loss.item() * bs
            train_n += bs

        scheduler.step()
        train_loss = train_loss_sum / max(train_n, 1)

        model.eval()
        preds_all, labels_all = [], []
        with torch.no_grad():
            for X, y in test_loader:
                X, y = X.to(device), y.to(device)
                preds_all.extend(model(X).argmax(1).cpu().numpy())
                labels_all.extend(y.cpu().numpy())
        test_f1 = f1_score(labels_all, preds_all, average="macro")

        if test_f1 > best_f1:
            best_f1 = test_f1
            best_state = copy.deepcopy(model.state_dict())

        if (ep + 1) % 5 == 0:
            cur_lr = optimizer.param_groups[0]["lr"]
            print(f"Epoch {ep+1:02d}/{epochs} | LR={cur_lr:.4f} | Train Loss={train_loss:.4f} | TestF1={test_f1:.4f} | BestF1={best_f1:.4f} | Temp={temp:.3f}")

    model.load_state_dict(best_state)
    print(f"\nBest Test Macro-F1: {best_f1:.4f}")
    return model


# ──────────────────────────────────────────────────────────────────────────────
# Main
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    DATA_PATH = "/content/drive/MyDrive/Colab Notebooks/HAR/har_orig_datasets/UCI_HAR"
    SEED = 42
    NUM_WORKERS = 2 if USE_GPU else 0
    PIN_MEMORY = USE_GPU

    BATCH_SIZE = 64
    EPOCHS = 100
    NUM_CLASSES = 6
    HIDDEN_DIM = 48
    NUM_LAYERS = 3
    MAX_DEGREE = 3
    GATE_HIDDEN_DIM = 16
    DROPOUT = 0.25
    LR = 1e-3
    WD = 5e-4

    train_dataset = UCIHARDataset(DATA_PATH, split="train")
    test_dataset = UCIHARDataset(DATA_PATH, split="test")

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                              num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                             num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)

    set_seed(SEED)
    model = PADReHAR(in_channels=9,
                     seq_len=128,
                     num_classes=NUM_CLASSES,
                     hidden_dim=HIDDEN_DIM,
                     num_layers=NUM_LAYERS,
                     max_degree=MAX_DEGREE,
                     gate_hidden_dim=GATE_HIDDEN_DIM,
                     dropout=DROPOUT,
                     temperature_initial=5.0,
                     temperature_min=0.5
                     ).to(DEVICE)
    print(f"\nModel params: {count_parameters(model):,}\n")

    model = train_model(model, train_loader, test_loader, DEVICE,
                        lr=LR, weight_decay=WD, epochs=EPOCHS, seed=SEED)

Device: cuda | pin_memory: True

Model params: 78,591

Epoch 05/100 | LR=0.0010 | Train Loss=0.1297 | TestF1=0.8998 | BestF1=0.9133 | Temp=4.982
Epoch 10/100 | LR=0.0010 | Train Loss=0.1057 | TestF1=0.9009 | BestF1=0.9224 | Temp=4.909
Epoch 15/100 | LR=0.0009 | Train Loss=0.0601 | TestF1=0.9328 | BestF1=0.9425 | Temp=4.782
Epoch 20/100 | LR=0.0009 | Train Loss=0.0293 | TestF1=0.9463 | BestF1=0.9463 | Temp=4.603
Epoch 25/100 | LR=0.0009 | Train Loss=0.0208 | TestF1=0.9401 | BestF1=0.9463 | Temp=4.378
Epoch 30/100 | LR=0.0008 | Train Loss=0.0131 | TestF1=0.9336 | BestF1=0.9463 | Temp=4.113
Epoch 35/100 | LR=0.0007 | Train Loss=0.0073 | TestF1=0.9470 | BestF1=0.9474 | Temp=3.813
Epoch 40/100 | LR=0.0007 | Train Loss=0.0011 | TestF1=0.9518 | BestF1=0.9617 | Temp=3.486
Epoch 45/100 | LR=0.0006 | Train Loss=0.0019 | TestF1=0.9493 | BestF1=0.9617 | Temp=3.141
Epoch 50/100 | LR=0.0005 | Train Loss=0.0010 | TestF1=0.9486 | BestF1=0.9617 | Temp=2.786
Epoch 55/100 | LR=0.0004 | Train Loss=0.0001 