In [None]:
# CELL 1 — IMPORTS, LOCATION
import os, time, json
import pandas as pd
import numpy as np
from pathlib import Path

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, f1_score, recall_score
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras import layers, models, optimizers, Sequential

from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam

import time, io
from datetime import datetime

from google.colab import drive

drive.mount('/content/drive', force_remount=True)

data_base = Path(
    "/content/drive/MyDrive/Colab Notebooks/classifier_2026_locomotion_mode/step1_labelled")
data_base.mkdir(parents=True, exist_ok=True)

OUTPUT_ROOT = Path(
    "/content/drive/MyDrive/Colab Notebooks/classifier_2026_locomotion_mode/step3_models")
OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)


DATASET_ROOT = Path(
    "/content/drive/MyDrive/Colab Notebooks/classifier_2026_locomotion_mode/step2_combined")
DATASET_ROOT.mkdir(parents=True, exist_ok=True)

# print(os.listdir(data_base))

subject_folders = sorted(
    [d for d in os.listdir(data_base)
      if d.lower().startswith("sub")
      ]
    )
assert len(subject_folders) > 1, "Need at least 2 subjects for LOSO"
print("Subjects found:", subject_folders)

print('\nCell 1 done at', datetime.now())  # useful for understanding which cell has been previously compiled

In [None]:
# CELL 2 — Data loading (TRAIN + VAL only), tensor building, augmentation, class weights

# CSV → (N, T, F) ----------------
def csv_to_tensor(df, window_size):
    feature_cols = [c for c in df.columns if c.startswith("f")]
    X, y = [], []

    for seg_id, seg_df in df.groupby("segment_id"):
        seg_df = seg_df.sort_values("t")
        if len(seg_df) != window_size:
            continue  # safety
        X.append(seg_df[feature_cols].values) # T, F
        y.append(seg_df["label"].iloc[0])     # Scalar

    X = np.asarray(X, dtype=np.float32)
    y = np.asarray(y)
    return X, y

def augment_raw(X, y_ohe):
      """
      X: (N, timesteps*window_size, num_sensors) in [0,1]
      Augmentation:
        - per-sample, per-sensor scale + offset
        - additive Gaussian noise
      Returns augmented X, y_ohe (doubled in size).
      """
      X_aug = X.copy()
      N, T, S = X.shape
      y_int = np.argmax(y_ohe, axis=1)   # shape (N,)

      for i in range(N):
          Xi = X_aug[i]
          cls = y_int[i]

          scale_range=(0.75, 1.25)
          offset_range=(-0.25, 0.25)
          noise_std=0.05

          # --- per-sensor scale and offset ---
          scales = np.random.uniform(scale_range[0], scale_range[1], size=(1, S))
          offsets = np.random.uniform(offset_range[0], offset_range[1], size=(1, S))
          Xi = Xi * scales + offsets

          # --- Gaussian noise ---
          Xi = Xi + np.random.normal(0.0, noise_std, size=(T, S))

          # clip back to [0, 1]
          X_aug[i] = np.clip(Xi, 0.0, 1.0)

      X_out = np.concatenate([X, X_aug], axis=0)
      y_out = np.concatenate([y_ohe, y_ohe], axis=0)
      return X_out, y_out

def load_train_val_tensors(dataset_root, tag, window_size, num_classes):
    # train_path = dataset_root / f"all_combined_train_after_shuffle_{tag}.csv"
    train_path = dataset_root / f"all_combined_train_before_shuffle_{tag}.csv"
    val_path   = dataset_root / f"all_combined_val_{tag}.csv"

    print(f"\nLoading TRAIN: {train_path.name}")
    print(f"Loading VAL  : {val_path.name}")

    train_df = pd.read_csv(train_path)
    val_df   = pd.read_csv(val_path)

    X_train, y_train = csv_to_tensor(train_df, window_size)
    X_val,   y_val   = csv_to_tensor(val_df, window_size)

    y_train_oh = tf.keras.utils.to_categorical(y_train, num_classes)
    y_val_oh   = tf.keras.utils.to_categorical(y_val, num_classes)

    # Data Augmentation
    X_train, y_train_oh = augment_raw(X_train, y_train_oh)

    # ---------------- Class weights (AFTER augmentation) ----------------
    y_train_aug = np.argmax(y_train_oh, axis=1)
    class_weights_arr = compute_class_weight(
        class_weight="balanced",
        classes=np.arange(num_classes),
        y=y_train_aug
    )
    class_weights = {i: float(class_weights_arr[i]) for i in range(num_classes)}

    class_weights = None

    print("Class weights:", class_weights)



    return X_train, y_train_oh, X_val, y_val_oh, class_weights

print('\nCell 2 done at', datetime.now())

In [None]:
# CELL 3 — Build & compile CNN model

# Models expect:
#   input_shape = (T, F)
#
#   T = window_size
#   F = number of sensor data / features, per timestep (column-wise)


def build_cnn_model(input_shape, num_classes, k1, k2):
    inp = tf.keras.layers.Input(shape=input_shape, name="raw_input")
    print("raw input shape ->", inp.shape)

    x = tf.keras.layers.Conv1D(32, kernel_size=k1, padding="same",
                      activation="relu", name="conv1")(inp)
    # x = tf.keras.layers.MaxPooling1D(pool_size=2, name="max-pool")(x)
    x = tf.keras.layers.Conv1D(64, kernel_size=k2, padding="same",
                      activation="relu", name="conv2")(x)
    x = tf.keras.layers.MaxPooling1D(pool_size=2, name="maxpool")(x)

    z = tf.keras.layers.GlobalAveragePooling1D(name="gap")(x)
    # z = tf.keras.layers.Flatten(name="flatten")(x)
    z = tf.keras.layers.Dense(32, activation="relu", name="dense1")(z)
    z = tf.keras.layers.Dropout(0.3, name="drop1")(z)

    out = tf.keras.layers.Dense(num_classes, activation="softmax", name="output")(z)

    model = models.Model(inp, out, name="cnn_1d_model")

    model.summary()
    model.compile(
        optimizer=optimizers.Adam(learning_rate=1e-3),  # standard Adam
        loss="categorical_crossentropy",
        metrics=[
            'accuracy',
            'mse',
            tf.keras.metrics.Precision(name="precision")
            ]
    )

    return model

print('\nCell 3 done at', datetime.now())

In [None]:
# CELL 4 — Train model and save .keras
MAX_EPOCHS = 200

def train_and_save_model(
    model,
    X_train, y_train_oh,
    X_val, y_val_oh,
    class_weights,
    batch_size,
    model_dir,
    model_name
):
    callbacks = [
        EarlyStopping(
            monitor="val_loss",
            patience=25,  #10-15% of MAX_EPOCHS
            min_delta=0.01,
            mode="min",
            restore_best_weights=False,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor="val_loss",
            factor=0.5,
            patience=13,  # ~ half of early stop pateince
            min_delta=0.01,
            mode="min",
            min_lr=1e-4,  # prevent too low LR, may overfit intra-subject
            verbose=1
        )
    ]

    history = model.fit(
        X_train, y_train_oh,
        validation_data=(X_val, y_val_oh),
        epochs=MAX_EPOCHS,
        batch_size=batch_size,
        class_weight=class_weights,
        callbacks=callbacks,
        verbose=1
    )

    model_dir.mkdir(parents=True, exist_ok=True)
    keras_path = model_dir / f"{model_name}.keras"
    model.save(keras_path)

    saved_model_dir = model_dir / "saved_model"
    model.export(saved_model_dir)

    print(f"Saved Keras model: {keras_path.name}")
    return keras_path, saved_model_dir

print('\nCell 4 done at', datetime.now())

In [None]:
# CELL 5 — Convert .keras to INT8 TFLite

def convert_to_int8_tflite(saved_model_dir, X_train, tflite_save_path):
    # model = tf.keras.models.load_model(keras_path)

    # Randomize train set and provide a representative data for int8 qunatization calibration
    idx = np.random.choice(len(X_train), size=min(200, len(X_train)), replace=False)
    def representative_dataset():
        for i in idx:
            yield [X_train[i:i+1].astype(np.float32)]

    converter = tf.lite.TFLiteConverter.from_saved_model(str(saved_model_dir))
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.representative_dataset = representative_dataset
    converter.target_spec.supported_ops = [
        tf.lite.OpsSet.TFLITE_BUILTINS_INT8,
        # tf.lite.OpsSet.TFLITE_SELECT_OPS,
        ]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8

    tflite_model = converter.convert()

    with open(tflite_save_path, "wb") as f:
        f.write(tflite_model)

    print(f"Saved TFLite model: {tflite_save_path.name}")
    return tflite_save_path

print('\nCell 5 done at', datetime.now())

In [None]:
# CELL 6 — Evaluate validation set using TFLite only

def evaluate_tflite_model(tflite_path, X, y_true):
    interpreter = tf.lite.Interpreter(model_path=str(tflite_path))
    interpreter.allocate_tensors()

    input_details  = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]

    scale, zero = input_details["quantization"]

    y_pred = []

    for i in range(X.shape[0]):
        x = X[i:i+1]
        x_q = np.clip(
            np.round(x / scale + zero),
            -128, 127
        ).astype(np.int8)

        interpreter.set_tensor(input_details["index"], x_q)
        interpreter.invoke()

        out = interpreter.get_tensor(output_details["index"])
        y_pred.append(np.argmax(out))

    y_pred = np.array(y_pred)

    cm_raw = confusion_matrix(y_true, y_pred)

    cm_norm = cm_raw.astype(np.float32)
    row_sums = cm_norm.sum(axis=1, keepdims=True)
    cm_norm = np.divide(cm_norm, row_sums, where=row_sums != 0)

    return {
        "accuracy": accuracy_score(y_true, y_pred),
        "f1_macro": f1_score(y_true, y_pred, average="macro"),
        "cm_raw": cm_raw,
        "cm_norm": cm_norm,
    }

print('\nCell 6 done at', datetime.now())

In [None]:
# CELL 7 — Build LOSO TEST dataset (combine + segment, no shuffle)

def build_loso_test_tensor(
    subject_folder,
    window_size,
    step_distance
):
    """
    Build LOSO test dataset from raw subject CSVs
    - combine all class files
    - segment using window_size and step_distance
    - NO shuffling
    """

    segments = []
    labels = []

    print(f"\nBuilding LOSO test set from: {subject_folder.name}")
    print(f"Window size: {window_size}, Step distance: {step_distance}")

    for csv_file in sorted(subject_folder.glob("*.csv")):
        df = pd.read_csv(csv_file)
        class_label = int(csv_file.stem.split("_")[-1])

        values = df.values  # (N, F)

        for start in range(0, len(values) - window_size + 1, step_distance):
            segment = values[start:start + window_size]
            segments.append(segment)
            labels.append(class_label)

    X_test = np.asarray(segments, dtype=np.float32)
    y_test = np.asarray(labels)

    print("LOSO test tensor shape:", X_test.shape)
    print("LOSO test labels shape:", y_test.shape)

    return X_test, y_test


print('\nCell 7 done at', datetime.now())

In [None]:
def plot_and_save_confusion_matrix(
    cm,
    title,
    save_path,
    cmap="Greens",
    dpi=450
):
    """
    Parameters
    ----------
    cm : np.ndarray
        Normalized confusion matrix of shape (NUM_CLASSES, NUM_CLASSES),
        values in [0, 1]
    title : str
        Figure title
    save_path : Path or str
        Where to save the PNG
    cmap : str
        Matplotlib colormap
    dpi : int
        Resolution for publication
    """

    cm = np.asarray(cm, dtype=np.float32)
    num_classes = cm.shape[0]

    fig, ax = plt.subplots(figsize=(7, 6), dpi=dpi)
    im = ax.imshow(cm, interpolation="nearest", cmap=cmap, vmin=0, vmax=1)

    ax.set_title(title, fontsize=14)
    ax.set_xlabel("Predicted label", fontsize=12)
    ax.set_ylabel("True label", fontsize=12)

    ax.set_xticks(np.arange(num_classes))
    ax.set_yticks(np.arange(num_classes))
    ax.set_xticklabels(np.arange(num_classes))
    ax.set_yticklabels(np.arange(num_classes))

    # Colorbar
    cbar = fig.colorbar(im, ax=ax, fraction=0.046)
    cbar.set_label("Normalized value", fontsize=11)

    # Cell annotations
    thresh = 0.5
    for i in range(num_classes):
        for j in range(num_classes):
            ax.text(
                j, i,
                f"{cm[i, j]:.2f}",
                ha="center",
                va="center",
                fontsize=11,
                color="white" if cm[i, j] > thresh else "black"
            )

    plt.tight_layout()

    save_path = Path(save_path)
    save_path.parent.mkdir(parents=True, exist_ok=True)
    plt.savefig(save_path, bbox_inches="tight")
    plt.close(fig)

    print(f"Saved confusion matrix → {save_path}")

print('\nCell 7 done at', datetime.now())

In [None]:
# CELL 15 — PARAMETRIC SWEEP + LOSO + AGGREGATION

NUM_CLASSES = 7

# ---------------- CONFIGURATIONS ----------------

WINDOW_STRIDE_CONFIGS = [
    (100, 20),
    (100, 50),
    (50, 10),
    (50, 25),
]

KERNEL_CONFIGS = [
    (11, 7),
    (9, 5),
    # (7, 7),
    (7, 5),
    # (7, 3),
    # (5, 3),
    # (5, 3)
]

BATCH_SIZES = [
    # 16,
    32,
    64
    ]

SUBJECTS = [
    "sub1",
    "sub2",
    # "sub3", # bad data
    "sub4",
    "sub5",
    "sub6",
    "sub7",
    "sub8",
    "sub9",
    "sub10",
    "sub11",
]

print("\n========== PARAMETRIC STUDY SETUP ==========")
print("NUM_CLASSES           :", NUM_CLASSES)
print("Window–Stride configs :", WINDOW_STRIDE_CONFIGS)
print("Kernel configs        :", KERNEL_CONFIGS)
print("Batch sizes           :", BATCH_SIZES)
print("LOSO subjects         :", SUBJECTS)
print("===========================================\n")

# ---------------- MASTER LOOP ----------------

for (W, S) in WINDOW_STRIDE_CONFIGS:
    for (k1, k2) in KERNEL_CONFIGS:
        for batch_size in BATCH_SIZES:
            start_time = time.time()
            print("\n" + "#" * 100)
            print(
                f"START CONFIG → W{W}_S{S} | kernels=({k1},{k2}) | batch={batch_size}"
            )
            print("#" * 100)

            # Store per-configuration results
            CONFIG_RESULTS   = []
            CONFIG_CMS_RAW   = []
            CONFIG_CMS_NORM  = []

            for loso_sub in SUBJECTS:

                tag = f"W{W}_S{S}_LOSO_{loso_sub}"

                dataset_root = Path("/content/drive/MyDrive/Colab Notebooks/classifier_2026_locomotion_mode/step2_combined")
                dataset_root = (
                    dataset_root
                    / f"W{W}_S{S}"
                    / f"LOSO_{loso_sub}"
                )
                if not dataset_root.exists():
                  raise FileNotFoundError(f"Dataset root does not exist: {dataset_root}")

                print("\n" + "-" * 80)
                print(f"LOSO SUBJECT → {loso_sub}")
                print(f"Dataset root → {dataset_root}")
                print("-" * 80)

                # =====================================================
                # CELL 2 — Load TRAIN + VAL tensors
                # =====================================================
                X_train, y_train_oh, X_val, y_val_oh, class_weights = \
                    load_train_val_tensors(
                        dataset_root=dataset_root,
                        tag=tag,
                        window_size=W,
                        num_classes=NUM_CLASSES
                    )

                # =====================================================
                # CELL 3 — Build model
                # =====================================================
                model = build_cnn_model(
                    input_shape=(W, X_train.shape[2]),
                    num_classes=NUM_CLASSES,
                    k1=k1,
                    k2=k2
                )

                model_name = f"cnn_model_{tag}_k{k1}-{k2}_batch{batch_size}"

                model_dir = (
                    OUTPUT_ROOT
                    / f"W{W}_S{S}"
                    / f"k{k1}-{k2}"
                    / f"batch{batch_size}"
                    / f"LOSO_{loso_sub}"
                )
                model_dir.mkdir(parents=True, exist_ok=True)

                # cm_dir = model_dir / "confusion_matrices"
                # cm_dir.mkdir(parents=True, exist_ok=True)

                # =====================================================
                # CELL 4 — Train + save .keras
                # =====================================================
                _, saved_model_dir = train_and_save_model(
                    model=model,
                    X_train=X_train,
                    y_train_oh=y_train_oh,
                    X_val=X_val,
                    y_val_oh=y_val_oh,
                    # class_weights=class_weights,
                    class_weights=None,
                    batch_size=batch_size,
                    model_dir=model_dir,
                    model_name=model_name
                )
                train_tm = time.time() - start_time
                eval_strt_tm = time.time()
                print("\nTime taken to train: ", train_tm)
                print(" secs")
                # =====================================================
                # CELL 5 — Convert to INT8 TFLite
                # =====================================================
                tflite_path = model_dir / f"{model_name}.tflite"
                convert_to_int8_tflite(saved_model_dir, X_train, tflite_path)

                # =====================================================
                # CELL 6 — Validation (TFLite only)
                # =====================================================
                val_metrics = evaluate_tflite_model(
                    tflite_path=tflite_path,
                    X=X_val,
                    y_true=np.argmax(y_val_oh, axis=1),
                    # normalize_cm=True
                )

                # =====================================================
                # CELL 7 — LOSO TEST (build + evaluate)
                # =====================================================
                subject_folder = data_base / loso_sub

                X_test, y_test = build_loso_test_tensor(
                    subject_folder=subject_folder,
                    window_size=W,
                    step_distance=S
                )

                test_metrics = evaluate_tflite_model(
                    tflite_path=tflite_path,
                    X=X_test,
                    y_true=y_test,
                    # normalize_cm=True
                )

                eval_tm = time.time() - eval_strt_tm
                print("\nTime taken to eval: ", eval_tm)
                print(" secs")

                # -----------------------------------------------------
                # Store per-fold results
                # -----------------------------------------------------
                print(
                    f"[VAL ] acc={val_metrics['accuracy']:.3f}, "
                    f"f1={val_metrics['f1_macro']:.3f}"
                )
                print(
                    f"[TEST] acc={test_metrics['accuracy']:.3f}, "
                    f"f1={test_metrics['f1_macro']:.3f}"
                )

                CONFIG_RESULTS.append({
                    "loso_subject": loso_sub,
                    "val_accuracy": val_metrics["accuracy"],
                    "val_f1_macro": val_metrics["f1_macro"],
                    "test_accuracy": test_metrics["accuracy"],
                    "test_f1_macro": test_metrics["f1_macro"],
                })

                CONFIG_CMS_RAW.append(test_metrics["cm_raw"])
                CONFIG_CMS_NORM.append(test_metrics["cm_norm"])

                # -----------------------------------------------------
                # Save per-fold confusion matrices
                # -----------------------------------------------------
                plot_and_save_confusion_matrix(
                    cm=test_metrics["cm_raw"],
                    title=f"{model_name} — LOSO {loso_sub} (Counts)",
                    save_path=model_dir / "cm_raw.png"
                )

                plot_and_save_confusion_matrix(
                    cm=test_metrics["cm_norm"],
                    title=f"{model_name} — LOSO {loso_sub} (Normalized)",
                    save_path=model_dir / "cm_norm.png"
                )

            # =====================================================
            # AGGREGATION — PER CONFIGURATION
            # =====================================================
            config_df = pd.DataFrame(CONFIG_RESULTS)

            mean_vals = config_df.mean(numeric_only=True)
            std_vals  = config_df.std(numeric_only=True)

            summary_df = pd.concat(
                [
                    config_df,
                    pd.DataFrame([{
                        "loso_subject": "MEAN",
                        **mean_vals.to_dict()
                    }]),
                    pd.DataFrame([{
                        "loso_subject": "STD",
                        **std_vals.to_dict()
                    }]),
                ],
                ignore_index=True
            )

            # Save summary
            summary_dir = (
                OUTPUT_ROOT
                / f"W{W}_S{S}"
                / f"k{k1}-{k2}"
                / f"batch{batch_size}"
            )
            summary_dir.mkdir(parents=True, exist_ok=True)

            summary_path = summary_dir / "summary_results.xlsx"
            summary_df.to_excel(summary_path, index=False)

            print("\nSaved summary →", summary_path)

            # =====================================================
            # Average normalized confusion matrix
            # =====================================================
            avg_cm_raw  = np.sum(np.stack(CONFIG_CMS_RAW, axis=0), axis=0)
            avg_cm_norm = np.mean(np.stack(CONFIG_CMS_NORM, axis=0), axis=0)

            plot_and_save_confusion_matrix(
                cm=avg_cm_raw,
                title=f"Avg Confusion Matrix (Counts)\nW{W}_S{S}, k{k1}-{k2}, batch{batch_size}",
                save_path=summary_dir / "avg_cm_raw.png"
            )

            plot_and_save_confusion_matrix(
                cm=avg_cm_norm,
                title=f"Avg Confusion Matrix (Normalized)\nW{W}_S{S}, k{k1}-{k2}, batch{batch_size}",
                save_path=summary_dir / "avg_cm_norm.png"
            )

            print(
                f"Completed CONFIG → "
                f"W{W}_S{S} | k{k1}-{k2} | batch{batch_size}"
            )


print('\nCell 8 done at', datetime.now())