In [1]:
#!/usr/bin/env python
import os, json, argparse, datetime, math
import torch
import whisperx
import subprocess
import tempfile
import wave
import contextlib

In [2]:
def get_duration_sec(audio_path: str) -> float:
    try:
        with contextlib.closing(wave.open(audio_path,'r')) as f:
            frames = f.getnframes()
            rate = f.getframerate()
            return frames / float(rate)
    except Exception:
        # fallback con ffprobe si no es WAV
        try:
            out = subprocess.check_output([
                "ffprobe","-v","error","-show_entries","format=duration",
                "-of","default=noprint_wrappers=1:nokey=1", audio_path
            ]).decode().strip()
            return float(out)
        except Exception:
            return None

def merge_contiguous_turns(segments):
    """Une segmentos consecutivos del mismo speaker en un único turno."""
    if not segments: return []
    merged = []
    cur = dict(speaker=segments[0]["speaker"], start=segments[0]["start"], end=segments[0]["end"], text=segments[0]["text"])
    for seg in segments[1:]:
        if seg["speaker"] == cur["speaker"] and (seg["start"] - cur["end"]) <= 0.6:
            # pegar si están pegados o muy cercanos (0.6s)
            cur["end"] = seg["end"]
            cur["text"] += (" " if cur["text"] else "") + seg["text"]
        else:
            merged.append(cur)
            cur = dict(speaker=seg["speaker"], start=seg["start"], end=seg["end"], text=seg["text"])
    merged.append(cur)
    return merged

def guess_interviewer(speaker_stats, turns):
    # Heurística: entrevistador = quien hace más preguntas y habla menos tiempo.
    # 1) contar signos de interrogación y frases interrogativas por speaker
    q_words = ("¿", "?", "qué", "que", "quién", "quien", "cuándo", "cuando", "dónde", "donde",
               "por qué", "por que", "cómo", "como", "cuál", "cual", "cuáles", "cuales")
    q_score = {spk:0 for spk in speaker_stats}
    for t in turns:
        txt = t["text"].lower()
        if any(w in txt for w in q_words):
            q_score[t["speaker"]] += 1
    # 2) normalizar por tiempo total (quien pregunta más/tiempo) y habla menos
    best = None
    best_val = -1e9
    for spk, st in speaker_stats.items():
        time = st["total_sec"]
        asks = q_score.get(spk,0)
        # más preguntas por minuto y menos tiempo total => mayor score
        val = (asks / max(time,1e-6)) - 0.001*time
        if val > best_val:
            best_val = val
            best = spk
    return best

def build_qa(turns, interviewer):
    """Forma pares Q->A: pregunta del entrevistador y respuesta(s) hasta que el entrevistador hable de nuevo."""
    qa = []
    i = 0
    while i < len(turns):
        t = turns[i]
        if t["speaker"] == interviewer and ("?" in t["text"] or "¿" in t["text"]):
            q = {
                "q_speaker": interviewer,
                "q_start": t["start"],
                "q_end": t["end"],
                "question": t["text"].strip(),
                "answers": []
            }
            i += 1
            # recolecta todas las réplicas de otros speakers hasta que vuelva a hablar el entrevistador
            while i < len(turns) and turns[i]["speaker"] != interviewer:
                a = turns[i]
                if a["text"].strip():
                    q["answers"].append({
                        "a_speaker": a["speaker"],
                        "a_start": a["start"],
                        "a_end": a["end"],
                        "answer": a["text"].strip()
                    })
                i += 1
            qa.append(q)
        else:
            i += 1
    return qa

In [3]:
drive_path = os.getenv("DRIVE_PATH")
if not drive_path:
    raise ValueError("Debe definir la variable de entorno DRIVE_PATH con la ruta a Google Drive.")
drive_files = os.listdir(drive_path)

In [4]:
drive_files

['Maria Reyes.mp3',
 '60.m4a',
 'Antonio Jael.m4a',
 '77.m4a',
 'Yam jamett.m4a',
 'Casa 78.m4a',
 'Brigit Velasquez.mp4',
 'Casa 91.m4a',
 'Jessica Ulcuango.mp4',
 'Cecilia Corona.mp3',
 'Casa 68.m4a',
 'Maryory Valestrini.mp3',
 'Milexi Rodriguez 73.m4a',
 'Casa 71.m4a',
 'Casa 73.m4a',
 'Maria Fica.m4a',
 'Maryoribeth Isea.mp4',
 'Jonathan Pacheco.m4a',
 'Maria Acosta.m4a',
 'Tamara Lara.mp4',
 '75 -1.m4a',
 '69.m4a',
 'Nicolas Montanares.mp4']

In [None]:
ap = argparse.ArgumentParser()
# ap.add_argument("audio", help="Ruta del archivo de audio (wav/mp3/m4a/etc.)")
# ap.add_argument("-o","--out", help="Ruta del JSON de salida", default=None)
# ap.add_argument("--model", default="large-v3", help="Modelo WhisperX (p.ej., large-v3, medium, small)")
# ap.add_argument("--lang", default="es", help="Idioma esperado (es, en, etc.)")
# ap.add_argument("--hf_token", default=os.getenv("HF_TOKEN"), help="Token de Hugging Face para diarización")
# ap.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu")
args = ap.parse_args()

# audio_path = args.audio
audio_path = os.path.join(drive_path, drive_files[0])
out_path = args.out or (os.path.splitext(audio_path)[0] + ".json")

device = args.device
compute_type = "float16" if (device=="cuda") else "int8"

# 1) Transcribir
model = whisperx.load_model(args.model, device=device, compute_type=compute_type, language=args.lang)
asr_result = model.transcribe(audio_path, batch_size=16)

# 2) Alinear palabras a tiempo exacto
align_model, metadata = whisperx.load_align_model(language_code=asr_result["language"], device=device)
aligned = whisperx.align(asr_result["segments"], align_model, metadata, audio_path, device)

# 3) Diarizar (requiere HF token y haber aceptado el modelo en HF)
diarize_pipeline = whisperx.DiarizationPipeline(use_auth_token=args.hf_token, device=device)
diarize_segments = diarize_pipeline(audio_path)

# 4) Asignar hablantes a palabras y recomponer segmentos
diarized = whisperx.assign_word_speakers(diarize_segments, aligned)

# Construir lista de segmentos (speaker, start, end, text)
# whisperx deja "segments" con palabras alineadas y speaker en "words"
segs = []
for seg in diarized["segments"]:
    # calcular ventana por min/max de words (start/end)
    if not seg.get("words"):
        continue
    words = [w for w in seg["words"] if "start" in w and "end" in w and "speaker" in w]
    if not words:
        continue
    # partir por cambios de speaker dentro del segmento
    current_spk = words[0]["speaker"]
    current_start = words[0]["start"]
    current_text = []
    for w in words:
        if w["speaker"] != current_spk:
            # cerrar tramo anterior
            segs.append({
                "speaker": current_spk,
                "start": current_start,
                "end": prev_end,
                "text": " ".join(current_text).strip()
            })
            # abrir nuevo tramo
            current_spk = w["speaker"]
            current_start = w["start"]
            current_text = [w.get("word","")]
        else:
            current_text.append(w.get("word",""))
        prev_end = w["end"]
    # cerrar último
    segs.append({
        "speaker": current_spk,
        "start": current_start,
        "end": prev_end,
        "text": " ".join(current_text).strip()
    })

# Unir tramos contiguos del mismo speaker
turns = merge_contiguous_turns(sorted(segs, key=lambda x: (x["start"], x["end"])))

# Estadísticas por speaker
speaker_stats = {}
for t in turns:
    d = t["end"] - t["start"]
    spk = t["speaker"]
    if spk not in speaker_stats:
        speaker_stats[spk] = {"total_sec":0.0, "num_utts":0}
    speaker_stats[spk]["total_sec"] += max(0.0, d)
    speaker_stats[spk]["num_utts"] += 1

# Detectar entrevistador
interviewer = guess_interviewer(speaker_stats, turns)

# Marcar entrevistador en stats
speakers_list = []
for spk, st in sorted(speaker_stats.items()):
    speakers_list.append({
        "id": spk,
        "total_sec": round(st["total_sec"], 3),
        "num_utts": st["num_utts"],
        "is_interviewer": (spk == interviewer)
    })

# Construir pares Q→A
qa = build_qa(turns, interviewer)

# Metadata
duration = get_duration_sec(audio_path) or 0
result_json = {
    "meta": {
        "source_audio": os.path.abspath(audio_path),
        "language": asr_result.get("language", args.lang),
        "duration_sec": round(duration, 3),
        "created_utc": datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
        "toolchain": {
            "asr": f"whisperx-{args.model}",
            "diarization": "pyannote",
            "alignment": "mfa"
        }
    },
    "speakers": speakers_list,
    "turns": [
        {
            "speaker": t["speaker"],
            "start": round(t["start"], 3),
            "end": round(t["end"], 3),
            "text": t["text"]
        } for t in turns if t["text"]
    ],
    "qa": qa
}

with open(out_path, "w", encoding="utf-8") as f:
    json.dump(result_json, f, ensure_ascii=False, indent=2)

print(f"✅ Listo: {out_path}")