# Introducción

En este notebook, aprenderemos cómo descargar un modelo preentrenado de OpenAI Whisper desde Hugging Face y adaptarlo al formato RKNN para ejecutarlo en un chip Rockchip RK3588. Este proceso es útil para aprovechar las capacidades de inferencia de hardware del RK3588 al trabajar con modelos de procesamiento de lenguaje natural y reconocimiento de voz.

## Requisitos

Este notebook ha sido probado en Python 3.10.16 y utiliza la herramienta RKNNToolkit v2.3.0. Asegúrate de instalar las dependencias necesarias antes de continuar.

### Instalación de dependencias

1. Instala las dependencias requeridas para RKNNToolkit v2.3.0 desde el siguiente archivo de requisitos:
    ```
    https://github.com/airockchip/rknn-toolkit2/blob/v2.3.0/rknn-toolkit2/packages/x86_64/requirements_cp310-2.3.0.txt
    ```
    Puedes instalarlas ejecutando:
    ```bash
    pip install -r requirements_cp310-2.3.0.txt
    ```

2. Instala el paquete `rknn_toolkit2` correspondiente:
    ```
    https://github.com/airockchip/rknn-toolkit2/blob/v2.3.0/rknn-toolkit2/packages/x86_64/rknn_toolkit2-2.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
    ```
    Puedes instalarlo ejecutando:
    ```bash
    pip install rknn_toolkit2-2.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
    ```

3. Instala las siguientes bibliotecas adicionales necesarias para trabajar con el modelo Whisper:
    ```bash
    pip install openai-whisper librosa onnxsim soundfile
    ```

## Paso 1. Descargar el modelo convertido en formato ONNX

Una vez que hayas instalado todas las dependencias, estarás listo para continuar con el proceso de descarga, conversión y optimización del modelo Whisper.

Imports y definiendo valores de configuración para el proceso de descarga del modelo

In [1]:
import sys
import os
import whisper
from whisper import audio as whisper_audio # Específicamente para mel_filters
import onnx
from onnxsim import simplify
from onnx import shape_inference
import torch
import numpy as np # Necesario para guardar los filtros mel con formato
import argparse
import warnings
from transformers import WhisperProcessor, WhisperForConditionalGeneration, WhisperConfig

# --- Configuraciones Iniciales ---
warnings.filterwarnings("ignore", category=UserWarning) # Ignorar warnings comunes de HuggingFace/Torch

# Directorio para guardar modelos, vocabulario, filtros, etc.
MODEL_SAVE_DIR = "./model"
os.makedirs(MODEL_SAVE_DIR, exist_ok=True) # Crear directorio si no existe

DEFAULT_MODEL_NAME = "rjac/whisper-tiny-spanish" # Modelo Whisper a usar
DEVICE = torch.device("cpu") # Dispositivo para PyTorch (exportación se hace en CPU)
FIXED_BATCH_SIZE = 1 # Tamaño de batch fijo para exportación ONNX
FIXED_NUM_FRAMES = 3000 # Número fijo de frames Mel (correspondiente a 30s de audio a 16kHz)
N_MELS_DEFAULT = 80 # Número de bandas Mel por defecto (se confirmará con el modelo)
N_FFT = 400 # Tamaño de la FFT
FIXED_DECODER_INPUT_LENGTH = 12

mel_filters_path = None # Ruta para guardar los filtros Mel
vocab_path = None # Ruta para guardar el vocabulario
onnx_encoder_path = None # Ruta para guardar el modelo ONNX del encoder
onnx_decoder_path = None # Ruta para guardar el modelo ONNX del decoder

  from .autonotebook import tqdm as notebook_tqdm


Funciones utilitarias para el proceso de descarga del modelo:

 1.  **Carga el modelo y procesador** de Hugging Face (`rjac/whisper-tiny-spanish`).
 2.  **Extrae los filtros Mel** específicos usados por Whisper y los guarda en un archivo `.txt`. Estos filtros son necesarios para calcular correctamente el espectrograma Mel durante la inferencia.
 3.  **Extrae el vocabulario** del tokenizador, incluyendo tokens especiales y de timestamp (generándolos si es necesario), y lo guarda en `vocab_es.txt`. Este archivo mapea IDs de tokens a sus representaciones textuales.
 4.  **Prepara datos de ejemplo** con las dimensiones correctas (batch=1, n_mels, frames=3000) para la exportación.
 5.  **Exporta el Encoder** del modelo a formato ONNX (`whisper_encoder_*.onnx`).
 6.  **Exporta el Decoder** (incluyendo la capa de proyección final) a formato ONNX (`whisper_decoder_*.onnx`).
 7.  **Simplifica y añade información de forma** a los modelos ONNX generados para optimizarlos y mejorar la compatibilidad.


In [2]:
def setup_model(model_name):
    """ Carga el modelo y procesador de Hugging Face. """
    print(f"Cargando modelo y procesador desde Hugging Face: {model_name}")
    if whisper is None: # Verificar si la importación de whisper funcionó
        raise ImportError("La biblioteca 'whisper' no está instalada o no se pudo importar.")
    try:
        processor = WhisperProcessor.from_pretrained(model_name)
        model = WhisperForConditionalGeneration.from_pretrained(model_name).to(DEVICE).eval()
        config = model.config
        print(f"Modelo {model_name}, Procesador y Configuración cargados.")

        # Extraer dimensiones clave (usando nombres consistentes con inferencia)
        global N_MELS # Usar global para actualizar la constante si es necesario
        N_MELS = processor.feature_extractor.feature_size
        D_MODEL = config.d_model
        VOCAB_SIZE = config.vocab_size
        # Longitud de secuencia de salida del encoder (depende de los frames de entrada y la reducción del modelo)
        # Para Whisper estándar con 3000 frames (30s), la salida suele ser 1500 tokens.
        ENCODER_SEQUENCE_LENGTH = FIXED_NUM_FRAMES // 2
        MAX_TARGET_POSITIONS = config.max_target_positions # Límite de tokens del decoder

        print("\n--- Dimensiones Clave del Modelo ---")
        print(f"  Nombre del Modelo: {model_name}")
        print(f"  Batch Size (Fijo para Exportación): {FIXED_BATCH_SIZE}")
        print(f"  Bandas Mel (N_MELS): {N_MELS}")
        print(f"  Frames Mel Fijos (Entrada Encoder): {FIXED_NUM_FRAMES}")
        print(f"  Longitud Secuencia Salida Encoder: {ENCODER_SEQUENCE_LENGTH}")
        print(f"  Tamaño Oculto (d_model): {D_MODEL}")
        print(f"  Tamaño Vocabulario: {VOCAB_SIZE}")
        print(f"  Máx Tokens Decoder: {MAX_TARGET_POSITIONS}")
        print("-----------------------------------\n")

        # Actualizar N_MELS_DEFAULT por si acaso se usa más adelante
        global N_MELS_DEFAULT
        N_MELS_DEFAULT = N_MELS

        return model, processor, config
    except Exception as e:
        print(f"Error cargando el modelo/procesador '{model_name}': {e}")
        print("Verifica que el nombre del modelo es correcto y tienes conexión a internet.")
        print("También asegúrate de tener las bibliotecas 'transformers' y 'torch' instaladas.")
        raise

def save_mel_filters(n_mels, save_dir):
    """ Extrae los filtros Mel de la biblioteca Whisper y los guarda en formato txt. """
    print(f"\nExtrayendo y guardando {n_mels} filtros Mel...")
    if whisper_audio is None:
        raise ImportError("El módulo 'whisper.audio' no está disponible. No se pueden extraer los filtros Mel.")

    filename = os.path.join(save_dir, f"mel_{n_mels}_filters.txt")
    n_fft = N_FFT # Usar constante definida globalmente

    try:
        # Generar filtros usando la función de la biblioteca whisper
        mel_filters_tensor = whisper_audio.mel_filters(device=DEVICE, n_mels=n_mels) # Usa n_fft=400 por defecto
        # Aplanar el tensor y convertir a numpy array en CPU para guardarlo
        mel_filters_flat = mel_filters_tensor.cpu().numpy().flatten()

        # Guardar usando numpy.savetxt para control de formato preciso
        # '%.18e' usa notación científica con 18 decimales
        np.savetxt(filename, mel_filters_flat, fmt='%.18e', newline='\n')
        print(f"Filtros Mel guardados en: {filename}")
        return filename
    except AttributeError:
        print("Error: No se pudo encontrar 'whisper.audio.mel_filters'. ¿Está instalada la biblioteca 'openai-whisper' correctamente?")
        raise
    except Exception as e:
        print(f"Error extrayendo o guardando los filtros Mel: {e}")
        raise

def save_vocabulary(processor, save_dir):
    """ Extrae el vocabulario completo (base + añadido + timestamps) y lo guarda. """
    print("\nExtrayendo y guardando el vocabulario completo...")
    if not hasattr(processor, 'tokenizer'):
         raise ValueError("El objeto Processor no tiene un atributo 'tokenizer'.")

    tokenizer = processor.tokenizer
    vocab_filename = os.path.join(save_dir, "vocab_es.txt") # Nombre específico

    combined_vocab = {}
    added_token_ids = set()

    # Parámetros para generación de Timestamps (ajustados a Whisper)
    TS_START_ID = 50364 # ID de <|0.00|>
    TS_END_ID = 51864   # ID de <|30.00|>
    NUM_TS_TOKENS = (TS_END_ID - TS_START_ID) + 1 # Debe ser 1501
    TS_START_TIME_SEC = 0.00
    TS_TIME_INCREMENT_SEC = 0.02 # Incremento de 20ms por token

    print(f"Parámetros de Timestamp: IDs {TS_START_ID}-{TS_END_ID} ({NUM_TS_TOKENS} tokens)")

    try:
        # 1. Procesar tokens añadidos existentes (si los hay)
        if hasattr(tokenizer, 'added_tokens_decoder') and tokenizer.added_tokens_decoder:
            added_tokens_decoder = tokenizer.added_tokens_decoder
            print(f"Procesando {len(added_tokens_decoder)} tokens añadidos encontrados en el tokenizer...")
            for token_id, added_token_obj in added_tokens_decoder.items():
                combined_vocab[token_id] = added_token_obj.content
                added_token_ids.add(token_id)
        else:
            print("Advertencia: No se encontraron tokens añadidos explícitos ('added_tokens_decoder') en el tokenizer.")

        # 2. Comprobar y/o generar tokens de Timestamp
        if TS_START_ID in added_token_ids:
            print("-> Información: Los tokens de timestamp parecen estar presentes en los tokens añadidos.")
        else:
            print(f"-> Advertencia: El token de inicio de timestamp ID={TS_START_ID} ('<|0.00|>') NO se encontró.")
            print(f"   Intentando AUTO-GENERAR {NUM_TS_TOKENS} tokens de timestamp.")
            generation_count = 0
            for i in range(NUM_TS_TOKENS):
                current_id = TS_START_ID + i
                if current_id > TS_END_ID: break # Seguridad

                current_time = TS_START_TIME_SEC + i * TS_TIME_INCREMENT_SEC
                time_str = f"{round(current_time, 2):.2f}"
                token_str = f"<|{time_str}|>"

                if current_id in combined_vocab:
                    print(f"   Advertencia: Sobrescribiendo ID {current_id} ('{combined_vocab[current_id]}') con TS generado '{token_str}'")
                combined_vocab[current_id] = token_str
                added_token_ids.add(current_id) # Marcar como añadido
                generation_count += 1
            print(f"   Se generaron y añadieron {generation_count} tokens de timestamp.")

        # 3. Procesar el vocabulario base (excluyendo los añadidos/generados)
        vocab_size = tokenizer.vocab_size
        print(f"Iterando hasta vocab_size ({vocab_size}) para encontrar tokens base (excluyendo {len(added_token_ids)} IDs)...")
        base_token_count = 0
        for token_id in range(vocab_size):
            if token_id not in added_token_ids:
                token_str = tokenizer.convert_ids_to_tokens(token_id)
                if token_str is not None:
                    if token_id not in combined_vocab:
                        combined_vocab[token_id] = token_str
                        base_token_count += 1
                    # else: # Muy raro que ocurra si la lógica es correcta
                    #     print(f"Advertencia: ID base {token_id} ('{token_str}') ya estaba en combined_vocab?")
                # else: # ID dentro de vocab_size pero sin representación?
                #     print(f"Advertencia: convert_ids_to_tokens devolvió None para ID base potencial {token_id}")

        print(f"Se encontraron {base_token_count} tokens base.")

        # 4. Ordenar y escribir al archivo
        if not combined_vocab:
            raise ValueError("El vocabulario combinado está vacío después del procesamiento.")

        print(f"Total de tokens únicos en el mapa combinado final: {len(combined_vocab)}")
        sorted_combined_items = sorted(combined_vocab.items(), key=lambda item: item[0]) # Ordenar por ID

        with open(vocab_filename, 'w', encoding='utf-8') as f:
            for token_id, token_str in sorted_combined_items:
                f.write(f"{token_id} {token_str}\n")

        print(f"Vocabulario completo guardado en: {vocab_filename}")
        return vocab_filename

    except Exception as e:
        print(f"Error extrayendo o guardando el vocabulario combinado: {e}")
        import traceback
        traceback.print_exc()
        raise


def setup_dummy_data(model, processor, n_mels):
    """ Crea datos de entrada dummy con las dimensiones correctas para exportar ONNX. """
    print("\nPreparando datos dummy para exportación ONNX...")
    if n_mels != processor.feature_extractor.feature_size:
        warnings.warn(f"n_mels especificado ({n_mels}) difiere del feature_size del procesador ({processor.feature_extractor.feature_size}). Usando el valor del procesador.")
        n_mels = processor.feature_extractor.feature_size

    # Crear espectrograma Mel dummy (valores aleatorios)
    # Dimensiones: [batch_size, n_mels, n_frames]
    x_mel = torch.randn(FIXED_BATCH_SIZE, n_mels, FIXED_NUM_FRAMES, dtype=torch.float32).to(DEVICE)

    # Ejecutar el encoder una vez para obtener la salida (hidden_states) que necesita el decoder
    print(f"Ejecutando encoder con entrada dummy de forma: {x_mel.shape}")
    try:
        with torch.no_grad(): # No necesitamos calcular gradientes
             encoder_output_obj = model.model.encoder(x_mel)
        encoder_hidden_states = encoder_output_obj.last_hidden_state # Tensor [batch, seq_len_encoder, d_model]
        print(f"Salida del Encoder (hidden_states) obtenida con forma: {encoder_hidden_states.shape}")
    except Exception as e:
         print(f"Error al ejecutar el encoder con datos dummy: {e}")
         raise

    # Crear tokens de entrada dummy para el decoder
    # Usamos una longitud corta representativa, p.ej., 12 tokens
    # Dimensiones: [batch_size, sequence_length]
    dummy_decoder_input_length = FIXED_DECODER_INPUT_LENGTH
    x_tokens = torch.randint(0, model.config.vocab_size, (FIXED_BATCH_SIZE, dummy_decoder_input_length), dtype=torch.long).to(DEVICE)
    print(f"Tokens de entrada dummy para Decoder creados con forma: {x_tokens.shape}")

    print("\n--- Formas de Datos Dummy ---")
    print(f"  Entrada Encoder (x_mel): {x_mel.shape}, dtype: {x_mel.dtype}")
    print(f"  Salida Encoder (encoder_hidden_states): {encoder_hidden_states.shape}, dtype: {encoder_hidden_states.dtype}")
    print(f"  Entrada Decoder (x_tokens): {x_tokens.shape}, dtype: {x_tokens.dtype}")
    print("---------------------------\n")

    return x_mel, encoder_hidden_states, x_tokens


def add_shape_info(model_path_in, model_path_out):
    """ Intenta añadir información de forma/tipo a un modelo ONNX. """
    try:
        print(f"Intentando añadir información de forma/tipo a: {os.path.basename(model_path_in)}")
        model = onnx.load(model_path_in)
        # Eliminar información de forma existente si la hubiera (a veces causa problemas)
        onnx.shape_inference.infer_shapes(model, check_type=True, strict_mode=False, data_prop=True)
        # Guardar el modelo con la información inferida
        onnx.save(model, model_path_out)
        print(f"-> Modelo con información de forma/tipo guardado en: {os.path.basename(model_path_out)}")
        return True
    except Exception as e:
        print(f"-> ¡Error! durante la inferencia/guardado de formas para {os.path.basename(model_path_in)}: {e}")
        # traceback.print_exc()
        return False

def simplify_onnx_model(model_path):
    """ Simplifica un modelo ONNX usando onnxsim y luego intenta añadir info de forma. """
    try:
        print(f"Simplificando modelo ONNX: {os.path.basename(model_path)}")
        original_model = onnx.load(model_path)
        simplified_model, check = simplify(original_model) # onnxsim

        if check:
            print("-> Simplificación exitosa.")
            # Guardar el modelo simplificado temporalmente (sobrescribiendo el original)
            onnx.save(simplified_model, model_path)
            # Intentar añadir info de forma al modelo simplificado
            if add_shape_info(model_path, model_path): # Sobrescribe de nuevo
                 print(f"-> Éxito: Modelo simplificado y con info de forma guardado: {os.path.basename(model_path)}")
            else:
                 print(f"-> Advertencia: Modelo simplificado pero falló al añadir info de forma: {os.path.basename(model_path)}")
                 # Nota: El archivo ahora contiene el modelo simplificado pero SIN la nueva info de forma.
        else:
            print(f"-> ¡Error! No se pudo simplificar el modelo: {os.path.basename(model_path)}")
            # Intentar añadir info de forma al modelo original como fallback
            print("-> Intentando añadir info de forma al modelo original...")
            if add_shape_info(model_path, model_path):
                print(f"-> Éxito (Fallback): Info de forma añadida al modelo original: {os.path.basename(model_path)}")
            else:
                print(f"-> Fallo Total: Ni simplificación ni info de forma para: {os.path.basename(model_path)}")

    except Exception as e:
        print(f"-> ¡Error! durante la simplificación/info-forma de ONNX para {os.path.basename(model_path)}: {e}")
        # traceback.print_exc()


# Wrapper para incluir la capa de proyección en la exportación del Decoder
class DecoderWithProjectionWrapper(torch.nn.Module):
    """ Envuelve el decoder y la capa de proyección final para exportarlos juntos. """
    def __init__(self, decoder, projection_layer):
        super().__init__()
        self.decoder = decoder
        self.proj_out = projection_layer

    def forward(self, input_ids, encoder_hidden_states):
        # Pasar entradas al decoder base
        decoder_outputs = self.decoder(
            input_ids=input_ids,
            encoder_hidden_states=encoder_hidden_states,
            use_cache=False # Desactivar caché KV para exportación simple
        )
        # Obtener la última capa oculta del decoder
        last_hidden_state = decoder_outputs[0]
        # Aplicar la capa de proyección final para obtener los logits
        logits = self.proj_out(last_hidden_state)
        return logits

Ejecución de paso 1 - Descarga de modelo

In [3]:
print("\n" + "="*60)
print("=== Iniciando Parte 1: Preparación del Modelo y Recursos ===")
print("="*60)

onnx_encoder_path = None
onnx_decoder_path = None
mel_filters_path = None
vocab_path = None

try:
    # 1. Cargar Modelo y Procesador
    model, processor, config = setup_model(DEFAULT_MODEL_NAME)
    current_n_mels = processor.feature_extractor.feature_size # Confirmar N_MELS

    # 2. Guardar Filtros Mel
    mel_filters_path = save_mel_filters(current_n_mels, MODEL_SAVE_DIR)

    # 3. Guardar Vocabulario
    vocab_path = save_vocabulary(processor, MODEL_SAVE_DIR)

    # 4. Preparar Datos Dummy para Exportación
    x_mel_dummy, enc_hidden_dummy, x_tokens_dummy = setup_dummy_data(model, processor, current_n_mels)

    # 5. Exportar Encoder
    onnx_encoder_path = os.path.join(MODEL_SAVE_DIR, f"whisper_encoder_{DEFAULT_MODEL_NAME.replace('/', '_')}.onnx")
    print(f"\nExportando Encoder a: {os.path.basename(onnx_encoder_path)}...")
    try:
        torch.onnx.export(
            model.model.encoder,        # El sub-módulo encoder
            (x_mel_dummy),              # Argumentos de entrada (solo el mel espectrograma)
            onnx_encoder_path,          # Ruta de guardado
            input_names=["x"],          # Nombre para la entrada Mel (importante para inferencia)
            output_names=["out"],       # Nombre para la salida (importante para inferencia)
            opset_version=17,           # Versión del ONNX opset (17 es reciente y compatible)
        )
        print("-> Exportación del Encoder completada.")
        simplify_onnx_model(onnx_encoder_path) # Simplificar y añadir info de forma
    except Exception as e:
        print(f"¡Error exportando o simplificando el Encoder!: {e}")
        onnx_encoder_path = None # Marcar como fallido

    # 6. Exportar Decoder con Proyección
    onnx_decoder_path = os.path.join(MODEL_SAVE_DIR, f"whisper_decoder_{DEFAULT_MODEL_NAME.replace('/', '_')}.onnx")
    print(f"\nExportando Decoder con Proyección a: {os.path.basename(onnx_decoder_path)}...")
    try:
        # Verificar si la capa de proyección existe
        if not hasattr(model, 'proj_out'):
                raise AttributeError("El modelo cargado no tiene el atributo 'proj_out' esperado para la capa de proyección final.")

        # Instanciar el wrapper que combina decoder y proyección
        decoder_with_proj_for_export = DecoderWithProjectionWrapper(
            model.model.decoder, # El sub-módulo decoder
            model.proj_out       # La capa de proyección final
        ).eval().to(DEVICE)

        # Exportar el wrapper
        torch.onnx.export(
            decoder_with_proj_for_export,   # El módulo wrapper
            (x_tokens_dummy, enc_hidden_dummy), # Argumentos: tokens de entrada, salida del encoder
            onnx_decoder_path,              # Ruta de guardado
            input_names=["tokens", "audio"], # Nombres de entrada (importante)
            output_names=["out"],        # Nombre de salida (importante)
            opset_version=17
        )
        print("-> Exportación del Decoder con Proyección completada.")
        simplify_onnx_model(onnx_decoder_path) # Simplificar y añadir info de forma
    except Exception as e:
        print(f"¡Error exportando o simplificando el Decoder!: {e}")
        onnx_decoder_path = None # Marcar como fallido


    print("\n" + "="*60)
    print("=== Resumen Parte 1 ===")
    print(f"  Filtros Mel: {'Guardados en ' + os.path.basename(mel_filters_path) if mel_filters_path else 'Fallo'}")
    print(f"  Vocabulario: {'Guardado en ' + os.path.basename(vocab_path) if vocab_path else 'Fallo'}")
    print(f"  Encoder ONNX: {'Guardado en ' + os.path.basename(onnx_encoder_path) if onnx_encoder_path else 'Fallo'}")
    print(f"  Decoder ONNX: {'Guardado en ' + os.path.basename(onnx_decoder_path) if onnx_decoder_path else 'Fallo'}")
    print("="*60 + "\n")

    if not all([mel_filters_path, vocab_path, onnx_encoder_path, onnx_decoder_path]):
            print("ADVERTENCIA: Uno o más artefactos no se pudieron generar correctamente.")

except Exception as e:
    print("\n¡¡¡ERROR CRÍTICO EN LA PARTE 1!!!")
    print(f"Error: {e}")
    import traceback
    traceback.print_exc()
    print("No se puede continuar a las siguientes partes si la preparación falla.")



=== Iniciando Parte 1: Preparación del Modelo y Recursos ===
Cargando modelo y procesador desde Hugging Face: rjac/whisper-tiny-spanish
Modelo rjac/whisper-tiny-spanish, Procesador y Configuración cargados.

--- Dimensiones Clave del Modelo ---
  Nombre del Modelo: rjac/whisper-tiny-spanish
  Batch Size (Fijo para Exportación): 1
  Bandas Mel (N_MELS): 80
  Frames Mel Fijos (Entrada Encoder): 3000
  Longitud Secuencia Salida Encoder: 1500
  Tamaño Oculto (d_model): 384
  Tamaño Vocabulario: 51865
  Máx Tokens Decoder: 448
-----------------------------------


Extrayendo y guardando 80 filtros Mel...
Filtros Mel guardados en: ./model/mel_80_filters.txt

Extrayendo y guardando el vocabulario completo...
Parámetros de Timestamp: IDs 50364-51864 (1501 tokens)
Procesando 107 tokens añadidos encontrados en el tokenizer...
-> Advertencia: El token de inicio de timestamp ID=50364 ('<|0.00|>') NO se encontró.
   Intentando AUTO-GENERAR 1501 tokens de timestamp.
   Se generaron y añadieron 1501

  if input_features.shape[-1] != expected_seq_length:
  if attn_output.size() != (bsz, self.num_heads, tgt_len, self.head_dim):


-> Exportación del Encoder completada.
Simplificando modelo ONNX: whisper_encoder_rjac_whisper-tiny-spanish.onnx
-> Simplificación exitosa.
Intentando añadir información de forma/tipo a: whisper_encoder_rjac_whisper-tiny-spanish.onnx
-> Modelo con información de forma/tipo guardado en: whisper_encoder_rjac_whisper-tiny-spanish.onnx
-> Éxito: Modelo simplificado y con info de forma guardado: whisper_encoder_rjac_whisper-tiny-spanish.onnx

Exportando Decoder con Proyección a: whisper_decoder_rjac_whisper-tiny-spanish.onnx...


  if sequence_length != 1:


-> Exportación del Decoder con Proyección completada.
Simplificando modelo ONNX: whisper_decoder_rjac_whisper-tiny-spanish.onnx
-> Simplificación exitosa.
Intentando añadir información de forma/tipo a: whisper_decoder_rjac_whisper-tiny-spanish.onnx
-> Modelo con información de forma/tipo guardado en: whisper_decoder_rjac_whisper-tiny-spanish.onnx
-> Éxito: Modelo simplificado y con info de forma guardado: whisper_decoder_rjac_whisper-tiny-spanish.onnx

=== Resumen Parte 1 ===
  Filtros Mel: Guardados en mel_80_filters.txt
  Vocabulario: Guardado en vocab_es.txt
  Encoder ONNX: Guardado en whisper_encoder_rjac_whisper-tiny-spanish.onnx
  Decoder ONNX: Guardado en whisper_decoder_rjac_whisper-tiny-spanish.onnx



## Paso 2 - Convertir archivos ONNX a RKNN

Imports para el proceso de exportación a RKNN

In [4]:
import builtins, sys
builtins.exit = sys.exit ## Requerido para evitar que RKNN llame a exit() y cierre el script

import glob
from rknn.api import RKNN

# Valores por defecto para los parámetros
DEFAULT_RKNN_DTYPE = 'fp'
DEFAULT_RKNN_PLATFORM = 'rk3588'

# Plataformas y tipos de datos soportados (ajusta según sea necesario)
SUPPORTED_RKNN_PLATFORMS = ['rk3562', 'rk3566', 'rk3568', 'rk3576', 'rk3588']
SUPPORTED_RKNN_DTYPES = ['i8', 'u8', 'fp']

RKNN_DATASET_PATH = './dataset.txt' # Archivo requerido para RKNN si se implementa cuantización a INT8


Funciones utilitarias para el paso 2 - Convertir el modelo ONNX a RKNN

1.  **Busca los archivos `.onnx`** generados en la Parte 1 (o colocados manualmente) en el directorio `MODEL_SAVE_DIR`.
2.  **Configura la conversión** para una plataforma Rockchip específica (ej. `rk3588`) y un tipo de datos (`fp` para punto flotante, `i8`/`u8` para cuantización entera de 8 bits).
3.  **Si se elige cuantización (`i8`/`u8`)**: Requiere un archivo `dataset.txt` que liste archivos de audio para la calibración. El script intentará usarlo.
4.  **Itera sobre cada archivo `.onnx` encontrado**:
     * Carga el modelo ONNX.
     * Construye el modelo RKNN (aplicando cuantización si se especificó).
     * Exporta el modelo resultante a formato `.rknn`.
5.  **Muestra un resumen** de la conversión.

In [5]:
def convert_onnx_to_rknn(onnx_model_path, platform, dtype, save_dir, dataset_path=None):
    """ Convierte un único modelo ONNX a formato RKNN. """
    if RKNN is None:
        print("Error: La biblioteca RKNN no está disponible. No se puede realizar la conversión.")
        return None

    model_name = os.path.basename(onnx_model_path)
    output_path = os.path.join(save_dir, model_name.replace('.onnx', '.rknn'))
    print(f"\n--- Procesando para RKNN: {model_name} ---")
    print(f"  Plataforma: {platform}, DType: {dtype}")

    # Crear objeto RKNN
    rknn = RKNN(verbose=False) # Poner verbose=True para más detalles

    # Configuración Previa
    print('--> Configurando modelo RKNN...')
    # Especificar la plataforma destino es crucial
    rknn.config(target_platform=platform)
    print('    Hecho.')

    # Cargar Modelo ONNX
    print('--> Cargando modelo ONNX...')
    ret = rknn.load_onnx(model=onnx_model_path)
    if ret != 0:
        print(f'    ¡ERROR al cargar {model_name}! Código: {ret}. ¿El archivo ONNX es válido y accesible?')
        rknn.release()
        return None
    print('    Hecho.')

    # Construir Modelo RKNN
    print('--> Construyendo modelo RKNN...')
    do_quant = dtype in ['i8', 'u8']
    build_args = {'do_quantization': do_quant}

    if do_quant:
        print(f"    Activando cuantización ({dtype}).")
        if dataset_path and os.path.exists(dataset_path):
            print(f"    Usando dataset de calibración: {dataset_path}")
            build_args['dataset'] = dataset_path
            # ¡IMPORTANTE! El dataset.txt para Whisper debe contener RUTAS A *ARCHIVOS DE AUDIO* (.wav, .mp3, etc.)
            # La herramienta RKNN internamente los procesará para generar los datos de calibración (espectrogramas Mel).
            # No pongas rutas a archivos .txt de espectrogramas directamente.
        else:
            print(f"    ¡ERROR! La cuantización ({dtype}) requiere un archivo 'dataset.txt' válido en '{dataset_path}'.")
            print(f"    Crea este archivo con rutas a archivos de audio (.wav) para calibración.")
            rknn.release()
            return None

    ret = rknn.build(**build_args)
    if ret != 0:
        print(f'    ¡ERROR al construir {model_name}! Código: {ret}. ¿Problemas de memoria, ops no soportadas, o dataset incorrecto?')
        rknn.release()
        return None
    print('    Hecho.')

    # Exportar Modelo RKNN
    print('--> Exportando modelo RKNN...')
    ret = rknn.export_rknn(output_path)
    if ret != 0:
        print(f'    ¡ERROR al exportar a {output_path}! Código: {ret}. ¿Permisos de escritura?')
        rknn.release()
        return None
    print(f'    Modelo exportado exitosamente a: {os.path.basename(output_path)}')
    print('    Hecho.')

    # Liberar recursos
    rknn.release()
    print(f"--- Fin Procesamiento RKNN (ÉXITO): {model_name} ---\n")
    return output_path

Ejecución de conversión

In [6]:
print("\n" + "="*60)
print("=== Iniciando Parte 2: Conversión de ONNX a RKNN ===")
print("="*60)

if RKNN is None:
    print("ADVERTENCIA: La biblioteca RKNN no está disponible. Saltando Parte 2.")
    print("="*60 + "\n")

# Validar plataforma y dtype
if DEFAULT_RKNN_PLATFORM not in SUPPORTED_RKNN_PLATFORMS:
    print(f"ERROR: Plataforma RKNN inválida '{DEFAULT_RKNN_PLATFORM}'. Soportadas: {SUPPORTED_RKNN_PLATFORMS}")

if DEFAULT_RKNN_DTYPE not in SUPPORTED_RKNN_DTYPES:
    print(f"ERROR: dtype RKNN inválido '{DEFAULT_RKNN_DTYPE}'. Soportados: {SUPPORTED_RKNN_DTYPES}")


print(f"Buscando archivos .onnx en: {MODEL_SAVE_DIR}")
onnx_files = glob.glob(os.path.join(MODEL_SAVE_DIR, '*.onnx'))

if not onnx_files:
    print(f"No se encontraron archivos .onnx en '{MODEL_SAVE_DIR}'. Asegúrate de que la Parte 1 se completó correctamente.")
    print("="*60 + "\n")


print(f"\nSe encontraron {len(onnx_files)} modelos ONNX para convertir:")
for model_path in onnx_files:
    print(f"  - {os.path.basename(model_path)}")

print(f"\nConfiguración de Conversión:")
print(f"  Plataforma Destino: {DEFAULT_RKNN_PLATFORM}")
print(f"  Tipo de Datos (dtype): {DEFAULT_RKNN_DTYPE}")
if DEFAULT_RKNN_DTYPE in ['i8', 'u8']:
        print(f"  Cuantización Activada. Dataset esperado en: {RKNN_DATASET_PATH}")
        if not os.path.exists(RKNN_DATASET_PATH):
            print(f"  ADVERTENCIA: ¡El archivo dataset '{RKNN_DATASET_PATH}' no existe!")
            print(f"               La conversión fallará si el dataset es requerido.")
else:
        print(f"  Cuantización Desactivada (FP16/BF16 en NPU o FP32 en CPU).")

print("\nIniciando proceso de conversión...\n")

rknn_success_files = []
rknn_fail_files = []

for onnx_path in onnx_files:
    rknn_output_path = convert_onnx_to_rknn(onnx_path, DEFAULT_RKNN_PLATFORM, DEFAULT_RKNN_DTYPE, MODEL_SAVE_DIR, RKNN_DATASET_PATH)
    if rknn_output_path:
        rknn_success_files.append(rknn_output_path)
    else:
        rknn_fail_files.append(os.path.basename(onnx_path)) # Guardar nombre del onnx que falló

print("\n" + "="*60)
print("=== Resumen Parte 2: Conversión RKNN ===")
print(f"  Modelos ONNX encontrados: {len(onnx_files)}")
print(f"  Conversiones Exitosas: {len(rknn_success_files)}")
for fpath in rknn_success_files: print(f"    - {os.path.basename(fpath)}")
print(f"  Conversiones Fallidas: {len(rknn_fail_files)}")
for fname in rknn_fail_files: print(f"    - {fname} (ONNX)")
print("="*60 + "\n")

if rknn_fail_files:
        print("ADVERTENCIA: Algunos modelos no pudieron ser convertidos a RKNN.")


I rknn-toolkit2 version: 2.3.0



=== Iniciando Parte 2: Conversión de ONNX a RKNN ===
Buscando archivos .onnx en: ./model

Se encontraron 2 modelos ONNX para convertir:
  - whisper_encoder_rjac_whisper-tiny-spanish.onnx
  - whisper_decoder_rjac_whisper-tiny-spanish.onnx

Configuración de Conversión:
  Plataforma Destino: rk3588
  Tipo de Datos (dtype): fp
  Cuantización Desactivada (FP16/BF16 en NPU o FP32 en CPU).

Iniciando proceso de conversión...


--- Procesando para RKNN: whisper_encoder_rjac_whisper-tiny-spanish.onnx ---
  Plataforma: rk3588, DType: fp
--> Configurando modelo RKNN...
    Hecho.
--> Cargando modelo ONNX...


I Loading : 100%|████████████████████████████████████████████████| 67/67 [00:00<00:00, 11808.49it/s]
[1;33mW[0m [1;33mload_onnx: The config.mean_values is None, zeros will be set for input 0![0m
[1;33mW[0m [1;33mload_onnx: The config.std_values is None, ones will be set for input 0![0m


    Hecho.
--> Construyendo modelo RKNN...


I OpFusing 0 :  27%|████████████▋                                  | 27/100 [00:00<00:01, 51.98it/s]





I OpFusing 2 : 100%|█████████████████████████████████████████████| 100/100 [00:00<00:00, 133.01it/s]








I OpFusing 2 : 100%|██████████████████████████████████████████████| 100/100 [00:01<00:00, 60.51it/s]
I rknn building ...
I rknn building done.
I rknn-toolkit2 version: 2.3.0


    Hecho.
--> Exportando modelo RKNN...
    Modelo exportado exitosamente a: whisper_encoder_rjac_whisper-tiny-spanish.rknn
    Hecho.
--- Fin Procesamiento RKNN (ÉXITO): whisper_encoder_rjac_whisper-tiny-spanish.onnx ---


--- Procesando para RKNN: whisper_decoder_rjac_whisper-tiny-spanish.onnx ---
  Plataforma: rk3588, DType: fp
--> Configurando modelo RKNN...
    Hecho.
--> Cargando modelo ONNX...


I Loading : 100%|████████████████████████████████████████████████| 102/102 [00:00<00:00, 993.08it/s]
[1;33mW[0m [1;33mload_onnx: The config.mean_values is None, zeros will be set for input 1![0m
[1;33mW[0m [1;33mload_onnx: The config.std_values is None, ones will be set for input 1![0m
[1;33mW[0m [1;33mbuild: For tensor ['/decoder/layers.0/self_attn/Slice_output_0'], the value smaller than -3e+38 has been corrected to -10000. Set opt_level to 2 or lower to disable this correction.[0m
[1;33mW[0m [1;33mbuild: For tensor ['/decoder/layers.0/self_attn/Slice_output_0_1'], the value smaller than -3e+38 has been corrected to -10000. Set opt_level to 2 or lower to disable this correction.[0m
[1;33mW[0m [1;33mbuild: For tensor ['/decoder/layers.0/self_attn/Slice_output_0_2'], the value smaller than -3e+38 has been corrected to -10000. Set opt_level to 2 or lower to disable this correction.[0m
[1;33mW[0m [1;33mbuild: For tensor ['/decoder/layers.0/self_attn/Slice_output_0_

    Hecho.
--> Construyendo modelo RKNN...


I OpFusing 1 :  98%|█████████████████████████████████████████████ | 98/100 [00:00<00:00, 138.66it/s]




I OpFusing 0 :   1%|▍                                               | 1/100 [00:00<01:36,  1.02it/s]




I OpFusing 2 : 100%|██████████████████████████████████████████████| 100/100 [00:01<00:00, 71.69it/s]








I OpFusing 2 : 100%|██████████████████████████████████████████████| 100/100 [00:01<00:00, 61.44it/s]
I rknn building ...
E RKNN: [13:29:05.012] channel is too large, may produce thousands of regtask, fallback to cpu!
E RKNN: [13:29:05.012] channel is too large, may produce thousands of regtask, fallback to cpu!
E RKNN: [13:29:05.012] channel is too large, may produce thousands of regtask, fallback to cpu!
E RKNN: [13:29:05.012] channel is too large, may produce thousands of regtask, fallback to cpu!
E RKNN: [13:29:05.044] channel is too large, may produce thousands of regtask, fallback to cpu!
I rknn building done.


    Hecho.
--> Exportando modelo RKNN...
    Modelo exportado exitosamente a: whisper_decoder_rjac_whisper-tiny-spanish.rknn
    Hecho.
--- Fin Procesamiento RKNN (ÉXITO): whisper_decoder_rjac_whisper-tiny-spanish.onnx ---


=== Resumen Parte 2: Conversión RKNN ===
  Modelos ONNX encontrados: 2
  Conversiones Exitosas: 2
    - whisper_encoder_rjac_whisper-tiny-spanish.rknn
    - whisper_decoder_rjac_whisper-tiny-spanish.rknn
  Conversiones Fallidas: 0



Si en este paso la conversión ha sido exitosa, es probable que el modelo RKNN esté listo para ser ejecutado en el chip. Sin embargo, en mi experiencia tuve muchos falsos positivos, en general los problemas se derivan de los shapes de los modelos ONNX, hice experimentos con Optimum y la preservación de los shapes de entrada y salida de los grafos y sus nodos son un dolor de cabeza. Si experimentas problemas sugiero revisar con Netron los grafos, puedes usar los ONNX proveidos como ejemplo por parte de Rockchip en rknn_model_zoo demo whisper para comparar, aunque este modelo está recortado de 20 segundos de audio, por tanto las dimensiones de muchos tensores van a diferir pero da una buena idea de lo que se espera por parte de RKNN Toolkit para hacer una conversión adecuada.

## Paso 3. Evaluar ONNX en local

Imports necesarios para esta fase

In [7]:
import onnxruntime
import scipy # Necesario para resample
import soundfile as sf

SAMPLE_RATE = 16000         # Tasa de muestreo esperada por Whisper (Hz)
N_FFT = 400                 # Tamaño de la ventana para la Transformada Rápida de Fourier (STFT)
HOP_LENGTH = 160            # Desplazamiento entre ventanas STFT consecutivas
CHUNK_LENGTH = 30           # Duración estándar de un segmento de audio procesado por Whisper (segundos)
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE # Número total de muestras de audio en un chunk de 30s
# MAX_LENGTH: Longitud máxima de la secuencia de frames de Mel que espera el encoder.
# Se calcula como: (CHUNK_LENGTH * SAMPLE_RATE) / HOP_LENGTH -> 3000 frames.
MAX_LENGTH = 3000
N_MELS = 80   
TASK_CODE_SPANISH=50262

# Archivo de audio de ejemplo para inferencia (¡Cámbialo por tu archivo!)
DEFAULT_AUDIO_PATH_FOR_INFERENCE = "./test_es.wav" 


Funciones utilitarias

1.  **Define Constantes y Funciones de Preprocesamiento de Audio:**
     * Constantes como `SAMPLE_RATE`, `N_FFT`, `HOP_LENGTH`, `MAX_MEL_LENGTH`, `N_MELS`.
     * Funciones para asegurar que el audio esté en mono (`ensure_channels`) y a 16000 Hz (`ensure_sample_rate`).
2.  **Define Funciones de Procesamiento de Espectrograma:**
     * Función para cargar los filtros Mel (`load_mel_filters`) desde el archivo `.txt` generado en la Parte 1.
     * Función para calcular el espectrograma Log-Mel (`log_mel_spectrogram`) usando PyTorch y los filtros cargados.
     * Función para ajustar (padding o recorte) el espectrograma a la longitud fija `MAX_MEL_LENGTH` (`pad_or_trim`).
3.  **Define Funciones de Vocabulario e Inferencia:**
     * Función para cargar el vocabulario (`read_vocab`) desde el archivo `vocab_es.txt` generado en la Parte 1.
     * Funciones para inicializar (`init_model`) y liberar (`release_model`) los modelos, soportando tanto ONNX (`onnxruntime`) como RKNN (`rknn-toolkit2`).
     * Función para ejecutar el encoder (`run_encoder`) con el espectrograma Mel preprocesado.
     * Función para ejecutar el decoder (`run_decoder`) de forma auto-regresiva, generando tokens hasta predecir el token de fin de texto (`<|endoftext|>`) o alcanzar un límite. Utiliza el vocabulario para decodificar los IDs de token a texto.
4.  **Ejecución Principal de Inferencia:**
     * Carga el archivo de audio especificado.
     * Preprocesa el audio (mono, 16kHz).
     * Calcula y ajusta el espectrograma Log-Mel.
     * Carga el vocabulario.
     * Inicializa los modelos (RKNN si están disponibles y se prefiere, si no ONNX).
     * Ejecuta el encoder.
     * Ejecuta el decoder para obtener la transcripción.
     * Imprime la transcripción resultante.
     * Libera los recursos del modelo.

In [8]:
def ensure_sample_rate(waveform, original_sample_rate, desired_sample_rate=SAMPLE_RATE):
    """
    Remuestrea la forma de onda de audio a la tasa de muestreo deseada (SAMPLE_RATE).
    Utiliza scipy.signal.resample para alta calidad.
    """
    if original_sample_rate != desired_sample_rate:
        print(f"Remuestreando audio: {original_sample_rate} Hz -> {desired_sample_rate} Hz")
        desired_length = int(round(float(len(waveform)) / original_sample_rate * desired_sample_rate))
        waveform = scipy.signal.resample(waveform, desired_length)
        print("Remuestreo completado.")
    # Convierte a float32 por si acaso resample cambia el tipo
    return waveform.astype(np.float32), desired_sample_rate

def ensure_channels(waveform, original_channels, desired_channels=1):
    """
    Convierte el audio al número deseado de canales (generalmente a mono).
    Promedia los canales si hay más de los deseados.
    """
    # Primero, inferir el número de canales si no se dio explícitamente
    inferred_channels = 1
    if waveform.ndim > 1:
        # Asumir que el eje más corto es el de canales (más robusto)
        if waveform.shape[0] < waveform.shape[1]: # Formato (canales, muestras)
            inferred_channels = waveform.shape[0]
        else: # Formato (muestras, canales)
            inferred_channels = waveform.shape[1]
    elif waveform.ndim == 1:
        inferred_channels = 1
    else: # ndim == 0 (escalar) o > 2, raro para audio
        print(f"Advertencia: Forma de onda con dimensiones inesperadas: {waveform.shape}. Asumiendo 1 canal.")
        inferred_channels = 1

    if inferred_channels > desired_channels:
        print(f"Convirtiendo canales: {inferred_channels} -> {desired_channels} (Mono)")
        # Intenta promediar a lo largo del eje correcto
        if waveform.ndim > 1:
             if waveform.shape[0] == inferred_channels: # Formato (canales, muestras)
                  waveform = np.mean(waveform, axis=0)
             elif waveform.shape[1] == inferred_channels: # Formato (muestras, canales)
                  waveform = np.mean(waveform, axis=-1)
             else: # No coincide con ninguna suposición
                 print("Advertencia: No se pudo determinar el eje de canales para promediar. Se devolverá como está.")
                 return waveform, inferred_channels
        # Si waveform.ndim era 1 pero inferred_channels>1, algo iba mal antes.
        # Ya estamos en mono si ndim es 1.
        print("Conversión a mono completada.")
        return waveform, desired_channels

    elif inferred_channels < desired_channels:
         # No se puede crear canales de la nada de forma significativa
         print(f"Advertencia: El audio tiene {inferred_channels} canales, se requieren {desired_channels}. No se puede convertir.")
         return waveform, inferred_channels
    else:
        # El número de canales ya es el deseado
        return waveform, inferred_channels

# --- Funciones Relacionadas con Vocabulario ---

def read_vocab(vocab_path):
    """
    Lee el archivo de vocabulario (formato: ID<espacio>TOKEN por línea).
    Mapea los IDs (como string) a los tokens (string).
    """
    if not os.path.exists(vocab_path):
         raise FileNotFoundError(f"Archivo de vocabulario no encontrado en: {vocab_path}")
    vocab = {}
    print(f"Leyendo vocabulario desde: {vocab_path}...")
    with open(vocab_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split(' ', 1) # Divide en el primer espacio: ID TOKEN
            if len(parts) == 2:
                token_id_str = parts[0]
                token_str = parts[1]
                vocab[token_id_str] = token_str
            elif len(parts) == 1 and parts[0]: # Asegura que no sea una línea vacía
                 # Maneja casos donde podría haber solo un ID (ej. token vacío o especial sin espacio)
                 token_id_str = parts[0]
                 vocab[token_id_str] = "" # Asigna un string vacío
            # Ignora líneas vacías o mal formateadas silenciosamente
    print(f"Vocabulario leído. {len(vocab)} tokens cargados.")
    return vocab

# --- Funciones de Procesamiento de Espectrograma Mel ---

def pad_or_trim(mel_array, length=MAX_LENGTH):
    """
    Ajusta (rellena o recorta) el espectrograma Mel a una longitud fija (MAX_LENGTH).
    """
    original_length = mel_array.shape[1] # Longitud temporal original

    if original_length > length:
        # Recortar: Selecciona solo los primeros 'length' frames temporales
        print(f"Recortando espectrograma Mel de {original_length} a {length} frames.")
        return mel_array[:, :length]
    elif original_length < length:
        # Rellenar (Padding): Añade columnas de valores al final
        pad_width = length - original_length
        print(f"Rellenando espectrograma Mel de {original_length} a {length} frames.")
        # Usamos -1.0 como valor de padding, relacionado con la normalización de Whisper.
        return np.pad(mel_array, ((0, 0), (0, pad_width)), mode='constant', constant_values=-1.0)
    else:
        # La longitud ya es la correcta
        return mel_array

def load_mel_filters(n_mels=N_MELS, filters_path="./model/mel_80_filters.txt"):
    """
    Carga los pesos del banco de filtros Mel precalculados desde un archivo de texto.
    Espera un array plano que se remodela a [n_mels, n_fft // 2 + 1].
    """
    if not os.path.exists(filters_path):
        raise FileNotFoundError(f"Archivo de filtros Mel no encontrado en: {filters_path}")
    try:
        expected_columns = N_FFT // 2 + 1 # 201 para N_FFT=400
        mels_data = np.loadtxt(filters_path, dtype=np.float32)
        expected_size = n_mels * expected_columns
        if mels_data.size != expected_size:
             raise ValueError(f"Tamaño inesperado de datos en {filters_path}. "
                              f"Se esperaban {expected_size} elementos, se encontraron {mels_data.size}")
        mels_data = mels_data.reshape((n_mels, expected_columns))
        print(f"Filtros Mel ({mels_data.shape}) cargados desde {filters_path}")
        return torch.from_numpy(mels_data) # Devuelve Tensor PyTorch
    except Exception as e:
        print(f"Error cargando o remodelando filtros Mel desde {filters_path}: {e}")
        raise

def log_mel_spectrogram(audio, n_mels=N_MELS, filters=None, filters_path="./model/mel_80_filters.txt"):
    """
    Calcula el espectrograma Log-Mel usando PyTorch, replicando Whisper.
    """
    # Asegura que el audio sea un tensor de PyTorch
    if not torch.is_tensor(audio):
        audio = torch.from_numpy(audio)

    # Carga los filtros Mel si no se proporcionaron
    if filters is None:
        print("Cargando filtros Mel predeterminados...")
        filters = load_mel_filters(n_mels, filters_path) # Usa la función anterior para cargar

    filters = filters.to(audio.device) # Mismo dispositivo que el audio
    window = torch.hann_window(N_FFT).to(audio.device) # Ventana Hann

    # STFT
    stft_result = torch.stft(audio, n_fft=N_FFT, hop_length=HOP_LENGTH,
                             window=window, return_complex=True, center=True)

    # Espectrograma de Potencia (Magnitudes al cuadrado)
    # Whisper usa stft[..., :-1], que son N_FFT // 2 bins (400 // 2 = 200 bins)
    # Sin embargo, el filtro precalculado suele ser [80, 201]. Verifiquemos.
    # Si filters es [80, 201], necesitamos magnitudes [201, T]
    # stft_result es [201, T]. Entonces usamos stft_result.abs()**2
    # PERO: El código original usa stft[..., :-1].abs() ** 2 -> [200, T]
    # Y luego filters @ magnitudes. Esto implica que filters debe ser [80, 200].
    # Vamos a seguir el código original: stft[..., :-1] y asumir filtros [80, 200].
    # Si `load_mel_filters` carga [80, 201] y da error de tamaño, hay que ajustar
    # o `load_mel_filters` o esta línea. El error original no era aquí, así que
    # mantenemos la línea original por ahora.
    magnitudes = stft_result[..., :-1].abs() ** 2 # Shape: [..., 200, T]

    # Verifica compatibilidad de dimensiones (Defensivo)
    if filters.shape[1] != magnitudes.shape[-2]:
         print(f"Advertencia: Incompatibilidad de dimensiones entre Filtros Mel {filters.shape} y Magnitudes STFT {magnitudes.shape}. "
               f"Se esperaba Filtros[1] == Magnitudes[-2]. Intentando continuar...")
         # Podría fallar en la línea siguiente si las dimensiones son incompatibles.

    # Aplicar filtros Mel
    mel_spec = filters @ magnitudes

    # Logaritmo y Clamping
    log_spec = torch.clamp(mel_spec, min=1e-10).log10()

    # Normalización Whisper
    log_spec = torch.maximum(log_spec, log_spec.max() - 8.0)
    log_spec = (log_spec + 4.0) / 4.0

    print(f"Espectrograma Log-Mel calculado. Forma: {log_spec.shape}")
    return log_spec

# --- Funciones de Inferencia del Modelo ---

def run_encoder(encoder_model, mel_input):
    """
    Ejecuta la inferencia del modelo Encoder (ONNX o RKNN).
    """
    expected_shape = (1, N_MELS, MAX_LENGTH)
    if mel_input.shape != expected_shape:
         raise ValueError(f"Input de Encoder con forma inesperada: {mel_input.shape}. Se esperaba {expected_shape}")

    if isinstance(encoder_model, RKNN):
        print("Ejecutando inferencia RKNN (Encoder)...")
        outputs = encoder_model.inference(inputs=[mel_input.astype(np.float32)]) # Asegura float32
        out_encoder = outputs[0]
    elif isinstance(encoder_model, onnxruntime.InferenceSession):
        print("Ejecutando inferencia ONNX (Encoder)...")
        input_name = encoder_model.get_inputs()[0].name
        output_name = encoder_model.get_outputs()[0].name
        print(f"  Input ONNX: '{input_name}', Output ONNX: '{output_name}'")
        outputs = encoder_model.run([output_name], {input_name: mel_input.astype(np.float32)}) # Asegura float32
        out_encoder = outputs[0]
    else:
        raise TypeError("Tipo de modelo Encoder no soportado. Debe ser RKNN o ONNX InferenceSession.")

    print(f"Salida del Encoder obtenida. Forma: {out_encoder.shape}")
    return out_encoder

# def _decode_step(...):  # Eliminada porque no se usaba y la lógica estaba en _decode

# --- REVERTIDA A LA LÓGICA ORIGINAL ---
def _decode(decoder_model, tokens, out_encoder):
    """
    Realiza UN PASO de inferencia del modelo Decoder (ONNX o RKNN).
    Esta es la versión que coincide con la lógica original del script,
    usando chequeo de tipo por string y nombres de input hardcodeados.

    Args:
        decoder_model (RKNN or onnxruntime.InferenceSession): Modelo decoder cargado.
        tokens (list): Lista de IDs de token actuales (int).
        out_encoder (np.ndarray): Salida del modelo encoder.

    Returns:
        np.ndarray: Logits predichos por el decoder, con forma
                    (1, seq_len, vocab_size).
    """
    # Prepara el input de tokens como un array numpy (1, seq_len) de int64
    tokens_array = np.asarray([tokens], dtype=np.int64)

    # Ejecuta inferencia según el tipo de modelo (usando chequeo por string como en el original)
    if 'rknn' in str(type(decoder_model)).lower(): # Chequeo insensible a mayúsculas
        # RKNN espera una lista de inputs: [tokens_array, encoder_output]
        # print(f"  Input RKNN Decoder: tokens={tokens_array.shape}, encoder_out={out_encoder.shape}")
        outputs = decoder_model.inference(inputs=[tokens_array, out_encoder])
        out_decoder = outputs[0]
    elif 'onnxruntime' in str(type(decoder_model)).lower(): # Chequeo insensible a mayúsculas
        # ONNX espera un diccionario {nombre_input: valor_input}
        # Usa los nombres hardcodeados del script original ("tokens", "audio")
        # Asegúrate que estos nombres coinciden con los de TU modelo ONNX decoder.
        input_dict = {
            "tokens": tokens_array,
            "audio": out_encoder
        }
        # print(f"  Input ONNX Decoder: tokens={tokens_array.shape}, audio={out_encoder.shape}")
        # `run(None, ...)` ejecuta todas las salidas
        outputs = decoder_model.run(None, input_dict)
        out_decoder = outputs[0] # Asume que los logits son la primera salida
    else:
        # Si no es ni RKNN ni ONNX, lanza un error.
        # Usamos isinstance para chequeo futuro, aunque el if/elif anterior lo cubre.
        if not isinstance(decoder_model, (RKNN, onnxruntime.InferenceSession)):
             raise TypeError("Tipo de modelo Decoder no soportado. Debe ser RKNN o ONNX InferenceSession.")
        # Si llegara aquí por alguna razón inesperada
        print(f"Advertencia: Tipo de modelo decoder detectado como {type(decoder_model)}, no manejado explícitamente por el chequeo de string. Intentando continuar...")
        # Intenta como ONNX por defecto (o lanza excepción si falla)
        try:
            input_dict = {"tokens": tokens_array, "audio": out_encoder}
            outputs = decoder_model.run(None, input_dict)
            out_decoder = outputs[0]
        except Exception as e:
             raise TypeError(f"No se pudo ejecutar inferencia con el tipo de modelo {type(decoder_model)}: {e}")


    # print(f"  Logits obtenidos. Forma: {out_decoder.shape}")
    return out_decoder


def run_decoder(decoder_model, out_encoder, vocab, task_code):
    """
    Realiza el proceso de decodificación autoregresiva (greedy search).
    Genera texto token a token hasta encontrar <|eot|> o alcanzar límite.
    Usa la lógica específica de manejo de tokens del script original.
    """
    # Asegura que task_code sea un entero (Corrección del error de tupla)
    if not isinstance(task_code, int):
        raise TypeError(f"run_decoder esperaba task_code como int, pero recibió {type(task_code)}")

    # --- Inicialización de la Secuencia de Tokens (como en el original) ---
    eot_token_id = 50257      # <|endoftext|>
    sot_token_id = 50258      # <|startoftranscript|>
    # task_code                # Idioma/Tarea (ej. 50262 para <|es|>)
    token_50359 = 50359        # Token desconocido o específico del modelo (¿quizás <|transcribe|>?)
    no_timestamps_id = 50363 # <|notimestamps|>
    timestamp_begin = 50364    # Inicio de tokens de tiempo

    # Secuencia inicial [sot, lang, ???, no_timestamps]
    initial_tokens = [sot_token_id, task_code, token_50359, no_timestamps_id]
    tokens = list(initial_tokens) # Copia modificable

    # --- Lógica Específica de Padding/Pop del script original ---
    # El decoder parece esperar una longitud fija de entrada (12 en el original)
    max_tokens_input_len = 12
    pop_id = max_tokens_input_len # Índice desde donde se empieza a eliminar/reemplazar

    # Rellenar/Inicializar la lista `tokens` a longitud `max_tokens_input_len`
    # repitiendo la secuencia inicial. (Confirmado como causa del error si task_code no es int)
    if len(tokens) < max_tokens_input_len:
         num_repeats = max_tokens_input_len // len(initial_tokens)
         remainder = max_tokens_input_len % len(initial_tokens)
         # Construye la lista asegurando que todos los elementos sean enteros
         tokens = [int(t) for t in initial_tokens] * num_repeats + [int(t) for t in initial_tokens[:remainder]]
         print(f"Advertencia: Se inicializaron los tokens a una longitud fija de {max_tokens_input_len} "
               f"repitiendo la secuencia inicial. Esto es específico de este script.")
    elif len(tokens) > max_tokens_input_len:
         tokens = [int(t) for t in tokens[:max_tokens_input_len]] # Truncar si es más larga
         print(f"Advertencia: Se truncaron los tokens iniciales a {max_tokens_input_len}.")
    else:
        # Asegura que todos sean enteros incluso si la longitud era correcta
        tokens = [int(t) for t in tokens]

    # Verifica que la lista 'tokens' ahora solo contenga enteros
    if not all(isinstance(t, int) for t in tokens):
        raise TypeError(f"La inicialización de tokens falló, todavía contiene no enteros: {tokens}")

    # Almacenar el texto generado
    generated_text = ""
    max_decoding_steps = 224 # Límite de seguridad
    print(f"\nIniciando decodificación (máx {max_decoding_steps} pasos):")
    print(f"  Tokens iniciales (len={len(tokens)}): {tokens}") # Ahora deberían ser todos ints

    next_token_id = -1 # Inicializa para el bucle while

    # Bucle principal de decodificación
    for step in range(max_decoding_steps):
        # 1. Ejecutar un paso del decoder usando la función _decode (revertida)
        try:
            # Pasamos la LISTA 'tokens', _decode la convertirá a array internamente
            logits = _decode(decoder_model, tokens, out_encoder)
            # Logits tiene forma (1, seq_len, vocab_size)
        except Exception as e:
            print(f"\nError durante el paso de decodificación {step+1} en _decode: {e}")
            import traceback
            traceback.print_exc()
            break # Salir del bucle

        # 2. Seleccionar el siguiente token (Greedy Search)
        # El logit para el *siguiente* token está en la última posición temporal.
        next_token_logits = logits[0, -1, :] # Shape: (vocab_size,)
        next_token_id = next_token_logits.argmax()

        # 3. Convertir ID a token string y manejar Mojibake
        if str(next_token_id) in vocab:
            raw_token_str = vocab[str(next_token_id)]
            processed_token_str = raw_token_str
            # ---- Intento de Corrección de Mojibake ----
            try:
                starts_with_G = raw_token_str.startswith('\u0120')
                core_token_str = raw_token_str[1:] if starts_with_G else raw_token_str
                original_bytes = core_token_str.encode('latin-1')
                corrected_core = original_bytes.decode('utf-8')
                processed_token_str = ('\u0120' + corrected_core) if starts_with_G else corrected_core
                # if processed_token_str != raw_token_str:
                #      print(f"    (Corrección Mojibake: '{raw_token_str}' -> '{processed_token_str}')")
            except (UnicodeEncodeError, UnicodeDecodeError):
                 processed_token_str = raw_token_str # Usa el original si falla la corrección
            # ---- Fin Corrección Mojibake ----
            next_token_str = processed_token_str
        else:
             print(f"Advertencia: ID de token {next_token_id} no encontrado en el vocabulario.")
             next_token_str = f"<UNK_{next_token_id}>"

        print(f"  Paso {step+1}: Predicho ID={next_token_id}, Token='{next_token_str}'")

        # 4. Comprobar condición de parada (<|eot|>)
        if next_token_id == eot_token_id:
            print(f"  Token de fin de texto ({eot_token_id}) detectado. Terminando decodificación.")
            # No añadir <|eot|> a la secuencia 'tokens' final ni al texto.
            break

        # 5. Añadir el nuevo token ID a la secuencia `tokens`
        tokens.append(next_token_id) # Añade al final

        # 6. Saltar la adición al texto si es un timestamp
        if next_token_id >= timestamp_begin:
            print(f"    (Token de timestamp {next_token_id}, omitido del texto)")
            # PERO AÚN SE ACTUALIZA LA LISTA DE TOKENS con el pop siguiente

        # 7. Actualizar la lista `tokens` usando la lógica `pop_id` del original
        #    Esto mantiene la longitud de `tokens` fija en `max_tokens_input_len`.
        if pop_id > len(initial_tokens): # Si pop_id > 4
            pop_id -= 1

        # Elimina el token en la posición `pop_id` (ahora que ya se añadió el nuevo al final)
        try:
            removed_token = tokens.pop(pop_id)
            # print(f"    Token {removed_token} eliminado de índice {pop_id}. pop_id ahora {pop_id if pop_id <= len(initial_tokens) else pop_id -1}. Len tokens: {len(tokens)}")
        except IndexError:
            print(f"Error: Índice pop_id={pop_id} fuera de rango para tokens (len={len(tokens)}). Deteniendo.")
            break


        # 8. Añadir el token string al resultado (solo si no es timestamp)
        if next_token_id < timestamp_begin:
             generated_text += next_token_str
             # print(f"    Texto acumulado: '{generated_text}'")

    else: # Se ejecuta si el bucle for termina sin 'break' (límite alcanzado)
        print(f"\nAdvertencia: Se alcanzó el límite máximo de pasos de decodificación ({max_decoding_steps}).")

    # --- Post-procesamiento del Texto ---
    # Reemplaza 'Ġ' por espacio, limpia tokens especiales residuales.
    final_text = generated_text.replace('\u0120', ' ').replace('<|endoftext|>', '').replace('\n', '').strip()

    # Decodificación Base64 (si es Chino - task_code 50260)
    if task_code == 50260:
        print("Tarea detectada como Chino (ZH), intentando decodificación Base64...")
        try:
            missing_padding = len(final_text) % 4
            if missing_padding: final_text += '=' * (4 - missing_padding)
            final_text = base64.b64decode(final_text).decode('utf-8')
            print("Decodificación Base64 completada.")
        except Exception as e:
            print(f"Error durante la decodificación Base64: {e}. Devolviendo texto como está.")

    return final_text


# --- Funciones de Inicialización/Liberación del Modelo ---

def init_model(model_path, target=None, device_id=None):
    """
    Inicializa y carga un modelo ONNX o RKNN.
    """
    print(f"Inicializando modelo desde: {model_path}")
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Archivo de modelo no encontrado: {model_path}")

    if model_path.endswith(".rknn"):
        print("Detectado modelo RKNN.")
        model = RKNN(verbose=False)
        print('--> Cargando modelo RKNN...')
        ret = model.load_rknn(model_path)
        if ret != 0:
            raise RuntimeError(f"Fallo al cargar RKNN {model_path}: {ret}")
        print('    Modelo RKNN cargado con éxito.')
        print(f'--> Inicializando entorno de ejecución RKNN (Target: {target}, Device ID: {device_id})...')
        ret = model.init_runtime(target=target, device_id=device_id)
        if ret != 0:
            model.release()
            raise RuntimeError(f"Fallo al inicializar runtime RKNN: {ret}")
        print('    Entorno de ejecución RKNN inicializado con éxito.')
        return model

    elif model_path.endswith(".onnx"):
        print("Detectado modelo ONNX.")
        print('--> Creando sesión de inferencia ONNX Runtime...')
        try:
            providers = ['CPUExecutionProvider'] # Opcionalmente añadir 'CUDAExecutionProvider' si hay GPU
            print(f"    Usando proveedores: {providers}")
            model = onnxruntime.InferenceSession(model_path, providers=providers)
            print(f"    Sesión ONNX creada. Proveedor activo: {model.get_providers()}")
            return model
        except Exception as e:
            print(f"Error al crear la sesión ONNX para {model_path}: {e}")
            raise
    else:
        raise ValueError(f"Formato de modelo no soportado: {model_path}. Use .rknn o .onnx")


def release_model(model):
    """
    Libera los recursos asociados a un modelo cargado (RKNN o ONNX).
    """
    if model is None: return
    if isinstance(model, RKNN):
        print("Liberando recursos del modelo RKNN...")
        model.release()
        print("Modelo RKNN liberado.")
    elif isinstance(model, onnxruntime.InferenceSession):
        print("Liberando modelo ONNX (implícito al eliminar referencia)...")
        del model
        print("Referencia a sesión ONNX eliminada.")
    else:
        print(f"Tipo de modelo desconocido ({type(model)}), no se puede liberar explícitamente.")



Ejecución del la inferencia usando modelo en ONNX

In [9]:
audio_path = DEFAULT_AUDIO_PATH_FOR_INFERENCE
encoder_path = os.path.join(MODEL_SAVE_DIR, f"whisper_encoder_{DEFAULT_MODEL_NAME.replace('/', '_')}.onnx")
decoder_path = os.path.join(MODEL_SAVE_DIR, f"whisper_decoder_{DEFAULT_MODEL_NAME.replace('/', '_')}.onnx")
vocab_path = os.path.join(MODEL_SAVE_DIR, "vocab_es.txt")
filters_path = os.path.join(MODEL_SAVE_DIR, f"mel_{N_MELS_DEFAULT}_filters.txt") # Asume N_MELS=80
prefer_rknn=False, # Intentar usar RKNN si está disponible
rknn_target=DEFAULT_RKNN_PLATFORM,
rknn_device_id=None,
task_token_id=TASK_CODE_SPANISH
max_output_tokens = 100

print("\n" + "="*60)
print("=== Iniciando Parte 3: Inferencia Local ===")
print("="*60)

print(f"Archivo de Audio: {audio_path}")

print("\n--- Configuración de Transcripción ---")
print(f"  Encoder: {encoder_path}")
print(f"  Decoder: {decoder_path}")
print(f"  Filters: {filters_path}")
print(f"  Vocab:   {vocab_path}")

if prefer_rknn:
        print(f"  Target NPU: {rknn_target} (Device ID: {rknn_device_id or 'default'})")
else:
        print("  Target: CPU (ONNX Runtime)")
        print("-------------------------------------\n")

# Variables para los modelos (inicializadas a None)
encoder_model = None
decoder_model = None

try:
        # --- 1. Carga de Vocabulario ---
        print("[1/4] Cargando vocabulario...")
        vocab = read_vocab(vocab_path)

        # --- 2. Carga y Preprocesamiento del Audio ---
        print(f"\n[2/4] Cargando y preprocesando audio: {audio_path}...")
        audio_data, original_sr = sf.read(audio_path, dtype='float32', always_2d=False)
        print(f"  Audio original - SR: {original_sr} Hz, Canales: {audio_data.ndim}, Duración: {len(audio_data)/original_sr:.2f}s")
        audio_data, num_channels = ensure_channels(audio_data, audio_data.ndim)
        if num_channels != 1: raise ValueError("Se requiere audio mono.")
        audio_data, current_sr = ensure_sample_rate(audio_data, original_sr)
        if current_sr != SAMPLE_RATE: raise ValueError("Fallo en remuestreo.")
        print(f"  Audio preprocesado - SR: {current_sr} Hz, Canales: 1")

        print("  Calculando espectrograma Log-Mel...")
        log_mel_features = log_mel_spectrogram(audio_data, filters_path=filters_path)
        log_mel_numpy = log_mel_features.numpy()

        print(f"  Ajustando espectrograma a longitud fija {MAX_LENGTH}...")
        x_mel = pad_or_trim(log_mel_numpy, length=MAX_LENGTH)
        x_mel = np.expand_dims(x_mel, 0) # Añadir Batch dim -> (1, N_MELS, MAX_LENGTH)
        print(f"  Preprocesamiento completado. Forma final Mel: {x_mel.shape}")

        # --- 3. Inicialización de Modelos ---
        print("\n[3/4] Inicializando modelos Encoder y Decoder...")
        encoder_model = init_model(encoder_path, rknn_target, rknn_device_id)
        decoder_model = init_model(decoder_path, rknn_target, rknn_device_id)
        print("  Modelos inicializados.")

        # --- 4. Inferencia ---
        print("\n[4/4] Ejecutando inferencia...")
        print("\n--- Ejecutando Encoder ---")
        out_encoder = run_encoder(encoder_model, x_mel)

        print("\n--- Ejecutando Decoder ---")
        # Pasar el task_token_id asegurado como entero
        result = run_decoder(decoder_model, out_encoder, vocab, task_token_id)

        # --- Mostrar Resultado ---
        print("\n" + "="*30)
        print("   Transcripción Resultante")
        print("="*30)
        print(result)
        print("="*30 + "\n")

except FileNotFoundError as e:
         print(f"\nError Fatal: Archivo no encontrado - {e}")
         exit(1)
except (ValueError, TypeError) as e: # Captura ambos tipos de error comunes
         print(f"\nError Fatal: Problema con los datos o configuración - {e}")
         import traceback
         traceback.print_exc()
         exit(1)
except RuntimeError as e:
         print(f"\nError Fatal: Problema con el runtime (RKNN/ONNX) - {e}")
         exit(1)
except Exception as e:
        print(f"\nError Inesperado: {e}")
        import traceback
        print("\n--- Traceback ---"); traceback.print_exc(); print("-----------------\n")
        exit(1)
finally:
        # --- Liberar Recursos ---
        print("\n--- Limpieza de Recursos ---")
        release_model(encoder_model)
        release_model(decoder_model)
        print("--------------------------\n")
        print("Proceso finalizado.")


=== Iniciando Parte 3: Inferencia Local ===
Archivo de Audio: ./test_es.wav

--- Configuración de Transcripción ---
  Encoder: ./model/whisper_encoder_rjac_whisper-tiny-spanish.onnx
  Decoder: ./model/whisper_decoder_rjac_whisper-tiny-spanish.onnx
  Filters: ./model/mel_80_filters.txt
  Vocab:   ./model/vocab_es.txt
  Target NPU: ('rk3588',) (Device ID: (None,))
[1/4] Cargando vocabulario...
Leyendo vocabulario desde: ./model/vocab_es.txt...
Vocabulario leído. 51865 tokens cargados.

[2/4] Cargando y preprocesando audio: ./test_es.wav...
  Audio original - SR: 44100 Hz, Canales: 1, Duración: 9.11s
Remuestreando audio: 44100 Hz -> 16000 Hz
Remuestreo completado.
  Audio preprocesado - SR: 16000 Hz, Canales: 1
  Calculando espectrograma Log-Mel...
Cargando filtros Mel predeterminados...
Filtros Mel ((80, 201)) cargados desde ./model/mel_80_filters.txt
Espectrograma Log-Mel calculado. Forma: torch.Size([80, 910])
  Ajustando espectrograma a longitud fija 3000...
Rellenando espectrograma 