In [None]:
"""
Audio Similarity Search with Resemblyzer + venv-based FFmpeg
------------------------------------------------------------
- Works offline with FFmpeg binary from imageio-ffmpeg inside your virtual environment
- Converts any input format (mp3, m4a, flac, etc.) to 16 kHz mono WAV before embedding
- Copies matches to output folder and writes CSV report

Requirements:
    pip install torch resemblyzer librosa soundfile tqdm pandas imageio-ffmpeg
"""

from pathlib import Path
import shutil, math, warnings, tempfile, subprocess
import numpy as np
import pandas as pd
from tqdm import tqdm
from resemblyzer import VoiceEncoder, preprocess_wav
import librosa
import imageio_ffmpeg as ffmpegio

# -----------------------------
# HARD-CODED PARAMETERS
# -----------------------------
REFERENCE_PATH = Path("../../datasets/audio/Voice 250810_182638.m4a")   # Known suspect voice
SUSPECT_DIR = Path("../../datasets/audio/gallery")      # Folder with suspect audio
OUT_DIR = Path("../../datasets/audio/out_dir")
THRESHOLD      = 0.80                            # Cosine similarity threshold
REPORT_CSV     = Path("../audio_similarity_report.csv")
WINDOW_SEC     = 0.0                             # 0 => whole clip; else sliding window size (e.g., 3.0)
HOP_SEC        = 1.0                             # hop between windows
COPY_ALL       = True                           # copy all files with score prefix
MIN_DURATION   = 0.5                             # skip files shorter than this

AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".aac", ".wma", ".opus", ".aiff", ".aif"}
TARGET_SR  = 16000

# -----------------------------
# Utility functions
# -----------------------------
FFMPEG_BIN = ffmpegio.get_ffmpeg_exe()

def convert_to_temp_wav_16k_mono(src_path: Path) -> Path:
    """Convert any format to temp 16kHz mono WAV using venv FFmpeg."""
    tmpdir = Path(tempfile.mkdtemp(prefix="audio_conv_"))
    dst = tmpdir / (src_path.stem + "_16k_mono.wav")
    cmd = [
        FFMPEG_BIN, "-y", "-loglevel", "error",
        "-i", str(src_path),
        "-ar", str(TARGET_SR), "-ac", "1",
        str(dst)
    ]
    subprocess.run(cmd, check=True)
    return dst

def find_audio_files(root: Path):
    return sorted([p for p in root.rglob("*") if p.is_file() and p.suffix.lower() in AUDIO_EXTS])

def safe_mkdir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    denom = (np.linalg.norm(a) * np.linalg.norm(b)) + 1e-9
    return float(np.dot(a, b) / denom)

def embed_whole_clip(encoder: VoiceEncoder, audio_path: Path):
    """Always convert to wav first to avoid decoder issues."""
    try:
        wav_path = convert_to_temp_wav_16k_mono(audio_path)
        wav = preprocess_wav(wav_path)  # handles VAD + loudness norm
        if wav.size == 0:
            return None
        return encoder.embed_utterance(wav)
    except Exception as e:
        warnings.warn(f"Failed to process {audio_path}: {e}")
        return None

def embed_sliding_windows(encoder: VoiceEncoder, audio_path: Path, window_sec: float, hop_sec: float):
    """Sliding window embedding average."""
    try:
        wav_path = convert_to_temp_wav_16k_mono(audio_path)
        wav, sr = librosa.load(wav_path, sr=TARGET_SR, mono=True)
        win = int(window_sec * sr)
        hop = int(hop_sec * sr)
        if win <= 0 or hop <= 0 or wav.size < win:
            return encoder.embed_utterance(preprocess_wav(wav_path))
        embs = []
        for start in range(0, len(wav) - win + 1, hop):
            seg = wav[start:start+win]
            if seg.size == 0:
                continue
            emb = encoder.embed_utterance(seg)
            embs.append(emb)
        if not embs:
            return None
        return np.mean(np.vstack(embs), axis=0)
    except Exception as e:
        warnings.warn(f"Sliding-window embed failed for {audio_path}: {e}")
        return None

# -----------------------------
# Main workflow
# -----------------------------
def run_similarity_search():
    safe_mkdir(OUT_DIR)
    if not REFERENCE_PATH.exists():
        raise FileNotFoundError(f"Reference file not found: {REFERENCE_PATH}")
    if not SUSPECT_DIR.exists() or not SUSPECT_DIR.is_dir():
        raise FileNotFoundError(f"Suspect directory not found: {SUSPECT_DIR}")
    if OUT_DIR == SUSPECT_DIR:
        raise ValueError("OUT_DIR must be different from SUSPECT_DIR.")

    print("[*] Initializing VoiceEncoder...")
    encoder = VoiceEncoder()

    print(f"[*] Embedding reference: {REFERENCE_PATH.name}")
    if WINDOW_SEC > 0:
        ref_emb = embed_sliding_windows(encoder, REFERENCE_PATH, WINDOW_SEC, HOP_SEC)
    else:
        ref_emb = embed_whole_clip(encoder, REFERENCE_PATH)
    if ref_emb is None:
        raise RuntimeError("Failed to embed reference audio.")

    files = find_audio_files(SUSPECT_DIR)
    print(f"[*] Found {len(files)} audio files to scan.")
    results, matches = [], []

    for apath in tqdm(files, desc="Scanning"):
        try:
            dur = librosa.get_duration(path=convert_to_temp_wav_16k_mono(apath))
            if dur < MIN_DURATION:
                continue
        except Exception:
            pass

        if WINDOW_SEC > 0:
            emb = embed_sliding_windows(encoder, apath, WINDOW_SEC, HOP_SEC)
        else:
            emb = embed_whole_clip(encoder, apath)

        if emb is None:
            score = float("nan")
            ok = False
        else:
            score = cosine_similarity(ref_emb, emb)
            ok = (score >= THRESHOLD)

        results.append({
            "file": str(apath),
            "filename": apath.name,
            "similarity": score,
            "is_match": bool(ok),
        })

        try:
            if COPY_ALL:
                score_str = "nan" if math.isnan(score) else f"{score:.3f}"
                shutil.copy2(apath, OUT_DIR / f"{score_str}__{apath.name}")
            elif ok:
                shutil.copy2(apath, OUT_DIR / apath.name)
                matches.append(apath)
        except Exception as e:
            warnings.warn(f"Copy failed {apath}: {e}")

    df = pd.DataFrame(results)
    df.sort_values(by="similarity", ascending=False, inplace=True, na_position="last")
    df.to_csv(REPORT_CSV, index=False)

    print(f"[*] Report written: {REPORT_CSV}")
    if COPY_ALL:
        print(f"[*] Copied all files to: {OUT_DIR}")
    else:
        print(f"[*] Matches copied to: {OUT_DIR} ({len(matches)} files)")
    return df

# -----------------------------
# Run
# -----------------------------
df_results = run_similarity_search()
df_results.head()
