# Ainu Speech: Recorded Audio vs. Sample Set

This notebook analyzes recorded audio and compares it against a sample Ainu audio set using MFCCs, mel features, DTW-based similarity, and optional pretrained speech embeddings.

## 1) Install and Import Dependencies

In [None]:
# If running in a fresh environment, uncomment and run this cell once
# !pip install -r ../requirements.txt

import os
import json
import math
from pathlib import Path
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

import librosa
import soundfile as sf

# Local utilities
import sys
ROOT = Path("..").resolve()
sys.path.append(str((ROOT / 'src').resolve()))
from audio_utils import load_audio, trim_silence, extract_features, zscore, dtw_distance, cosine_similarity_matrix, compare_recording_to_samples

# Optional torch/transformers (guarded)
try:
    import torch
    import torchaudio
    from transformers import AutoProcessor, AutoModel
    HAS_TORCH = True
except Exception:
    HAS_TORCH = False

sns.set_theme(context="notebook", style="whitegrid")
plt.rcParams["figure.dpi"] = 120

## 2) Configure Paths and Parameters

In [None]:
DATA_DIR = (ROOT / 'data').resolve()
SAMPLES_DIR = DATA_DIR / 'samples'
RECORDINGS_DIR = DATA_DIR / 'recordings'
OUTPUT_DIR = (ROOT / 'outputs').resolve()
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Audio parameters
TARGET_SR = 16000
N_MELS = 64
N_MFCC = 13
HOP_LENGTH = 160    # 10 ms @ 16 kHz
N_FFT = 400         # 25 ms window @ 16 kHz
TRIM_DB = 30.0

# Ranking
TOP_K = 5
DTW_METRIC = 'cosine'  # could be 'euclidean'

# Embedding model (optional)
EMBED_MODEL_ID = 'facebook/wav2vec2-base-960h'  # multilingual alternative: 'facebook/wav2vec2-large-xlsr-53'

print('Samples dir:', SAMPLES_DIR)
print('Recordings dir:', RECORDINGS_DIR)

# Helpers
AUDIO_EXTS = {'.wav', '.flac', '.mp3', '.ogg', '.m4a'}

def list_audio_files(folder: Path) -> List[Path]:
    return [p for p in folder.rglob('*') if p.suffix.lower() in AUDIO_EXTS]

def latest_file(folder: Path) -> Path | None:
    files = list_audio_files(folder)
    if not files:
        return None
    return max(files, key=lambda p: p.stat().st_mtime)


## 3) Load Sample Ainu Audio Dataset

In [None]:
def infer_label_from_path(p: Path) -> str:
    # e.g., data/samples/<label>/<file>.wav -> use the parent folder as label
    try:
        return p.parent.name
    except Exception:
        return "unknown"

sample_files = list_audio_files(SAMPLES_DIR)
if not sample_files:
    print(f"No sample files found in {SAMPLES_DIR}. Please add WAV/FLAC/etc.")

samples_df = pd.DataFrame({
    'path': [str(p) for p in sample_files],
    'label': [infer_label_from_path(p) for p in sample_files],
    'relpath': [str(p.relative_to(SAMPLES_DIR)) for p in sample_files]
})
print(f"Indexed {len(samples_df)} sample files.")
samples_df.head(10)

## 4) Load and Preprocess Recorded Audio

In [None]:
# Choose a recorded file: either specify manually or take the latest in the recordings folder
REC_FILE = latest_file(RECORDINGS_DIR)
if REC_FILE is None:
    print(f"No recorded audio found in {RECORDINGS_DIR}. Place a WAV/FLAC/MP3 file there.")
else:
    print("Using recording:", REC_FILE)

rec_audio = None
if REC_FILE is not None:
    rec_audio = load_audio(str(REC_FILE), target_sr=TARGET_SR, mono=True)
    # Peak normalize to -1 dBFS
    peak = np.max(np.abs(rec_audio.y)) if rec_audio.y.size else 0
    if peak > 1e-6:
        rec_audio = rec_audio.__class__(y=rec_audio.y / peak * 0.89, sr=rec_audio.sr, path=rec_audio.path)
    print(f"Recording duration: {rec_audio.y.shape[0]/rec_audio.sr:.2f} s @ {rec_audio.sr} Hz")

## 5) Voice Activity Detection and Silence Trimming

In [None]:
trimmed_audio = None
if rec_audio is not None:
    trimmed_audio = trim_silence(rec_audio, top_db=TRIM_DB)
    dur = trimmed_audio.y.shape[0] / trimmed_audio.sr
    print(f"Trimmed duration: {dur:.2f} s (top_db={TRIM_DB})")

# Optional: segment long files into voiced chunks using energy-based VAD
# For simplicity, we keep a single trimmed utterance here.


## 6) Feature Extraction (Log-Mel Spectrograms, MFCC)

In [None]:
# Feature extraction for recording
F_rec = None
if trimmed_audio is not None:
    F_rec = extract_features(trimmed_audio, n_mfcc=N_MFCC, hop_length=HOP_LENGTH, n_fft=N_FFT,
                             add_deltas=True, add_prosody=True)
    print('Recording features:', F_rec.shape)

# Feature extraction for samples (on-the-fly, small sets; for large sets, see caching section)
sample_feats = []
for p in tqdm(sample_files, desc='Extracting sample features'):
    try:
        a = trim_silence(load_audio(str(p), target_sr=TARGET_SR))
        F = extract_features(a, n_mfcc=N_MFCC, hop_length=HOP_LENGTH, n_fft=N_FFT,
                             add_deltas=True, add_prosody=True)
        sample_feats.append((str(p), F))
    except Exception as e:
        print('Error processing', p, e)

print(f"Extracted features for {len(sample_feats)} / {len(sample_files)} samples")

## 7) Feature Normalization and Length Handling

In [None]:
if F_rec is not None:
    F_rec_z = zscore(F_rec)
else:
    F_rec_z = None

sample_feats_z = []
for path, F in sample_feats:
    sample_feats_z.append((path, zscore(F)))

print('Normalized features. Ready for DTW.')

## 8) DTW-Based Similarity Scoring

In [None]:
dtw_results = []
if F_rec_z is not None:
    for path, Fz in tqdm(sample_feats_z, desc='DTW scoring'):
        try:
            dist, _ = dtw_distance(F_rec_z, Fz, metric=DTW_METRIC)
            # Normalize by combined length to reduce bias
            norm = (F_rec_z.shape[0] + Fz.shape[0])
            score = dist / max(norm, 1)
            dtw_results.append({
                'path': path,
                'label': infer_label_from_path(Path(path)),
                'dtw_dist': dist,
                'dtw_score': score,
                'n_frames_sample': Fz.shape[0]
            })
        except Exception as e:
            print('DTW failed for', path, e)

    dtw_df = pd.DataFrame(dtw_results).sort_values(['dtw_score', 'dtw_dist'], ascending=[True, True])
    display(dtw_df.head(TOP_K))
else:
    dtw_df = pd.DataFrame()


## 9) Embedding-Based Similarity (Optional, Pretrained Model)

In [None]:
embed_df = pd.DataFrame()
if HAS_TORCH:
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print('Embedding device:', device)
    try:
        processor = AutoProcessor.from_pretrained(EMBED_MODEL_ID)
        model = AutoModel.from_pretrained(EMBED_MODEL_ID).to(device).eval()
        def embed_audio(y: np.ndarray, sr: int) -> np.ndarray:
            # Resample to model's expected input if needed
            target_sr = getattr(processor, 'sampling_rate', 16000)
            if sr != target_sr:
                y_res = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
                sr_use = target_sr
            else:
                y_res = y
                sr_use = sr
            with torch.no_grad():
                inputs = processor(y_res, sampling_rate=sr_use, return_tensors='pt', padding=True)
                for k in inputs:
                    inputs[k] = inputs[k].to(device)
                out = model(**inputs)
                # Last hidden state: [B, T, C] -> mean over time
                h = out.last_hidden_state.mean(dim=1).squeeze(0).detach().cpu().numpy()
                return h
        
        rec_emb = None
        if trimmed_audio is not None:
            rec_emb = embed_audio(trimmed_audio.y, trimmed_audio.sr)
        emb_rows = []
        if rec_emb is not None:
            rec_norm = rec_emb / (np.linalg.norm(rec_emb) + 1e-8)
            for p in tqdm(sample_files, desc='Embedding scoring'):
                try:
                    a = load_audio(str(p), target_sr=TARGET_SR)
                    e = embed_audio(a.y, a.sr)
                    e_norm = e / (np.linalg.norm(e) + 1e-8)
                    cos = float(np.dot(rec_norm, e_norm))
                    emb_rows.append({'path': str(p), 'label': infer_label_from_path(Path(p)), 'cos_sim': cos})
                except Exception as e:
                    print('Embedding failed for', p, e)
            embed_df = pd.DataFrame(emb_rows).sort_values('cos_sim', ascending=False)
            display(embed_df.head(TOP_K))
    except Exception as e:
        print('Embedding section skipped:', e)
else:
    print('Torch/transformers not available. Skipping embedding section. To enable, install torch, torchaudio, transformers.')

## 10) Ranking and Thresholding of Matches

In [None]:
# Combine DTW and embedding (if available). If no embeddings, use DTW only.
rank_df = pd.DataFrame()
if not dtw_df.empty:
    rank_df = dtw_df[['path', 'label', 'dtw_score']].copy()
    if not embed_df.empty:
        rank_df = rank_df.merge(embed_df[['path', 'cos_sim']], on='path', how='left')
        # Convert to a unified score: lower dtw better, higher cos better
        # Normalize dtw to [0,1] by min-max over candidates; invert for similarity
        dtw_min, dtw_max = rank_df['dtw_score'].min(), rank_df['dtw_score'].max()
        if dtw_max > dtw_min:
            rank_df['dtw_sim'] = 1.0 - (rank_df['dtw_score'] - dtw_min) / (dtw_max - dtw_min)
        else:
            rank_df['dtw_sim'] = 1.0
        rank_df['cos_sim'] = rank_df['cos_sim'].fillna(rank_df['cos_sim'].min() if not rank_df['cos_sim'].isna().all() else 0.0)
        rank_df['score'] = 0.6 * rank_df['dtw_sim'] + 0.4 * rank_df['cos_sim']
        rank_df = rank_df.sort_values('score', ascending=False)
    else:
        # Use 1/dtw as similarity proxy
        rank_df['score'] = -rank_df['dtw_score']
        rank_df = rank_df.sort_values('score', ascending=False)

    display(rank_df.head(TOP_K))
else:
    print('No DTW results to rank.')

## 11) Visualization of Waveforms, Spectrograms, and DTW Path

In [None]:
from IPython.display import Audio, display as ipydisplay

if rec_audio is not None:
    ipydisplay(Audio(rec_audio.y, rate=rec_audio.sr))

if F_rec_z is not None and not dtw_df.empty:
    top_path = dtw_df.iloc[0]['path']
    samp = trim_silence(load_audio(top_path, target_sr=TARGET_SR))
    F_samp = extract_features(samp, n_mfcc=N_MFCC, hop_length=HOP_LENGTH, n_fft=N_FFT, add_deltas=True, add_prosody=True)
    F_samp_z = zscore(F_samp)

    # Waveforms
    fig, axes = plt.subplots(2, 1, figsize=(10, 6), constrained_layout=True)
    librosa.display.waveshow(trimmed_audio.y, sr=trimmed_audio.sr, ax=axes[0])
    axes[0].set_title(f'Recording: {Path(rec_audio.path).name}')
    librosa.display.waveshow(samp.y, sr=samp.sr, ax=axes[1], color='orange')
    axes[1].set_title(f'Top Sample: {Path(samp.path).name}')
    plt.show()

    # Similarity matrix and DTW path
    cost = 1.0 - cosine_similarity_matrix(F_rec_z, F_samp_z)
    dist, path = dtw_distance(F_rec_z, F_samp_z, metric=DTW_METRIC)

    fig, ax = plt.subplots(figsize=(6,5))
    im = ax.imshow(cost, origin='lower', aspect='auto', cmap='magma')
    ax.set_title(f'DTW Cost Matrix (dist={dist:.2f})')
    ax.set_xlabel('Sample frames')
    ax.set_ylabel('Recording frames')
    # Plot path
    ax.plot(path['index2'], path['index1'], color='cyan', linewidth=1)
    fig.colorbar(im, ax=ax)
    plt.show()

## 12) Batch Processing and Caching of Features

In [None]:
CACHE_DIR = OUTPUT_DIR / 'cache'
CACHE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_FILE = CACHE_DIR / 'samples_features_mfcc_prosody_npz.npz'

if not sample_files:
    print('No samples to cache.')
else:
    if not CACHE_FILE.exists():
        print('Caching sample features to', CACHE_FILE)
        arrays = {}
        meta = {}
        for i, (p, F) in enumerate(sample_feats):
            key = f'F_{i}'
            arrays[key] = F.astype(np.float32)
            meta[key] = {'path': p}
        np.savez_compressed(CACHE_FILE, **arrays)
        with open(CACHE_FILE.with_suffix('.json'), 'w') as f:
            json.dump(meta, f, indent=2)
    else:
        print('Cache already exists:', CACHE_FILE)


## 13) Results Export (CSV/JSON) and Artifact Saving

In [None]:
RESULTS_DIR = OUTPUT_DIR / 'results'
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

if not rank_df.empty:
    rec_name = Path(rec_audio.path).stem if rec_audio is not None else 'unknown_recording'
    out_csv = RESULTS_DIR / f'{rec_name}_top{TOP_K}.csv'
    rank_df.head(TOP_K).to_csv(out_csv, index=False)
    print('Saved results to', out_csv)

    # Save similarity matrix plot for the top result
    if not dtw_df.empty:
        top_path = dtw_df.iloc[0]['path']
        samp = trim_silence(load_audio(top_path, target_sr=TARGET_SR))
        F_samp = extract_features(samp, n_mfcc=N_MFCC, hop_length=HOP_LENGTH, n_fft=N_FFT, add_deltas=True, add_prosody=True)
        F_samp_z = zscore(F_samp)
        cost = 1.0 - cosine_similarity_matrix(F_rec_z, F_samp_z)
        fig, ax = plt.subplots(figsize=(6,5))
        im = ax.imshow(cost, origin='lower', aspect='auto', cmap='magma')
        ax.set_title('Cost matrix')
        ax.set_xlabel('Sample frames')
        ax.set_ylabel('Recording frames')
        fig.colorbar(im, ax=ax)
        fig_path = RESULTS_DIR / f'{rec_name}_top1_cost_matrix.png'
        fig.savefig(fig_path, dpi=150)
        plt.close(fig)
        print('Saved plot to', fig_path)
else:
    print('No ranking to export.')

## 14) Reusable Functions and Simple Widgets

In [None]:
import ipywidgets as W


def run_comparison(recording_path: Path, samples_dir: Path) -> pd.DataFrame:
    if not recording_path.exists():
        print('Recording not found:', recording_path)
        return pd.DataFrame()
    sample_paths = list_audio_files(samples_dir)
    if not sample_paths:
        print('No samples found in', samples_dir)
        return pd.DataFrame()
    results = compare_recording_to_samples(str(recording_path), [str(p) for p in sample_paths], target_sr=TARGET_SR)
    df = pd.DataFrame(results)
    return df

rec_opts = ['<latest>'] + [str(p) for p in list_audio_files(RECORDINGS_DIR)]
widgets = {
    'rec': W.Dropdown(options=rec_opts, description='Recording:'),
    'run': W.Button(description='Run', button_style='primary'),
    'out': W.Output()
}

@widgets['run'].on_click
def _(_btn):
    with widgets['out']:
        widgets['out'].clear_output()
        sel = widgets['rec'].value
        path = latest_file(RECORDINGS_DIR) if sel == '<latest>' else Path(sel)
        if path is None:
            print('No recording available.')
            return
        df = run_comparison(path, SAMPLES_DIR)
        if not df.empty:
            display(df.head(TOP_K))
        else:
            print('No results.')

W.VBox([widgets['rec'], widgets['run'], widgets['out']])

## 15) Basic Sanity Checks

In [None]:
# Sanity checks (safe to run multiple times)
if rec_audio is not None:
    assert rec_audio.sr == TARGET_SR, f"Expected SR {TARGET_SR}, got {rec_audio.sr}"
    assert rec_audio.y.ndim == 1, 'Audio should be mono'
    assert rec_audio.y.size > 0, 'Audio is empty'

if F_rec is not None:
    assert F_rec.ndim == 2 and F_rec.shape[0] > 0 and F_rec.shape[1] > 0, 'Invalid feature shape'

if trimmed_audio is not None:
    voiced_ratio = trimmed_audio.y.size / rec_audio.y.size if rec_audio is not None else 0
    print(f"Voiced ratio after trim: {voiced_ratio:.2f}")

print('Diagnostics complete.')