In [7]:
import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GroupKFold, GridSearchCV
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    precision_score,
    recall_score,
    f1_score
)

DATA_DIR = "../DatasetGFT"

ACTIVITY_LABELS = [0, 1, 2, 3, 4, 5]
ACTIVITY_NAMES = [
    "Rest",
    "Fold clothes",
    "Sweep",
    "Walk",
    "Move boxes",
    "Bike"
]

SAMPLING_RATE = 100
WINDOW_SIZE = SAMPLING_RATE
STEP_SIZE = SAMPLING_RATE // 2

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

RAW_FEATURES = ["ax", "ay", "az", "gx", "gy", "gz"]
GFT_FEATURES = ["ax_enu", "ay_enu", "az_enu", "gx_enu", "gy_enu", "gz_enu"]

In [9]:
def load_gft_csvs(base_dir):
    subject_dfs = {}

    for subject_id in sorted(os.listdir(base_dir)):
        subject_path = os.path.join(base_dir, subject_id)
        if not os.path.isdir(subject_path):
            continue

        csv_files = glob.glob(
            os.path.join(subject_path, "IMU_GFT_*.csv")
        )

        if len(csv_files) == 0:
            continue

        subject_dfs[subject_id] = pd.read_csv(csv_files[0])

    print(f"Loaded {len(subject_dfs)} subjects")
    return subject_dfs

In [10]:
def split_subjects(subject_dfs, train_ratio=0.8):
    subjects = list(subject_dfs.keys())
    np.random.shuffle(subjects)

    n_train = int(len(subjects) * train_ratio)

    train_subjects = subjects[:n_train]
    test_subjects = subjects[n_train:]

    print(f"Train subjects: {len(train_subjects)}")
    print(f"Test subjects:  {len(test_subjects)}")

    train_dfs = [subject_dfs[s] for s in train_subjects]
    test_dfs = [subject_dfs[s] for s in test_subjects]

    return train_dfs, test_dfs

In [11]:
def sliding_windows(df):
    for start in range(0, len(df) - WINDOW_SIZE + 1, STEP_SIZE):
        yield df.iloc[start:start + WINDOW_SIZE]

def extract_features(window, cols):
    feats = []
    for c in cols:
        x = window[c].values

        feats.extend([
            np.mean(x),
            np.std(x),
            np.min(x),
            np.max(x),
            np.sqrt(np.mean(x ** 2)),
            np.mean(np.abs(x))
        ])

        fft_vals = np.fft.rfft(x)
        fft_mag = np.abs(fft_vals)

        feats.extend([
            np.mean(fft_mag),
            np.std(fft_mag),
            np.max(fft_mag),
            np.sum(fft_mag)
        ])

    return feats

In [12]:
def build_dataset(dfs, feature_cols):
    X, y = [], []

    for df in dfs:
        for w in sliding_windows(df):
            X.append(extract_features(w, feature_cols))
            y.append(w["label"].iloc[0])

    return np.array(X), np.array(y)

In [13]:
def evaluate_knn_subject_split(
    X_train, y_train, X_test, y_test, title
):
    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("knn", KNeighborsClassifier())
    ])

    param_grid = {
        "knn__n_neighbors": list(range(3, 16, 2)),
        "knn__weights": ["uniform", "distance"],
        "knn__metric": ["euclidean", "manhattan"]
    }

    grid = GridSearchCV(
        pipeline,
        param_grid,
        cv=5,
        scoring="f1_macro",
        n_jobs=-1,
        verbose=1
    )

    grid.fit(X_train, y_train)

    print(f"\nBest parameters for {title}:")
    print(grid.best_params_)

    y_pred = grid.best_estimator_.predict(X_test)

    print(f"\n=== {title} ===")
    print(
        classification_report(
            y_test,
            y_pred,
            target_names=ACTIVITY_NAMES,
            digits=4
        )
    )

    cm = confusion_matrix(y_test, y_pred, labels=ACTIVITY_LABELS)

    plt.figure(figsize=(6, 5))
    sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        xticklabels=ACTIVITY_NAMES,
        yticklabels=ACTIVITY_NAMES,
        cmap="Blues"
    )
    plt.title(title)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.show()

    return {
        "precision": precision_score(y_test, y_pred, average="macro"),
        "recall": recall_score(y_test, y_pred, average="macro"),
        "f1": f1_score(y_test, y_pred, average="macro")
    }

In [14]:
if __name__ == "__main__":
    subject_dfs = load_gft_csvs(DATA_DIR)
    train_dfs, test_dfs = split_subjects(subject_dfs)

    # Raw frame
    X_train_raw, y_train_raw = build_dataset(train_dfs, RAW_FEATURES)
    X_test_raw, y_test_raw = build_dataset(test_dfs, RAW_FEATURES)

    raw_metrics = evaluate_knn_subject_split(
        X_train_raw, y_train_raw,
        X_test_raw, y_test_raw,
        "KNN – Raw IMU Frame"
    )

    # Global frame
    X_train_gft, y_train_gft = build_dataset(train_dfs, GFT_FEATURES)
    X_test_gft, y_test_gft = build_dataset(test_dfs, GFT_FEATURES)

    gft_metrics = evaluate_knn_subject_split(
        X_train_gft, y_train_gft,
        X_test_gft, y_test_gft,
        "KNN – Global Frame (ENU)"
    )

    print("\n========== SUMMARY ==========")
    print("Raw Frame:", raw_metrics)
    print("Global Frame:", gft_metrics)

Loaded 60 subjects
Train subjects: 48
Test subjects:  12
Fitting 5 folds for each of 28 candidates, totalling 140 fits


KeyboardInterrupt: 