In [39]:
import os, re, glob
import numpy as np
import pandas as pd

from scipy.stats import skew, kurtosis
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.model_selection import train_test_split


import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  
from joblib import dump
from sklearn.base import clone

DATA_ROOT = r"data"   

SENSOR_FOLDERS = {
    "act": "act",
    "acw": "acw",
    "pm":  "pm_1.0_1.0",
    "dc":  "dc_0.05_0.05",
}

FNAME_RE = re.compile(r"(?P<ex>\d{2})_(?P<sensor>[a-z]+)_(?:\d+)\.csv$", re.IGNORECASE)


In [40]:
def parse_file_info(path):
    fname = os.path.basename(path)
    m = FNAME_RE.search(fname)
    if not m:
        return None
    ex = int(m.group("ex"))               
    sensor = m.group("sensor").lower()  
    subject = int(os.path.basename(os.path.dirname(path)))  
    return {"subject": subject, "exercise_id": ex, "sensor_tag": sensor, "path": path}

def collect_files(sensor_key):
    folder = os.path.join(DATA_ROOT, SENSOR_FOLDERS[sensor_key])
    paths = glob.glob(os.path.join(folder, "*", "*.csv"))
    infos = []
    for p in sorted(paths):
        info = parse_file_info(p)
        if info is None:
            continue
        infos.append(info)
    return pd.DataFrame(infos)


In [41]:
def read_act_acw_csv(path):
    df = pd.read_csv(path, header=None)

    if df.shape[1] < 4:
        return None, None

    t = df.iloc[:, 0].astype(str).to_numpy()
    M = df.iloc[:, 1:4].apply(pd.to_numeric, errors="coerce")

    if M.isna().mean().mean() > 0.01:
        return None, None

    return t, M.to_numpy(dtype=float)


In [42]:
def read_pm_dc_csv(path):
    df = pd.read_csv(path, header=None)

    if df.shape[1] < 3:
        return None, None

    t = df.iloc[:, 0].astype(str).to_numpy()
    M = df.iloc[:, 1:].apply(pd.to_numeric, errors="coerce")

    M = M.dropna(axis=1, how="all")
    if M.shape[1] == 0:
        return None, None

    M = M.interpolate(limit_direction="both").bfill().ffill()
    return t, M.to_numpy(dtype=float)


In [43]:
def feats_act_like(M, prefix):
    if M.shape[1] < 3:
        return {}

    x, y, z = M[:, 0], M[:, 1], M[:, 2]
    mag = np.sqrt(x*x + y*y + z*z)

    def stats(v, name):
        return {
            f"{prefix}__{name}_mean": float(np.mean(v)),
            f"{prefix}__{name}_std":  float(np.std(v)),
            f"{prefix}__{name}_min":  float(np.min(v)),
            f"{prefix}__{name}_max":  float(np.max(v)),
            f"{prefix}__{name}_med":  float(np.median(v)),
            f"{prefix}__{name}_iqr":  float(np.percentile(v, 75) - np.percentile(v, 25)),
            f"{prefix}__{name}_skew": float(skew(v)) if len(v) > 2 else 0.0,
            f"{prefix}__{name}_kurt": float(kurtosis(v, fisher=True)) if len(v) > 3 else 0.0,
            f"{prefix}__{name}_energy": float(np.mean(v*v)),
        }

    feats = {}
    feats.update(stats(x, "x"))
    feats.update(stats(y, "y"))
    feats.update(stats(z, "z"))
    feats.update(stats(mag, "mag"))

    feats[f"{prefix}__corr_xy"] = float(np.corrcoef(x, y)[0, 1]) if np.std(x) > 0 and np.std(y) > 0 else 0.0
    feats[f"{prefix}__corr_xz"] = float(np.corrcoef(x, z)[0, 1]) if np.std(x) > 0 and np.std(z) > 0 else 0.0
    feats[f"{prefix}__corr_yz"] = float(np.corrcoef(y, z)[0, 1]) if np.std(y) > 0 and np.std(z) > 0 else 0.0

    feats[f"{prefix}__T"] = int(M.shape[0])
    return feats


In [44]:
def feats_frame_sensor(M, prefix, maxval=1.0):
    if M.size == 0:
        return {}

    frame_mean   = M.mean(axis=1)
    frame_std    = M.std(axis=1)
    frame_energy = (M*M).mean(axis=1)

    thr = 0.05 * maxval
    frame_active = (M > thr).mean(axis=1)   

    flat = M.ravel()

    def agg(v, name):
        return {
            f"{prefix}__{name}_mean": float(np.mean(v)),
            f"{prefix}__{name}_std":  float(np.std(v)),
            f"{prefix}__{name}_min":  float(np.min(v)),
            f"{prefix}__{name}_max":  float(np.max(v)),
            f"{prefix}__{name}_med":  float(np.median(v)),
            f"{prefix}__{name}_iqr":  float(np.percentile(v, 75) - np.percentile(v, 25)),
        }

    feats = {}
    feats[f"{prefix}__T"] = int(M.shape[0])
    feats[f"{prefix}__P"] = int(M.shape[1])

    feats.update(agg(flat, "g"))

    feats.update(agg(frame_mean, "frame_mean"))
    feats.update(agg(frame_std, "frame_std"))
    feats.update(agg(frame_energy, "frame_energy"))
    feats.update(agg(frame_active, "frame_active"))

    if M.shape[0] > 1:
        frame_diff = np.abs(np.diff(M, axis=0)).mean(axis=1)  # (T-1,)
        feats.update(agg(frame_diff, "frame_diff"))
    else:
        feats.update(agg(np.array([0.0]), "frame_diff"))

    return feats


In [45]:
def extract_features_for_file(path, sensor_key):
    if sensor_key in ["act", "acw"]:
        t, M = read_act_acw_csv(path)
        if M is None:
            return None
        return feats_act_like(M, prefix=sensor_key)

    if sensor_key in ["pm", "dc"]:
        t, M = read_pm_dc_csv(path)
        if M is None:
            return None
        return feats_frame_sensor(M, prefix=sensor_key, maxval=1.0)

    return None


def build_feature_table(sensor_key):
    meta = collect_files(sensor_key)
    rows = []
    skipped = 0

    for _, r in meta.iterrows():
        feats = extract_features_for_file(r["path"], sensor_key)
        if feats is None or len(feats) == 0:
            skipped += 1
            continue

        row = {}
        row.update(feats)  
        row["subject"] = int(r["subject"])
        row["exercise_id"] = int(r["exercise_id"])
        row["path"] = r["path"]

        rows.append(row)

    if len(rows) == 0:
        print(f"[{sensor_key}] WARNING: 0 rows. Check DATA_ROOT and file parsing.")
        df = pd.DataFrame(columns=["subject","exercise_id","path"])
    else:
        df = pd.DataFrame(rows)

    print(f"[{sensor_key}] rows={len(df)} skipped={skipped} total_files={len(meta)}")
    return df


In [46]:
act_df = build_feature_table("act")
acw_df = build_feature_table("acw")
pm_df  = build_feature_table("pm")
dc_df  = build_feature_table("dc")
print(act_df.shape, acw_df.shape, pm_df.shape, dc_df.shape)


[act] rows=239 skipped=0 total_files=239
[acw] rows=239 skipped=0 total_files=239
[pm] rows=239 skipped=0 total_files=239


KeyboardInterrupt: 

In [None]:
act_df

In [None]:
dc_df

In [None]:
acw_df

In [None]:
pm_df

In [None]:
def merge_modalities_left(base_df, other_dfs: dict):
    keys = ["subject", "exercise_id"]

    feat_cols = [c for c in base_df.columns if c not in keys + ["path"]]
    merged = base_df[keys + feat_cols].copy()

    for name, df in other_dfs.items():
        if df is None or df.shape[0] == 0:
            print(f"[merge] {name} empty -> skipped")
            continue

        feat_cols = [c for c in df.columns if c not in keys + ["path"]]
        keep = df[keys + feat_cols].copy()
        merged = merged.merge(keep, on=keys, how="left")

    return merged

# Full feature set (ALL sensors): ACT + ACW + PM + DC
df_all = merge_modalities_left(act_df, {"acw": acw_df, "pm": pm_df, "dc": dc_df})
print("ALL sensors shape:", df_all.shape)


In [None]:
import os
os.makedirs("outputs", exist_ok=True)

# Save raw (feature-engineered) table for reproducibility
df_all.to_csv("outputs/mex_features_all_raw.csv", index=False)


In [None]:
df_all

In [None]:
df_all
df_all.info()
df_all['exercise_id'].value_counts()

In [None]:
df_all_pre = df_all.copy()
num_cols = df_all_pre.select_dtypes(include=[np.number]).columns
df_all_pre[num_cols] = df_all_pre[num_cols].replace([np.inf, -np.inf], np.nan)
df_all_pre[num_cols] = df_all_pre[num_cols].fillna(df_all_pre[num_cols].median(numeric_only=True))

df_all_pre.to_csv("outputs/mex_features_all_preprocessed.csv", index=False)

print("Saved:")
print(" - outputs/mex_features_all_raw.csv")
print(" - outputs/mex_features_all_preprocessed.csv")


In [None]:
df_all_pre

In [None]:
X = df_all_pre.drop(columns=["exercise_id","subject","path"], errors="ignore")
X = X.select_dtypes(include=[np.number]) 
y = df_all_pre["exercise_id"].values
groups = df_all_pre["subject"].values

In [None]:
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401

def plot_pca_2d(X, y, title="PCA 2D projekcija", sample=5000, random_state=0):
    if sample is not None and X.shape[0] > sample:
        rng = np.random.default_rng(random_state)
        idx = rng.choice(X.shape[0], size=sample, replace=False)
        Xp, yp = X.iloc[idx], y[idx]
    else:
        Xp, yp = X, y

    Xs = StandardScaler().fit_transform(Xp)
    Z = PCA(n_components=2, random_state=random_state).fit_transform(Xs)

    plt.figure(figsize=(7,5))
    for lab in sorted(np.unique(yp)):
        m = (yp == lab)
        plt.scatter(Z[m,0], Z[m,1], s=12, alpha=0.7, label=str(lab))
    plt.title(title)
    plt.xlabel("PC1"); plt.ylabel("PC2")
    plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
    plt.tight_layout()
    plt.show()

def plot_pca_3d(X, y, title="PCA 3D projekcija", sample=5000, random_state=0):
    if sample is not None and X.shape[0] > sample:
        rng = np.random.default_rng(random_state)
        idx = rng.choice(X.shape[0], size=sample, replace=False)
        Xp, yp = X.iloc[idx], y[idx]
    else:
        Xp, yp = X, y

    Xs = StandardScaler().fit_transform(Xp)
    Z = PCA(n_components=3, random_state=random_state).fit_transform(Xs)

    fig = plt.figure(figsize=(7,5))
    ax = fig.add_subplot(111, projection='3d')
    for lab in sorted(np.unique(yp)):
        m = (yp == lab)
        ax.scatter(Z[m,0], Z[m,1], Z[m,2], s=12, alpha=0.7, label=str(lab))

    ax.set_title(title)
    ax.set_xlabel("PC1"); ax.set_ylabel("PC2"); ax.set_zlabel("PC3")
    ax.legend(bbox_to_anchor=(1.25, 1), loc="upper left")
    plt.tight_layout()
    plt.show()


In [None]:
print("X:", X.shape, "| broj klasa:", len(np.unique(y)))

# Obavezno iz zadatka: vizuelni prikaz (2D/3D) -> koristimo PCA projekciju
plot_pca_2d(X, y, title="MEx: PCA 2D projekcija (svi atributi)")
plot_pca_3d(X, y, title="MEx: PCA 3D projekcija (svi atributi)")


In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
print("Train:", X_train.shape, "Test:", X_test.shape)

In [None]:
scaler = StandardScaler()
X_train_sc = scaler.fit_transform(X_train)
X_test_sc  = scaler.transform(X_test)

In [None]:
pca_all = PCA(random_state=0).fit(X_train_sc)
cum_var = np.cumsum(pca_all.explained_variance_ratio_)

plt.figure(figsize=(6,4))
plt.plot(cum_var)
plt.axhline(0.90, linestyle="--")
plt.axhline(0.95, linestyle="--")
plt.xlabel("Broj komponenti")
plt.ylabel("Kumulativna očuvana varijansa")
plt.title("PCA - izbor dimenzionalnosti preko varijanse (train)")
plt.grid(True)
plt.tight_layout()
plt.show()

pca90 = PCA(n_components=0.90, random_state=0)
X_train_pca90 = pca90.fit_transform(X_train_sc)
X_test_pca90  = pca90.transform(X_test_sc)

pca95 = PCA(n_components=0.95, random_state=0)
X_train_pca95 = pca95.fit_transform(X_train_sc)
X_test_pca95  = pca95.transform(X_test_sc)

print("PCA90 dim:", X_train_pca90.shape[1], "| PCA95 dim:", X_train_pca95.shape[1])

kbest50 = SelectKBest(mutual_info_classif, k=min(50, X_train_sc.shape[1]))
X_train_kb50 = kbest50.fit_transform(X_train_sc, y_train)
X_test_kb50  = kbest50.transform(X_test_sc)

print("Shapes:",
      "full", X_train_sc.shape,
      "| pca90", X_train_pca90.shape,
      "| pca95", X_train_pca95.shape,
      "| kbest50", X_train_kb50.shape)


In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB

MODELS = {
    "Logistic Regression": LogisticRegression(max_iter=3000, C=0.01),
    "K-Nearest Neighbors": KNeighborsClassifier(),
    "Decision Tree": DecisionTreeClassifier(random_state=42),
    "Random Forest": RandomForestClassifier(n_estimators=200,max_depth=8,min_samples_leaf=5,random_state=42),
    "SVM (RBF)": SVC(kernel="rbf"),
    "Naive Bayes": GaussianNB(),
}

In [None]:
from sklearn.metrics import f1_score, confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from IPython.display import display

variants = {
    "Full (svi atributi)": (X_train_sc, X_test_sc),
    "PCA90 (90% varijanse)": (X_train_pca90, X_test_pca90),
    "PCA95 (95% varijanse)": (X_train_pca95, X_test_pca95),
    "KBest50 (mutual_info)": (X_train_kb50, X_test_kb50),
}


def eval_model(name, model, Xtr, Xte, ytr, yte, variant_name="", show_cm=True):
    model.fit(Xtr, ytr)

    pred_tr = model.predict(Xtr)
    pred_te = model.predict(Xte)

    out = {
        "Algoritam": name,
        "Train Accuracy": float(accuracy_score(ytr, pred_tr)),
        "Train F1-macro": float(f1_score(ytr, pred_tr, average="macro")),
        "Test Accuracy": float(accuracy_score(yte, pred_te)),
        "Test F1-macro": float(f1_score(yte, pred_te, average="macro")),
    }

    if show_cm:
        print(f"\n--- {name} ---")
        print("Train acc:", round(out["Train Accuracy"],4), "| Test acc:", round(out["Test Accuracy"],4))

        # TRAIN CM
        labels = getattr(model, "classes_", None)
        if labels is None:
            labels = np.unique(np.concatenate([ytr, pred_tr]))
        cm_tr = confusion_matrix(ytr, pred_tr, labels=labels)
        disp_tr = ConfusionMatrixDisplay(confusion_matrix=cm_tr, display_labels=labels)
        disp_tr.plot(cmap="viridis", values_format="d", colorbar=True)
        plt.title(f"Train {variant_name} | {name}")
        plt.xlabel("Predicted label")
        plt.ylabel("True label")
        plt.tight_layout()
        plt.show()

        # TEST CM
        labels = getattr(model, "classes_", None)
        if labels is None:
            labels = np.unique(np.concatenate([yte, pred_te]))
        cm_te = confusion_matrix(yte, pred_te, labels=labels)
        disp_te = ConfusionMatrixDisplay(confusion_matrix=cm_te, display_labels=labels)
        disp_te.plot(cmap="viridis", values_format="d", colorbar=True)
        plt.title(f"Test {variant_name} | {name}")
        plt.xlabel("Predicted label")
        plt.ylabel("True label")
        plt.tight_layout()
        plt.show()

    return out

all_results = []

for variant_name, (Xtr, Xte) in variants.items():
    print("\n===============================")
    print("VARIJANTA:", variant_name)
    print("===============================")

    for mname, model in MODELS.items():
        metrics = eval_model(mname, model, Xtr, Xte, y_train, y_test, variant_name=variant_name, show_cm=True)
        metrics["Varijanta"] = variant_name
        all_results.append(metrics)

res_df = pd.DataFrame(all_results)
res_df["Test Accuracy"] = res_df["Test Accuracy"].round(4)
res_df["Test F1-macro"] = res_df["Test F1-macro"].round(4)
res_df["Train Accuracy"] = res_df["Train Accuracy"].round(4)
res_df["Train F1-macro"] = res_df["Train F1-macro"].round(4)

res_df.sort_values(["Varijanta", "Test F1-macro", "Test Accuracy"], ascending=[True, False, False])

best_overall = (res_df
    .sort_values(["Test F1-macro","Test Accuracy"], ascending=[False, False])
    .head(1)
)

display(best_overall[["Varijanta","Algoritam","Test F1-macro","Test Accuracy","Train F1-macro","Train Accuracy"]])

In [None]:
# Najbolji model po varijanti (po Test F1-macro pa Test Accuracy)
best_per_variant = (res_df
    .sort_values(["Varijanta", "Test F1-macro", "Test Accuracy"], ascending=[True, False, False])
    .groupby("Varijanta", as_index=False)
    .first()
)

best_per_variant[["Varijanta","Algoritam","Test F1-macro","Test Accuracy","Train F1-macro","Train Accuracy"]]


In [None]:
from sklearn.model_selection import StratifiedKFold,GridSearchCV

param_grid = {
    "C": [0.1, 1, 10, 50],
    "kernel": ["rbf", "linear"],
    "gamma": ["scale", "auto"],
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

grid = GridSearchCV(
    estimator=SVC(),
    param_grid=param_grid,
    scoring="f1_macro",
    cv=cv,
    n_jobs=-1
)

grid.fit(X_train_pca95, y_train)

print("Najbolji parametri:", grid.best_params_)
print("Najbolji CV F1-macro:", round(grid.best_score_, 4))

best_svm = grid.best_estimator_

_ = eval_model("SVM (GridSearch best) @ PCA95", best_svm, X_train_pca95, X_test_pca95, y_train, y_test, show_cm=True)


In [None]:
import joblib, os
from sklearn.base import clone

os.makedirs("outputs", exist_ok=True)

res_df.to_csv("outputs/model_results_all.csv", index=False)
best_per_variant.to_csv("outputs/best_per_variant.csv", index=False)
best_overall.to_csv("outputs/best_overall.csv", index=False)

joblib.dump(scaler, "outputs/scaler.joblib")
joblib.dump(pca90, "outputs/pca90.joblib")
joblib.dump(pca95, "outputs/pca95.joblib")
joblib.dump(kbest50, "outputs/kbest50_selector.joblib")

best_row = best_overall.iloc[0]
best_variant = best_row["Varijanta"]
best_alg = best_row["Algoritam"]

Xtr_best, Xte_best = variants[best_variant]
best_model = clone(MODELS[best_alg])
best_model.fit(Xtr_best, y_train)
joblib.dump(best_model, f"outputs/best_model_overall__{best_variant}__{best_alg}.joblib")

for _, row in best_per_variant.iterrows():
    v = row["Varijanta"]
    a = row["Algoritam"]
    Xtr_v, Xte_v = variants[v]
    m = clone(MODELS[a])
    m.fit(Xtr_v, y_train)
    joblib.dump(m, f"outputs/best_model__{v}__{a}.joblib")

print("Sačuvano u outputs/: rezultati + scaler/PCA/KBest + najbolji modeli.")