In [11]:
import json
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [12]:
DATA_DIR = "final_data"   # folder containing train.npz/val.npz/test.npz
MODEL_TYPE = "tcn"        # "lstm" or "tcn"
BATCH_SIZE = 32
EPOCHS = 20
LR = 1e-3
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SEED = 42

torch.manual_seed(SEED)
np.random.seed(SEED)

In [13]:
class NPZSequenceDataset(Dataset):
    def __init__(self, npz_path):
        d = np.load(npz_path, allow_pickle=True)
        self.X = d["X"].astype(np.float32)  # (N,T,F)
        self.y = d["y"].astype(np.int64)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return torch.from_numpy(self.X[idx]), torch.tensor(self.y[idx])

# LSTM Model

In [14]:

class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden=128, num_layers=2, num_classes=2, dropout=0.2):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0,
            bidirectional=True
        )
        self.head = nn.Sequential(
            nn.LayerNorm(hidden * 2),
            nn.Dropout(dropout),
            nn.Linear(hidden * 2, num_classes)
        )

    def forward(self, x):  # x: (B,T,F)
        out, _ = self.lstm(x)
        last = out[:, -1, :]              # (B, 2*hidden)
        return self.head(last)


# TCN Model

In [15]:
class TCNBlock(nn.Module):
    def __init__(self, in_ch, out_ch, k=3, dilation=1, dropout=0.2):
        super().__init__()
        pad = (k - 1) * dilation // 2
        self.net = nn.Sequential(
            nn.Conv1d(in_ch, out_ch, kernel_size=k, dilation=dilation, padding=pad),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(out_ch, out_ch, kernel_size=k, dilation=dilation, padding=pad),
            nn.ReLU(),
            nn.Dropout(dropout),
        )
        self.down = nn.Conv1d(in_ch, out_ch, 1) if in_ch != out_ch else nn.Identity()

    def forward(self, x):  # x: (B,C,T)
        return self.net(x) + self.down(x)

class TCNClassifier(nn.Module):
    def __init__(self, input_dim, channels=(128,128,128), num_classes=2, dropout=0.2):
        super().__init__()
        layers = []
        in_ch = input_dim
        dilation = 1
        for ch in channels:
            layers.append(TCNBlock(in_ch, ch, k=3, dilation=dilation, dropout=dropout))
            in_ch = ch
            dilation *= 2
        self.tcn = nn.Sequential(*layers)
        self.head = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),   # (B,C,1)
            nn.Flatten(),              # (B,C)
            nn.LayerNorm(in_ch),
            nn.Dropout(dropout),
            nn.Linear(in_ch, num_classes)
        )

    def forward(self, x):  # x: (B,T,F)
        x = x.transpose(1, 2)          # (B,F,T)
        x = self.tcn(x)                # (B,C,T)
        return self.head(x)            # (B,num_classes)

In [16]:
def accuracy(logits, y):
    return (logits.argmax(dim=1) == y).float().mean().item()

In [None]:
def run_epoch(model, weights, loader, optim=None):
    train = optim is not None
    model.train(train)
    total_loss, total_acc, n = 0.0, 0.0, 0

    ce = nn.CrossEntropyLoss(weight=weights)
    
    for X, y in loader:
        X, y = X.to(DEVICE), y.to(DEVICE)
        logits = model(X)
        loss = ce(logits, y)

        if train:
            optim.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optim.step()

        bs = X.size(0)
        total_loss += loss.item() * bs
        total_acc  += accuracy(logits, y) * bs
        n += bs

    return total_loss / n, total_acc / n

In [None]:
def main(MODEL_TYPE:str):
    # load label mapping just for info
    with open(f"{DATA_DIR}/labels.json", "r") as f:
        label2id = json.load(f)
    num_classes = len(label2id)

    train_ds = NPZSequenceDataset(f"{DATA_DIR}/train.npz")
    val_ds   = NPZSequenceDataset(f"{DATA_DIR}/val.npz")
    test_ds  = NPZSequenceDataset(f"{DATA_DIR}/test.npz")


    y_tr = train_ds.y  # numpy array
    counts = np.bincount(y_tr)
    weights = counts.sum() / (len(counts) * counts)

    weights = torch.tensor(weights, dtype=torch.float32).to(DEVICE)


    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
    val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
    test_loader  = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

    input_dim = train_ds.X.shape[-1]

    if MODEL_TYPE == "lstm":
        model = LSTMClassifier(input_dim=input_dim, num_classes=num_classes).to(DEVICE)
    else:
        model = TCNClassifier(input_dim=input_dim, num_classes=num_classes).to(DEVICE)

    optim = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)

    best_val = 0.0
    best_state = None

    for epoch in range(1, EPOCHS + 1):
        tr_loss, tr_acc = run_epoch(model, weights, train_loader, optim)
        va_loss, va_acc = run_epoch(model, val_loader, optim=None)

        print(f"Epoch {epoch:02d} | train loss {tr_loss:.4f} acc {tr_acc:.4f} | val loss {va_loss:.4f} acc {va_acc:.4f}")

        if va_acc > best_val:
            best_val = va_acc
            best_state = {k: v.cpu() for k, v in model.state_dict().items()}

    # load best and evaluate on test
    model.load_state_dict(best_state)
    te_loss, te_acc = run_epoch(model, test_loader, optim=None)
    print(f"Best val acc: {best_val:.4f} | test acc: {te_acc:.4f}")

    # Export to TorchScript for your YOLO pipeline
    model.eval()
    example = torch.randn(1, 30, input_dim).to(DEVICE)
    scripted = torch.jit.trace(model, example)
    scripted.save("action_model_jit.pt")
    print("Saved TorchScript model: action_model_jit.pt")

if __name__ == "__main__":
    main(MODEL_TYPE="tcn")

Epoch 01 | train loss 0.0382 acc 0.9846 | val loss 1.1726 acc 0.8660
Epoch 02 | train loss 0.0003 acc 1.0000 | val loss 1.2620 acc 0.8660
Epoch 03 | train loss 0.0002 acc 1.0000 | val loss 1.3293 acc 0.8660
Epoch 04 | train loss 0.0001 acc 1.0000 | val loss 1.3806 acc 0.8660
Epoch 05 | train loss 0.0001 acc 1.0000 | val loss 1.4277 acc 0.8660
Epoch 06 | train loss 0.0001 acc 1.0000 | val loss 1.4708 acc 0.8660
Epoch 07 | train loss 0.0001 acc 1.0000 | val loss 1.5061 acc 0.8660
Epoch 08 | train loss 0.0000 acc 1.0000 | val loss 1.5386 acc 0.8660
Epoch 09 | train loss 0.0000 acc 1.0000 | val loss 1.5673 acc 0.8660
Epoch 10 | train loss 0.0000 acc 1.0000 | val loss 1.5953 acc 0.8660
Epoch 11 | train loss 0.0000 acc 1.0000 | val loss 1.6212 acc 0.8660
Epoch 12 | train loss 0.0000 acc 1.0000 | val loss 1.6453 acc 0.8660
Epoch 13 | train loss 0.0000 acc 1.0000 | val loss 1.6651 acc 0.8660
Epoch 14 | train loss 0.0000 acc 1.0000 | val loss 1.6848 acc 0.8660
Epoch 15 | train loss 0.0000 acc 1

In [19]:
import numpy as np, torch, json

device = "cuda" if torch.cuda.is_available() else "cpu"
m = torch.jit.load("action_model_jit.pt").to(device).eval()

label2id = json.load(open("final_data/labels.json"))
id2label = {v:k for k,v in label2id.items()}

d = np.load("final_data/test.npz", allow_pickle=True)
X = torch.from_numpy(d["X"][:64]).to(device)   # (64,30,85)
y = d["y"][:64]

with torch.no_grad():
    p = m(X).argmax(1).cpu().numpy()

print("true :", [id2label[i] for i in y[:10]])
print("pred :", [id2label[i] for i in p[:10]])

true : ['PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps']
pred : ['PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps', 'PullUps']


In [21]:
train_ds = NPZSequenceDataset(f"final_data/train.npz")

y_tr = train_ds.y  # numpy array

counts = np.bincount(y_tr)
print("Class counts:", counts)

weights = counts.sum() / (len(counts) * counts)
print("Class weights:", weights)


Class counts: [1647  563   71]
Class weights: [ 0.46164744  1.35050326 10.70892019]
