In [2]:
import os, re, datetime
import numpy as np
import pandas as pd
import h5py
from typing import List, Optional
import scipy.io


# ───────────────────────── util ─────────────────────────
def matlab_datenum_to_datetime(dn: float) -> datetime.datetime:
    """Convierte un datenum MATLAB a datetime; si dn es NaN → NaT."""
    if np.isnan(dn):
        return pd.NaT
    frac = dn % 1
    return (datetime.datetime.fromordinal(int(dn))
            + datetime.timedelta(days=frac) - datetime.timedelta(days=366))

def mat_to_parquet(file_path: str,
                   file_path_ibif: str,
                   output_folder: str = '.',
                   valid_keys: Optional[List[str]] = None,
                   ibif_keys: Optional[List[str]] = None,
                   week: str = "W1"):
    """
    Une archivo de señales (*.mat) + archivo IBIF separado.
    Conserva solo muestras en las que:
        voicedRMS == 1  Y  breathGroup contiene ≥15 unos consecutivos.
    Descarta filas con NaN en timestamps (solo si el .mat traía timestamps).
    Guarda resultado a .parquet.
    """
    if valid_keys is None:
        raise ValueError("Debes proporcionar una lista de valid_keys.")
    if ibif_keys is None:
        ibif_keys = ['acflow', 'mfdr', 'oq', 'naq', 'h1h2', 'voicedRMS']

    # ────────── metadatos del nombre ──────────
    base = os.path.basename(file_path).replace('.mat', '')
    m = re.match(r'([NP])([FM])(\d+)_([\d]{8})', base)
    if not m:
        raise ValueError("Nombre de archivo no válido: " + base)
    stat_tag, gen_tag, sid, date_str = m.groups()
    status     = 'Normal' if stat_tag == 'N' else 'Pathological'
    subject_id = f"{stat_tag}{gen_tag}{sid}"
    date_dt    = datetime.datetime.strptime(date_str, "%Y%m%d")

    # ────────── helper de carga ────────────────
    def load_mat(path):
        try:
            with h5py.File(path, 'r') as f:
                return {k: np.array(f[k]) for k in f.keys()}
        except OSError:
            return scipy.io.loadmat(path)

    mat_data  = load_mat(file_path)
    ibif_data = load_mat(file_path_ibif)

    # -------- voicedRMS --------
    if 'voicedRMS' not in ibif_data:
        raise KeyError(f"{file_path_ibif} no contiene 'voicedRMS'")
    voiced_mask = ibif_data['voicedRMS'].flatten().astype(bool)

    # -------- breathGroup ------ (puede estar en archivo principal o IBIF)
    if 'breathGroup' in mat_data:
        breath = mat_data['breathGroup'].flatten().astype(bool)
    elif 'breathGroup' in ibif_data:
        breath = ibif_data['breathGroup'].flatten().astype(bool)
    else:
        raise KeyError("'breathGroup' no encontrado en archivos")

    if len(breath) != len(voiced_mask):
        raise ValueError("'breathGroup' y 'voicedRMS' tienen longitudes distintas")

    # ► breath_long: secuencias de 1 con longitud ≥ 15
    breath_long = np.zeros_like(breath, dtype=bool)
    i = 0
    n = len(breath)
    while i < n:
        if breath[i]:
            j = i
            while j < n and breath[j]:
                j += 1
            if j - i >= 30:
                breath_long[i:j] = True
            i = j
        else:
            i += 1

    # máscara final
    use_mask = voiced_mask & breath_long
    if not use_mask.any():
        print(f"⚠️ {base}: ninguna muestra cumple voiced & breathGroup≥15")
        return

    # -------- timestamps --------
    has_ts = 'timestamps' in mat_data
    if has_ts:
        ts_raw = mat_data['timestamps'].flatten()[use_mask]
        ts_dt  = np.array([matlab_datenum_to_datetime(x) for x in ts_raw])
        ts_sec = np.array([(t - date_dt).total_seconds() if t is not pd.NaT else np.nan
                           for t in ts_dt])
    else:
        print("⚠️ archivo sin 'timestamps'; se usarán NaN/NaT.")
        ts_dt  = np.full(use_mask.sum(), pd.NaT)
        ts_sec = np.full(use_mask.sum(), np.nan)

    # -------- DataFrame base ----
    df = pd.DataFrame({
        'ts'        : ts_dt,
        'ts_sec'    : ts_sec,
        'subject_id': subject_id,
        'status'    : 'Control' if status == 'Normal' else 'Patient',
        'week'      : ('Control' if status == 'Normal'
                       else 'Pre' if week == 'W1' else 'Post'),
        'date'      : date_str
    })

    # -------- agregar features principales -----
    for k in valid_keys:
        if k in mat_data and k != 'timestamps':
            df[k] = mat_data[k].flatten()[use_mask]
        else:
            print(f"⚠️ '{k}' no encontrado en {base}")

    # -------- agregar features IBIF -------------
    for k in ibif_keys:
        if k in ibif_data:
            df[k] = ibif_data[k].flatten()[use_mask]
        else:
            print(f"⚠️ '{k}' no encontrado en IBIF")

    # -------- eliminar NaN en ts si hay timestamps ----
    if has_ts:
        n0 = len(df)
        df = df[~df['ts'].isna()]
        if len(df) < n0:
            print(f"🔍 Eliminadas {n0-len(df)} filas con NaN en timestamps.")

    if df.empty:
        print("⚠️ No quedan filas válidas; archivo omitido.")
        return

    # -------- guardar parquet -------------
    os.makedirs(output_folder, exist_ok=True)
    out_path = os.path.join(output_folder, f"{subject_id}_{date_str}.parquet")
    df.to_parquet(out_path, index=False)
    print(f"📦 Guardado → {out_path}")

# ───────────────────── función principal ─────────────────
def mat_to_parquet_new(file_path: str,
                       output_folder: str = '.',
                       valid_keys: Optional[List[str]] = None,
                       ibif_keys: Optional[List[str]] = None,
                       week: str = "W1"):

    if valid_keys is None:
        raise ValueError("Debes proporcionar una lista de valid_keys.")
    if ibif_keys is None:
        ibif_keys = ['acflow', 'mfdr', 'oq', 'naq', 'h1h2', 'voicedRMS']

    # ----------- metadatos ----------- #
    base = os.path.basename(file_path).replace('.mat', '')
    m = re.match(r'([NP])([FM])(\d+)_([\d]{8})', base)
    if not m:
        raise ValueError("Nombre de archivo no válido: " + base)
    status_tag, gender_tag, sid, date_str = m.groups()
    status     = 'Normal' if status_tag == 'N' else 'Pathological'
    subject_id = f"{status_tag}{gender_tag}{sid}"
    date_dt    = datetime.datetime.strptime(date_str, "%Y%m%d")

    # ----------- lectura HDF5 --------- #
    with h5py.File(file_path, 'r') as f:
        print(f"✅ {file_path} (HDF5)")

        # --- voiced mask (IBIF) --- #
        ibif = f['IBIF']
        voiced_mask = np.array(ibif['voicedRMS']).flatten().astype(bool)

        # --- breathGroup --- #
        if 'breathGroup' in f:
            breath = np.array(f['breathGroup']).flatten().astype(bool)
        elif 'breathGroup' in ibif:
            breath = np.array(ibif['breathGroup']).flatten().astype(bool)
        else:
            raise KeyError("'breathGroup' no encontrado ni en raíz ni en 'IBIF'")

        if len(breath) != len(voiced_mask):
            raise ValueError("'breathGroup' y 'voicedRMS' tienen longitudes distintas")

        # ► generar máscara breath_long (secuencias de 1 de longitud ≥ 15)
        breath_long = np.zeros_like(breath, dtype=bool)
        i = 0
        n = len(breath)
        while i < n:
            if breath[i]:
                j = i
                while j < n and breath[j]:
                    j += 1
                if j - i >= 30:
                    breath_long[i:j] = True
                i = j
            else:
                i += 1

        # máscara definitiva: voiced & breath_long
        use_mask = voiced_mask & breath_long
        if not use_mask.any():
            print("⚠️ Ninguna muestra cumple voiced + breathGroup≥15; archivo omitido.")
            return

        # --- timestamps --- #
        has_ts = "timestamps" in f
        if has_ts:
            ts_raw = np.array(f["timestamps"]).flatten()
            ts_raw = ts_raw[use_mask]                      # filtrar ya aquí
            ts_dt  = np.array([matlab_datenum_to_datetime(x) for x in ts_raw])
            ts_sec = np.array([(t - date_dt).total_seconds()
                               if t is not pd.NaT else np.nan
                               for t in ts_dt])
        else:
            print("⚠️ NO hay 'timestamps'; se registrarán NaN/NaT.")
            ts_dt  = np.full(use_mask.sum(), pd.NaT)
            ts_sec = np.full(use_mask.sum(), np.nan)

        # ----------- DataFrame base --------- #
        df = pd.DataFrame({
            'ts'        : ts_dt,
            'ts_sec'    : ts_sec,
            'subject_id': subject_id,
            'status'    : 'Control' if status == 'Normal' else 'Patient',
            'week'      : ('Control' if status == 'Normal'
                           else 'Pre' if week == 'W1' else 'Post'),
            'date'      : date_str
        })

        # --------- añadir señales planas ------- #
        for k in valid_keys:
            if k in f and k != 'timestamps':
                df[k] = np.array(f[k]).flatten()[use_mask]
            else:
                print(f"⚠️ '{k}' no encontrado en {base}")

        # --------- añadir señales IBIF --------- #
        for k in ibif_keys:
            if k in ibif:
                df[k] = np.array(ibif[k]).flatten()[use_mask]
            else:
                print(f"⚠️ '{k}' no encontrado en IBIF")

    # --- descartar filas con NaT si había timestamps reales --- #
    if has_ts:
        n0 = len(df)
        df = df[~df['ts'].isna()]
        if len(df) < n0:
            print(f"🔍 Eliminadas {n0-len(df)} filas con NaN en timestamps.")

    if df.empty:
        print("⚠️ No quedan filas válidas; archivo omitido.")
        return

    # ----------- guardar ---------------------- #
    os.makedirs(output_folder, exist_ok=True)
    out_path = os.path.join(output_folder, f"{subject_id}_{date_str}.parquet")
    df.to_parquet(out_path, index=False)
    print(f"📦 Guardado → {out_path}")


In [None]:
valid_keys = ['cppall', 'zcrall', 'normpeakall', 'spectralTiltall', 'LHratioall', 'H1H2all', 'periodicity', 'level', 'freq', 'dBcms2', 'cppall_2048']
ibif_keys=['acflow', 'mfdr', 'oq', 'naq', 'h1h2', 'voicedRMS']

carpeta = "data/NF063/W1"
output_folder = "parquets/no_wind/NF063"

# Listar todos los archivos .mat y filtrar los que NO son IBIF
archivos_base = sorted([
    f for f in os.listdir(carpeta)
    if f.endswith(".mat") and "_IBIF" not in f
])

for base_file in archivos_base:
    # Construir ruta completa
    file_path = os.path.join(carpeta, base_file)

    # Crear nombre de archivo IBIF
    name_no_ext = base_file.replace(".mat", "")
    ibif_name = f"{name_no_ext}_01_IBIF.mat"
    file_path_ibif = os.path.join(carpeta, ibif_name)

    # Verificar que exista el archivo IBIF
    if not os.path.exists(file_path_ibif):
        print(f"❌ No se encontró IBIF para {base_file}")
        continue

    try:
        # Llamar a tu función
        mat_to_parquet(
            file_path=file_path,
            file_path_ibif=file_path_ibif,
            output_folder=output_folder,
            valid_keys=valid_keys,
            ibif_keys=ibif_keys,
            week="W1",  # puedes cambiar esto si detectas W2 por nombre
        )
    except Exception as e:
        print(f"⚠️ Error procesando {base_file}: {e}")

In [3]:
valid_keys = ['cppall', 'zcrall', 'normpeakall', 'spectralTiltall', 'LHratioall', 'H1H2all', 'periodicity', 'level', 'freq', 'dBcms2', 'cppall_2048']
ibif_keys=['acflow', 'mfdr', 'oq', 'naq', 'h1h2', 'voicedRMS']

sujetos = ["NF140", "PF140", "NF129", "PF129", "NF109", "PF109", "NF022", "PF022", "NF021", "PF021"]

base_input_dir  = "data"
base_output_dir = "parquets/no_wind"

for sujeto in sujetos:
    sujeto_path = os.path.join(base_input_dir, sujeto)
    week_dirs = ["W1"] if sujeto.startswith("NF") else ["W1", "W2"]

    for week in week_dirs:
        week_path = os.path.join(sujeto_path, week)
        output_folder = os.path.join(base_output_dir, sujeto)

        if not os.path.isdir(week_path):
            print(f"⚠️ Carpeta no encontrada: {week_path}")
            continue

        for file in sorted(os.listdir(week_path)):
            if file.endswith(".mat"):
                file_path = os.path.join(week_path, file)
                try:
                    mat_to_parquet_new(
                        file_path=file_path,
                        output_folder=output_folder,
                        valid_keys=valid_keys,
                        ibif_keys=ibif_keys,
                        week=week  # "W1" o "W2"
                    )
                except Exception as e:
                    print(f"❌ Error al procesar {file_path}: {e}")


✅ data\NF140\W1\NF140_20150101.mat (HDF5)
📦 Guardado → parquets/no_wind\NF140\NF140_20150101.parquet
✅ data\NF140\W1\NF140_20150102.mat (HDF5)
📦 Guardado → parquets/no_wind\NF140\NF140_20150102.parquet
✅ data\NF140\W1\NF140_20150103.mat (HDF5)
📦 Guardado → parquets/no_wind\NF140\NF140_20150103.parquet
✅ data\NF140\W1\NF140_20150105.mat (HDF5)
📦 Guardado → parquets/no_wind\NF140\NF140_20150105.parquet
✅ data\NF140\W1\NF140_20150106.mat (HDF5)
📦 Guardado → parquets/no_wind\NF140\NF140_20150106.parquet
✅ data\NF140\W1\NF140_20150107.mat (HDF5)
📦 Guardado → parquets/no_wind\NF140\NF140_20150107.parquet
✅ data\NF140\W1\NF140_20150108.mat (HDF5)
📦 Guardado → parquets/no_wind\NF140\NF140_20150108.parquet
✅ data\PF140\W1\PF140_20150101.mat (HDF5)
📦 Guardado → parquets/no_wind\PF140\PF140_20150101.parquet
✅ data\PF140\W1\PF140_20150102.mat (HDF5)
📦 Guardado → parquets/no_wind\PF140\PF140_20150102.parquet
✅ data\PF140\W1\PF140_20150103.mat (HDF5)
📦 Guardado → parquets/no_wind\PF140\PF140_2015010