In [1]:
"""Train a classifier on sanitized CSI stored in NPZ files.

This notebook mirrors the processing steps in `visualization.ipynb`, but it
loads pre-sanitized CSI contained in `.npz` archives written by the SHARP
pipeline (see `sanitize.ipynb`).  Each archive is expected to contain at
least the key `H` with shape `(N_pkts, K)` (complex) – the sanitized channel
frequency response of one capture.
"""

import os, glob, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, classification_report

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Repository root (one level up from the notebook directory)
REPO_ROOT = os.path.abspath(os.path.join(os.getcwd(), '..'))


In [2]:
# -----------------------------------------------------------------
# 1. Load per-packet features from NPZ files (train / test directories)
# -----------------------------------------------------------------

TRAIN_DIR = os.path.join(REPO_ROOT, 'sanitized_data', 'train')
TEST_DIR  = os.path.join(REPO_ROOT, 'sanitized_data', 'test')

# Activity keywords → numeric labels
CLASS_KEYWORDS = ['empty', 'watch', 'work', 'smoke', 'eat', 'drink', 'sleep']
CLS_TO_IDX = {kw: i for i, kw in enumerate(CLASS_KEYWORDS)}

# Whether to concatenate phase alongside amplitudes
INCLUDE_PHASE = False  # ⇐ set True to use (amp, phase)


def load_npz_dir(dir_path, include_phase: bool = False, n_samples: int = 1):
    """Return (X, y) arrays where each *aggregated packet* becomes one sample.

    Behaviour controlled by `n_samples`:
      • n_samples == 1 (default): identical to original implementation – each
        individual packet is a separate sample.
      • n_samples  > 1: consecutive `n_samples` packets are concatenated along
        the feature dimension to form one larger sample. Any leftover packets
        that do not complete a full group are discarded.

    X.shape  = (N_groups,  K)      if include_phase=False and n_samples==1
                (N_groups, n*K)    if include_phase=False and n_samples>1
                (N_groups, 2K)     if include_phase=True and n_samples==1
                (N_groups, 2n*K)   if include_phase=True and n_samples>1
    y.shape  = (N_groups,)
    """
    if n_samples < 1:
        raise ValueError("n_samples must be a positive integer")

    per_class_feats = {idx: [] for idx in range(len(CLASS_KEYWORDS))}
    class_counts = {idx: 0 for idx in range(len(CLASS_KEYWORDS))}

    if not os.path.exists(dir_path):
        print(f"[!] Directory not found: {dir_path}")
        return np.empty((0, 0), dtype=np.float32), np.empty((0,), dtype=np.int64)

    npz_files = glob.glob(os.path.join(dir_path, '*.npz'))
    if not npz_files:
        print(f"[!] No NPZ files in {dir_path}")

    for file in sorted(npz_files):
        fname = os.path.basename(file)
        label_idx = next((CLS_TO_IDX[k] for k in CLASS_KEYWORDS if k in fname), None)
        if label_idx is None:
            print(f"[skip] Unrecognised class in filename: {fname}")
            continue

        data = np.load(file)
        H = data['H']  # shape: (N_pkts, K) complex
        amp = np.abs(H).astype(np.float32)  # (N_pkts, K)
        if include_phase:
            phase = np.angle(H).astype(np.float32)  # (N_pkts, K)
            feat_mat = np.concatenate([amp, phase], axis=1)  # (N_pkts, 2K)
        else:
            feat_mat = amp

        # Optionally concatenate `n_samples` consecutive packets
        if n_samples > 1:
            total_pkts = feat_mat.shape[0]
            groups = total_pkts // n_samples
            if groups == 0:
                print(f"[skip] Not enough packets in {fname} (need ≥{n_samples})")
                continue
            trimmed = feat_mat[:groups * n_samples]
            feat_mat = trimmed.reshape(groups, -1)  # (groups, n_samples * D)

        # Stack features for this label
        per_class_feats[label_idx].append(feat_mat)
        class_counts[label_idx] += feat_mat.shape[0]

        print(f"Loaded {fname:>25} | class={CLASS_KEYWORDS[label_idx]:<6} | samples={feat_mat.shape[0]:>5} | total {CLASS_KEYWORDS[label_idx]}={class_counts[label_idx]}")

    # Build final arrays
    feats_all, labels_all = [], []
    for idx, feat_list in per_class_feats.items():
        if feat_list:
            stacked = np.vstack(feat_list)
            feats_all.append(stacked)
            labels_all.append(np.full(stacked.shape[0], idx, dtype=np.int64))

    if not feats_all:
        return np.empty((0, 0), dtype=np.float32), np.empty((0,), dtype=np.int64)

    X = np.vstack(feats_all)
    y = np.concatenate(labels_all)

    # Summary distribution
    print("\n[Summary] Sample distribution per class:")
    for idx, name in enumerate(CLASS_KEYWORDS):
        print(f"  {name:<6}: {class_counts[idx]}")
    print(f"  TOTAL : {len(y)} samples\n")

    return X, y


# Default call – keep original behaviour (per-packet samples)
X_train_raw, y_train = load_npz_dir(TRAIN_DIR, include_phase=INCLUDE_PHASE, n_samples=5)
X_test_raw , y_test  = load_npz_dir(TEST_DIR , include_phase=INCLUDE_PHASE, n_samples=5)

print('Train set:', X_train_raw.shape, 'Test set:', X_test_raw.shape)

Loaded             drink.csv.npz | class=drink  | samples=42006 | total drink=42006
Loaded               eat.csv.npz | class=eat    | samples=39659 | total eat=39659
Loaded             empty.csv.npz | class=empty  | samples=38103 | total empty=38103
Loaded             sleep.csv.npz | class=sleep  | samples=39998 | total sleep=39998
Loaded             smoke.csv.npz | class=smoke  | samples=39676 | total smoke=39676
Loaded             watch.csv.npz | class=watch  | samples=38024 | total watch=38024
Loaded              work.csv.npz | class=work   | samples=38185 | total work=38185

[Summary] Sample distribution per class:
  empty : 38103
  watch : 38024
  work  : 38185
  smoke : 39676
  eat   : 39659
  drink : 42006
  sleep : 39998
  TOTAL : 275651 samples

Loaded            drink2.csv.npz | class=drink  | samples=37661 | total drink=37661
Loaded               eat.csv.npz | class=eat    | samples=37980 | total eat=37980
Loaded             empty.csv.npz | class=empty  | samples=38724 | tot

In [3]:
# -----------------------------------------------------------------
# 1. Load aggregated packet features from NPZ files (train directory)
#    Optionally split into train/test (70/30) internally.
# -----------------------------------------------------------------

TRAIN_DIR = os.path.join(REPO_ROOT, 'sanitized_data', 'train')
TEST_DIR  = os.path.join(REPO_ROOT, 'sanitized_data', 'test2')  # kept for backward-compat

# Activity keywords → numeric labels
CLASS_KEYWORDS = ['empty', 'watch', 'work', 'smoke', 'eat', 'drink', 'sleep']
CLS_TO_IDX = {kw: i for i, kw in enumerate(CLASS_KEYWORDS)}

# Whether to concatenate phase alongside amplitudes
INCLUDE_PHASE = False  # ⇐ set True to use (amp, phase)


def load_npz_dir(
    dir_path: str,
    *,
    include_phase: bool = False,
    n_samples: int = 1,
    split: bool = False,
    split_ratio: float = 0.3,
    random_state: int = 0,
):
    """Load CSI features from a directory of ``*.npz`` files.

    Returns *per-sample* feature matrix **X** and label vector **y**.

    Parameters
    ----------
    dir_path : str
        Folder containing ``*.npz`` archives (each with key ``H``).
    include_phase : bool, default False
        If ``True``, concatenate amplitude & phase; otherwise amplitude only.
    n_samples : int, default 1
        Concatenate this many consecutive packets to form one sample.  If
        leftovers < ``n_samples`` remain, they are discarded.
    split : bool, default False
        If ``True``, **ignores any separate test directory** and instead
        returns an *internal 70/30 train/test split* of the data loaded from
        *dir_path*.
    split_ratio : float, default 0.3
        Proportion of samples to assign to the *test* split when
        ``split=True``.
    random_state : int, default 0
        Seed for shuffling before splitting.

    Returns
    -------
    np.ndarray, np.ndarray            # if ``split=False``
    np.ndarray, np.ndarray,
    np.ndarray, np.ndarray            # if ``split=True`` (train_X, train_y, test_X, test_y)
    """
    if n_samples < 1:
        raise ValueError("n_samples must be a positive integer")
    if not 0 < split_ratio < 1:
        raise ValueError("split_ratio must be in the open interval (0, 1)")

    per_class_feats = {idx: [] for idx in range(len(CLASS_KEYWORDS))}
    class_counts = {idx: 0 for idx in range(len(CLASS_KEYWORDS))}

    if not os.path.exists(dir_path):
        print(f"[!] Directory not found: {dir_path}")
        return np.empty((0, 0), dtype=np.float32), np.empty((0,), dtype=np.int64)

    npz_files = glob.glob(os.path.join(dir_path, '*.npz'))
    if not npz_files:
        print(f"[!] No NPZ files in {dir_path}")

    # --------------------- per-file processing ---------------------
    for file in sorted(npz_files):
        fname = os.path.basename(file)
        label_idx = next((CLS_TO_IDX[k] for k in CLASS_KEYWORDS if k in fname), None)
        if label_idx is None:
            print(f"[skip] Unrecognised class in filename: {fname}")
            continue

        data = np.load(file)
        H = data['H']                        # (N_pkts, K) complex
        amp = np.abs(H).astype(np.float32)   # (N_pkts, K)
        if include_phase:
            phase = np.angle(H).astype(np.float32)    # (N_pkts, K)
            feat_mat = np.concatenate([amp, phase], axis=1)  # (N_pkts, 2K)
        else:
            feat_mat = amp

        # --- optional packet aggregation ---
        if n_samples > 1:
            total_pkts = feat_mat.shape[0]
            groups = total_pkts // n_samples          # floor division – discard leftovers
            if groups == 0:
                print(f"[skip] Not enough packets in {fname} (need ≥{n_samples})")
                continue
            trimmed = feat_mat[:groups * n_samples]    # (groups*n, D)
            feat_mat = trimmed.reshape(groups, -1)      # (groups, n*D)

        per_class_feats[label_idx].append(feat_mat)
        class_counts[label_idx] += feat_mat.shape[0]

        print(
            f"Loaded {fname:>25} | class={CLASS_KEYWORDS[label_idx]:<6} | "
            f"samples={feat_mat.shape[0]:>6} | total {CLASS_KEYWORDS[label_idx]}="
            f"{class_counts[label_idx]}"
        )

    # --------------------- aggregate all classes ------------------
    feats_all, labels_all = [], []
    for idx, feat_list in per_class_feats.items():
        if feat_list:
            stacked = np.vstack(feat_list)
            feats_all.append(stacked)
            labels_all.append(np.full(stacked.shape[0], idx, dtype=np.int64))

    if not feats_all:
        return np.empty((0, 0), dtype=np.float32), np.empty((0,), dtype=np.int64)

    X = np.vstack(feats_all)
    y = np.concatenate(labels_all)

    print("\n[Summary] Sample distribution per class:")
    for idx, name in enumerate(CLASS_KEYWORDS):
        print(f"  {name:<6}: {class_counts[idx]}")
    print(f"  TOTAL : {len(y)} samples\n")

    # --------------------- optional 70/30 split -------------------
    if split:
        rng = np.random.default_rng(random_state)
        perm = rng.permutation(len(y))
        X, y = X[perm], y[perm]
        split_idx = int(len(y) * (1 - split_ratio))   # 70% train by default
        X_train, y_train = X[:split_idx], y[:split_idx]
        X_test,  y_test  = X[split_idx:], y[split_idx:]
        print(f"Performed internal split: train={len(y_train)} test={len(y_test)}")
        return X_train, y_train, X_test, y_test

    return X, y


# ------------------------- example usage -------------------------
# 1️⃣ Original behaviour (full directory → X, y)
# X_all, y_all = load_npz_dir(TRAIN_DIR, include_phase=INCLUDE_PHASE)

# 2️⃣ Packet aggregation + internal 70/30 split
X_tr, y_tr, X_te, y_te = load_npz_dir(
    TRAIN_DIR,
    include_phase=INCLUDE_PHASE,
    n_samples=1000,
    split=True,
)

Loaded             drink.csv.npz | class=drink  | samples=   210 | total drink=210
Loaded               eat.csv.npz | class=eat    | samples=   198 | total eat=198
Loaded             empty.csv.npz | class=empty  | samples=   190 | total empty=190
Loaded             sleep.csv.npz | class=sleep  | samples=   199 | total sleep=199
Loaded             smoke.csv.npz | class=smoke  | samples=   198 | total smoke=198
Loaded             watch.csv.npz | class=watch  | samples=   190 | total watch=190
Loaded              work.csv.npz | class=work   | samples=   190 | total work=190

[Summary] Sample distribution per class:
  empty : 190
  watch : 190
  work  : 190
  smoke : 198
  eat   : 198
  drink : 210
  sleep : 199
  TOTAL : 1375 samples

Performed internal split: train=962 test=413


In [4]:
# ----------------------------------
# 2. PCA visualisation helper + run
# ----------------------------------

from typing import Sequence

# --- Scale features (standardisation) ---
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train_raw)
X_test  = scaler.transform(X_test_raw)


def plot_pca_scatter(
    X_train: np.ndarray,
    y_train: Sequence[int],
    X_test: np.ndarray,
    y_test: Sequence[int],
    class_names: Sequence[str],
    n_components: int = 2,
):
    """Visualise train/test sets with 2-D or 3-D PCA scatter plots."""
    if n_components not in (2, 3):
        raise ValueError("n_components must be 2 or 3")

    pca = PCA(n_components=n_components, random_state=0)
    train_pca = pca.fit_transform(X_train)
    test_pca  = pca.transform(X_test)

    lbl_train = [class_names[i] for i in y_train]
    lbl_test  = [class_names[i] for i in y_test]

    if n_components == 2:
        fig_tr = px.scatter(x=train_pca[:, 0], y=train_pca[:, 1],
                            color=lbl_train, title="PCA 2-D – training set",
                            labels={'color': 'class'})
        fig_te = px.scatter(x=test_pca[:, 0], y=test_pca[:, 1],
                            color=lbl_test, title="PCA 2-D – test set",
                            labels={'color': 'class'})
    else:
        fig_tr = px.scatter_3d(x=train_pca[:, 0], y=train_pca[:, 1], z=train_pca[:, 2],
                               color=lbl_train, title="PCA 3-D – training set",
                               labels={'color': 'class'})
        fig_te = px.scatter_3d(x=test_pca[:, 0], y=test_pca[:, 1], z=test_pca[:, 2],
                               color=lbl_test, title="PCA 3-D – test set",
                               labels={'color': 'class'})

    fig_tr.show(); fig_te.show()

# --- Example calls ---
# plot_pca_scatter(X_train, y_train, X_test, y_test, CLASS_KEYWORDS, n_components=2)
# plot_pca_scatter(X_train, y_train, X_test, y_test, CLASS_KEYWORDS, n_components=3)

In [None]:
# ----------------------
# 3. PyTorch classifier
# ----------------------

class WiFiSensingNet(nn.Module):
    def __init__(self, input_size, num_classes, hidden_sizes=(512, 256, 128, 64), dropout=0.1):
        super().__init__()
        layers = []
        prev = input_size
        for h in hidden_sizes:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.Dropout(dropout)]
            prev = h
        layers.append(nn.Linear(prev, num_classes))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)


device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using device:', device)

X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.long)
X_test_t  = torch.tensor(X_test,  dtype=torch.float32)
y_test_t  = torch.tensor(y_test,  dtype=torch.long)

train_ds = TensorDataset(X_train_t, y_train_t)
valid_ds = TensorDataset(X_test_t,  y_test_t)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_ds, batch_size=32)

model = WiFiSensingNet(X_train.shape[1], len(CLASS_KEYWORDS)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

epochs = 200
for epoch in range(epochs):
    # ---- training ----
    model.train()
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()

    # ---- validation ----
    if (epoch + 1) % 10 == 0:
        model.eval()
        preds, gts = [], []
        with torch.no_grad():
            for xb, yb in valid_loader:
                xb = xb.to(device)
                out = model(xb)
                preds.extend(out.argmax(1).cpu().numpy())
                gts.extend(yb.numpy())
        acc = accuracy_score(gts, preds)
        print(f'Epoch {epoch+1:02d} – test accuracy: {acc*100:.2f}%')

print('\nClassification report (test set):')
model.eval()
with torch.no_grad():
    preds = model(X_test_t.to(device)).argmax(1).cpu().numpy()
print(classification_report(y_test, preds, target_names=CLASS_KEYWORDS))

In [None]:
# -------------------------------------------------
# 4. Classical ML baselines: SVC and KNN classifiers
# -------------------------------------------------

from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

# --- Support Vector Classifier ---
svc = SVC(kernel='rbf', C=10, gamma='scale')
svc.fit(X_train, y_train)
svc_pred = svc.predict(X_test)
print("\nSVC accuracy: {:.2f}%".format(accuracy_score(y_test, svc_pred) * 100))
print("SVC classification report:\n", classification_report(y_test, svc_pred, target_names=CLASS_KEYWORDS))

# --- k-Nearest Neighbours ---
knn = KNeighborsClassifier(n_neighbors=5, metric='euclidean')
knn.fit(X_train, y_train)
knn_pred = knn.predict(X_test)
print("\nKNN accuracy: {:.2f}%".format(accuracy_score(y_test, knn_pred) * 100))
print("KNN classification report:\n", classification_report(y_test, knn_pred, target_names=CLASS_KEYWORDS))