SE√ëALES FILTRADAS DOM TEMPORAL

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pywt
from scipy.io import wavfile
from scipy.signal import decimate

# ========================
# CONFIGURACI√ìN PRINCIPAL
# ========================
audio_path    = "audios/Rec091.wav"
tabla_path    = "tiempo_recortes_sincronizados.xlsx"
etiquetas_dir = "etiquetas"
selected_iddsi = 4

# Nuevas carpetas
carpeta_npz_temporal = "power_data_temporal_fil"
carpeta_fig_temporal = "FIG_TEMP_FIL"

os.makedirs(carpeta_npz_temporal, exist_ok=True)
os.makedirs(carpeta_fig_temporal, exist_ok=True)


# ========================
# FUNCIONES AUXILIARES
# ========================

def _safe_name(s: str) -> str:
    return "".join(c if c.isalnum() or c in "._- " else "_" for c in s)

def reconstruct_subset(coeffs, wavelet, include_approx=True, include_details=None):
    J = len(coeffs) - 1
    include_details = include_details or []
    sel = [np.zeros_like(c) for c in coeffs]

    if include_approx:
        sel[0] = coeffs[0]  # Aproximaci√≥n A10

    for lvl in include_details:
        idx = J - lvl + 1
        sel[idx] = coeffs[idx]

    return pywt.waverec(sel, wavelet)

def cargar_sorbos(codigo, iddsi):
    sorbo_path = os.path.join(etiquetas_dir, f"IDDSI{iddsi}_{codigo}.txt")
    if not os.path.exists(sorbo_path):
        return None
    return pd.read_csv(sorbo_path, sep=r"\s+")


# ========================
# LEE TABLA DE CORTE
# ========================
tabla = pd.read_excel(tabla_path)
tabla["ID_audio"] = tabla["ID_audio"].astype(str).str.strip()

audio_name = os.path.basename(audio_path)

filas_target = tabla[(tabla["ID_audio"] == audio_name) &
                     (tabla["IDDSI"] == selected_iddsi)]

if filas_target.empty:
    raise ValueError("No hay segmento IDDSI solicitado para este audio en la tabla.")


# ========================
# CARGA Y DECIMA AUDIO
# ========================
fs, data = wavfile.read(audio_path)

if data.ndim > 1:
    data = data[:, 0]

data = data.astype(np.float32)

# Downsampling x2
factor = 2
data = decimate(data, factor, ftype='fir', zero_phase=True).astype(np.float32)
fs = fs // factor


# ========================
# PROCESAMIENTO PRINCIPAL
# ========================
for _, row in filas_target.iterrows():

    id_vol = str(row["ID_Voluntario"]).strip()
    tipo = "sujetos" if "sujeto" in id_vol.lower() else "pacientes"
    numero = ''.join(filter(str.isdigit, id_vol))
    codigo = f"S{numero}" if tipo == "sujetos" else f"P{numero}"

    t0, t1 = float(row["cut_i_audio"]), float(row["cut_f_audio"])
    i0_seg, i1_seg = int(t0 * fs), int(t1 * fs)
    seg = data[i0_seg:i1_seg]

    # ========================
    # APLICA DWT (db4)
    # ========================
    dwt_wavelet = "db4"
    dwt_level = 10
    coeffs_dwt = pywt.wavedec(seg, dwt_wavelet, level=dwt_level)
    coeffs_dwt = [c.astype(np.float32) for c in coeffs_dwt]

    # ========================
    # RECONSTRUCCI√ìN FILTRADA (A10 + D10 + D9 + D8 + D1)
    # ========================
    include_details = [10, 9, 8, 1]

    recon = reconstruct_subset(coeffs_dwt, dwt_wavelet,
                               include_approx=True,
                               include_details=include_details)

    recon = recon[:len(seg)]
    residual = seg - recon   # ESTA ES LA SE√ëAL FILTRADA FINAL (FIL)


    # ========================
    # CARGAR SORBOS
    # ========================
    sorbos_df = cargar_sorbos(codigo, selected_iddsi)

    if sorbos_df is None:
        print(f"[WARN] No hay etiquetas de sorbos: IDDSI{selected_iddsi}_{codigo}.txt")
        continue


    # ========================
    # PROCESAR POR CADA SORBO
    # ========================
    for _, s in sorbos_df.iterrows():

        num = int(s["sorbo"])
        i_deg, f_deg = float(s["i_deg"]), float(s["f_deg"])

        idx0 = int(i_deg * fs)
        idx1 = int(f_deg * fs)

        seg_fil = residual[idx0:idx1]


        # ========================
        # GUARDAR GR√ÅFICO TEMPORAL
        # ========================
        plt.figure(figsize=(10, 3))
        t_axis = np.linspace(i_deg, f_deg, len(seg_fil))
        plt.plot(t_axis, seg_fil, linewidth=0.8)
        plt.title(f"{codigo} ‚Äì IDDSI {selected_iddsi} ‚Äì Sorbo {num} ‚Äì FIL")
        plt.xlabel("Tiempo (s)")
        plt.ylabel("Amplitud")
        plt.grid(True)

        nombre_fig = _safe_name(f"{codigo}_IDDSI{selected_iddsi}_S{num}_TEMP_FIL.png")
        plt.savefig(os.path.join(carpeta_fig_temporal, nombre_fig), dpi=160, bbox_inches="tight")
        plt.close()


        # ========================
        # GUARDAR NPZ FILTRADO
        # ========================
        carpeta_guardado = os.path.join(carpeta_npz_temporal, tipo, f"IDDSI{selected_iddsi}")
        os.makedirs(carpeta_guardado, exist_ok=True)

        nombre_npz = f"{codigo}S{num}.npz"
        ruta_npz = os.path.join(carpeta_guardado, nombre_npz)

        np.savez_compressed(
            ruta_npz,
            signal_fil=seg_fil.astype(np.float32),
            sujeto=codigo,
            iddsi=selected_iddsi,
            sorbo=num,
            fs=fs,
            t_ini=i_deg,
            t_fin=f_deg,
            downsample_factor=factor,
            filtro_wavelet="db4_A10_D10_D9_D8_D1"
        )


print("\nPROCESAMIENTO COMPLETO")
print(f"NPZ temporales FIL en: {os.path.abspath(carpeta_npz_temporal)}")
print(f" Figuras temporales FIL en: {os.path.abspath(carpeta_fig_temporal)}")



>>> ‚úÖ PROCESAMIENTO COMPLETO
üìÅ NPZ temporales FIL en: d:\Tesis\python\power_data_temporal_fil
üìÅ Figuras temporales FIL en: d:\Tesis\python\FIG_TEMP_FIL
