In [1]:
from __future__ import annotations
import argparse
from pathlib import Path
from typing import List, Tuple
import ast
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.utils.class_weight import compute_class_weight
from torch.cuda.amp import autocast, GradScaler
from tqdm import tqdm

In [2]:
WINDOW = 50
STRIDE = 25
LOCATIONS = ["right_arm", "left_arm", "right_leg", "left_leg"]
AXES = ["x", "y", "z"]

In [14]:
data_dir = Path('data')
train_dir = data_dir / 'train'
meta_file = data_dir / 'meta_data.txt'
test_file = data_dir/'test.csv'
label_map = {
    'null': 0,'jogging': 1,'jogging (rotating arms)': 2,'jogging (skipping)': 3,'jogging (sidesteps)': 4,'jogging (butt-kicks)': 5,
    'stretching (triceps)': 6,'stretching (lunging)': 7,'stretching (shoulders)': 8,'stretching (hamstrings)': 9,'stretching (lumbar rotation)': 10,
    'push-ups': 11,'push-ups (complex)': 12,'sit-ups': 13,'sit-ups (complex)': 14,'burpees': 15,'lunges': 16,'lunges (complex)': 17,'bench-dips': 18
}
num_classes = len(label_map)
C = 3
crit = nn.CrossEntropyLoss()

In [31]:
def detect_locations() -> list[str]:
    files = sorted(train_dir.glob('sbj_*.csv'))
    if not files:
        raise FileNotFoundError(f"No 'sbj_*.csv' files found in {data_dir}")
    sample = pd.read_csv(files[0], nrows=0)
    locs = sorted({col.split('_acc_')[0] for col in sample.columns if '_acc_' in col})
    if not locs:
        raise ValueError(f"No sensor columns found in {files[0]}")
    return locs
    sample = pd.read_csv(next(data_dir.glob('sbj_*.csv')), nrows=0)
    return sorted({col.split('_acc_')[0] for col in sample.columns if '_acc_' in col})

def load_continuous_csv(path: Path) -> pd.DataFrame:
    return pd.read_csv(path)

def cols_for(location: str) -> List[str]:
    return [f"{location}_acc_{ax}" for ax in AXES]


def clean_location_df(df: pd.DataFrame, location: str, require_label: bool) -> pd.DataFrame:
    """Return only rows where the 3‑axis sensor values (and optionally label) are all non‑null."""
    cols = cols_for(location)
    if require_label:
        cols = cols + ['label']
    return df.dropna(subset=cols)

def segment_windows(
    data: np.ndarray,
    labels: np.ndarray | None,
    window: int = WINDOW,
    stride: int = STRIDE
) -> Tuple[np.ndarray, np.ndarray | None]:
    X_list, y_list = [], []
    for start in range(0, len(data) - window + 1, stride):
        seg = data[start:start + window]
        if np.isnan(seg).any():
            continue
        X_list.append(seg)
        if labels is not None:
            lbl_seg = labels[start:start + window]
            vals, counts = np.unique(lbl_seg, return_counts=True)
            y_list.append(vals[counts.argmax()])
    X = np.stack(X_list) if X_list else np.empty((0, window, 3), dtype=np.float32)
    y = (np.array(y_list) if y_list else np.empty((0,))) if labels is not None else None
    return X, y


In [18]:

class WearDataset(Dataset):
    def __init__(self, windows: np.ndarray, labels: np.ndarray, scaler: StandardScaler):
        self.X = scaler.transform(windows.reshape(-1, 3)).reshape(windows.shape)
        self.y = labels

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.X[idx]).permute(1, 0).contiguous()  # (C, T)
        y = self.y[idx]
        return x, y


In [19]:
class DeepConvLSTM(nn.Module):
    def __init__(self, n_channels: int = 3, n_classes: int = 19, seq_len: int = WINDOW):
        super().__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv1d(n_channels, 64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.Conv1d(64, 64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
        self.lstm = nn.LSTM(input_size=64, hidden_size=128, num_layers=1, batch_first=True)
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128, n_classes)
        )

    def forward(self, x):  # x: (B,C,T)
        x = self.feature_extractor(x)
        x = x.permute(0, 2, 1)  # (B,T,C')
        _, (h_n, _) = self.lstm(x)
        out = self.classifier(h_n[-1])
        return out


In [34]:
def train_loop(model, loader, criterion, optim, scaler):
    model.train()
    total, correct, loss_sum = 0, 0, 0.0
    for X, y in loader:
        X, y = X.cuda(), y.cuda()
        optim.zero_grad()
        with autocast():
            out = model(X)
            loss = criterion(out, y)
        scaler.scale(loss).backward()
        scaler.step(optim)
        scaler.update()
        preds = out.argmax(1)
        total += y.size(0)
        correct += (preds == y).sum().item()
        loss_sum += loss.item() * y.size(0)
    return loss_sum / total, correct / total

def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    optimizer: torch.optim.Optimizer,
    criterion: nn.Module,
    scaler: GradScaler
) -> tuple[float, float]:
    model.train()
    total, correct, loss_sum = 0, 0, 0.0
    for X, y in loader:
        X, y = X.cuda(), y.cuda()
        optimizer.zero_grad()
        with autocast():
            logits = model(X)
            loss = criterion(logits, y)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        preds = logits.argmax(dim=1)
        total += y.size(0)
        correct += (preds == y).sum().item()
        loss_sum += loss.item() * y.size(0)
    return loss_sum / total, correct / total


def run_location(train_dir: Path, test_path: Path, out_dir: Path, loc: str,
                 epochs: int, batch: int):
    train_files = sorted(train_dir.glob('sbj_*.csv'))
    train_rows = []
    for f in train_files:
        df = clean_location_df(pd.read_csv(f), loc, require_label=True)
        train_rows.append(df)
    if not train_rows:
        print(f"[WARN] No data for {loc}, skipping.")
        return
    train_df = pd.concat(train_rows, ignore_index=True)

    test_df_raw = pd.read_csv(test_path)
    test_df = clean_location_df(test_df_raw, loc, require_label=False)

    X_train, y_train = segment_windows(train_df[cols_for(loc)].values.astype(np.float32),
                                       train_df['label'].values)
    if X_train.size == 0:
        print(f"[WARN] No windows for {loc}, skipping.")
        return

    classes = sorted({*y_train})
    lbl2idx = {c: i for i, c in enumerate(classes)}
    y_enc = np.array([lbl2idx[l] for l in y_train], dtype=np.int64)

    # ---------- 5. Scaling ----------
    scaler = StandardScaler().fit(X_train.reshape(-1, 3))
    ds = WearDataset(X_train, y_enc, scaler)
    loader = DataLoader(ds, batch_size=batch, shuffle=True, pin_memory=True, num_workers=4)

    # ---------- 6. Model ----------
    model = DeepConvLSTM(n_classes=len(classes)).cuda()
    class_w = compute_class_weight('balanced', classes=np.unique(y_enc), y=y_enc)
    criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_w, dtype=torch.float32).cuda())
    optim = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
    scaler_amp = GradScaler()

    best_acc = 0.0
    for ep in range(1, epochs + 1):
        loss, acc = train_loop(model, loader, criterion, optim, scaler_amp)
        print(f"[{loc}] Epoch {ep:02d}/{epochs} loss={loss:.4f} acc={acc:.4f}")
        if acc > best_acc:
            best_acc = acc
            ckpt = {
                'model': model.state_dict(),
                'scaler': scaler,
                'label_map': lbl2idx,
            }
            out_dir.mkdir(parents=True, exist_ok=True)
            torch.save(ckpt, out_dir / f"{loc}_best.pt")
            del ckpt
            torch.cuda.empty_cache()

    del model, loader, ds, optim, criterion, scaler_amp
    torch.cuda.empty_cache()



def evaluate(model, loader, criterion):
    model.eval()
    total, correct, loss_sum = 0, 0, 0.0
    with torch.no_grad():
        for X, y in loader:
            X, y = X.cuda(), y.cuda()
            logits = model(X)
            loss = criterion(logits, y)
            preds = logits.argmax(1)
            total += y.size(0)
            correct += (preds == y).sum().item()
            loss_sum += loss.item() * y.size(0)
    return loss_sum/total, correct/total

In [35]:
first_file = next(train_dir.glob('sbj_*.csv'))
print(first_file)
all_locs = detect_locations()

all_locs

data/train/sbj_21.csv


['left_arm', 'left_leg', 'right_arm', 'right_leg']

In [36]:
for loc in all_locs[:1]:
    print(loc)
    run_location(train_dir, test_file, "result", loc, 5, 256)


left_arm


  df = clean_location_df(pd.read_csv(f), loc, require_label=True)


KeyError: ['left_arm_acc_x', 'left_arm_acc_y', 'left_arm_acc_z']

In [None]:
scaler = StandardScaler().fit(X.reshape(-1, 3))
ds = WearDataset(X, y, scaler, augment=True)
dl = DataLoader(ds, batch_size=args.batch_size, shuffle=True, num_workers=4, pin_memory=True)

In [None]:
model = DeepConvLSTM(n_classes=len(label_to_idx)).cuda()
class_weights = compute_class_weight('balanced', classes=np.unique(y), y=y)
criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.float32).cuda())
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scaler = GradScaler()

best_acc = 0.0
for epoch in range(args.epochs):
    tr_loss, tr_acc = train_one_epoch(model, dl, optimizer, criterion, scaler)
    print(f'Epoch {epoch+1}/{args.epochs} - loss: {tr_loss:.4f} acc: {tr_acc:.4f}')
    if tr_acc > best_acc:
        best_acc = tr_acc
        args.output_dir.mkdir(parents=True, exist_ok=True)
        torch.save({'model': model.state_dict(), 'scaler': scaler, 'label_map': label_to_idx},
                   args.output_dir / f'{args.location}_best.pt')