
# Human Activity Recognition — Comparing Feedforward NN vs RNN vs LSTM vs GRU

Train and compare four neural architectures on a real sequential task — smartphone-based **Human Activity Recognition (HAR)** — and visualize what leaders should look for at every step: data quality, leakage checks, baselines, learning curves, confusion matrices, generalization, model size/latency, and trade-offs.

**Models compared**
- **Feedforward NN (MLP)** on flattened sequences (ignores temporal order).
- **SimpleRNN** (vanilla recurrent layer).
- **LSTM** (handles long-term dependencies via gates).
- **GRU** (gated, often smaller/faster than LSTM).

**Dataset (public):** UCI *Human Activity Recognition Using Smartphones* (UCI HAR). It contains tri-axial accelerometer and gyroscope signals for six activities (Walking, Upstairs, Downstairs, Sitting, Standing, Laying). We will use the **Inertial Signals** windows (length 128) as proper sequences: shape `(samples, time_steps=128, channels=9)`.

> If the dataset cannot be downloaded in your environment, this notebook will **fall back to a synthetic dataset** so you can still run the full workflow.


## Setup

In [None]:
import os, sys, zipfile, io, urllib.request, shutil, time, random
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt, warnings

# For modeling
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# For metrics and plots
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score, 
    balanced_accuracy_score, precision_score, recall_score,
    f1_score, cohen_kappa_score, matthews_corrcoef, log_loss
)
from sklearn.model_selection import train_test_split

np.random.seed(7)
tf.random.set_seed(7)
warnings.filterwarnings("ignore")

print("Python:", sys.version.split()[0])
print("TensorFlow:", tf.__version__)
print("NumPy:", np.__version__)
print("Pandas:", pd.__version__)


## Load the UCI HAR Dataset (with fallbacks)

Try multiple official/mirrored URLs. If download fails (e.g., no internet, firewalls), we generate a **synthetic** dataset with similar shape and class balance so the rest of the notebook still runs.


In [None]:
DATA_DIR = Path("data_uci_har")
HAR_DIR = DATA_DIR / "UCI HAR Dataset"

def try_download_uci_har(dest_dir: Path) -> bool:
    dest_dir.mkdir(parents=True, exist_ok=True)
    # Several known URLs (first is the classic direct file)
    urls = [
        "https://archive.ics.uci.edu/ml/machine-learning-databases/00240/UCI%20HAR%20Dataset.zip",
        # New UCI site may require manual fetch; this is kept as a reference.
        # If the first link fails, users can download manually from UCI and place here.
        # "https://archive.ics.uci.edu/dataset/240/human%2Bactivity%2Brecognition%2Busing%2Bsmartphones"
    ]
    for url in urls:
        try:
            print("Attempting download:", url)
            with urllib.request.urlopen(url, timeout=60) as resp:
                data = resp.read()
            with zipfile.ZipFile(io.BytesIO(data)) as zf:
                zf.extractall(dest_dir)
            print("Downloaded & extracted to:", dest_dir.resolve())
            return True
        except Exception as e:
            print("Download failed from", url, "->", e)
    print("\nManual option: Download 'UCI HAR Dataset.zip' from the UCI page and unzip into:", dest_dir.resolve())
    return False

def load_inertial_har(base: Path):
    # Load 9 channels (body_acc, body_gyro, total_acc) each with x/y/z; each row has 128 time steps.
    channels = [
        "body_acc_x", "body_acc_y", "body_acc_z",
        "body_gyro_x","body_gyro_y","body_gyro_z",
        "total_acc_x","total_acc_y","total_acc_z"
    ]
    def load_split(split):
        X_list = []
        for ch in channels:
            f = base / split / "Inertial Signals" / f"{ch}_{split}.txt"
            arr = np.loadtxt(f)  # shape (n_samples, 128)
            X_list.append(arr[:, :, None])  # (n, 128, 1)
        X = np.concatenate(X_list, axis=2)  # (n, 128, 9)
        y = np.loadtxt(base / split / f"y_{split}.txt").astype(int)  # labels 1..6
        # Map to 0..5
        y = y - 1
        return X, y

    X_train, y_train = load_split("train")
    X_test,  y_test  = load_split("test")

    # Activity labels per UCI HAR
    activity_map = {
        0: "WALKING",
        1: "WALKING_UPSTAIRS",
        2: "WALKING_DOWNSTAIRS",
        3: "SITTING",
        4: "STANDING",
        5: "LAYING",
    }
    return X_train, y_train, X_test, y_test, activity_map

download_ok = False
if not HAR_DIR.exists():
    download_ok = try_download_uci_har(DATA_DIR)
else:
    download_ok = True

USE_SYNTHETIC = False
if download_ok:
    try:
        X_train, y_train, X_test, y_test, activity_map = load_inertial_har(HAR_DIR)
    except Exception as e:
        print("Failed to load inertial signals, falling back to synthetic. Error:", e)
        USE_SYNTHETIC = True
else:
    USE_SYNTHETIC = True

if USE_SYNTHETIC:
    print("Generating synthetic dataset with similar shape...")
    n_train, n_test = 6000, 2500
    time_steps, channels, n_classes = 128, 9, 6
    def make_synth(n):
        # Class-specific sinusoids + noise to mimic different activities
        X = np.zeros((n, time_steps, channels), dtype=np.float32)
        y = np.random.randint(0, n_classes, size=n)
        t = np.linspace(0, 4*np.pi, time_steps)
        for i in range(n):
            cls = y[i]
            base_freq = (cls + 1) * 0.4
            sig = np.sin(base_freq * t)[None, :].T  # (128,1)
            noise = 0.3 * np.random.randn(time_steps, channels)
            X[i] = sig @ np.ones((1, channels)) + noise
        return X, y
    X_train, y_train = make_synth(n_train)
    X_test,  y_test  = make_synth(n_test)
    activity_map = {i: f"CLASS_{i}" for i in range(6)}

n_classes = len(np.unique(y_train))
time_steps = X_train.shape[1]
channels   = X_train.shape[2]

print("Train shape:", X_train.shape, " Test shape:", X_test.shape)
print("Classes:", n_classes, activity_map)


## Explore data and visualize sequences


In [None]:
# Class distribution
def plot_class_counts(y, title):
    counts = pd.Series(y).value_counts().sort_index()
    labels = [f"{i}:{activity_map[i]}" for i in counts.index]
    plt.figure(figsize=(8,3.5)); plt.bar(labels, counts.values)
    plt.title(title); plt.ylabel("count"); plt.xticks(rotation=30, ha='right'); plt.tight_layout(); plt.show()

plot_class_counts(y_train, "Train Class Distribution")
plot_class_counts(y_test,  "Test Class Distribution")

# Plot a few example windows (first two channels only for readability)
def plot_example_sequences(X, y, n=6):
    idxs = np.random.choice(len(X), size=n, replace=False)
    plt.figure(figsize=(10, 6))
    for i, idx in enumerate(idxs, 1):
        ax = plt.subplot(n, 1, i)
        ax.plot(X[idx, :, 0], label="ch0")
        ax.plot(X[idx, :, 1], label="ch1")
        ax.set_ylabel(f"{activity_map[int(y[idx])]}")
        if i == 1: ax.legend(loc='upper right')
    plt.xlabel("time step"); plt.tight_layout(); plt.show()

plot_example_sequences(X_train, y_train, n=6)


## Train/Validation split and preprocessing

Keep the dataset’s **pre-defined test set** (different subjects) and split the training set into train/validation. Standardize each channel by the **training** mean/std to avoid leakage.


In [None]:
# Train/Val split
X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train, random_state=7)

# Standardize per-channel using training statistics only
mu = X_tr.mean(axis=(0,1), keepdims=True)
sd = X_tr.std(axis=(0,1), keepdims=True) + 1e-8
X_tr  = (X_tr  - mu) / sd
X_val = (X_val - mu) / sd
X_te  = (X_test - mu) / sd

print("Standardized shapes:", X_tr.shape, X_val.shape, X_te.shape)


## Baselines

- **Majority class** accuracy
- **Feedforward NN (MLP)** on flattened sequences (order-less baseline)


In [None]:
# Majority baseline
majority_class = pd.Series(y_tr).mode()[0]
maj_acc = np.mean(y_test == majority_class)
print("Majority-class baseline accuracy (on Test):", round(float(maj_acc), 4))

# Prepare one-hot labels for Keras
def to_onehot(y, n_classes):
    Y = np.zeros((len(y), n_classes), dtype=np.float32)
    Y[np.arange(len(y)), y] = 1.0
    return Y

Y_tr, Y_val, Y_te = to_onehot(y_tr, n_classes), to_onehot(y_val, n_classes), to_onehot(y_test, n_classes)

# Flattened inputs for MLP
X_tr_flat = X_tr.reshape((X_tr.shape[0], -1))
X_val_flat = X_val.reshape((X_val.shape[0], -1))
X_te_flat  = X_te.reshape((X_te.shape[0],  -1))

def build_mlp(input_dim, n_classes):
    model = keras.Sequential([
        layers.Input(shape=(input_dim,)),
        layers.Dense(128, activation="relu"),
        layers.Dropout(0.3),
        layers.Dense(64, activation="relu"),
        layers.Dropout(0.3),
        layers.Dense(n_classes, activation="softmax")
    ])
    model.compile(optimizer=keras.optimizers.Adam(1e-3),
                  loss="categorical_crossentropy",
                  metrics=["accuracy"])
    return model

mlp = build_mlp(X_tr_flat.shape[1], n_classes)
mlp.summary()

cb = [keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True, monitor="val_accuracy")]
hist_mlp = mlp.fit(X_tr_flat, Y_tr, validation_data=(X_val_flat, Y_val),
                   epochs=30, batch_size=128, verbose=2, callbacks=cb)
mlp_test_loss, mlp_test_acc = mlp.evaluate(X_te_flat, Y_te, verbose=0)
print("MLP Test Acc:", round(float(mlp_test_acc), 4))


## Recurrent models: SimpleRNN, LSTM, GRU
Train three sequential models with similar capacity and compare.


In [None]:
def build_recurrent(kind:str, time_steps:int, channels:int, n_classes:int, units:int=64):
    model = keras.Sequential([layers.Input(shape=(time_steps, channels))])
    if kind == "rnn":
        model.add(layers.SimpleRNN(units, return_sequences=False))
    elif kind == "lstm":
        model.add(layers.LSTM(units, return_sequences=False))
    elif kind == "gru":
        model.add(layers.GRU(units, return_sequences=False))
    else:
        raise ValueError("Unknown kind")
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(64, activation="relu"))
    model.add(layers.Dense(n_classes, activation="softmax"))
    model.compile(optimizer=keras.optimizers.Adam(1e-3),
                  loss="categorical_crossentropy",
                  metrics=["accuracy"])
    return model

results = []
histories = {}

for kind in ["rnn", "lstm", "gru"]:
    print("\nTraining", kind.upper(), "…")
    model = build_recurrent(kind, time_steps, channels, n_classes, units=64)
    model.summary()
    cb = [keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True, monitor="val_accuracy")]
    hist = model.fit(X_tr, Y_tr, validation_data=(X_val, Y_val),
                     epochs=30, batch_size=128, verbose=2, callbacks=cb)
    test_loss, test_acc = model.evaluate(X_te, Y_te, verbose=0)
    results.append({"model": kind.upper(), "test_acc": float(test_acc), "params": model.count_params()})
    histories[kind] = hist.history

# Add MLP results too
results.append({"model": "MLP", "test_acc": float(mlp_test_acc), "params": mlp.count_params()})
res_df = pd.DataFrame(results).sort_values("test_acc", ascending=False).reset_index(drop=True)
res_df


## Learning curves & capacity vs. accuracy


In [None]:
def plot_learning_curves(hist_dict, title):
    plt.figure(figsize=(9,4))
    for name, hist in hist_dict.items():
        plt.plot(hist["val_accuracy"], label=f"{name.upper()} val_acc")
    plt.title(title); plt.xlabel("epoch"); plt.ylabel("val_acc"); plt.legend(); plt.tight_layout(); plt.show()

# Histories include only RNN/LSTM/GRU; fetch MLP from its Keras History object
hist_dict = histories.copy()
hist_dict["mlp"] = {"val_accuracy": [float(x) for x in (globals().get("hist_mlp").history.get("val_accuracy", []))]}
plot_learning_curves(hist_dict, "Validation Accuracy per Epoch")

# Params vs accuracy
plt.figure(figsize=(7,4))
plt.scatter(res_df["params"], res_df["test_acc"])
for i, r in res_df.iterrows():
    plt.text(r["params"], r["test_acc"], r["model"], fontsize=9, ha="left", va="bottom")
plt.xscale("log")
plt.xlabel("Parameter count (log scale)")
plt.ylabel("Test accuracy")
plt.title("Capacity vs. Accuracy")
plt.tight_layout(); plt.show()


## Confusion matrices & per-class metrics

Compute predictions from the best-performing model and visualize the confusion matrix and class-wise precision/recall/F1.


In [None]:
# Choose the best model by test_acc
best_name = res_df.iloc[0]["model"]
print("Best model:", best_name)

def get_model_by_name(name):
    if name == "MLP":
        return mlp, lambda X: X.reshape((X.shape[0], -1))
    k = name.lower()
    # Rebuild and re-train is wasteful; instead, we kept the last trained instances in memory.
    # We'll re-train quickly to obtain the trained instance when needed (simple approach for clarity).
    model = build_recurrent(k, time_steps, channels, n_classes, units=64)
    cb = [keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True, monitor="val_accuracy")]
    _ = model.fit(X_tr, Y_tr, validation_data=(X_val, Y_val), epochs=30, batch_size=128, verbose=0, callbacks=cb)
    if k in ("rnn","lstm","gru"):
        return model, lambda X: X
    raise ValueError("Unknown model name")

best_model, X_mapper = get_model_by_name(best_name)
Xte_in = X_mapper(X_te)
Y_pred = best_model.predict(Xte_in, batch_size=256, verbose=0)
y_pred = Y_pred.argmax(axis=1)

print(classification_report(y_test, y_pred, target_names=[activity_map[i] for i in range(n_classes)]))

cm = confusion_matrix(y_test, y_pred)
fig, ax = plt.subplots(figsize=(6,5))
im = ax.imshow(cm, interpolation='nearest')
ax.set_title(f"Confusion Matrix — {best_name}")
ax.set_xlabel("Predicted"); ax.set_ylabel("True")
ax.set_xticks(range(n_classes)); ax.set_yticks(range(n_classes))
ax.set_xticklabels([activity_map[i] for i in range(n_classes)], rotation=45, ha='right')
ax.set_yticklabels([activity_map[i] for i in range(n_classes)])
for i in range(n_classes):
    for j in range(n_classes):
        ax.text(j, i, cm[i, j], ha="center", va="center", fontsize=8)
plt.tight_layout(); plt.show()

## Metrics beyond accuracy: Show they tell a consistent story

In [None]:
# Get a trained model instance by name (retrain quickly to ensure availability)
def _get_model_by_name(name:str):
    global mlp  # from earlier section
    if name.upper() == "MLP":
        # uses flattened inputs
        def mapper(X): return X.reshape((X.shape[0], -1))
        return mlp, mapper, False
    # recurrent models use (time, channels)
    k = name.lower()
    model = build_recurrent(k, time_steps, channels, n_classes, units=64)
    cb = [keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True, monitor="val_accuracy")]
    _ = model.fit(X_tr, Y_tr, validation_data=(X_val, Y_val),
                  epochs=30, batch_size=128, verbose=0, callbacks=cb)
    return model, (lambda X: X), True

def evaluate_model(model, X, y_true):
    """Return a dict of metrics using predicted probabilities (for log_loss) and labels."""
    # Probabilities and labels
    prob = model.predict(X, batch_size=256, verbose=0)
    y_pred = prob.argmax(axis=1)

    # Metrics
    out = {
        "accuracy": accuracy_score(y_true, y_pred),
        "balanced_accuracy": balanced_accuracy_score(y_true, y_pred),
        "precision_macro": precision_score(y_true, y_pred, average="macro", zero_division=0),
        "precision_weighted": precision_score(y_true, y_pred, average="weighted", zero_division=0),
        "recall_macro": recall_score(y_true, y_pred, average="macro", zero_division=0),
        "recall_weighted": recall_score(y_true, y_pred, average="weighted", zero_division=0),
        "f1_macro": f1_score(y_true, y_pred, average="macro"),
        "f1_weighted": f1_score(y_true, y_pred, average="weighted"),
        "cohen_kappa": cohen_kappa_score(y_true, y_pred),
        "mcc": matthews_corrcoef(y_true, y_pred),
        "log_loss": log_loss(y_true, prob)
    }
    return out

model_names = ["MLP", "RNN", "LSTM", "GRU"]
rows = []

for name in model_names:
    print(f"Scoring {name} …")
    model, mapper, retrained = _get_model_by_name(name)
    Xte_in = mapper(X_te)
    metrics = evaluate_model(model, Xte_in, y_test)
    metrics["model"] = name
    metrics["params"] = model.count_params()
    rows.append(metrics)

metrics_df = pd.DataFrame(rows).set_index("model")
# Sort by macro-F1 (safer under imbalance) to emphasize consistency with accuracy
metrics_df = metrics_df.sort_values("f1_macro", ascending=False)
display(metrics_df)

# Correlation matrix to show the metrics largely agree (high positive correlations)
corr = metrics_df.drop(columns=["params", "log_loss"]).corr()
print("\nMetric correlation matrix (excluding params & log_loss):")
display(corr)

# Plot: Accuracy vs Macro-F1
print("Note: Accuracy vs Macro-F1 - Models should lie close to diagonal if they tell the same story.")
plt.figure(figsize=(6,5))
plt.scatter(metrics_df["accuracy"], metrics_df["f1_macro"])
for m, r in metrics_df.iterrows():
    plt.text(r["accuracy"], r["f1_macro"], m, fontsize=9, ha="left", va="bottom")
plt.xlabel("Accuracy")
plt.ylabel("F1 (Macro)")
plt.title("Do Accuracy and F1 (Macro) agree?")
plt.tight_layout(); plt.show()

# Capacity vs multiple metrics: overlay text for quick scan (optional)
plt.figure(figsize=(7,5))
plt.scatter(metrics_df["params"], metrics_df["f1_macro"])
for m, r in metrics_df.iterrows():
    plt.text(r["params"], r["f1_macro"], f"{m}", fontsize=9, ha="left", va="bottom")
plt.xscale("log")
plt.xlabel("Parameter count (log scale)")
plt.ylabel("F1 (Macro)")
plt.title("Capacity vs F1 (Macro)")
plt.tight_layout(); plt.show()

# Quick narrative check
rank_cols = ["accuracy","balanced_accuracy","f1_macro","f1_weighted","precision_macro","recall_macro","cohen_kappa","mcc"]
ranks = metrics_df[rank_cols].rank(ascending=False, method="min")
print("\nModel ranks across metrics (1 = best). Lower is better:")
display(ranks)
print("\nIf ranks are similar across columns, different metrics are not telling a different story here.")

## References
- UCI Machine Learning Repository — *Human Activity Recognition Using Smartphones* (HAR).  
  https://archive.ics.uci.edu/dataset/240/human%2Bactivity%2Brecognition%2Busing%2Bsmartphones