<a href="https://colab.research.google.com/github/DanielCaignet/Prop/blob/main/VP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
"""
Python translation of ProyectoFinal_TEC_Daniel_Caignet.m

This script loads the vehicles dataset, performs preprocessing,
trains a 1D CNN using cross validation and evaluates the final model.
It follows the logic of the original MATLAB code.

Note: The vehicles_.mat file must contain a variable ``vehicles`` which is an
array of structs with fields ``high`` and ``power``.
"""

import numpy as np
from scipy.io import loadmat
from scipy.interpolate import interp1d
import pywt
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support


# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------

def map_values(x, in_min, in_max, out_min, out_max):
    """Replicates the map function from the MATLAB code."""
    x_map = (x - in_min) * ((out_max - out_min) / (in_max - in_min)) + out_min
    return np.clip(x_map, out_min, out_max)


def inter2mean(x, n_mean):
    """Interpolate a 1‑D sequence to ``n_mean`` points using cubic splines."""
    n = len(x)
    x_old = np.linspace(1, n, n)
    x_new = np.linspace(1, n, n_mean)
    f = interp1d(x_old, x, kind="cubic")
    return f(x_new)


# ---------------------------------------------------------------------------
# Load data and split train/test
# ---------------------------------------------------------------------------

def load_data(path="vehicles_.mat", test_ratio=0.2, seed=42):
    data = loadmat(path)
    vehicles = data["vehicles"].squeeze()

    n = vehicles.size
    rng = np.random.default_rng(seed)
    idx = rng.permutation(n)
    n_test = int(round(test_ratio * n))
    idx_test = idx[:n_test]
    idx_train = idx[n_test:]

    v_train = vehicles[idx_train]
    v_test = vehicles[idx_test]

    return v_train, v_test, idx_train, idx_test


# ---------------------------------------------------------------------------
# Preprocessing
# ---------------------------------------------------------------------------

def prepare_data(v_train, v_test):
    # Labels
    h_train = np.array([v["high"].item() for v in v_train])
    h_test = np.array([v["high"].item() for v in v_test])

    # Discretize into 3 classes
    edges = np.linspace(-1, 1, 4)
    h_min = h_train.min()
    h_max = h_train.max()

    h_tn = map_values(h_train, h_min, h_max, 0.1, 1)
    y_log = np.log10(h_tn)
    y_min = y_log.min()
    y_max = y_log.max()
    y_norm_train = map_values(y_log, y_min, y_max, -1, 1)
    clases_tr = np.digitize(y_norm_train, edges, right=True)

    h_tn2 = map_values(h_test, h_min, h_max, 0.1, 1)
    y_log2 = np.log10(h_tn2)
    y_norm_test = map_values(y_log2, y_min, y_max, -1, 1)
    clases_te = np.digitize(y_norm_test, edges, right=True)

    y_train = clases_tr.astype(int) - 1  # classes 0..2
    y_test = clases_te.astype(int) - 1

    # Interpolate series to same length
    n_tr = len(v_train)
    lengths = [len(v["power"].squeeze()) for v in v_train]
    T = round(np.mean(lengths) / 2)

    pmat_train = np.zeros((n_tr, T))
    for i, v in enumerate(v_train):
        pmat_train[i] = inter2mean(v["power"].squeeze(), T)

    n_ts = len(v_test)
    pmat_test = np.zeros((n_ts, T))
    for i, v in enumerate(v_test):
        pmat_test[i] = inter2mean(v["power"].squeeze(), T)
    # MODWT + global normalization
    wname = "db1"
    n_levels = 4
    n_feat = n_levels + 1
    pad_len = (-T) % (2**n_levels)

    wmat_train = np.zeros((n_tr, n_feat, T))
    for i in range(n_tr):
        if hasattr(pywt, 'modwt'):
            coeffs = pywt.modwt(pmat_train[i], wname, level=n_levels)
            coeffs = [coeffs[-1]] + coeffs[-2::-1]
        else:
            pad = np.pad(pmat_train[i], (0, pad_len), mode='symmetric') if pad_len else pmat_train[i]
            swt_coeffs = pywt.swt(pad, wname, level=n_levels)
            coeffs = [swt_coeffs[-1][0][:T]] + [c[1][:T] for c in swt_coeffs[::-1]]
        wmat_train[i] = np.stack(coeffs, axis=0)

    w_min = wmat_train.min(axis=(0, 2))
    w_max = wmat_train.max(axis=(0, 2))
    w3d_train = 2 * (wmat_train - w_min) / (w_max - w_min) - 1

    x_train_seq = [w3d_train[i].T for i in range(n_tr)]

    wmat_test = np.zeros((n_ts, n_feat, T))
    for i in range(n_ts):
        if hasattr(pywt, 'modwt'):
            coeffs = pywt.modwt(pmat_test[i], wname, level=n_levels)
            coeffs = [coeffs[-1]] + coeffs[-2::-1]
        else:
            pad = np.pad(pmat_test[i], (0, pad_len), mode='symmetric') if pad_len else pmat_test[i]
            swt_coeffs = pywt.swt(pad, wname, level=n_levels)
            coeffs = [swt_coeffs[-1][0][:T]] + [c[1][:T] for c in swt_coeffs[::-1]]
        wmat_test[i] = np.stack(coeffs, axis=0)

    w3d_test = 2 * (wmat_test - w_min) / (w_max - w_min) - 1
    x_test_seq = [w3d_test[i].T for i in range(n_ts)]

    return x_train_seq, y_train, x_test_seq, y_test, T, n_feat


# ---------------------------------------------------------------------------
# Build model
# ---------------------------------------------------------------------------

class CNN1D(nn.Module):
    """Simple 1D CNN matching the original architecture."""

    def __init__(self, n_feat):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(n_feat, 32, kernel_size=5, padding=2),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.MaxPool1d(2),

            nn.Conv1d(32, 128, kernel_size=5, padding=2),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(128, 128),
            nn.Dropout(0.3),
            nn.ReLU(),
            nn.Linear(128, 3)
        )

    def forward(self, x):
        return self.net(x)


def build_model(n_feat):
    """Instantiate the CNN model."""
    return CNN1D(n_feat)


# ---------------------------------------------------------------------------
# Training with K-Fold cross validation
# ---------------------------------------------------------------------------

def train_with_kfold(x_train_seq, y_train, T, n_feat, k=5, epochs=30, batch_size=16):
    """Train the CNN using k-fold cross validation."""
    y_train = np.array(y_train)
    tbl = np.bincount(y_train)
    weight = torch.tensor([tbl.sum() / tbl[i] for i in range(len(tbl))], dtype=torch.float32)

    kf = KFold(n_splits=k, shuffle=True, random_state=42)

    acc_folds = []
    prec_folds = []
    rec_folds = []
    f1_folds = []

    for train_index, val_index in kf.split(x_train_seq):
        x_tr = torch.tensor([x_train_seq[i].T for i in train_index], dtype=torch.float32)
        y_tr = torch.tensor(y_train[train_index], dtype=torch.long)
        x_val = torch.tensor([x_train_seq[i].T for i in val_index], dtype=torch.float32)
        y_val = torch.tensor(y_train[val_index], dtype=torch.long)

        train_loader = DataLoader(TensorDataset(x_tr, y_tr), batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(TensorDataset(x_val, y_val), batch_size=batch_size)

        model = build_model(n_feat)
        criterion = nn.CrossEntropyLoss(weight=weight)
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

        for _ in range(epochs):
            model.train()
            for xb, yb in train_loader:
                optimizer.zero_grad()
                out = model(xb)
                loss = criterion(out, yb)
                loss.backward()
                optimizer.step()

        model.eval()
        preds = []
        with torch.no_grad():
            for xb, _ in val_loader:
                preds.append(model(xb).argmax(dim=1).cpu().numpy())
        y_pred = np.concatenate(preds)
        y_true = y_val.numpy()

        prec, rec, f1, _ = precision_recall_fscore_support(
            y_true, y_pred, average="macro", zero_division=0)
        acc = (y_pred == y_true).mean()

        acc_folds.append(acc)
        prec_folds.append(prec)
        rec_folds.append(rec)
        f1_folds.append(f1)

        print(f"Fold {len(acc_folds)} — Acc: {acc*100:.2f}%, "
              f"Prec: {prec:.3f}, Rec: {rec:.3f}, F1: {f1:.3f}")

    print(f"CV mean Acc: {np.mean(acc_folds)*100:.2f}% ± {np.std(acc_folds)*100:.2f}%")
    print(f"CV mean Prec: {np.mean(prec_folds):.3f} ± {np.std(prec_folds):.3f}")
    print(f"CV mean Rec:  {np.mean(rec_folds):.3f} ± {np.std(rec_folds):.3f}")
    print(f"CV mean F1:   {np.mean(f1_folds):.3f} ± {np.std(f1_folds):.3f}")

    # Train final model on all data
    x_all = torch.tensor([x.T for x in x_train_seq], dtype=torch.float32)
    y_all = torch.tensor(y_train, dtype=torch.long)
    loader = DataLoader(TensorDataset(x_all, y_all), batch_size=batch_size, shuffle=True)

    model = build_model(n_feat)
    criterion = nn.CrossEntropyLoss(weight=weight)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for _ in range(epochs):
        model.train()
        for xb, yb in loader:
            optimizer.zero_grad()
            out = model(xb)
            loss = criterion(out, yb)
            loss.backward()
            optimizer.step()

    return model


# ---------------------------------------------------------------------------
# Evaluation
# ---------------------------------------------------------------------------

def evaluate(model, x_test_seq, y_test, batch_size=16):
    """Evaluate the trained model on the test set."""
    x_ts = torch.tensor([x.T for x in x_test_seq], dtype=torch.float32)
    y_ts = torch.tensor(y_test, dtype=torch.long)
    loader = DataLoader(TensorDataset(x_ts, y_ts), batch_size=batch_size)

    model.eval()
    preds = []
    with torch.no_grad():
        for xb, _ in loader:
            preds.append(model(xb).argmax(dim=1).cpu().numpy())
    y_pred = np.concatenate(preds)

    cm = confusion_matrix(y_test, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(
        y_test, y_pred, average="macro", zero_division=0)
    acc = (y_pred == y_test).mean()
    print(f"Test — Acc: {acc*100:.2f}%, Prec: {prec:.3f}, Rec: {rec:.3f}, F1: {f1:.3f}")
    print(cm)
    idx = np.random.choice(len(y_test), 3, replace=False)
    print(" Índice original | Altura real | Clase Real | Clase Predicha")
    for i in idx:
        print(f" {i:14d}  | {y_test[i]:11d} | {y_test[i]:9d} | {y_pred[i]:9d}")


# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------

v_train, v_test, idx_train, idx_test = load_data()
x_train_seq, y_train, x_test_seq, y_test, T, n_feat = prepare_data(v_train, v_test)
model = train_with_kfold(x_train_seq, y_train, T, n_feat)
evaluate(model, x_test_seq, y_test)


ValueError: operands could not be broadcast together with shapes (423,5,1084) (5,) 