## __üéß Audio Diarization Project (pyannote.audio)__

#### üß± Requisitos del sistema
- Python 3.12.12
- Cuenta en Hugging Face con acceso a modelos pyannote/

## üìå Descripci√≥n
Este proyecto utiliza el pipeline pyannote/speaker-diarization-3.0 para:
- detectar segmentos de voz
- identificar hablantes distintos
- preparar el audio para cortes posteriores

In [None]:
# 0. Check python version
!python3 --version

In [None]:
# 1. Limpiar entorno previo
!pip uninstall -y numpy torch torchaudio pyannote.audio

In [None]:
# 2. Install ffmpeg
!apt-get update
!apt-get install -y ffmpeg

In [None]:
# 3. Install main libraries
!pip install "numpy<2.0" \
             "torch==2.3.1" \
             "torchvision==0.18.1" \
             "torchaudio==2.3.1" \
             "onnxruntime" \
             "huggingface-hub>=0.19.4" \
             "pyannote.audio==3.1.1" \
             "pydub" \
             "ipywidgets"

# NOTA IMPORTANTE:
# Al terminar, ve al men√∫ arriba: "Entorno de ejecuci√≥n" -> "Reiniciar sesi√≥n".

In [None]:
import os
import torch

from pydub import AudioSegment
from pydub.effects import normalize
from pydub.silence import detect_silence

from pyannote.audio import Pipeline
import ipywidgets as widgets

In [None]:
# Get HF token from Colab userdata
from google.colab import userdata
HF_TOKEN = userdata.get('PYANNOTE_HF_TOKEN')

In [None]:
# Test diarization pipeline
pipeline = Pipeline.from_pretrained(
   'pyannote/speaker-diarization-3.0',
   use_auth_token=HF_TOKEN
)

_______________
1¬∞ Try: Diarization
_______________

In [None]:
INPUT_DIR = "./audios"
OUTPUT_DIR = "./output"
PRE_CUT_SECONDS = 1.5  # segundos antes de que empiece el ni√±o
MIN_CHILD_SEGMENT = 1.0  # duraci√≥n m√≠nima para considerar "voz del ni√±o"
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [None]:
# ==========================
# FUNCI√ìN PRINCIPAL
# ==========================
def process_audio(audio_path):
    print(f"Procesando: {os.path.basename(audio_path)}")

    # Cargar audio
    audio = AudioSegment.from_wav(audio_path)
    duration = len(audio) / 1000  # segundos

    # Diarizaci√≥n
    diarization = pipeline(audio_path)

    # Extraer segmentos ordenados
    segments = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        segments.append({
            "speaker": speaker,
            "start": turn.start,
            "end": turn.end,
            "duration": turn.end - turn.start
        })

    segments.sort(key=lambda x: x["start"])

    if len(segments) < 2:
        print("‚ö†Ô∏è No se detectaron suficientes hablantes")
        return None

    # ==========================
    # Siempre se cumple:
    # - Primer hablante = profesor
    # - Segundo hablante largo = ni√±o
    # ==========================
    first_speaker = segments[0]["speaker"]

    child_start = None
    for seg in segments:
        if seg["speaker"] != first_speaker and seg["duration"] >= MIN_CHILD_SEGMENT:
            child_start = seg["start"]
            break

    if child_start is None:
        print("‚ö†Ô∏è No se detect√≥ inicio del ni√±o")
        return None

    # Aplicar margen
    cut_time = max(0, child_start - PRE_CUT_SECONDS)

    # Cortar audio
    cut_audio = audio[int(cut_time * 1000):]

    return cut_audio, cut_time

In [None]:
!unzip audios.zip

In [None]:
# ==========================
# PROCESAR CARPETA
# ==========================
for file in os.listdir(INPUT_DIR):
    if not file.lower().endswith(".wav"):
        continue

    input_path = os.path.join(INPUT_DIR, file)
    output_path = os.path.join(
        OUTPUT_DIR,
        file.replace(".wav", "_cut.wav")
    )

    result = process_audio(input_path)

    if result is None:
        continue

    cut_audio, cut_time = result
    cut_audio.export(output_path, format="wav")

    print(f"‚úî Guardado: {output_path} (corte en {cut_time:.2f}s)")

______________________
2¬∞ Try: VAD
‚ùå Problemas reales de diarizaci√≥n en estos audios

El profesor y el ni√±o suelen ser clasificados como el mismo speaker
Voces cercanas al micr√≥fono
Duraci√≥n corta del profesor
Ruido ambiente
Voz infantil ‚Üí embeddings poco estables
El ni√±o puede empezar a leer sin una pausa clara
El modelo no detecta un ‚Äúcambio de hablante‚Äù
Todo queda como SPEAKER_00
Segmentos del ni√±o aparecen fragmentados
Muchos segmentos cortos < 1.0s
Nunca cumples duration >= MIN_CHILD_SEGMENT
Diarizaci√≥n ‚â† detecci√≥n de ‚Äúinicio sem√°ntico‚Äù
Se busca inicio de lectura
El modelo solo sabe: qui√©n habla cu√°ndo
Por lo tanto:

No se busca detectar otro hablante.
Se quiere identificar el momento en que comienza una voz continua distinta al c√≥digo inicial.
Se utilizar√° VAD Voice activity detection -> ACTIVIDAD DE VOZ
______________________

In [None]:
from pyannote.audio import Pipeline
from pydub import AudioSegment
import os

vad = Pipeline.from_pretrained(
    "pyannote/voice-activity-detection",
    use_auth_token=HF_TOKEN
)

MIN_READING_DURATION = 4.0   # segundos continuos leyendo
PRE_CUT_SECONDS = 1.5

INPUT_DIR = "./audios"
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [None]:
def process_audio_vad(audio_path):
    audio = AudioSegment.from_wav(audio_path)

    vad_result = vad(audio_path)

    segments = []
    for segment in vad_result.get_timeline():
        segments.append({
            "start": segment.start,
            "end": segment.end,
            "duration": segment.end - segment.start
        })

    # print('##############################################')
    # print(audio_path)
    # print(segments)
    # print('##############################################')

    # Agrupar segmentos cercanos
    merged = []
    for seg in segments:
        if not merged:
            merged.append(seg)
        else:
            last = merged[-1]
            if seg["start"] - last["end"] < 0.4:  # tolerancia silencio corto
                last["end"] = seg["end"]
                last["duration"] = last["end"] - last["start"]
            else:
                merged.append(seg.copy())


    # print('##############################################')
    print(audio_path)c:\Users\Denisse Orellana\Desktop\editar_audio\test._2.json
    print(merged)
    # print('##############################################'

    # Buscar primer bloque largo
    acc = 0.0
    for block in merged:
        acc += block["duration"]
        block["duration_acc"] = acc

        # if block["duration"] >= MIN_READING_DURATION:
        if block["duration_acc"] >= MIN_READING_DURATION:
            cut_time = max(0, block["start"] - PRE_CUT_SECONDS)
            cut_audio = audio[int(cut_time * 1000):]
            return cut_audio, cut_time

    print("‚ö†Ô∏è No se detect√≥ lectura continua")
    return None

In [None]:
# ==========================
# PROCESAR CARPETA
# ==========================
for file in os.listdir(INPUT_DIR):
    if not file.lower().endswith(".wav"):
        continue

    input_path = os.path.join(INPUT_DIR, file)
    output_path = os.path.join(
        OUTPUT_DIR,
        file.replace(".wav", "_cut.wav")
    )

    result = process_audio_vad(input_path)

    if result is None:
        continue

    cut_audio, cut_time = result
    cut_audio.export(output_path, format="wav")

    print(f"‚úî Guardado: {output_path} (corte en {cut_time:.2f}s)")

In [None]:
# Download zip folder with files
import os
import shutil
from google.colab import files

download_dir = './output'
zip_path = './output.zip'

# Crear un ZIP con todos los archivos
shutil.make_archive(zip_path.replace('.zip', ''), 'zip', download_dir)

# Descargar el ZIP completo
files.download(zip_path)

_________________________
3¬∞ Try: VAD + Construcci√≥n serie temporal de voz
_________________________

In [None]:
from pyannote.audio import Pipeline
from pydub import AudioSegment
import os

vad = Pipeline.from_pretrained(
    "pyannote/voice-activity-detection",
    use_auth_token=HF_TOKEN
)

INPUT_DIR = "./audios"
OUTPUT_DIR = "./output"
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [None]:
WINDOW_SECONDS = 8.0     # ventana m√°xima de an√°lisis
MIN_TOTAL_VOICE = 4.0   # voz acumulada dentro de la ventana
PRE_CUT_SECONDS = 1.5

In [None]:
def find_reading_start(segments):
    """
    segments: lista de dicts con start, end, duration
    return: start_time o None
    """

    for i, seg in enumerate(segments):
        window_start = seg["start"]
        window_end = window_start + WINDOW_SECONDS

        total_voice = 0.0

        for s in segments[i:]:
            if s["start"] > window_end:
                break

            overlap_start = max(s["start"], window_start)
            overlap_end = min(s["end"], window_end)

            if overlap_end > overlap_start:
                total_voice += overlap_end - overlap_start

            if total_voice >= MIN_TOTAL_VOICE:
                return window_start

    return None

In [None]:
# Construir serie temporal de voz
def build_voice_timeline(segments, resolution=0.1):
    timeline = []
    t = 0.0
    end_time = max(s["end"] for s in segments)

    while t < end_time:
        is_voice = any(s["start"] <= t < s["end"] for s in segments)
        timeline.append((t, 1 if is_voice else 0))
        t += resolution

    return timeline

In [None]:
# Detectar cambio estructural (sin umbral fijo)
import numpy as np

def detect_reading_start_adaptive(timeline, resolution=0.1):
    values = np.array([v for _, v in timeline])

    # promedio m√≥vil acumulado
    cumulative_density = np.cumsum(values) / np.arange(1, len(values)+1)

    # derivada (cambio)
    diff = np.diff(cumulative_density)

    # punto de mayor cambio positivo
    idx = np.argmax(diff)

    return timeline[idx][0]

In [None]:
def process_audio(audio_path):
    audio = AudioSegment.from_wav(audio_path)
    vad_result = vad(audio_path)

    segments = [{
        "start": s.start,
        "end": s.end
    } for s in vad_result.get_timeline()]

    if not segments:
        return None

    timeline = build_voice_timeline(segments)
    reading_start = detect_reading_start_adaptive(timeline)

    # margen din√°mico (proporcional)
    pre_cut = max(0.5, reading_start * 0.15)

    cut_time = max(0, reading_start - pre_cut)
    cut_audio = audio[int(cut_time * 1000):]

    return cut_audio, cut_time


In [None]:
# ==========================
# PROCESAR CARPETA
# ==========================
for file in os.listdir(INPUT_DIR):
    if not file.lower().endswith(".wav"):
        continue

    input_path = os.path.join(INPUT_DIR, file)
    output_path = os.path.join(
        OUTPUT_DIR,
        file.replace(".wav", "_cut.wav")
    )

    result = process_audio(input_path)

    if result is None:
        continue

    cut_audio, cut_time = result
    cut_audio.export(output_path, format="wav")

    print(f"‚úî Guardado: {output_path} (corte en {cut_time:.2f}s)")