In [54]:
import os, glob, csv
from typing import List, Optional, Tuple

import numpy as np
import torch

from CNN_model.model_generation import GeneratedModel as CNN_GeneratedModel
from NN_model.model_generation import GeneratedModel as NN_GeneratedModel
from MLP_model.model_generation import GeneratedModel as MLP_GeneratedModel
from utils.data_processing import (
    get_feature_list, get_dataset, combine_arrays, split_combined_data
)

def _to_tensor(x: np.ndarray) -> torch.Tensor:
    return torch.tensor(x, dtype=torch.float32)

def _batch_iter(X: torch.Tensor, y: torch.Tensor, batch_size: int):
    n = X.shape[0]
    for i in range(0, n, batch_size):
        yield X[i:i+batch_size], y[i:i+batch_size]

def _load_ckpt(path: str):
    ckpt = torch.load(path, map_location="cpu")
    for k in ["state_dict", "arch_config", "input_size", "output_size"]:
        if k not in ckpt:
            raise ValueError(f"{os.path.basename(path)} missing key '{k}'")
    return ckpt

def _load_val_xy(collections: List[str], db_name: str, feature_names: Optional[List[str]] = None):
    feats = feature_names if feature_names is not None else get_feature_list(db_name)
    arrays = [get_dataset(c, db_name, feats) for c in collections]
    combo = combine_arrays(arrays)
    X, y = split_combined_data(combo, feats)
    return X, y, feats

In [55]:
def evaluate_model_on_collections(
    model_path: str,
    val_collections: List[str],
    db_name: str,
    batch_size: int = 8192,
    device: Optional[str] = None,
) -> Tuple[float, float, float, int]:
    """Returns (mae, mse, rmse, n_samples)."""
    ckpt = _load_ckpt(model_path)
    arch = ckpt["arch_config"]
    input_size = int(ckpt["input_size"])
    output_size = int(ckpt["output_size"])
    feat_names = ckpt.get("feature_names", None)

    X_val_np, y_val_np, feats_used = _load_val_xy(val_collections, db_name, feature_names=feat_names)

    if X_val_np.shape[1] != input_size:
        raise RuntimeError(
            f"[{os.path.basename(model_path)}] Feature width mismatch: "
            f"X has {X_val_np.shape[1]}, checkpoint expects {input_size}. "
            f"Use a matching 'db_name' or save 'feature_names' in the checkpoint."
        )
    
    if "CNN" in model_path:
        model = CNN_GeneratedModel(input_size=input_size, output_size=output_size, architecture_config=arch)
    elif "MLP" in model_path:
        model = MLP_GeneratedModel(input_size=input_size, output_size=output_size, architecture_config=arch)
    elif "NN" in model_path:
        model = NN_GeneratedModel(input_size=input_size, output_size=output_size, architecture_config=arch)
    else:
        raise Exception
    missing, unexpected = model.load_state_dict(ckpt["state_dict"], strict=False)
    if missing or unexpected:
        print(f"[WARN] Partial state load for {os.path.basename(model_path)}. "
              f"missing={missing[:4]} unexpected={unexpected[:4]}")

    device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    X_val = _to_tensor(X_val_np).to(device)
    y_val = _to_tensor(y_val_np).to(device)

    mae_sum = 0.0
    mse_sum = 0.0
    count = 0

    with torch.no_grad():
        for xb, yb in _batch_iter(X_val, y_val, batch_size):
            preds = model(xb)
            batch_mae = torch.mean(torch.abs(preds - yb))
            batch_mse = torch.mean((preds - yb) ** 2)
            mae_sum += batch_mae.item() * xb.shape[0]
            mse_sum += batch_mse.item() * xb.shape[0]
            count += xb.shape[0]

    mae = mae_sum / max(count, 1)
    mse = mse_sum / max(count, 1)
    rmse = float(np.sqrt(mse))
    return mae, mse, rmse, count

In [56]:
def discover_models(models_dir: Optional[str], models_list: Optional[List[str]]) -> List[str]:
    paths = []
    if models_list:
        for p in models_list:
            if os.path.isfile(p) and p.endswith(".pt"):
                paths.append(p)
            else:
                print(f"[SKIP] Not a .pt file: {p}")
    if models_dir:
        for p in glob.glob(os.path.join(models_dir, "*.pt")):
            if os.path.isfile(p):
                paths.append(p)
    paths = sorted(list(dict.fromkeys(paths)))
    return paths

In [57]:
MODELS_DIR = "retrained_models"
BATCH_SIZE = 2048
models = []

models = discover_models(MODELS_DIR, models)

rows = []
for mp in models:

    model_path = mp.split("/")[1].replace(".pt","").split("_")
    relative_coeficient = 32

    model_type = model_path.pop(0)
    space = model_path.pop(0)
    database = "_".join(model_path)
    cols = [f"reto_grande_{space}"]

    if database == "wifi_fingerprinting_data_raw":
        relative_coeficient = 1

    mae, mse, rmse, n = evaluate_model_on_collections(
        mp, cols, database, BATCH_SIZE
    )
    mae_final = mae * relative_coeficient
    mse_final = mse * relative_coeficient
    rmse_final = rmse * relative_coeficient
    print(f"{os.path.basename(mp)} | N={n} | MAE={mae_final:.6f} | MSE={mse_final:.6f} | RMSE={rmse_final:.6f}")
    rows.append((os.path.basename(mp), mae_final, mse_final, rmse_final))



CNN_garage_wifi_fingerprinting_data.pt | N=23160 | MAE=2.713941 | MSE=0.498745 | RMSE=3.994977
CNN_garage_wifi_fingerprinting_data_exponential.pt | N=23160 | MAE=4.068295 | MSE=0.897750 | RMSE=5.359850
CNN_garage_wifi_fingerprinting_data_extra_features_no_leak.pt | N=22621 | MAE=11.367534 | MSE=5.889815 | RMSE=13.728587
CNN_garage_wifi_fingerprinting_data_raw.pt | N=23160 | MAE=0.930759 | MSE=1.790319 | RMSE=1.338028
CNN_indoor_wifi_fingerprinting_data.pt | N=12243 | MAE=2.775443 | MSE=0.551603 | RMSE=4.201345
CNN_indoor_wifi_fingerprinting_data_exponential.pt | N=12243 | MAE=1.868392 | MSE=0.233014 | RMSE=2.730652
CNN_indoor_wifi_fingerprinting_data_extra_features_no_leak.pt | N=12243 | MAE=0.055096 | MSE=0.004409 | RMSE=0.375630
CNN_indoor_wifi_fingerprinting_data_raw.pt | N=12243 | MAE=0.430145 | MSE=0.468834 | RMSE=0.684714
CNN_outdoor_wifi_fingerprinting_data.pt | N=19910 | MAE=3.040233 | MSE=0.597947 | RMSE=4.374276
CNN_outdoor_wifi_fingerprinting_data_exponential.pt | N=19910 | 

In [58]:
import pandas as pd

df = pd.DataFrame(rows, columns=["model", "mae", "mse", "rmse"])

styled = df.style.background_gradient(cmap="RdYlGn_r")  # _r reverses so green=low, red=high
styled

Unnamed: 0,model,mae,mse,rmse
0,CNN_garage_wifi_fingerprinting_data.pt,2.713941,0.498745,3.994977
1,CNN_garage_wifi_fingerprinting_data_exponential.pt,4.068295,0.89775,5.35985
2,CNN_garage_wifi_fingerprinting_data_extra_features_no_leak.pt,11.367534,5.889815,13.728587
3,CNN_garage_wifi_fingerprinting_data_raw.pt,0.930759,1.790319,1.338028
4,CNN_indoor_wifi_fingerprinting_data.pt,2.775443,0.551603,4.201345
5,CNN_indoor_wifi_fingerprinting_data_exponential.pt,1.868392,0.233014,2.730652
6,CNN_indoor_wifi_fingerprinting_data_extra_features_no_leak.pt,0.055096,0.004409,0.37563
7,CNN_indoor_wifi_fingerprinting_data_raw.pt,0.430145,0.468834,0.684714
8,CNN_outdoor_wifi_fingerprinting_data.pt,3.040233,0.597947,4.374276
9,CNN_outdoor_wifi_fingerprinting_data_exponential.pt,4.299556,1.055789,5.812509
