In [None]:
from huggingface_hub import login
token = "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"
print("Hugging Face logging")
login(token)

In [None]:
import torch
device = "mps" if torch.backends.mps.is_available() else ("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

## Automatic Text to Speech (ATS)

#### Generación estática

In [None]:
import torchaudio
from transformers import WhisperProcessor, WhisperForConditionalGeneration

# Función para transcribir un archivo de audio
def transcribe_audio_file(model_asr, processor_asr, audio_path):
    # Cargar el archivo de audio
    waveform, sampling_rate = torchaudio.load(audio_path)

    # Resamplear si es necesario
    if sampling_rate != SAMPLE_RATE:
        resampler = torchaudio.transforms.Resample(orig_freq=sampling_rate, new_freq=SAMPLE_RATE)
        waveform = resampler(waveform)

    # Normalizar el audio para evitar problemas de rendimiento
    waveform = waveform / torch.max(torch.abs(waveform))

    # Procesar el audio
    inputs = processor_asr(waveform.squeeze().numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
    inputs = {key: value.clone().detach().to(device) for key, value in inputs.items()}

    # Generar transcripción con parámetros optimizados
    with torch.no_grad():
        forced_decoder_ids = processor_asr.get_decoder_prompt_ids(language="spanish", task="transcribe")
        predicted_ids = model_asr.generate(
            inputs["input_features"],
            max_length=1000, # generación máxima de output
            num_beams=3,
            early_stopping=False, # evita finalización prematura
            forced_decoder_ids = forced_decoder_ids # Indica el idioma
        )
    return processor_asr.batch_decode(predicted_ids, skip_special_tokens=True)[0]


In [None]:
# Configuración inicial
MODEL_NAME = "openai/whisper-small"  # Modelo preentrenado en Hugging Face
SAMPLE_RATE = 16000
cache_dir = "./models/whisper-small"

# Cargar el modelo y el procesador con cache optimizado
processor = WhisperProcessor.from_pretrained(MODEL_NAME, cache_dir=cache_dir)
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME, cache_dir=cache_dir).to(device)
print("Modelo Whisper cargado correctamente")

In [None]:
import numpy

audio_path = "./provided/no_es_pais.wav"
transcription = transcribe_audio_file(model, processor, audio_path)
print(f"Transcripción: {transcription}")

#### Generación ventana

In [None]:
def chunk_audio(waveform, chunk_size_s, sampling_rate):
    # Tamaño del fragmento en muestras
    chunk_size = int(chunk_size_s * sampling_rate)
    return waveform.split(chunk_size, dim=1)

def transcribe_audio_file_chunked(model_asr, processor_asr, audio_path, chunk_size_s=30):
    global device
    # Cargar el archivo de audio
    waveform, sampling_rate = torchaudio.load(audio_path)

    # Resamplear si es necesario
    if sampling_rate != SAMPLE_RATE:
        resampler = torchaudio.transforms.Resample(orig_freq=sampling_rate, new_freq=SAMPLE_RATE)
        waveform = resampler(waveform)

    # Normalizar el audio
    waveform = waveform / torch.max(torch.abs(waveform)).detach()

    # Dividir el audio en fragmentos
    chunks = chunk_audio(waveform, chunk_size_s, SAMPLE_RATE)

    # Procesar cada fragmento
    transcriptions = []
    for chunk in chunks:
        inputs = processor_asr(chunk.squeeze().numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
        inputs = {key: value.clone().detach().to(device) for key, value in inputs.items()}

        # Generar transcripción
        with torch.no_grad():
            forced_decoder_ids = processor_asr.get_decoder_prompt_ids(language="spanish", task="transcribe")
            predicted_ids = model_asr.generate(inputs["input_features"],forced_decoder_ids=forced_decoder_ids, max_length=500, num_beams=5)
            transcription_tmp = processor_asr.batch_decode(predicted_ids, skip_special_tokens=True)[0]
            transcriptions.append(transcription_tmp)

    # Combinar las transcripciones
    return " ".join(transcriptions)



In [None]:
audio_path = "./provided/no_es_pais.wav"
transcription = transcribe_audio_file_chunked(model, processor, audio_path, chunk_size_s=30)
print(f"Transcripción: {transcription}")

### Limitaciones

##### Tarea ASRB1

Convierta a texto el audio de los ficheros `padrino.wav` y `patria_invento.wav`. Cuando tenga la transcripción, escuche los audios y valores cuantitativamente como de bien funciona el modelo. Para transcribir el audio utilice la función `transcribe_audio_file_chunked`

In [None]:
# TODO: transcribir el audio de "./provided/padrino.wav"


In [None]:
# TODO: transcribir el audio de "./provided/patria_invento.wav"


### ASR en vivo

In [None]:
import sounddevice as sd
import numpy as np
import scipy.io.wavfile as wav
from transformers import pipeline

# Función para grabar audio
def record_audio(duration, sample_rate):
    print("Grabando...")
    audio = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, dtype='float32')
    sd.wait()
    print("Grabación completada.")
    return audio.flatten()

# Convertir audio a formato compatible con el modelo (int16)
def preprocess_audio(audio):
    # Convertir de float32 a int16
    audio_int16 = np.int16(audio * 32767)
    return audio_int16



In [None]:
# Configuración de grabación
SAMPLE_RATE = 16000  # Frecuencia de muestreo
DURATION = 5  # Duración en segundos

# Grabar el audio
audio = record_audio(DURATION, SAMPLE_RATE)
# Preprocesar el audio
audio_int16 = preprocess_audio(audio)

# Guardar temporalmente el audio como WAV
output_filename = "output.wav"
wav.write(output_filename, SAMPLE_RATE, audio_int16)

# Usar el modelo ASR
transcription = transcribe_audio_file_chunked(model, processor, output_filename, chunk_size_s=30)
print(f"Transcripción: {transcription}")

### Generación ventana en vivo (live stream)

Ventanas de audio:

* El audio se graba en bloques de duración fija (CHUNK_DURATION), definidos por CHUNK_SIZE.

Callback de grabación:

* La función audio_callback se ejecuta cada vez que se captura un fragmento de audio.
* El fragmento se procesa y se convierte al formato esperado por el modelo (int16).

Transcripción en vivo:

* El fragmento de audio capturado se pasa directamente al modelo ASR de Hugging Face.
* La transcripción se imprime en la consola en tiempo real.

Control del flujo:

* sd.InputStream mantiene el flujo de grabación abierto.
* El programa se ejecuta indefinidamente hasta que el usuario presiona Ctrl+C.

Notas importantes:
* Latencia: La duración de CHUNK_DURATION afecta la latencia de la transcripción. Puedes ajustarla según la capacidad del modelo y la potencia de tu máquina.
 * Rendimiento: Procesar en tiempo real puede ser intensivo. Asegúrate de que tu equipo tenga suficiente capacidad para manejar las solicitudes.

In [None]:
import sounddevice as sd
import numpy as np
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration


# Configuración inicial
MODEL_NAME = "openai/whisper-small"
SAMPLE_RATE = 16000
cache_dir = "./models/whisper-small"
CHUNK_DURATION = 5  # en segundos
CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION)


# Cargar el modelo y el procesador con cache optimizado
processor = WhisperProcessor.from_pretrained(MODEL_NAME, cache_dir=cache_dir)
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME, cache_dir=cache_dir).to(device)
print("Modelo Whisper cargado correctamente")


def transcribe_audio_live(model_asr, processor_asr, audio_chunk):
    # Preprocesar audio para Whisper
    inputs = processor_asr(audio_chunk, sampling_rate=SAMPLE_RATE, return_tensors="pt")
    inputs = {key: value.clone().detach().to(model_asr.device) for key, value in inputs.items()}

    # Generar transcripción
    with torch.no_grad():
        forced_decoder_ids = processor_asr.get_decoder_prompt_ids(language="spanish", task="transcribe")
        predicted_ids = model_asr.generate(inputs["input_features"], max_length=448, num_beams=5, forced_decoder_ids=forced_decoder_ids)
        transcription = processor_asr.batch_decode(predicted_ids, skip_special_tokens=True)[0]
    return transcription

def audio_callback(indata, frames, time, status):
    global model, processor
    if status:
        print(f"Estado de grabación: {status}")
    audio_chunk = indata.squeeze().astype(np.float32)
    try:
        transcription = transcribe_audio_live(model, processor, audio_chunk)
        print("Transcripción:", transcription)
    except Exception as e:
        print("Error durante la transcripción:", e)

def start_live_transcription(chunk_size, sampling_rate):
    print("Iniciando transcripción en vivo con Whisper. Presiona Ctrl+C para detener.")
    try:
        with sd.InputStream(
            samplerate=sampling_rate,
            channels=1,
            dtype="float32",
            blocksize=chunk_size,
            callback=audio_callback
        ):
            while True:
                sd.sleep(6000)
    except KeyboardInterrupt:
        print("Transcripción detenida.")

# Iniciar transcripción
# TODO: Uncomment -> start_live_transcription(CHUNK_SIZE, SAMPLE_RATE)
# TODO: probar cambiando el tiempo de duración de la grabación y observar los distintos resultados

### Evaluar diversos modelos


In [None]:
from datasets import load_dataset, Audio

# Cargar el dataset
data = load_dataset("mozilla-foundation/common_voice_11_0", "es", split="test", cache_dir="./data/common_voice_11_0_test")
data = data.cast_column("audio", Audio(sampling_rate=16000))
print("Dataset cargado correctamente")

# Preprocesamiento: Normalización del texto
def normalize_text(batch):
    text = batch["sentence"].lower().strip()
    batch["sentence"] = text
    return batch

data = data.map(normalize_text)
print("Texto normalizado")
# Filtrar el dataset por idioma y clientId
def filter_dataset(batch):
    return batch["client_id"] == "0d461bf9e0450a750b67f5ec88f07d9e2bd8b5a0c46f00bbf6f5d7a4a50a0f1fa46222902c3e70fd317747176028de656c385e09c6270e08aab36c419e8f7d7c" #Notese que se ha buscado una voz con acento argentino

# TODO: visite la web del dataset y escuche el audio que estamos filtrando
data = data.filter(filter_dataset)
print("Dataset filtrado por clientId")
print(data)

Para evaluar los modelos, vamos a utilizar la metrica WER. El **Word Error Rate (WER)** es una métrica utilizada para evaluar la precisión de los sistemas de reconocimiento automático del habla (ASR). Mide la proporción de errores cometidos en las transcripciones generadas por el modelo, comparándolas con una referencia. Estos errores se calculan como la suma de sustituciones, inserciones y eliminaciones necesarias para alinear la transcripción con la referencia, y se normalizan dividiendo por el número total de palabras en la referencia. Un WER más bajo indica un mejor rendimiento del sistema.



In [None]:
import os
import torch
from datasets import load_dataset, Audio
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
from evaluate import load
import matplotlib.pyplot as plt


# Configuración de los modelos
whisper_model_name = "openai/whisper-small"
wav2vec_model_name = "facebook/wav2vec2-large-xlsr-53-spanish"

# Cargar Whisper
whisper_processor = WhisperProcessor.from_pretrained(whisper_model_name)
whisper_model = WhisperForConditionalGeneration.from_pretrained(whisper_model_name).to(device)

# Cargar Wav2Vec2
wav2vec_processor = Wav2Vec2Processor.from_pretrained(wav2vec_model_name)
wav2vec_model = Wav2Vec2ForCTC.from_pretrained(wav2vec_model_name).to(device)

# Métrica WER (Word Error Rate)
wer_metric = load("wer")

# Evaluación de ambos modelos
def evaluate_model(processor, model, dataset, is_whisper=False):
    global device
    references = []
    predictions = []

    for batch in dataset:
        audio = batch["audio"]["array"]
        inputs = processor(audio, sampling_rate=16000, return_tensors="pt", padding=True)
        inputs = {key: value.clone().detach().to(device) for key, value in inputs.items()}

        if is_whisper:
            # Asegurar que las características Mel estén rellenadas a la longitud requerida
            input_features = inputs["input_features"]
            padding_length = 3000 - input_features.shape[-1]
            if padding_length > 0:
                input_features = torch.nn.functional.pad(input_features, (0, padding_length), "constant", 0)

            inputs["input_features"] = input_features

        with torch.no_grad():
            transcription=""
            if is_whisper:
                forced_decoder_ids = processor.get_decoder_prompt_ids(language="spanish", task="transcribe")
                predicted_ids = model.generate(inputs["input_features"], forced_decoder_ids=forced_decoder_ids)
                transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
            else:
                logits = model(inputs["input_values"]).logits
                predicted_ids = torch.argmax(logits, dim=-1)
                transcription = processor.batch_decode(predicted_ids)[0]

            predictions.append(transcription.lower().strip())
            references.append(batch["sentence"])

    wer = wer_metric.compute(predictions=predictions, references=references)
    return wer

# Evaluar Whisper
print("Evaluando Whisper...")
whisper_wer = evaluate_model(whisper_processor, whisper_model, data, is_whisper=True)
print(f"WER de Whisper: {whisper_wer:.4f}")

# Evaluar Wav2Vec2
print("Evaluando Wav2Vec2...")
wav2vec_wer = evaluate_model(wav2vec_processor, wav2vec_model, data)
print(f"WER de Wav2Vec2: {wav2vec_wer:.4f}")

# Crear gráfica de comparación
models = ["Whisper", "Wav2Vec2"]
wer_scores = [whisper_wer, wav2vec_wer]

plt.figure(figsize=(8, 6))
plt.bar(models, wer_scores)
plt.title("Comparación de WER entre modelos ASR")
plt.ylabel("WER (Word Error Rate)")
plt.xlabel("Modelo")
plt.ylim(0, max(wer_scores) + 0.1)
plt.show()


### Tarea ASR2

Cambiar el código anterior para evaluar los modelos con las primeras 30 muestras del dataset de test. ¿Qué ocurre con los resultados?

### Tarea ASR3

Cambiar el modelo `openai/whisper-small` por `openai/whisper-tiny`. ¿Qué ocurre con los resultados?