# Proyecto Machine Learning

#### Dependencias que se necesitan

In [17]:
# Instalar dependencias necesarias para detectar IPUs y generar archivos TextGrid

!pip install webrtcvad praatio pydub --quiet

# Descargar ffmpeg si no está instalado (solo en Colab o sistemas que lo requieran)
import shutil
if not shutil.which("ffmpeg"):
    print("ffmpeg no está instalado. Intentando instalar...")
    !apt-get update && apt-get install -y ffmpeg
else:
    print("✓ ffmpeg ya está instalado.")

# Confirmación de instalación
print("✅ Instalación completada. Listo para usar funciones de VAD y TextGrid.")


✓ ffmpeg ya está instalado.
✅ Instalación completada. Listo para usar funciones de VAD y TextGrid.


#### Librerias que se utilizarán

In [1]:
import subprocess
import os
import tempfile
import whisper
from pydub import AudioSegment
import re
import wave
import webrtcvad
from difflib import SequenceMatcher
from praatio import textgrid

### Función para transformar audios mp3 a wav

In [2]:
carpeta_audios = "./mp3"
carpeta_salida_audios = "./wav"

for archivo in os.listdir(carpeta_audios):
    if archivo.endswith(".mp3"):
        mp3_path = os.path.join(carpeta_audios, archivo)
        nombre_base = os.path.splitext(archivo)[0]
        wav_path = os.path.join(carpeta_salida_audios, f"{nombre_base}.wav")

        
        comando = f'ffmpeg -y -i "{mp3_path}" -ar 16000 -ac 1 "{wav_path}"'
        try:
            subprocess.run(comando, shell=True, check=True)
            print(f"[✓] Convertido: {archivo} → {nombre_base}.wav")
        except subprocess.CalledProcessError as e:
            print(f"[✗] Error al convertir {archivo}: {e}")


[✓] Convertido: 202003311300.mp3 → 202003311300.wav


### Uso de Whisper de OpenAI para transcribir segmentos de audio

In [3]:
def transcribe_audio_segment(audio_path, start, end, model=None):
    if model is None:
        model = whisper.load_model("base")

    audio = AudioSegment.from_wav(audio_path)
    segment = audio[start * 1000:end * 1000] 

    with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
        segment.export(tmp_file.name, format="wav")
        tmp_path = tmp_file.name

    result = model.transcribe(tmp_path, language="es")
    os.remove(tmp_path)
    return result["text"].strip()

#### Funciones 

In [4]:
def transcribir_audio_a_html(audio_path, output_path):
    model = whisper.load_model("base")  
    result = model.transcribe(audio_path, language="es")

    with open(output_path, 'w', encoding='utf-8') as f:
        f.write("<html><body><p>")
        f.write(result["text"])
        f.write("</p></body></html>")

    print(f"[✓] Transcripción guardada en: {output_path}")


In [5]:
def detectar_ipus(audio_path, output_textgrid):
    try:
        vad = webrtcvad.Vad(3)

        with wave.open(audio_path, 'rb') as wf:
            sample_rate = wf.getframerate()
            channels = wf.getnchannels()
            sample_width = wf.getsampwidth()

          
            assert sample_rate == 16000, "El audio debe estar a 16kHz"
            assert channels == 1, "El audio debe ser mono (1 canal)"

           
            frame_duration_ms = 30
            frame_size_bytes = int(sample_rate * frame_duration_ms / 1000) * sample_width

            
            frames = []
            while True:
                frame = wf.readframes(frame_size_bytes // sample_width)
                if len(frame) < frame_size_bytes:
                    break
                frames.append(frame)

        
        voiced_segments = []
        for i, frame in enumerate(frames):
            if vad.is_speech(frame, sample_rate):
                start = i * frame_duration_ms / 1000.0
                end = (i + 1) * frame_duration_ms / 1000.0
                voiced_segments.append((start, end))

        
        ipus = []
        if voiced_segments:
            cur_start, cur_end = voiced_segments[0]
            for start, end in voiced_segments[1:]:
                if start - cur_end <= 0.3:  
                    cur_end = end
                else:
                    ipus.append((cur_start, cur_end))
                    cur_start, cur_end = start, end
            ipus.append((cur_start, cur_end))

        
        entries = [(start, end, "") for start, end in ipus]
        tg = textgrid.Textgrid()
        tg.minTimestamp = 0
        tg.maxTimestamp = ipus[-1][1] if ipus else 1.0
        tg.addTier(textgrid.IntervalTier("IPU", entries, tg.minTimestamp, tg.maxTimestamp))
        tg.save(output_textgrid, format="short_textgrid", includeBlankSpaces=True)

        print(f"[✓] TextGrid IPU guardado en: {output_textgrid}")

    except AssertionError as e:
        print(f"[Error] Audio inválido: {e}")

    except Exception as e:
        print(f"[Error] Falló la detección de IPUs: {e}")

In [6]:
def extract_text_with_regex(html_path):
    with open(html_path, 'r', encoding='utf-8') as f:
        content = f.read()
        paragraphs = re.findall(r'<p>(.*?)</p>', content, re.DOTALL)
        text = ' '.join(paragraphs)
        return re.sub(r'\s+', ' ', text).strip()

In [7]:
def align_transcription_to_ipu(ipu_intervals, audio_path):
    """
    Alinea cada intervalo IPU con su transcripción usando Whisper.

    Args:
        ipu_intervals (list of tuples): Lista de (start, end, "") desde el TextGrid.
        audio_path (str): Ruta al archivo .wav.

    Returns:
        list of tuples: Lista de (start, end, transcripción) para cada IPU.
    """
    model = whisper.load_model("base")
    aligned = []

    for i, (start, end, _) in enumerate(ipu_intervals):
        duration = end - start

        if duration < 0.2:
        
            aligned.append((start, end, ""))
            continue

        try:
            segment_text = transcribe_audio_segment(audio_path, start, end, model)
            print(f"[✓] IPU {i + 1}/{len(ipu_intervals)} transcrita: '{segment_text}'")
        except Exception as e:
            segment_text = ""
            print(f"[Error] Falló transcripción de IPU {i + 1}: {e}")

        aligned.append((start, end, segment_text))

    return aligned

In [8]:
def get_phonemes_espeak(text):
    try:
        cmd = f'espeak-ng -q -v es -x "{text}"'
        result = subprocess.check_output(cmd, shell=True, encoding='utf-8')
        return result.strip()
    except subprocess.CalledProcessError:
        return ""

In [9]:
def find_transcription_errors(audio_path, ipu_intervals, transc_intervals):
    model = whisper.load_model("base")
    errors = []

    for ipu, transc in zip(ipu_intervals, transc_intervals):
        start, end = ipu[0], ipu[1]
        transc_text = transc[2].strip()

        if not transc_text:
            continue

        ipu_text = transcribe_audio_segment(audio_path, start, end, model)

        if not ipu_text:
            continue

        phon_ipu = get_phonemes_espeak(ipu_text)
        phon_transc = get_phonemes_espeak(transc_text)

        if phon_ipu != phon_transc:
            errors.append((start, end, f"Desajuste fonético"))

    return errors

In [10]:
def export_errors_to_txt(errors, path_txt):
    with open(path_txt, 'w', encoding='utf-8') as f:
        f.write("Inicio\tFin\tDescripción\n")
        for start, end, desc in errors:
            f.write(f"{start:.2f}\t{end:.2f}\t{desc}\n")
    print(f"[✓] Errores exportados a: {path_txt}")


In [11]:
def process_textgrid(ipu_textgrid_path, html_path, output_path, audio_path):
    
    tg = textgrid.openTextgrid(ipu_textgrid_path, includeEmptyIntervals=True)
    ipu_tier = tg.getTier("IPU")
    ipu_intervals = ipu_tier.entries

  
    model = whisper.load_model("base")
    transc_entries = []
    for start, end, _ in ipu_intervals:
        segment_text = transcribe_audio_segment(audio_path, start, end, model)
        transc_entries.append((start, end, segment_text.strip()))

    
    transc_tier = textgrid.IntervalTier("Transc", transc_entries, tg.minTimestamp, tg.maxTimestamp)

    
    error_entries = find_transcription_errors(audio_path, ipu_intervals, transc_entries)
    errors_tier = textgrid.IntervalTier("TranscErrors", error_entries, tg.minTimestamp, tg.maxTimestamp)

    
    tg.addTier(transc_tier)
    tg.addTier(errors_tier)

    
    tg.save(output_path, format="short_textgrid", includeBlankSpaces=True)
    print(f"[✓] TextGrid completo guardado en: {output_path}")

    
    export_errors_to_txt(error_entries, output_path.replace(".TextGrid", "_errores.txt"))



#### Generación de TextGrid, detección de IPUs y alineación tradicional (segmentos de audio + espeak-ng)

In [12]:
carpeta_audios = "./wav"
carpeta_resultados = "./Resultados"

for nombre_archivo in os.listdir(carpeta_audios):
    if nombre_archivo.endswith(".wav"):
        print(f"\n🔊 Procesando: {nombre_archivo}")

        ruta_audio = os.path.join(carpeta_audios, nombre_archivo)
        nombre_base = os.path.splitext(nombre_archivo)[0]

        ruta_html = os.path.join(carpeta_resultados, f"{nombre_base}.html")
        ruta_textgrid = os.path.join(carpeta_resultados, f"{nombre_base}.TextGrid")
        ruta_textgrid_procesado = os.path.join(carpeta_resultados, f"{nombre_base}_v.TextGrid")#Tier con Errores 

        try:
           
            transcribir_audio_a_html(ruta_audio, ruta_html)

            
            detectar_ipus(ruta_audio, ruta_textgrid)


            process_textgrid(ruta_textgrid, ruta_html, ruta_textgrid_procesado, ruta_audio)

        except Exception as e:
            print(f"❌ Error procesando {nombre_archivo}: {e}")



🔊 Procesando: 202003311300.wav




[✓] Transcripción guardada en: ./Resultados\202003311300.html
[✓] TextGrid IPU guardado en: ./Resultados\202003311300.TextGrid
[✓] TextGrid completo guardado en: ./Resultados\202003311300_v.TextGrid
[✓] Errores exportados a: ./Resultados\202003311300_v_errores.txt
