In [1]:
#DATA AUGMENTATION 
# script: data_augmentation_per_class.py
# necesita: pip install librosa soundfile numpy scipy tqdm
# ejecutarlo con python >= 3.8

import os
import random
import shutil
from pathlib import Path
import uuid

import numpy as np
import soundfile as sf
import librosa
from scipy import signal
from tqdm import tqdm

# ---------------------- parámetros ----------------------
# ruta con subcarpetas por enfermedad (origen)
SRC_ROOT = r"D:\dataset pf\COPIA\organized_by_type\FILTERED_AUDIO_10s"

# ruta donde se guardarán los audios balanceados / aumentados
DST_ROOT = os.path.join(SRC_ROOT, "data_augmentation")

# cantidad objetivo por clase (default 200)
num_target_per_class = 1778

# frecuencia de muestreo objetivo (hz)
sr = 4000

# semilla para reproducibilidad
random_seed = 42
random.seed(random_seed)
np.random.seed(random_seed)

# duración minima/ maxima (segundos) para algunas transformaciones (si aplica)
# aqui se supone que los audios son de aprox 10s; ajustar si es necesario
min_duration_s = 0.5
max_duration_s = 30.0

# ---------------------- utilidades ----------------------
def ensure_dir(path):
    """crear carpeta si no existe"""
    os.makedirs(path, exist_ok=True)

def list_wav_files(folder):
    """listar archivos .wav en una carpeta (no recorre subcarpetas)"""
    return [str(p) for p in Path(folder).glob("*.wav")]

def save_wav(y, sr_local, dst_path):
    """guardar audio como wav, normalizando a -1..1 si es necesario"""
    # asegurar rango float32
    if y.dtype.kind == 'i':
        y = y.astype(np.float32) / np.iinfo(y.dtype).max
    # clip para seguridad
    y = np.clip(y, -1.0, 1.0)
    sf.write(dst_path, y, sr_local, subtype='PCM_16')

def load_and_resample(path, target_sr):
    """cargar audio y re-muestrear a target_sr"""
    # librosa devuelve float32 en rango -1..1
    y, existing_sr = librosa.load(path, sr=None, mono=True)
    if existing_sr != target_sr:
        y = librosa.resample(y, orig_sr=existing_sr, target_sr=target_sr)
    return y

# ---------------------- transformaciones ----------------------
def add_white_noise(y, snr_db_min=10, snr_db_max=30):
    """añadir ruido blanco con snr en dB aleatorio"""
    rms = np.sqrt(np.mean(y**2))
    snr_db = random.uniform(snr_db_min, snr_db_max)
    snr = 10 ** (snr_db / 20.0)
    noise_rms = rms / snr
    noise = np.random.normal(0, noise_rms, size=y.shape[0])
    return y + noise

def time_shift(y, shift_max_seconds=1.0, sr_local=sr):
    """desplazar en el tiempo (circular shift) hasta shift_max_seconds"""
    shift = int(random.uniform(-shift_max_seconds, shift_max_seconds) * sr_local)
    return np.roll(y, shift)

def time_stretch(y, min_rate=0.8, max_rate=1.25):
    """estirar/encoger en el tiempo. cuidado: cambia la duración"""
    rate = random.uniform(min_rate, max_rate)
    # librosa.effects.time_stretch requiere frames; si es muy corto, evitar
    try:
        y_st = librosa.effects.time_stretch(y, rate)
    except Exception:
        # si falla, devolver original
        return y
    return y_st

def pitch_shift(y, sr_local=sr, n_steps_min=-2, n_steps_max=2):
    """cambiar tono en semitonos"""
    n_steps = random.uniform(n_steps_min, n_steps_max)
    try:
        y_ps = librosa.effects.pitch_shift(y, sr_local, n_steps)
    except Exception:
        return y
    return y_ps

def change_volume(y, gain_db_min=-6, gain_db_max=6):
    """cambiar volumen en decibelios"""
    gain_db = random.uniform(gain_db_min, gain_db_max)
    factor = 10 ** (gain_db / 20.0)
    return y * factor

def random_crop_or_pad(y, target_length_samples):
    """recortar o rellenar (con ceros) para ajustar longitud"""
    cur = len(y)
    if cur > target_length_samples:
        # recortar aleatoriamente una ventana
        start = random.randint(0, cur - target_length_samples)
        return y[start:start + target_length_samples]
    elif cur < target_length_samples:
        # pad al final o inicio aleatoriamente
        pad_left = random.randint(0, target_length_samples - cur)
        pad_right = target_length_samples - cur - pad_left
        return np.pad(y, (pad_left, pad_right), mode='constant')
    else:
        return y

def lowpass_filter(y, sr_local=sr, cutoff_hz=None):
    """aplicar filtro pasa bajos butterworth de orden 4"""
    if cutoff_hz is None:
        cutoff_hz = random.uniform(300.0, min(3000.0, sr_local/2 - 10))
    nyq = 0.5 * sr_local
    normal_cutoff = cutoff_hz / nyq
    b, a = signal.butter(4, normal_cutoff, btype='low', analog=False)
    return signal.filtfilt(b, a, y)

def highpass_filter(y, sr_local=sr, cutoff_hz=None):
    """aplicar filtro pasa altos butterworth de orden 4"""
    if cutoff_hz is None:
        cutoff_hz = random.uniform(20.0, 300.0)
    nyq = 0.5 * sr_local
    normal_cutoff = cutoff_hz / nyq
    b, a = signal.butter(4, normal_cutoff, btype='high', analog=False)
    return signal.filtfilt(b, a, y)

# ---------------------- función principal de augmentación ----------------------
def apply_random_augmentations(y, sr_local=sr, n_transforms=None, target_length_samples=None):
    """
    aplicar una combinación aleatoria de transformaciones.
    n_transforms: cuantas transformaciones aplicar (si None: random 1..3)
    target_length_samples: si se quiere forzar longitud al final (crop/pad)
    """
    if n_transforms is None:
        n_transforms = random.randint(1, 3)

    transforms_applied = []
    y_aug = y.copy()

    choices = [
        ('add_noise', lambda yy: add_white_noise(yy)),
        ('time_shift', lambda yy: time_shift(yy, shift_max_seconds=1.5, sr_local=sr_local)),
        ('time_stretch', lambda yy: time_stretch(yy, 0.9, 1.1)),
        ('pitch_shift', lambda yy: pitch_shift(yy, sr_local)),
        ('change_volume', lambda yy: change_volume(yy, -8, 8)),
        ('lowpass', lambda yy: lowpass_filter(yy, sr_local)),
        ('highpass', lambda yy: highpass_filter(yy, sr_local)),
    ]

    # escoger sin reemplazo para variedad
    chosen = random.sample(choices, k=min(n_transforms, len(choices)))
    for name, func in chosen:
        try:
            y_aug = func(y_aug)
            transforms_applied.append(name)
        except Exception:
            # si una transform falla, ignorarla
            continue

    # si se especificó longitud objetivo, ajustar con crop/pad
    if target_length_samples is not None:
        y_aug = random_crop_or_pad(y_aug, target_length_samples)

    return y_aug, transforms_applied

# ---------------------- procesamiento por clase ----------------------
def process_class_folder(class_folder, dst_class_folder, target_count=num_target_per_class, sr_local=sr):
    """
    procesar una carpeta de clase:
    - si hay >= target_count: seleccionar aleatoriamente target_count archivos y copiarlos (resample si es necesario)
    - si hay < target_count: copiar todos y generar audios aumentados hasta target_count
    """
    ensure_dir(dst_class_folder)
    wavs = list_wav_files(class_folder)
    n_available = len(wavs)

    if n_available == 0:
        print(f"advertencia: la carpeta {class_folder} no tiene archivos .wav")
        return

    # calcular numero de muestras en muestras para cropping/padding
    # intentar usar la duración media como referencia (si los archivos son de duración variable, esto normaliza)
    durations = []
    for p in wavs:
        try:
            info = sf.info(p)
            durations.append(info.frames / info.samplerate)
        except Exception:
            continue
    if durations:
        median_dur = float(np.median(durations))
    else:
        median_dur = 10.0  # asumir 10s si no se pudo leer
    target_len_samples = int(sr_local * median_dur)

    if n_available >= target_count:
        # seleccionar target_count aleatorios de los originales
        chosen = random.sample(wavs, target_count)
        for idx, src in enumerate(tqdm(chosen, desc=f"copiando {os.path.basename(class_folder)}")):
            y = load_and_resample(src, sr_local)
            # ajustar longitud al target_len_samples para consistencia
            y_out = random_crop_or_pad(y, target_len_samples)
            base = Path(src).stem
            dst_name = f"{base}_copy_{idx:04d}.wav"
            dst_path = os.path.join(dst_class_folder, dst_name)
            save_wav(y_out, sr_local, dst_path)
    else:
        # primero copiar todos los originales (resample y pad/crop)
        for idx, src in enumerate(tqdm(wavs, desc=f"copiando existentes {os.path.basename(class_folder)}")):
            y = load_and_resample(src, sr_local)
            y_out = random_crop_or_pad(y, target_len_samples)
            base = Path(src).stem
            dst_name = f"{base}_orig_{idx:04d}.wav"
            dst_path = os.path.join(dst_class_folder, dst_name)
            save_wav(y_out, sr_local, dst_path)

        # generar aumentaciones hasta target_count
        needed = target_count - n_available
        pbar = tqdm(total=needed, desc=f"generando aumentos {os.path.basename(class_folder)}")
        attempts = 0
        max_attempts = needed * 10  # prevenir loop infinito
        created = 0
        while created < needed and attempts < max_attempts:
            attempts += 1
            # escoger un archivo original como base
            src = random.choice(wavs)
            try:
                y = load_and_resample(src, sr_local)
            except Exception:
                continue

            # aplicar transformaciones aleatorias
            y_aug, transforms = apply_random_augmentations(y, sr_local=sr_local, n_transforms=None, target_length_samples=target_len_samples)

            # si el resultado es demasiado corto o NaN, descartar
            if np.isnan(y_aug).any() or len(y_aug) < 1:
                continue

            # crear nombre de archivo descriptivo
            base = Path(src).stem
            uid = uuid.uuid4().hex[:8]
            tf_tag = "-".join(transforms) if transforms else "none"
            dst_name = f"{base}_aug_{tf_tag}_{uid}.wav"
            dst_path = os.path.join(dst_class_folder, dst_name)

            # guardar
            try:
                save_wav(y_aug, sr_local, dst_path)
                created += 1
                pbar.update(1)
            except Exception:
                # si falla guardar, ignorar
                continue
        pbar.close()
        if created < needed:
            print(f"advertencia: solamente se crearon {created} archivos aumentados para {os.path.basename(class_folder)} (objetivo {needed})")

# ---------------------- recorrido principal ----------------------
def main(src_root=SRC_ROOT, dst_root=DST_ROOT, target_count=num_target_per_class, sr_local=sr):
    """recorrer subcarpetas y balancear cada clase"""
    ensure_dir(dst_root)
    # listar subcarpetas en src_root (solo primer nivel)
    classes = [p for p in os.listdir(src_root) if os.path.isdir(os.path.join(src_root, p))]
    # excluir la posible carpeta data_augmentation si está dentro del src
    classes = [c for c in classes if c != os.path.basename(dst_root)]

    print(f"encontradas {len(classes)} clases en: {src_root}")
    for cls in classes:
        class_folder = os.path.join(src_root, cls)
        dst_class_folder = os.path.join(dst_root, cls)
        print(f"\nprocesando clase: {cls}")
        process_class_folder(class_folder, dst_class_folder, target_count=target_count, sr_local=sr_local)

    print("\nproceso completado.")

# ---------------------- ejecución directa ----------------------
if __name__ == "__main__":
    # si quieres cambiar parametros, hacerlo aquí sin modificar funciones
    # ejemplo: num_target_per_class = 300 ; sr = 8000
    main()


encontradas 8 clases en: D:\dataset pf\COPIA\organized_by_type\FILTERED_AUDIO_10s

procesando clase: Asthma


copiando existentes Asthma: 100%|██████████| 74/74 [00:03<00:00, 21.07it/s]
generando aumentos Asthma: 100%|███████| 1704/1704 [04:46<00:00,  5.94it/s]



procesando clase: BRON


copiando existentes BRON: 100%|████████████| 79/79 [00:20<00:00,  3.88it/s]
generando aumentos BRON: 100%|█████████| 1699/1699 [07:09<00:00,  3.95it/s]



procesando clase: COPD


copiando COPD: 100%|███████████████████| 1778/1778 [04:10<00:00,  7.10it/s]



procesando clase: Healthy


copiando existentes Healthy: 100%|███████| 150/150 [00:33<00:00,  4.42it/s]
generando aumentos Healthy: 100%|██████| 1628/1628 [06:23<00:00,  4.24it/s]



procesando clase: Lung Fibrosis


copiando existentes Lung Fibrosis: 100%|███| 11/11 [00:01<00:00,  5.58it/s]
generando aumentos Lung Fibrosis: 100%|█| 1767/1767 [07:40<00:00,  3.84it/s



procesando clase: Plueral Effusion


copiando existentes Plueral Effusion: 100%|██| 6/6 [00:01<00:00,  4.20it/s]
generando aumentos Plueral Effusion: 100%|█| 1772/1772 [05:14<00:00,  5.64i



procesando clase: Pneumonia


copiando existentes Pneumonia: 100%|███████| 86/86 [00:15<00:00,  5.43it/s]
generando aumentos Pneumonia: 100%|████| 1692/1692 [06:30<00:00,  4.34it/s]



procesando clase: RTI


copiando existentes RTI: 100%|█████████████| 46/46 [00:10<00:00,  4.45it/s]
generando aumentos RTI: 100%|██████████| 1732/1732 [06:17<00:00,  4.59it/s]


proceso completado.



