Instalamos las librerias necesarias (Python 3.13.7)

In [1]:
!pip install openai-whisper librosa pydub scikit-learn speechrecognition ipykernel



Importamos las librerias necesarias y creamos las funciones que necesitamos para el proyecto

In [2]:
import whisper
import librosa
import numpy as np
from pydub import AudioSegment
import os
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
import glob
from datetime import datetime

def convert_mp3_to_wav(mp3_path):
    """Convierte MP3 a WAV manteniendo la calidad"""
    wav_path = mp3_path.replace('.mp3', '_converted.wav')
    audio = AudioSegment.from_mp3(mp3_path)
    # Convertir a mono y 16kHz para mejor procesamiento
    audio = audio.set_channels(1).set_frame_rate(16000)
    audio.export(wav_path, format='wav')
    return wav_path

def detect_number_of_speakers(audio_path, max_speakers=4):
    """
    Detecta automáticamente el número de hablantes usando análisis espectral
    """
    print("  Detectando número de hablantes...")
    
    # Cargar audio
    y, sr = librosa.load(audio_path, sr=16000)
    
    # Extraer características para clustering
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20, hop_length=512)
    mfccs = mfccs.T
    
    # Probar diferentes números de clusters
    best_score = -1
    best_n = 2  # Default a 2 hablantes
    
    for n in range(2, min(max_speakers + 1, len(mfccs))):
        try:
            clustering = AgglomerativeClustering(n_clusters=n)
            labels = clustering.fit_predict(mfccs[:1000])  # Usar subset para velocidad
            
            # Calcular score de silueta (métrica de calidad del clustering)
            if len(np.unique(labels)) > 1:
                score = silhouette_score(mfccs[:1000], labels)
                if score > best_score:
                    best_score = score
                    best_n = n
        except:
            continue
    
    print(f"  Número de hablantes detectados: {best_n}")
    return best_n

def extract_voice_embeddings(audio_path, segments):
    """
    Extrae embeddings de voz para cada segmento usando características MFCC avanzadas
    """
    y, sr = librosa.load(audio_path, sr=16000)
    
    embeddings = []
    valid_segments = []
    
    for segment in segments:
        start_sample = int(segment["start"] * sr)
        end_sample = int(segment["end"] * sr)
        
        # Asegurar que los índices estén dentro del rango
        if start_sample < len(y) and end_sample <= len(y) and (end_sample - start_sample) > 512:
            segment_audio = y[start_sample:end_sample]
            
            # Extraer características MFCC avanzadas
            mfcc = librosa.feature.mfcc(
                y=segment_audio, 
                sr=sr, 
                n_mfcc=20,
                n_fft=2048,
                hop_length=512
            )
            
            # Calcular estadísticas de las características
            mfcc_mean = np.mean(mfcc, axis=1)
            mfcc_std = np.std(mfcc, axis=1)
            
            # Combinar en un embedding
            embedding = np.concatenate([mfcc_mean, mfcc_std])
            embeddings.append(embedding)
            valid_segments.append(segment)
    
    return np.array(embeddings), valid_segments

def advanced_speaker_diarization(audio_path, segments, num_speakers):
    """
    Diarización avanzada usando embeddings de voz y clustering
    """
    print("  Realizando diarización avanzada...")
    
    # Extraer embeddings
    embeddings, valid_segments = extract_voice_embeddings(audio_path, segments)
    
    if len(embeddings) < 2:
        print("  No hay suficientes segmentos para diarización")
        for segment in segments:
            segment["speaker"] = "Persona_01"
        return segments
    
    # Clustering con número de hablantes detectado
    clustering = AgglomerativeClustering(n_clusters=min(num_speakers, len(embeddings)))
    speaker_labels = clustering.fit_predict(embeddings)
    
    # Asignar etiquetas a los segmentos válidos
    for i, segment in enumerate(valid_segments):
        segment["speaker"] = f"Persona_{speaker_labels[i] + 1:02d}"
    
    # Para segmentos no válidos, asignar basado en proximidad temporal
    last_valid_speaker = "Persona_01"
    for segment in segments:
        if "speaker" not in segment:
            segment["speaker"] = last_valid_speaker
        else:
            last_valid_speaker = segment["speaker"]
    
    return segments

def group_consecutive_segments(segments, pause_threshold=2.0):
    """
    Agrupa segmentos consecutivos del mismo hablante y añade "..." para pausas
    """
    if not segments:
        return []
    
    grouped_segments = []
    current_speaker = segments[0]["speaker"]
    current_text = segments[0]["text"].strip()
    current_start = segments[0]["start"]
    current_end = segments[0]["end"]
    
    for i in range(1, len(segments)):
        current_segment = segments[i]
        prev_segment = segments[i-1]
        
        # Calcular pausa entre segmentos
        pause_duration = current_segment["start"] - prev_segment["end"]
        
        # Si es el mismo hablante
        if current_segment["speaker"] == current_speaker:
            # Si hay una pausa significativa, añadir "..."
            if pause_duration > pause_threshold:
                current_text += " ... " + current_segment["text"].strip()
            else:
                current_text += " " + current_segment["text"].strip()
            
            # Actualizar el tiempo final
            current_end = current_segment["end"]
        
        # Si cambia de hablante
        else:
            # Guardar el segmento agrupado anterior
            grouped_segments.append({
                "speaker": current_speaker,
                "text": current_text,
                "start": current_start,
                "end": current_end
            })
            
            # Comenzar nuevo grupo
            current_speaker = current_segment["speaker"]
            current_text = current_segment["text"].strip()
            current_start = current_segment["start"]
            current_end = current_segment["end"]
    
    # Añadir el último segmento
    grouped_segments.append({
        "speaker": current_speaker,
        "text": current_text,
        "start": current_start,
        "end": current_end
    })
    
    return grouped_segments

def process_single_audio(audio_path):
    """Procesa un único archivo de audio y retorna los segmentos transcritos"""
    temp_wav_path = None
    
    try:
        print(f"Procesando: {os.path.basename(audio_path)}")
        
        # 1. Convertir a WAV si es MP3
        if audio_path.endswith('.mp3'):
            audio_path = convert_mp3_to_wav(audio_path)
            temp_wav_path = audio_path
        
        # 2. Detectar número de hablantes
        num_speakers = detect_number_of_speakers(audio_path)
        
        # 3. Cargar modelo de Whisper
        model = whisper.load_model("base")
        
        # 4. Transcribir el audio completo
        result = model.transcribe(audio_path)
        
        # 5. Filtrar segmentos muy cortos (probablemente ruido)
        segments = [
            segment for segment in result["segments"] 
            if segment["end"] - segment["start"] > 0.5  # Mínimo 0.5 segundos
        ]
        
        # 6. Diarización avanzada
        segments = advanced_speaker_diarization(audio_path, segments, num_speakers)
        
        # 7. Agrupar segmentos consecutivos del mismo hablante
        grouped_segments = group_consecutive_segments(segments)
        
        return grouped_segments
        
    except Exception as e:
        print(f"  Error procesando {audio_path}: {str(e)}")
        return None
        
    finally:
        # 8. Limpiar archivo temporal si existe
        if temp_wav_path and os.path.exists(temp_wav_path):
            os.remove(temp_wav_path)
            print(f"  Archivo temporal eliminado")

def save_transcription(segments, output_path, audio_filename):
    """Guarda la transcripción en formato legible con segmentos agrupados"""
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(f"TRANSCRIPCIÓN: {audio_filename}\n")
        f.write(f"FECHA DE PROCESAMIENTO: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("=" * 70 + "\n\n")
        
        for segment in segments:
            f.write(f"{segment['speaker']} ({segment['start']:.1f}s - {segment['end']:.1f}s):\n")
            f.write(f"  {segment['text']}\n\n")
    
    print(f"  Transcripción guardada en: {output_path}")

def analyze_speaker_statistics(segments):
    """Analiza estadísticas de los hablantes"""
    speakers = {}
    
    for segment in segments:
        speaker = segment["speaker"]
        duration = segment["end"] - segment["start"]
        
        if speaker not in speakers:
            speakers[speaker] = {
                "segments": 0,
                "total_duration": 0,
                "word_count": 0
            }
        
        speakers[speaker]["segments"] += 1
        speakers[speaker]["total_duration"] += duration
        speakers[speaker]["word_count"] += len(segment["text"].split())
    
    return speakers

def process_all_audios(input_folder="audios", output_folder="transcripcion"):
    """
    Procesa todos los archivos de audio en la carpeta de entrada
    y guarda las transcripciones en la carpeta de salida
    """
    # Crear carpeta de salida si no existe
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
        print(f"Carpeta '{output_folder}' creada")
    
    # Buscar archivos de audio
    audio_extensions = ['*.mp3', '*.wav', '*.m4a', '*.flac']
    audio_files = []
    
    for extension in audio_extensions:
        audio_files.extend(glob.glob(os.path.join(input_folder, extension)))
    
    if not audio_files:
        print(f"No se encontraron archivos de audio en la carpeta '{input_folder}'")
        return
    
    print(f"Se encontraron {len(audio_files)} archivos de audio para procesar")
    print("=" * 60)
    
    # Estadísticas generales
    total_files = len(audio_files)
    processed_files = 0
    failed_files = 0
    
    # Procesar cada archivo
    for audio_path in audio_files:
        audio_filename = os.path.basename(audio_path)
        output_filename = os.path.splitext(audio_filename)[0] + "_transcripcion.txt"
        output_path = os.path.join(output_folder, output_filename)
        
        print(f"\n[{processed_files + 1}/{total_files}] {audio_filename}")
        print("-" * 40)
        
        # Procesar el audio
        segments = process_single_audio(audio_path)
        
        if segments:
            # Guardar transcripción
            save_transcription(segments, output_path, audio_filename)
            
            # Mostrar estadísticas del archivo
            speaker_stats = analyze_speaker_statistics(segments)
            print("  --- ESTADÍSTICAS ---")
            for speaker, stats in speaker_stats.items():
                print(f"  {speaker}: {stats['segments']} segmentos, "
                      f"{stats['total_duration']:.1f}s, {stats['word_count']} palabras")
            
            processed_files += 1
        else:
            print(f"  ❌ Error al procesar {audio_filename}")
            failed_files += 1
        
        print("-" * 40)
    
    # Resumen final
    print(f"\n" + "=" * 60)
    print("PROCESAMIENTO COMPLETADO")
    print(f"Archivos procesados exitosamente: {processed_files}/{total_files}")
    print(f"Archivos con errores: {failed_files}")
    
    if processed_files > 0:
        print(f"Transcripciones guardadas en: {os.path.abspath(output_folder)}")

# Script principal
if __name__ == "__main__":
    # Configuración de carpetas
    INPUT_FOLDER = "Audios"
    OUTPUT_FOLDER = "Transcripciones"
    
    try:
        print("INICIANDO PROCESAMIENTO MASIVO DE AUDIOS")
        print("=" * 50)
        
        # Verificar que existe la carpeta de audios
        if not os.path.exists(INPUT_FOLDER):
            print(f"Error: No existe la carpeta '{INPUT_FOLDER}'")
            print("Creando estructura de carpetas...")
            os.makedirs(INPUT_FOLDER)
            print(f"Por favor, coloca tus archivos de audio en la carpeta '{INPUT_FOLDER}' y ejecuta el script nuevamente")
        else:
            # Procesar todos los audios
            process_all_audios(INPUT_FOLDER, OUTPUT_FOLDER)
        
    except Exception as e:
        print(f"Error durante el procesamiento: {str(e)}")
        import traceback
        traceback.print_exc()

INICIANDO PROCESAMIENTO MASIVO DE AUDIOS
Se encontraron 1 archivos de audio para procesar

[1/1] ejemplo.mp3
----------------------------------------
Procesando: ejemplo.mp3
  Detectando número de hablantes...
  Número de hablantes detectados: 2
  Realizando diarización avanzada...
  Archivo temporal eliminado
  Transcripción guardada en: Transcripciones/ejemplo_transcripcion.txt
  --- ESTADÍSTICAS ---
  Persona_02: 4 segmentos, 20.0s, 17 palabras
  Persona_01: 4 segmentos, 101.0s, 178 palabras
----------------------------------------

PROCESAMIENTO COMPLETADO
Archivos procesados exitosamente: 1/1
Archivos con errores: 0
Transcripciones guardadas en: /home/hm/Documents/Github/audio-to-text-conversion/audio-to-text-conversion/Transcripciones
