<a href="https://colab.research.google.com/github/RafaelTorresCH/Signal-theory/blob/main/class%20notebooks/MLP_17_OCT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =========================================
# 1) IMPORTS & CONFIG
# =========================================
%matplotlib inline

import os, pickle, math
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import welch, periodogram, get_window, chirp
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

# Widgets para el "dashboard"
import ipywidgets as W
from IPython.display import display, clear_output

# ---------- Parámetros ajustables ----------
SEED           = 7
DATASET_SIZE   = 40000      # súbelo/bájalo según recursos
EPOCHS         = 100
BATCH_TRAIN    = 256
BATCH_VAL_TEST = 512
LR             = 1e-3
PATIENCE       = 10

# Señal y PSD
N       = 1024
FS      = 1024.0
NFFT    = 1024
NPERSEG = 256
NOVERLP = 128
WIN     = get_window("hann", NPERSEG)
EPS     = 1e-12

# "welch" o "periodogram"
PSD_METHOD = "welch"

# ---------- Reproducibilidad ----------
np.random.seed(SEED)
torch.manual_seed(SEED)
rng = np.random.default_rng(SEED)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)


Device: cpu


In [None]:
# =========================================
# 2) GENERADORES & UTILS
# =========================================
t = np.arange(N) / FS

def gen_sine():
    A = rng.uniform(0.5, 2.0)
    f = rng.uniform(5, 48000)
    phi = rng.uniform(0, 2*np.pi)
    return A * np.sin(2*np.pi*f*t + phi)

def gen_square():
    A = rng.uniform(0.5, 2.0)
    f = rng.uniform(5, 200)
    phi = rng.uniform(0, 2*np.pi)
    return A * np.sign(np.sin(2*np.pi*f*t + phi))

def gen_triangle():
    A = rng.uniform(0.5, 2.0)
    f = rng.uniform(5, 200)
    phi = rng.uniform(0, 2*np.pi)
    frac = (t*f + phi/(2*np.pi)) % 1
    tri = 2*np.abs(2*frac - 1) - 1
    return A * tri

def gen_gaussian_noise():
    std = rng.uniform(0.3, 1.2)
    return rng.normal(0, std, size=N)

def gen_chirp_linear():
    f0 = rng.uniform(10, 50)
    f1 = rng.uniform(100, 220)
    return chirp(t, f0=f0, f1=f1, t1=t[-1], method="linear")

generators = [gen_sine, gen_square, gen_triangle, gen_gaussian_noise, gen_chirp_linear]


In [None]:
# =========================================
# 3) DATASET (X) + TARGETS PSD (Y)
# =========================================
per_type = max(1, DATASET_SIZE // len(generators))
X_list, y_type = [], []

for i, g in enumerate(generators):
    for _ in range(per_type):
        x = g().astype(np.float32)
        if g is not gen_gaussian_noise:
            x = x + rng.normal(0, 0.05, size=N).astype(np.float32)
        X_list.append(x); y_type.append(i)

X = np.stack(X_list, axis=0).astype(np.float32)
y_type = np.array(y_type)
print("X shape:", X.shape, "| clases:", np.bincount(y_type))

if PSD_METHOD.lower() == "welch":
    f_axis, Pxx = welch(
        X, fs=FS, window=WIN, nperseg=NPERSEG, noverlap=NOVERLP,
        nfft=NFFT, scaling="density", axis=1
    )
elif PSD_METHOD.lower() == "periodogram":
    f_axis, Pxx = periodogram(X, fs=FS, nfft=NFFT, scaling="density", axis=1)
else:
    raise ValueError("PSD_METHOD debe ser 'welch' o 'periodogram'.")

Y = np.log10(Pxx + EPS).astype(np.float32)
print("Y shape:", Y.shape, "| PSD:", PSD_METHOD)


X shape: (40000, 1024) | clases: [8000 8000 8000 8000 8000]
Y shape: (40000, 513) | PSD: welch


In [None]:
# =========================================
# 4) SPLIT + SCALER + LOADERS (60/20/20)
# =========================================
# 60% train, 40% temporal
X_train, X_tmp, Y_train, Y_tmp, y_train_type, y_tmp_type = train_test_split(
    X, Y, y_type, test_size=0.40, random_state=SEED, shuffle=True, stratify=y_type  # <-- aquí cambié 0.30 -> 0.40
)

# 40% temporal -> 20% val y 20% test (50/50)
X_val, X_test, Y_val, Y_test, y_val_type, y_test_type = train_test_split(
    X_tmp, Y_tmp, y_tmp_type, test_size=0.50, random_state=SEED, shuffle=True, stratify=y_tmp_type
)

scaler = StandardScaler().fit(X_train)  # ¡solo con TRAIN!
X_train_s = scaler.transform(X_train).astype(np.float32)
X_val_s   = scaler.transform(X_val).astype(np.float32)
X_test_s  = scaler.transform(X_test).astype(np.float32)

in_dim  = X_train_s.shape[1]
out_dim = Y_train.shape[1]

train_ds = TensorDataset(torch.from_numpy(X_train_s), torch.from_numpy(Y_train))
val_ds   = TensorDataset(torch.from_numpy(X_val_s),   torch.from_numpy(Y_val))
test_ds  = TensorDataset(torch.from_numpy(X_test_s),  torch.from_numpy(Y_test))

train_loader = DataLoader(train_ds, batch_size=BATCH_TRAIN,    shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_VAL_TEST, shuffle=False)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_VAL_TEST, shuffle=False)

print("Train/Val/Test:", len(train_ds), len(val_ds), len(test_ds))
total = len(train_ds)+len(val_ds)+len(test_ds)
print("Proporciones ->",
      f"train: {len(train_ds)/total:.2%}, val: {len(val_ds)/total:.2%}, test: {len(test_ds)/total:.2%}")


Train/Val/Test: 24000 8000 8000
Proporciones -> train: 60.00%, val: 20.00%, test: 20.00%


In [None]:
# =========================================
# 5) MLP & TRAIN
# =========================================
class MLP(nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 512), nn.ReLU(),
            nn.Linear(512, 256), nn.ReLU(),
            nn.Linear(256, out_dim)
        )
    def forward(self, x): return self.net(x)

model  = MLP(in_dim, out_dim).to(device)
opt    = optim.Adam(model.parameters(), lr=LR)
lossfn = nn.MSELoss()

best_val = float("inf")
best_state = None
wait = 0
train_curve, val_curve = [], []

for epoch in range(1, EPOCHS+1):
    # --- train ---
    model.train()
    bl = []
    for xb, yb in train_loader:
        xb = xb.to(device); yb = yb.to(device)
        opt.zero_grad()
        pred = model(xb)
        loss = lossfn(pred, yb)
        loss.backward()
        opt.step()
        bl.append(loss.item())
    tr = float(np.mean(bl))

    # --- val ---
    model.eval()
    vl = []
    with torch.no_grad():
        for xb, yb in val_loader:
            xb = xb.to(device); yb = yb.to(device)
            pred = model(xb)
            vl.append(lossfn(pred, yb).item())
    va = float(np.mean(vl))

    train_curve.append(tr); val_curve.append(va)

    if va < best_val - 1e-5:
        best_val = va
        best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
        wait = 0
    else:
        wait += 1
        if wait >= PATIENCE:
            print(f"Early stopping @ epoch {epoch}")
            break

    if epoch % max(1, EPOCHS//10) == 0:
        print(f"Epoch {epoch:03d} | train {tr:.4e} | val {va:.4e}")

if best_state is not None:
    model.load_state_dict({k: v.to(device) for k, v in best_state.items()})


Epoch 010 | train 5.1364e-01 | val 1.1226e+00
Epoch 020 | train 2.7950e-01 | val 7.7675e-01
Epoch 030 | train 2.2228e-01 | val 6.3179e-01
Epoch 040 | train 1.6935e-01 | val 6.1798e-01
Epoch 050 | train 1.3688e-01 | val 5.1490e-01
Epoch 060 | train 1.1872e-01 | val 4.6922e-01
Epoch 070 | train 1.0710e-01 | val 4.3167e-01
Epoch 080 | train 9.6189e-02 | val 3.9438e-01
Epoch 090 | train 9.7734e-02 | val 3.7346e-01
Epoch 100 | train 8.9641e-02 | val 3.9096e-01


In [None]:
# =========================================
# 6) TEST PREDICTIONS & STATS
# =========================================
model.eval()
y_true_all, y_pred_all = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        y_true_all.append(yb.numpy())
        y_pred_all.append(model(xb).cpu().numpy())
y_true_all = np.concatenate(y_true_all, axis=0)
y_pred_all = np.concatenate(y_pred_all, axis=0)

# MSE por muestra
mse_per_sample = np.mean((y_pred_all - y_true_all)**2, axis=1)
test_mse = float(np.mean(mse_per_sample))
p5, p50, p95 = np.percentile(mse_per_sample, [5, 50, 95])

print(f"Test MSE medio  : {test_mse:.6e}")
print(f"Test MSE mediana: {p50:.6e} | p5={p5:.6e} | p95={p95:.6e}")
print("N test samples  :", len(mse_per_sample))

best_idx  = int(np.argmin(mse_per_sample))
worst_idx = int(np.argmax(mse_per_sample))


Test MSE medio  : 3.631112e-01
Test MSE mediana: 1.651108e-01 | p5=3.857406e-02 | p95=1.387319e+00
N test samples  : 8000


In [None]:
# =========================================
# 7) PLOT HELPERS (una figura por chart)
# =========================================
def plot_learning_curves():
    plt.figure(figsize=(8,5), dpi=140)
    plt.plot(np.arange(1, len(train_curve)+1), train_curve, label="Train loss (MSE)")
    plt.plot(np.arange(1, len(val_curve)+1),   val_curve,   label="Validation loss (MSE)")
    plt.title("Learning Curves — MLP predicting log-PSD")
    plt.xlabel("Epoch"); plt.ylabel("Loss (MSE)")
    plt.legend(frameon=False); plt.grid(True, linestyle=":")
    plt.show()

def plot_time_signal(idx):
    x = X_test[idx]
    plt.figure(figsize=(8,4), dpi=140)
    plt.plot(t, x, linewidth=1.5)
    plt.title(f"Señal de prueba (tiempo) — idx={idx}")
    plt.xlabel("Tiempo [s]"); plt.ylabel("Amplitud")
    plt.grid(True, linestyle=":")
    plt.xlim([t[0], t[-1]])
    plt.show()

def _predict_idx(idx):
    x = X_test[idx][None, :]
    x_s = scaler.transform(x).astype(np.float32)
    with torch.no_grad():
        y_pred = model(torch.from_numpy(x_s).to(device)).cpu().numpy()[0]
    return y_pred, Y_test[idx]

def plot_psd(idx):
    y_pred, y_true = _predict_idx(idx)
    sample_mse = float(np.mean((y_pred - y_true)**2))
    plt.figure(figsize=(8,5), dpi=140)
    plt.plot(f_axis, y_true, label="PSD real (log10)", linewidth=1.6)
    plt.plot(f_axis, y_pred, label="PSD predicha (log10)", linewidth=1.6)
    plt.title(f"PSD ({PSD_METHOD}) — Real vs Predicha — idx={idx}")
    plt.xlabel("Frecuencia [Hz]"); plt.ylabel("log10(PSD)")
    plt.legend(frameon=False); plt.grid(True, linestyle=":")
    plt.annotate(
        f"MSE muestra: {sample_mse:.3e}\nMSE test medio: {np.mean(mse_per_sample):.3e}",
        xy=(0.02, 0.02), xycoords="axes fraction"
    )
    plt.show()

def plot_residual(idx):
    y_pred, y_true = _predict_idx(idx)
    residual = y_pred - y_true
    plt.figure(figsize=(8,5), dpi=140)
    plt.plot(f_axis, residual, linewidth=1.4)
    plt.title(f"Residuo espectral (log-PSD pred − real) — idx={idx}")
    plt.xlabel("Frecuencia [Hz]"); plt.ylabel("Error (log-PSD)")
    plt.grid(True, linestyle=":")
    plt.annotate(
        f"Media: {np.mean(residual):.2e}\nStd: {np.std(residual):.2e}",
        xy=(0.02, 0.02), xycoords="axes fraction"
    )
    plt.show()

def plot_hist_errors():
    plt.figure(figsize=(8,5), dpi=140)
    plt.hist(mse_per_sample, bins=30)
    plt.title("Distribución del error (MSE por muestra) — Test")
    plt.xlabel("MSE (log-PSD por muestra)"); plt.ylabel("Frecuencia")
    plt.grid(True, linestyle=":")
    p5, p50, p95 = np.percentile(mse_per_sample, [5, 50, 95])
    txt = f"media={np.mean(mse_per_sample):.2e}\nmediana={p50:.2e}\np5={p5:.2e}\np95={p95:.2e}"
    plt.annotate(txt, xy=(0.68, 0.68), xycoords="axes fraction")
    plt.show()


In [None]:
# =========================================
# 8) DASHBOARD
# =========================================
# --- Controles ---
mode = W.ToggleButtons(
    options=[("Aleatorio", "random"), ("Mejor (min MSE)", "best"), ("Peor (max MSE)", "worst"), ("Elegir índice", "pick")],
    description="Ejemplo:",
    disabled=False,
    button_style=""
)

idx_slider = W.IntSlider(value=0, min=0, max=len(X_test)-1, step=1, description="idx", continuous_update=False)
btn_refresh = W.Button(description="Actualizar vistas", icon="refresh")
metrics_html = W.HTML()

# --- Salidas (una por pestaña) ---
out_metrics   = W.Output()
out_learn     = W.Output()
out_time      = W.Output()
out_psd       = W.Output()
out_residual  = W.Output()
out_hist      = W.Output()

tabs = W.Tab(children=[out_metrics, out_learn, out_time, out_psd, out_residual, out_hist])
for i, title in enumerate(["Métricas", "Learning Curves", "Señal (tiempo)", "PSD vs Pred", "Residuo", "Hist. errores"]):
    tabs.set_title(i, title)

def _pick_index():
    if mode.value == "random":
        return int(np.random.randint(0, len(X_test)))
    elif mode.value == "best":
        return int(best_idx)
    elif mode.value == "worst":
        return int(worst_idx)
    else:
        return int(idx_slider.value)

def _render_all(_=None):
    idx = _pick_index()

    # Panel de métricas
    with out_metrics:
        clear_output(wait=True)
        print(f"== Resumen en TEST ==")
        print(f" MSE medio    : {np.mean(mse_per_sample):.4e}")
        print(f" MSE mediana  : {np.median(mse_per_sample):.4e}")
        print(f" p5 / p95     : {np.percentile(mse_per_sample,5):.4e} / {np.percentile(mse_per_sample,95):.4e}")
        print(f" N muestras   : {len(mse_per_sample)}")
        print(f" Mejor idx    : {best_idx} (MSE={mse_per_sample[best_idx]:.3e})")
        print(f" Peor  idx    : {worst_idx} (MSE={mse_per_sample[worst_idx]:.3e})")
        print(f" Mostrando idx: {idx}")

    # Learning curves
    with out_learn:
        clear_output(wait=True)
        plot_learning_curves()

    # Señal tiempo
    with out_time:
        clear_output(wait=True)
        plot_time_signal(idx)

    # PSD comparación
    with out_psd:
        clear_output(wait=True)
        plot_psd(idx)

    # Residuo
    with out_residual:
        clear_output(wait=True)
        plot_residual(idx)

    # Histograma
    with out_hist:
        clear_output(wait=True)
        plot_hist_errors()

btn_refresh.on_click(_render_all)

controls = W.HBox([mode, idx_slider, btn_refresh])
display(W.VBox([controls, tabs]))

# Render inicial
_render_all()


VBox(children=(HBox(children=(ToggleButtons(description='Ejemplo:', options=(('Aleatorio', 'random'), ('Mejor …

In [None]:
# ===== Real audio: Welch vs Modelo (snippet compacto) =====
# Si te falta librosa:  !pip install librosa --quiet
import numpy as np, matplotlib.pyplot as plt, ipywidgets as W
import librosa
from google.colab import files
from scipy.signal import welch, get_window

print("Sube tu audio (wav/mp3) ...")
uploaded = files.upload()
audio_path = list(uploaded.keys())[0]
audio, _ = librosa.load(audio_path, sr=int(FS), mono=True)  # remuestrea a tu FS y mono
print(f"Audio remuestreado -> {len(audio)} muestras @ {int(FS)} Hz")

# Segmentación en ventanas de tamaño N (sin solape, simple)
n_segments = max(1, len(audio)//N)
seg_slider = W.IntSlider(value=0, min=0, max=n_segments-1, step=1, description="segment", continuous_update=False)
display(seg_slider)

# Reutiliza EXACTAMENTE tus parámetros/escala del cuaderno
win = get_window("hann", NPERSEG)
EPS = 1e-12

def _get_segment(idx):
    start = idx * N
    end   = start + N
    x = audio[start:end].astype(np.float32)
    if len(x) < N:  # padding si el último segmento queda corto
        x = np.pad(x, (0, N-len(x)))
    return x

def _predict_psd(x):
    # PSD real (Welch) en log10
    _, Pxx = welch(x[None, :], fs=FS, window=win, nperseg=NPERSEG, noverlap=NOVERLP,
                   nfft=NFFT, scaling="density", axis=1)
    y_true = np.log10(Pxx[0] + EPS).astype(np.float32)
    # PSD del modelo (usa tu scaler y red ya entrenada)
    x_in = scaler.transform(x[None, :]).astype(np.float32)
    with torch.no_grad():
        y_pred = model(torch.from_numpy(x_in).to(device)).cpu().numpy()[0]
    return y_true, y_pred

def show_real_audio_compare(idx):
    x = _get_segment(idx)
    y_true, y_pred = _predict_psd(x)
    seg_mse = float(np.mean((y_pred - y_true)**2))

    # 1) Señal en tiempo
    t_seg = np.arange(N)/FS
    plt.figure(figsize=(8, 3.6), dpi=140)
    plt.plot(t_seg, x, linewidth=1.5)
    plt.title(f"Señal real (tiempo) — segmento {idx}")
    plt.xlabel("Tiempo [s]"); plt.ylabel("Amplitud")
    plt.grid(True, linestyle=":")
    plt.xlim([t_seg[0], t_seg[-1]])
    plt.show()

    # 2) PSD Real vs Modelo
    plt.figure(figsize=(8, 4.6), dpi=140)
    plt.plot(f_axis, y_true, label="PSD real (log10)", linewidth=1.6)
    plt.plot(f_axis, y_pred, label="PSD modelo (log10)", linewidth=1.6)
    plt.title(f"PSD (Welch) — Real vs Modelo — segmento {idx}")
    plt.xlabel("Frecuencia [Hz]"); plt.ylabel("log10(PSD)")
    plt.legend(frameon=False); plt.grid(True, linestyle=":")
    plt.annotate(f"MSE seg: {seg_mse:.3e}", xy=(0.02, 0.02), xycoords="axes fraction")
    plt.show()

    # 3) Residuo espectral
    resid = y_pred - y_true
    plt.figure(figsize=(8, 4.6), dpi=140)
    plt.plot(f_axis, resid, linewidth=1.4)
    plt.title("Residuo espectral (modelo − real)")
    plt.xlabel("Frecuencia [Hz]"); plt.ylabel("Error (log-PSD)")
    plt.grid(True, linestyle=":")
    plt.annotate(f"Media: {np.mean(resid):.2e}\nStd: {np.std(resid):.2e}", xy=(0.02, 0.02), xycoords="axes fraction")
    plt.show()

W.interact(lambda segment: show_real_audio_compare(segment), segment=seg_slider);


Sube tu audio (wav/mp3) ...


Saving Luister La Voz, Maikol El Insoportable - Las Cartas (Video Oficial).mp4 to Luister La Voz, Maikol El Insoportable - Las Cartas (Video Oficial) (2).mp4


  audio, _ = librosa.load(audio_path, sr=int(FS), mono=True)  # remuestrea a tu FS y mono
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Audio remuestreado -> 221414 muestras @ 1024 Hz


IntSlider(value=0, continuous_update=False, description='segment', max=215)

interactive(children=(IntSlider(value=0, continuous_update=False, description='segment', max=215), Output()), …