# **Chapter 8** - Multimedia and Deepfake Defense

This notebook demonstrates the integration of AI in multimedia, featuring object detection and video annotation using open-source libraries like `Librosa`, `OpenCV`, `SpeechT5` and `YOLOv5`. With these tools, you’ll learn how to analyze video frames, detect objects, and overlay annotations, creating enhanced, interactive visual content. The modular design makes it easy to follow, adapt, and extend for custom applications. Libraries such as pandas and matplotlib add support for structured data handling and visualization.
**Note**: GPU acceleration is recommended for optimal performance for some of the code cells that train AI models.

### Listing 8-1: Download Audio Samples from Github
This code prepares the environment by downloading required audio samples from GitHub. It includes Jerry’s podcast samples and non-Jerry audio files, ensuring they are available locally for training and testing purposes.

**Note 1:** The download process can take a few mins.

**Note 2:** Using WAV files with a sampling rate of 16kHz and Signed 16-bit PCM encoding ensures compatibility with SpeechT5. Consistent format avoids processing errors, maintains audio quality, and allows the model to generate accurate spectrograms. Variations in format can disrupt training and degrade synthesized speech quality.

In [None]:
import requests
import os

# Base GitHub repository URL for audio files
BASE_URL = "https://opensourceai-book.github.io/code/media/"

# List of Jerry's podcast audio samples for training and testing (Label 1)
Jerry_Audio_Files = [
    "L1-Sample01-Jerry.wav",  # Training sample
    "L1-Sample02-Jerry.wav",  # Training sample
    "L1-Sample03-Jerry.wav",  # Training sample
    "L1-Sample04-Jerry.wav",  # Training sample
    "L1-Sample05-Jerry.wav",  # Training sample
    "L1-Sample06-Jerry.wav",  # Training sample
    "L1-Sample07-Jerry.wav",  # Training sample
    "L1-Sample08-Jerry.wav",  # Training sample
    "L1-Sample09-Jerry.wav",  # Training sample
    "L1-Sample10-Jerry.wav",  # Training sample
    "L1-Sample11-Jerry.wav",  # Reserved for test
    "L1-Sample12-Jerry.wav",  # Reserved for test
]

# List of non-Jerry audio samples for training and testing (Label 0)
Non_Jerry_Audio_Files = [
    "L0-Sample01-Adolfo.wav",  # Non-Jerry speaker
    "L0-Sample02-Rama.wav",    # Non-Jerry speaker
    "L0-Sample03-Alex.wav",    # Non-Jerry speaker
    "L0-Sample04-SynthGeorge.wav",  # Synthetic voice
    "L0-Sample05-SynthJerry.wav",   # Synthetic Jerry voice
    "L0-Sample06-SynthJerry.wav",   # Synthetic Jerry voice
    "L0-Sample07-Teresa.wav",  # Non-Jerry speaker
    "L0-Sample08-Blaine.wav",  # Non-Jerry speaker
    "L0-Sample09-Bill.wav",    # Non-Jerry speaker
    "L0-Sample10-Brian.wav",   # Non-Jerry speaker
    "L0-Sample11-Chris.wav",   # Non-Jerry speaker (test)
    "L0-Sample12-George.wav",  # Non-Jerry speaker (test)
]

# Download a file from BASE_URL and save it to the current directory
# if it does not already exist.
def download_file(filename):

    filepath = os.path.join("./", filename)  # Local path in root
    url = BASE_URL + filename
    if not os.path.exists(filepath):  # Check if file exists
        print(f"Downloading {filename} to {filepath}...")
        response = requests.get(url)
        if response.status_code == 200:
            with open(filepath, 'wb') as f:
                f.write(response.content)
            print(f"Downloaded {filename} successfully!")
        else:
            print(f"Failed to download {filename}. "
                  f"Status code: {response.status_code}")
    else:
        print(f"{filename} already exists at {filepath}.")
    return filepath  # Return the full path

# Download files for Label 1 (Jerry's audio files)
print("Processing Label 1 (Jerry's audio files)...")
for file in Jerry_Audio_Files:
    download_file(file)

# Download files for Label 0 (Non-Jerry audio files)
print("\nProcessing Label 0 (Non-Jerry audio files)...")
for file in Non_Jerry_Audio_Files:
    download_file(file)

print("\nAll files are downloaded and ready!")

### Listing 8-2: Audio Feature Extraction and Visualization

The first cell defines extract_audio_features, which computes a compact audio
fingerprint used for speaker recognition and deepfake detection.
It summarizes timbre, brightness, bandwidth, and harmonic balance, along with
a light anti-spoof cue—spectral flatness—to flag overly synthetic tones.

In [None]:
import numpy as np
import librosa

def extract_audio_features(file_path):
    """
    Extract compact audio features for voice verification and light anti-spoof.
    """

    # Load mono at native sample rate
    y, sr = librosa.load(file_path, sr=None, mono=True)

    # Simple voice-activity detection (drop silences for stable statistics)
    intervals = librosa.effects.split(y, top_db=30)
    if len(intervals):
        y = np.concatenate([y[s:e] for s, e in intervals])

    # Core spectral features
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    spec_cent = librosa.feature.spectral_centroid(y=y, sr=sr)
    spec_roll = librosa.feature.spectral_rolloff(y=y, sr=sr)
    spec_bw = librosa.feature.spectral_bandwidth(y=y, sr=sr)
    spec_con = librosa.feature.spectral_contrast(y=y, sr=sr)
    zcr = librosa.feature.zero_crossing_rate(y)
    chroma = librosa.feature.chroma_stft(y=y, sr=sr)
    rmse = librosa.feature.rms(y=y)

    # Harmonic/percussive split for HNR proxy
    harm = librosa.effects.harmonic(y)
    perc = librosa.effects.percussive(y)
    hnr = np.mean(harm) / (np.mean(perc) + 1e-6)

    # Anti-spoof cue (higher ≈ more noise-like / synthetic)
    flat = librosa.feature.spectral_flatness(y=y)

    # Return dict; each key includes a short description
    return {
        "mfcc_mean": float(np.mean(mfccs)),                 # Avg MFCCs (timbre)
        "mfcc_std": float(np.std(mfccs)),                   # MFCC variability
        "spectral_centroid_mean": float(np.mean(spec_cent)),# Brightness center
        "spectral_rolloff_mean": float(np.mean(spec_roll)), # High-end energy cut
        "spectral_bandwidth_mean": float(np.mean(spec_bw)), # Bandwidth/spread
        "spectral_contrast_mean": float(np.mean(spec_con)), # Peaks vs valleys
        "spectral_contrast_std": float(np.std(spec_con)),   # Contrast variability
        "zcr_mean": float(np.mean(zcr)),                    # Noisiness/frication
        "chroma_mean": float(np.mean(chroma)),              # Pitch-class energy
        "hnr": float(hnr),                                  # Harmonic-to-noise
        "rmse_mean": float(np.mean(rmse)),                  # Loudness (RMS)
        "spectral_flatness_mean": float(np.mean(flat)),     # Tonality→low, noise→high
    }

#### Cell 2 - Plot Basic Audio Features

In [None]:
import matplotlib.pyplot as plt
import librosa.display

# Audio sample to plot basic features
file_path = "L1-Sample01-Jerry.wav"

# Download sample, if not done already
download_file(file_path)

# Extract features
print("Extracting audio features...")
features = extract_audio_features(file_path)
print("Extracted Features:", features)

# Load audio for visualization
y, sr = librosa.load(file_path, sr=None)

# Plot MFCCs
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
plt.figure(figsize=(10, 4))
librosa.display.specshow(mfccs, x_axis="time", sr=sr, cmap="coolwarm")
plt.colorbar()
plt.title("MFCCs")
plt.tight_layout()
plt.show()

# Plot Spectral Centroid
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
plt.figure(figsize=(10, 4))
plt.plot(spectral_centroid[0], label="Spectral Centroid")
plt.legend()
plt.title("Spectral Centroid")
plt.ylabel("Hz")
plt.xlabel("Frames")
plt.tight_layout()
plt.show()

# Plot Zero-Crossing Rate
zcr = librosa.feature.zero_crossing_rate(y)
plt.figure(figsize=(10, 4))
plt.plot(zcr[0], label="Zero-Crossing Rate")
plt.legend()
plt.title("Zero-Crossing Rate")
plt.xlabel("Frames")
plt.tight_layout()
plt.show()

### Listing 8-3: Train Jerry Audio Detection Model
This program trains a `logistic regression` model to distinguish Real Jerry audio from other voices. Each clip is converted into a compact fingerprint using MFCCs, spectral features, and harmonic cues, with silence removed for consistency. Features are standardized, then split into training and test sets. The model’s decision threshold is calibrated from the 95th percentile of Not Jerry scores to minimize false accepts. A lightweight anti-spoof rule adds a final check: if a “Real Jerry” prediction has a spectral flatness higher than Jerry’s natural range, it’s flipped to Not Real, catching clones without complicating the workflow.

In [None]:
# === Train/Test + threshold + flatness veto + single predict (globals) ===
import os
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report

# ----------------------------- Helpers --------------------------------
def make_splits(jerry_list, non_list, holdout=2):
    """Return splits and labels; also return Jerry train files."""
    train_j = jerry_list[:-holdout]
    test_j  = jerry_list[-holdout:]
    train_n = non_list[:-holdout]
    test_n  = non_list[-holdout:]
    train_files = train_j + train_n
    test_files  = test_j + test_n
    y_train = np.array([1]*len(train_j) + [0]*len(train_n), dtype=int)
    y_test  = np.array([1]*len(test_j)  + [0]*len(test_n), dtype=int)
    return train_files, test_files, y_train, y_test, train_j

def features_matrix(files):
    """Extract feature vectors (order = dict insertion from Cell 1)."""
    vecs = []
    for f in files:
        feats = extract_audio_features(f)
        vecs.append(np.array(list(feats.values()), np.float32))
    return np.stack(vecs)

def train_speaker(X_train, y_train):
    """Fit scaler + balanced logistic regression."""
    scaler = StandardScaler()
    Xn = scaler.fit_transform(X_train)
    model = LogisticRegression(
        class_weight="balanced", max_iter=500, random_state=42
    )
    model.fit(Xn, y_train)
    return scaler, model

def neg_p95_threshold(model, scaler, X_train, y_train):
    """Cutoff from 95th percentile of negative (Not Jerry) scores."""
    Xn = scaler.transform(X_train)
    scores = model.predict_proba(Xn)[:, 1]
    neg = scores[y_train == 0]
    thr = float(np.percentile(neg, 95)) if neg.size else 0.5
    return thr

def compute_flat_cap(train_jerry_files, pctl=99):
    """Cap for spectral_flatness_mean from Jerry-only training set."""
    vals = []
    for f in train_jerry_files:
        v = extract_audio_features(f)["spectral_flatness_mean"]
        vals.append(v)
    vals = np.array(vals, dtype=np.float32)
    cap = float(np.percentile(vals, pctl)) if vals.size else 1.0
    return cap

def evaluate_set(model, scaler, X_test, y_test, threshold):
    """Report accuracy and classification report."""
    Xn = scaler.transform(X_test)
    scores = model.predict_proba(Xn)[:, 1]
    y_pred = (scores >= threshold).astype(int)
    acc = accuracy_score(y_test, y_pred)
    print("Test Accuracy:", f"{acc*100:.1f}%")
    print("Classification Report:")
    print(classification_report(
        y_test, y_pred,
        target_names=["Not Real Jerry", "Real Jerry"],
        zero_division=1
    ))
    return scores, y_pred

# --------------------- Single-file predictor (global state) -----------
def predict_new_wav(path_or_name):
    """
    Predict Real/Not Real using the trained global pipeline.
    Requires: voice_model, voice_scaler, voice_threshold, voice_flat_cap.
    """
    missing = [
        name for name in
        ["voice_model", "voice_scaler", "voice_threshold", "voice_flat_cap"]
        if name not in globals()
    ]
    if missing:
        print("Model not initialized. Please rerun the training cell "
              "to define: voice_model, voice_scaler, voice_threshold, "
              "voice_flat_cap.")
        return None

    path = path_or_name if os.path.exists(path_or_name) else f"./{path_or_name}"
    feats = extract_audio_features(path)
    x = np.array(list(feats.values()), np.float32).reshape(1, -1)
    xn = voice_scaler.transform(x)
    score = float(voice_model.predict_proba(xn)[0, 1])
    pred = 1 if score >= voice_threshold else 0

    flat = float(feats["spectral_flatness_mean"])
    if pred == 1 and flat > voice_flat_cap:
        pred = 0  # veto to Not Real

    label = "Real Jerry" if pred == 1 else "Not Real Jerry"
    print(f"\nNew file: {os.path.basename(path)} | Score: {score:.3f} | "
          f"Flatness: {flat:.3f} (cap {voice_flat_cap:.3f}) | "
          f"Thr: {voice_threshold:.2f} | Pred: {label}")
    return pred, score

# --------------------------- Main control -----------------------------
# 1) Build splits
train_files, test_files, y_train, y_test, train_jerry_files = make_splits(
    Jerry_Audio_Files, Non_Jerry_Audio_Files, holdout=2
)
print(f"Total training files: {len(train_files)}")
print(f"Total test files: {len(test_files)}")

# 2) Feature matrices
X_train = features_matrix(train_files)
X_test  = features_matrix(test_files)
print("Extracted training features shape:", X_train.shape)
print("Extracted test features shape:", X_test.shape)

# 3) Train speaker model  → store as descriptive globals
voice_scaler, voice_model = train_speaker(X_train, y_train)
print("Model training completed.")

# 4) Decision threshold from negatives (P95)  → global
voice_threshold = neg_p95_threshold(voice_model, voice_scaler, X_train, y_train)
print(f"Decision threshold from negatives (P95): {voice_threshold:.2f}")

# 5) Jerry-only flatness cap (P99) for minimal anti-spoof veto  → global
voice_flat_cap = compute_flat_cap(train_jerry_files, pctl=99)
print(f"Jerry flatness cap (P99): {voice_flat_cap:.3f}")

# 6) Evaluate on held-out set
scores, y_pred = evaluate_set(
    voice_model, voice_scaler, X_test, y_test, voice_threshold
)

# 7) Per-file predictions
for f, t, s, yhat in zip(test_files, y_test, scores, y_pred):
    t_str = "Real Jerry" if t == 1 else "Not Real Jerry"
    y_str = "Real Jerry" if yhat == 1 else "Not Real Jerry"
    print(f"File: {os.path.basename(f):24s} Score: {s:.3f} "
          f"True: {t_str:13s} Pred: {y_str}")

In [None]:
# --- Acid test: two cloned clips then one authentic control ---

# Test 1: cloned sample (should be flagged as Not Real Jerry)
download_file("Jerry-Cloned-Sample01.wav")
_ = predict_new_wav("Jerry-Cloned-Sample01.wav")  # uses global voice_* vars

# Test 2: another cloned sample to confirm consistent detection
download_file("Jerry-Cloned-Sample02.wav")
_ = predict_new_wav("Jerry-Cloned-Sample02.wav")  # uses global voice_* vars

# Control: a real Jerry clip to verify authentic detection still passes
download_file("L1-Sample01-Jerry.wav")
_ = predict_new_wav("L1-Sample01-Jerry.wav")      # uses global voice_* vars)

### Listing 8-4: Transcribe Jerry's Real Audio to Text
This program downloads Real Jerry audio files, transcribes them using OpenAI's Whisper model, and saves the results in a dictionary for further use in other programs.

In [None]:
# Import necessary libraries
from transformers import pipeline, WhisperProcessor, WhisperForConditionalGeneration
import librosa
import torch
import pandas as pd

# Function to transcribe audio files using Whisper model
def transcribe_audio_files(file_list, output_csv="transcriptions.csv"):
    """
    Transcribe audio files using Whisper model and save the filename and
    transcription to a CSV file.
    """
    results = []

    # Load Whisper model and processor
    print("Loading Whisper model...")
    try:
        processor = WhisperProcessor.from_pretrained("openai/whisper-small")
        model = WhisperForConditionalGeneration.from_pretrained(
            "openai/whisper-small"
        )
        model = model.to("cuda" if torch.cuda.is_available() else "cpu")
        print("Whisper model loaded successfully!")
    except Exception as e:
        print(f"Error loading Whisper model: {e}")
        return None

    # Process each file
    for file in file_list:
        print(f"Processing {file}...")

        try:
            # Load and preprocess audio
            audio, sr = librosa.load(file, sr=16000)  # Ensure 16 kHz sampling rate
            inputs = processor(
                audio, sampling_rate=16000, return_tensors="pt", language="en"
            ).input_features
            inputs = inputs.to(model.device)

            # Transcribe the audio
            predicted_ids = model.generate(inputs)
            transcription = processor.batch_decode(
                predicted_ids, skip_special_tokens=True
            )[0]

            results.append({"filename": file, "transcription": transcription})
            print(f"Transcription for {file}: {transcription}")
        except Exception as e:
            print(f"Error transcribing {file}: {e}")
            results.append({"filename": file, "transcription": None})

    # Save results to CSV
    df = pd.DataFrame(results)
    df.to_csv(output_csv, index=False)
    print(f"Transcriptions saved to {output_csv}")

    return results

# Transcribe Real Jerry files
real_jerry_transcriptions = transcribe_audio_files(Jerry_Audio_Files)

# Print transcriptions
if real_jerry_transcriptions:
    print("\n--- Transcriptions ---")
    for entry in real_jerry_transcriptions:
        print(f"{entry['filename']}: {entry['transcription']}")
else:
    print("No transcriptions available due to an error.")

## Listing 8-5: Voice Cloning Listings

This section
It includes the following steps:

1. **Step 1:** Installs the necessary libraries and checks for GPU availability.
2. **Step 2:** Dataset Preparation and Embedding.
3. **Step 3:** Fine-Tuning the SpeehT5 Model.
4. **Step 4:** Testing Jerry's cloned voice. How does it sound?
5. **Step 5:** Comparing Feature Differences: Real vs. Cloned

### Step 1 - Prerequisite Setup
This section installs the necessary libraries and checks for GPU availability to prepare the environment for using SpeechT5 and HiFi-GAN for text-to-speech synthesis.

It includes the following steps:

1. **Install Libraries:** Installs `datasets`, `soundfile`, `speechbrain`, `transformers`, and `accelerate` using `pip`.
2. **Check GPU:** Verifies the availability of a GPU using `nvidia-smi`.
3. **Import Libraries:** Imports the required modules from `transformers` and `torch`.
4. **Load Models:** Loads the SpeechT5 processor, model, and HiFi-GAN vocoder.
5. **Device Setup:** Checks for GPU availability and moves the model and vocoder to the appropriate device (GPU or CPU).

**Note:** Colab ships with TensorFlow and Keras 3 preinstalled, which can confuse
🤗 Transformers into loading the wrong backend. To avoid this, we disable
TensorFlow/Flax, install a small tf-keras shim, and sometimes restart the
runtime so changes take effect. If you later see errors about Keras 3 or
tf_keras, rerun the install cell, restart the runtime, or upgrade
Transformers. Future versions will likely remove the need for this workaround.

In [None]:
# REQUIRES: Colab (GPU optional), python>=3.9
# Run after a fresh runtime restart. Do NOT import transformers in this cell.

# Keep Transformers on PyTorch only (disable TF/Flax backends)
%env TRANSFORMERS_NO_TF=1
%env TRANSFORMERS_NO_FLAX=1
%env USE_TF=0
%env USE_FLAX=0
%env TOKENIZERS_PARALLELISM=false

# Clean install; ignore unrelated resolver warnings. No transformers import.
!pip -q uninstall -y torch torchaudio torchvision >/dev/null 2>&1
!pip -q install torch==2.8.0 torchaudio==2.8.0 torchvision==0.23.0 \
  --index-url https://download.pytorch.org/whl/cu126
# CPU alt:
# !pip -q install torch==2.8.0+cpu torchaudio==2.8.0+cpu \
#   torchvision==0.23.0+cpu --index-url https://download.pytorch.org/whl/cpu

!pip -q install tf-keras==2.15.0
!pip -q install --force-reinstall pyarrow==16.1.0
!pip -q install -U datasets==2.20.0 "fsspec>=2024.5.0,<2025.1"
!pip -q install -U transformers accelerate speechbrain soundfile librosa \
  peft sentencepiece

# Quick check without importing transformers
import torch, pyarrow, datasets, sys
print("torch:", torch.__version__, "cuda?", torch.cuda.is_available())
print("pyarrow:", pyarrow.__version__)
print("datasets:", datasets.__version__)

print("\n" + "="*72)
print("NEXT STEP REQUIRED:")
print("1) Go to: Runtime → Restart Session (IMPORTANT).")
print("2) After restart, run Step 1 (model load) and continue.")
print("="*72)

# Extra safety: warn if transformers got imported earlier by mistake
if "transformers" in sys.modules:
    print("\nWARNING: 'transformers' is already imported in this session.")
    print("Please Runtime → Restart runtime, then re-run Step 1.")

In [None]:
# REQUIRES: python>=3.9, torch, transformers, sentencepiece; GPU optional

from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech
from transformers import SpeechT5HifiGan
import torch

# Load processor and base model (text to acoustic features)
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")

# Disable caching to support gradient checkpointing if used later
model.config.use_cache = False

# Load HiFi-GAN vocoder to convert features to waveform
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

# Use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
vocoder.to(device)

print(f"Models ready on {device}")

### Step 2 - Dataset Preparation and Embedding
This program processes audio files and their transcripts to create a dataset for voice cloning. It integrates audio features, transcriptions, and speaker embeddings into a Hugging Face Dataset, ready for training or testing voice cloning models. Long samples are filtered, and the dataset is split into train-test subsets.

In [None]:
# (Helper functions for Step 2 — keep hidden in book)

import os, numpy as np, pandas as pd, torch
from datasets import Dataset, Audio
from speechbrain.pretrained import EncoderClassifier

def load_csv_validate(csv_path: str):
    # This function loads the CSV and checks required columns
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV not found: {csv_path}")
    df = pd.read_csv(csv_path)
    need = {"filename", "transcription"}
    if not need.issubset(df.columns):
        raise ValueError("CSV must have 'filename' and 'transcription'.")
    return df

def make_paths(df, audio_dir: str):
    # This function builds audio paths and returns texts
    file_paths = [os.path.join(audio_dir, f) for f in df["filename"]]
    missing = [p for p in file_paths if not os.path.exists(p)]
    if missing:
        raise FileNotFoundError(f"Missing audio; first few: {missing[:5]}")
    texts = df["transcription"].tolist()
    return file_paths, texts

def build_hf_dataset(file_paths, texts, sampling_rate: int):
    # This function builds a HF dataset and casts audio with a given rate
    ds = Dataset.from_dict({"file_path": file_paths, "text": texts})
    return ds.cast_column("file_path", Audio(sampling_rate=sampling_rate))

def load_speaker_model(device: str, savedir: str):
    # This function loads the SpeechBrain x-vector encoder
    name = "speechbrain/spkrec-xvect-voxceleb"
    return EncoderClassifier.from_hparams(
        source=name, run_opts={"device": device}, savedir=savedir
    )

def normalize_labels(arr: np.ndarray, mel_bins: int) -> np.ndarray:
    # This function forces labels to shape (T, mel_bins)
    x = np.asarray(arr, dtype=np.float32)
    x = np.squeeze(x)                 # handles (1, T, 80) etc.
    if x.ndim == 1:
        x = x[:, None]                # (T,) -> (T,1)
    if x.ndim != 2:
        raise ValueError(f"labels must be 2D after squeeze, got {x.shape}")
    if x.shape[-1] == mel_bins:
        return x                      # (T, 80)
    if x.shape[0] == mel_bins:
        return x.T                    # (80, T) -> (T, 80)
    if mel_bins in x.shape and x.shape[-1] != mel_bins:
        axis = 0 if x.shape[0] == mel_bins else 1
        x = np.swapaxes(x, axis, 1)
    if x.shape[-1] != mel_bins:
        raise ValueError(f"labels not (T,{mel_bins}); got {x.shape}")
    return x

def prepare_example(ex, processor, spk_model, device, mel_bins: int):
    # This function tokenizes text, makes labels, and adds embeddings
    a = ex["file_path"]               # {'array', 'sampling_rate'}
    proc = processor(
        text=ex["text"],
        audio_target=a["array"],
        sampling_rate=a["sampling_rate"],
        return_attention_mask=True
    )
    labels = normalize_labels(proc["labels"], mel_bins)
    with torch.no_grad():
        emb = spk_model.encode_batch(
            torch.tensor(a["array"]).unsqueeze(0).to(device)
        ).squeeze().detach().cpu().numpy().astype(np.float32)
    return {
        "input_ids": proc["input_ids"],
        "attention_mask": proc["attention_mask"],
        "labels": labels,
        "speaker_embeddings": emb,
        "text": ex["text"],
    }

def filter_long_texts(ex, max_tokens=200):
    # This function filters out long texts by token length
    return len(ex["input_ids"]) < max_tokens

def print_alignment_sample(ds, sr: int, mel_bins: int):
    # This function prints shapes so Step 3 expectations are clear
    import numpy as _np
    ex0 = ds[0]
    print("\n=== Alignment assumptions for Step 3 ===")
    print(f"- sampling_rate: {sr}")
    print(f"- mel_bins (labels last dim): {mel_bins}")
    print(f"- input_ids len: {len(ex0['input_ids'])}")
    print(f"- attention_mask len: {len(ex0['attention_mask'])}")
    print(f"- labels shape: {_np.asarray(ex0['labels']).shape}")
    print(f"- speaker_embeddings shape: "
          f"{_np.asarray(ex0['speaker_embeddings']).shape}")
    print("Step 3 pads text to batch max, labels on T with -100, "
          "and batches speaker embeddings as float32.")

In [None]:
# REQUIRES: processor from Step 1; datasets, speechbrain, numpy, torch
#           'transcriptions.csv' with filename, transcription; audio at 16 kHz

import os, torch

print("Step 2: Starting data preparation...")

# Assumptions that Step 3 depends on
TARGET_SR = 16000          # audio sampling rate
MEL_BINS  = 80             # labels have 80 mel bins (shape = T, 80)

# Set paths for CSV, audio, and output dataset
root_dir    = "./"
audio_dir   = root_dir
csv_path    = "transcriptions.csv"
dataset_dir = os.path.join(root_dir, "processed_dataset")
os.makedirs(dataset_dir, exist_ok=True)

# Load the CSV and build a Hugging Face dataset with audio casting
df = load_csv_validate(csv_path)
paths, texts = make_paths(df, audio_dir)
ds = build_hf_dataset(paths, texts, TARGET_SR)

# Pick device and load the speaker embedding model
device = "cuda" if torch.cuda.is_available() else "cpu"
spk = load_speaker_model(device, savedir="/tmp/spk_xvect_voxceleb")

# Preparing examples: tokenize, build labels, and add embeddings
def _map_fn(ex):
    return prepare_example(
        ex, processor=processor, spk_model=spk,
        device=device, mel_bins=MEL_BINS
    )

# Tokenize audio+text and add speaker embeddings.
# - _map_fn: builds input_ids, attention_mask, labels, embeddings.
# - remove_columns: drop raw "file_path" after we extract audio.
# - num_proc=1: keep it single-process in Colab to avoid quirks.
print("[info] tokenizing and computing speaker embeddings...")
ds = ds.map(_map_fn, remove_columns=["file_path"], num_proc=1)

# Filter out long texts so batches stay small and training is stable.
# - filter_long_texts: keeps examples with <200 tokens (you can tune).
print("[info] filtering long texts...")
ds = ds.filter(filter_long_texts)

# Print one example so readers can see shapes that Step 3 expects.
# - sr: target sampling rate used when casting audio (16 kHz here).
# - mel_bins: number of mel features per frame (80 here).
print_alignment_sample(ds, sr=TARGET_SR, mel_bins=MEL_BINS)

# Create a train/test split, then persist to disk for Step 3.
# - test_size=0.1: 10% of data goes to the test set.
# - seed=42: fixed split for reproducibility.
print("\n[info] splitting train/test and saving to disk...")
ds = ds.train_test_split(test_size=0.1, seed=42)

# - save_to_disk: saves to 'processed_dataset' directory.
ds.save_to_disk(dataset_dir)
print("[done] Data preparation complete.")

### Step 3 - Fine-Tuning the Model
This program fine-tunes a SpeechT5 model for text-to-speech conversion using a processed dataset. It includes a custom data collator for speaker embeddings, trains the model with Hugging Face's Seq2SeqTrainer, and saves the fine-tuned model and processor to the Hugging Face Hub.

In [None]:
# (Helper(s) for Step 3 — keep hidden in book)

import torch
from dataclasses import dataclass
from typing import Dict, List

def ensure_even_T(x: torch.Tensor) -> torch.Tensor:
    # This function trims last frame if T is odd so T is even
    return x[:-1, :] if (x.shape[0] % 2 == 1) else x

@dataclass
class TTSDataCollator:
    # This class pads text, pads labels on time, batches embeddings
    pad_id: int = 0
    mel_bins: int = 80

    def __call__(self, feats: List[Dict]) -> Dict[str, torch.Tensor]:
        # Pad text inputs (ids, attention) to batch max length
        ids  = [torch.tensor(f["input_ids"], dtype=torch.long) for f in feats]
        amsk = [torch.tensor(f["attention_mask"], dtype=torch.long) for f in feats]
        input_ids = torch.nn.utils.rnn.pad_sequence(
            ids, batch_first=True, padding_value=self.pad_id
        )
        attention = torch.nn.utils.rnn.pad_sequence(
            amsk, batch_first=True, padding_value=0
        )

        # Make label lengths even, then pad on time with -100
        labs = [torch.tensor(f["labels"], dtype=torch.float32) for f in feats]
        labs = [ensure_even_T(x) for x in labs]  # each x is (T, mel_bins)
        t_max = max(x.shape[0] for x in labs)
        labels = torch.full(
            (len(labs), t_max, self.mel_bins), -100.0, dtype=torch.float32
        )
        for i, x in enumerate(labs):
            t = x.shape[0]
            labels[i, :t, :] = x

        # Stack speaker embeddings to a batch
        spk = torch.stack(
            [torch.tensor(f["speaker_embeddings"], dtype=torch.float32)
             for f in feats],
            dim=0,
        )

        return {
            "input_ids": input_ids,
            "attention_mask": attention,
            "labels": labels,
            "speaker_embeddings": spk,
        }

In [None]:
# REQUIRES: processor+model from Step 1; Step 2 saved dataset on disk
#           transformers, datasets, torch; GPU optional

from datasets import load_from_disk
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
import torch, os

print("Step 3: Starting fine-tuning process...")

# Load the dataset prepared in Step 2
dataset = load_from_disk("./processed_dataset")
print("Dataset loaded successfully.")

# Build the collator (pads text, pads labels on time, batches spk)
pad_id = getattr(getattr(processor, "tokenizer", None), "pad_token_id", 0)
data_collator = TTSDataCollator(pad_id=pad_id, mel_bins=80)

# Optional: Hugging Face Hub token (for push_to_hub)
hf_token = None
try:
    from google.colab import userdata
    hf_token = userdata.get("HF_TOKEN")
except Exception:
    hf_token = os.getenv("HF_TOKEN")

# Configure training (quick tuning tips):
# - Start small: batch=2, accum=8; raise if memory allows.
# - If loss is noisy, lower LR (e.g., 5e-5) or raise warmup steps.
# - If training is slow, cut max_steps or save less often.
# - Enable fp16 on GPU to save memory; disable if you see NaNs.
# - Keep remove_unused_columns=False so speaker embeddings stay in.
# - Set label_names to "labels" so masking works as expected.
training_args = Seq2SeqTrainingArguments(
    output_dir="./speecht5_finetuned_model",   # save checkpoints here
    per_device_train_batch_size=2,             # batch per GPU/CPU; lower if OOM
    gradient_accumulation_steps=8,             # virtual batch = 2*8 per update
    learning_rate=1e-4,                        # optimizer step size
    warmup_steps=200,                          # steps to ramp LR from 0
    max_steps=2000,                            # total training steps
    fp16=torch.cuda.is_available(),            # use half precision on GPU
    logging_steps=100,                         # log every N steps
    save_steps=500,                            # checkpoint every N steps
    report_to=[],                              # disable wandb/tensorboard
    push_to_hub=bool(hf_token),                # upload to HF Hub if token set
    hub_token=hf_token,                        # HF token (or None)
    remove_unused_columns=False,               # keep speaker_embeddings
    label_names=["labels"],                    # tell Trainer our label field
    dataloader_num_workers=0,                  # avoid Colab multi-proc issues
)

# Create the Trainer
# - processing_class: lets Trainer handle text post-proc via processor
trainer = Seq2SeqTrainer(
    model=model,                       # from Step 1
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    processing_class=processor,
)

# Train the model
print("Starting training...")
trainer.train()

# Save artifacts for Step 4 (and optionally push to Hub)
# print("Saving fine-tuned model and processor...")
# trainer.save_model("./speecht5_finetuned_model")
# processor.save_pretrained("./speecht5_finetuned_model")
# if hf_token:
#    trainer.push_to_hub()

print("Fine-tuning process complete!")

### Step 4: Testing and Play Synthesized Speech
This program tests the fine-tuned SpeechT5 model by generating a spectrogram and converting it to audio using a vocoder. It synthesizes speech from custom text input using the selected speaker embedding, visualizes the spectrogram, and saves the generated audio file for playback.

In [None]:
# REQUIRES: Step 3 outputs in ./speecht5_finetuned_model; Step 2 dataset;
#           torch, transformers, soundfile; GPU optional.

import torch, soundfile as sf
from IPython.display import Audio
from datasets import load_from_disk
from transformers import (
    SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
)

print("Step 4: Testing the fine-tuned model...")

# Load fine-tuned artifacts (processor, model) and the vocoder
model_dir = "./speecht5_finetuned_model"
processor = SpeechT5Processor.from_pretrained(model_dir)
model     = SpeechT5ForTextToSpeech.from_pretrained(model_dir)
vocoder   = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

# Put model and vocoder on GPU if available (faster), else CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device); vocoder.to(device)

# Load a speaker embedding from Step 2's saved dataset (test split)
dataset = load_from_disk("./processed_dataset")
ex = dataset["test"][0]
speaker_embeddings = torch.tensor(
    ex["speaker_embeddings"], dtype=torch.float32
).unsqueeze(0).to(device)

# Text to synthesize (try a sentence the speaker would plausibly say)
text = ("Hey ladies and gentlemen, thank you for tuning in to the "
        "Wild Ducks podcast featuring your host, Jerry Cuomo.")

# Tokenize text and move tensors to the same device as the model
inputs = processor(text=text, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}

# Generate waveform directly; SpeechT5 calls HiFi-GAN under the hood
with torch.no_grad():
    speech = model.generate_speech(
        inputs["input_ids"],                 # tokenized text
        speaker_embeddings=speaker_embeddings,  # voice identity
        vocoder=vocoder                     # waveform generator
    )  # returns a 1D waveform tensor

# --- Generate + SAVE the cloned audio ---
clone_wav = "Jerry-Cloned-Sample01.wav"
sf.write(clone_wav, speech.cpu().numpy(), 16000)  # save the new clone
print("Wrote cloned WAV:", clone_wav)

# Sanity: file sizes and durations should differ if content differs
import os, soundfile as sf, numpy as np
real_wav = "L1-Sample11-Jerry.wav"
for p in [real_wav, clone_wav]:
    y, sr = sf.read(p)
    print(f"{p}: sr={sr}, dur={len(y)/sr:.2f}s, bytes={os.path.getsize(p)}")

# --- Side-by-side embedded players (shows proper durations) ---
from IPython.display import Audio, HTML, display
a_real  = Audio(filename=real_wav,  rate=16000, embed=True)
a_clone = Audio(filename=clone_wav, rate=16000, embed=True)

html = f"""
<div style="display:flex;gap:24px;align-items:flex-start;">
  <div><div style="font:600 14px system-ui;margin-bottom:6px;">Real</div>
    {a_real._repr_html_()}
  </div>
  <div><div style="font:600 14px system-ui;margin-bottom:6px;">Cloned</div>
    {a_clone._repr_html_()}
  </div>
</div>
"""
display(HTML(html))

### Step 5 - Comparing Real Audio versus Memorex (Cloned)

This program extracts audio features from real and cloned samples, computes their differences, and visualizes them in a bar chart. The plot highlights subtle variations in the audio fingerprints, helping identify key features where synthetic audio deviates from real recordings.


In [None]:
# REQUIRES: librosa, pandas, matplotlib
import pandas as pd

# Function to extract audio features remains the same as provided above

# Paths to real and cloned audio samples
real_audio_path = "L1-Sample11-Jerry.wav"
cloned_audio01_path = "Jerry-Cloned-Sample01.wav"
cloned_audio02_path = "Jerry-Cloned-Sample02.wav"

# Extract features for real and cloned audio

# --- Acid test: two cloned clips then one authentic control ---

# Test 1: cloned sample (should be flagged as Not Real Jerry)
download_file(cloned_audio01_path)
_ = predict_new_wav(cloned_audio01_path)  # uses global voice_* vars

# Test 2: another cloned sample to confirm consistent detection
download_file(cloned_audio02_path)
_ = predict_new_wav(cloned_audio02_path)  # uses global voice_* vars

# Control: a real Jerry clip to verify authentic detection still passes
download_file(real_audio_path)
_ = predict_new_wav(real_audio_path)      # uses global voice_* vars)


# Extract features for real and cloned audio
real_features = extract_audio_features(real_audio_path)
cloned_features = extract_audio_features(cloned_audio01_path)

# Create a DataFrame for easy comparison
feature_df = pd.DataFrame([real_features, cloned_features], index=["Real", "Cloned"])

# Normalize features for comparison (min-max scaling)
normalized_feature_df = (feature_df - feature_df.min()) / (feature_df.max() - feature_df.min())

# Plot feature comparison
import matplotlib.pyplot as plt

# Compute feature-wise differences
feature_differences = feature_df.loc["Real"] - feature_df.loc["Cloned"]

# Plot the feature differences
plt.figure(figsize=(12, 6))
feature_differences.plot(kind="bar", color="red", edgecolor="black")

# Add titles and labels
plt.title("Feature-wise Differences: Real vs. Cloned Audio")
plt.ylabel("Difference in Feature Value")
plt.xlabel("Audio Features")
plt.xticks(rotation=45, ha="right")

# Add grid for better readability
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Show the plot
plt.tight_layout()
plt.show()

## Video Analysis

### Listing 8-6: Scene Detection with OpenCV
Detects video scenes using `OpenCV`. Outputs the scene number, with start and start times, and scene duration.

** Note:** This program attempts to download a sample video file from this books Github. If you get an error downloading, simply rerun the first code cell in this notebook that defines the `download_file` function.

In [None]:
%pip install opencv-python numpy torch torchvision scenedetect yolov5

In [None]:
import os
from scenedetect import SceneManager
from scenedetect.detectors import ContentDetector
from scenedetect.backends.opencv import VideoCaptureAdapter
import cv2  # For OpenCV VideoCapture

# Input video file path
INPUT_VIDEO_FILE = "Jerry-Jose-SampleVideo01.mp4"
download_file(INPUT_VIDEO_FILE)  # Ensure the file is downloaded

# Check if the input video file exists
if not os.path.exists(INPUT_VIDEO_FILE):
    print(f"Error: File '{INPUT_VIDEO_FILE}' not found.")
    exit()

# Initialize OpenCV VideoCapture for reading the video
video_capture = cv2.VideoCapture(INPUT_VIDEO_FILE)
if not video_capture.isOpened():
    print(f"Error: Unable to open video file '{INPUT_VIDEO_FILE}'.")
    exit()

# Create a VideoCaptureAdapter for SceneDetect compatibility
video_adapter = VideoCaptureAdapter(video_capture)

# Initialize SceneManager for scene detection
scene_manager = SceneManager()

# Add a ContentDetector to detect scene transitions
# Lower threshold (e.g., 8.0) = more sensitive to changes
scene_manager.add_detector(ContentDetector(threshold=8.0))

# Perform scene detection on the video
scene_manager.detect_scenes(video_adapter)

# Retrieve the list of detected scenes with start and end times
scene_list = scene_manager.get_scene_list()

# Filter scenes to exclude those shorter than 4 seconds
filtered_scene_list = [
    (start, end)
    for start, end in scene_list
    if (end - start).get_seconds() >= 4  # Minimum scene length filter
]

# Output the number of detected scenes
print(f"Detected {len(filtered_scene_list)} scenes.")

# Print details of each filtered scene
for i, (start_time, end_time) in enumerate(filtered_scene_list):
    print(f"Scene {i + 1}: Start - {start_time}, End - {end_time}")

# Release the video capture resource
video_capture.release()

### Listing 8-7: Video Object Detection and Annotation

The code demonstrates using `YOLOv5` for real-time object detection on video frames. It processes each frame, detects objects, annotates with bounding boxes and labels, and saves the output video.


In [None]:
import cv2
import torch
from torchvision.transforms import functional as F
import warnings

# Suppress specific warnings, such as FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)

# Load the pre-trained YOLOv5 model for object detection
# 'yolov5s' is a small, pre-trained YOLOv5 model optimized for speed
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')

# Define input video file and output video file paths
INPUT_VIDEO_FILE = "Jerry-Jose-SampleVideo01.mp4"  # Input video to process
OUTPUT_VIDEO_FILE = "Jerry-Jose-SampleVideo02.mp4"  # Annotated output video

# Ensure the input video file is downloaded or exists
download_file(INPUT_VIDEO_FILE)

# Load the input video using OpenCV
cap = cv2.VideoCapture(INPUT_VIDEO_FILE)
if not cap.isOpened():
    print(f"Error: Unable to open video file '{INPUT_VIDEO_FILE}'.")
    exit()

# Get video properties: width, height, and frames per second (FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Initialize the video writer to save the output video with annotations
out = cv2.VideoWriter(OUTPUT_VIDEO_FILE,
                      cv2.VideoWriter_fourcc(*'mp4v'),
                      fps, (width, height))

# Process the video frame by frame
while cap.isOpened():
    ret, frame = cap.read()  # Read the next frame
    if not ret:  # If no more frames are available, exit the loop
        break

    # Convert the frame to RGB format (required by YOLOv5)
    results = model(frame)  # Perform object detection on the frame

    # Get detection results as a pandas DataFrame
    detected_objects = results.pandas().xyxy[0]

    # Annotate the frame with bounding boxes and labels
    for _, row in detected_objects.iterrows():
        # Extract bounding box coordinates and object details
        x1, y1 = int(row['xmin']), int(row['ymin'])
        x2, y2 = int(row['xmax']), int(row['ymax'])
        conf, cls = row['confidence'], row['name']
        label = f"{cls} {conf:.2f}"  # Format label with class and confidence

        # Draw bounding box on the frame
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # Add label above the bounding box
        cv2.putText(frame, label, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5, (255, 0, 0), 2)

    # Write the annotated frame to the output video
    out.write(frame)

# Release video resources after processing
cap.release()  # Release input video
out.release()  # Save the output video
cv2.destroyAllWindows()  # Close OpenCV windows

### Listing 8-8 — Facial Landmark Extraction and Wireframe Rendering (Chapter Image Generation)

This program demonstrates how AI systems detect and represent facial structure.  
Using OpenCV’s 68-point landmark model, it identifies key reference points such as the eyes, nose, and mouth, then connects them into a simplified geometric mesh.  
The resulting output shows two views: one with the wireframe overlaid on the original image, and another displaying the mesh alone as a transparent feature map ready for analysis or visualization.

In [None]:
# 0) Remove conflicting packages you don't need for this listing
!pip -q uninstall -y opencv-python opencv-python-headless opencv-contrib-python opencv-contrib-python-headless \
  albumentations albucore dopamine-rl thinc

# 1) Install a stable set for the facemark demo
!pip -q install "numpy==1.26.4" "opencv-contrib-python==4.8.1.78" "matplotlib==3.8.4"

# 2) HARD restart the kernel so the new C-extensions load
import os; os.kill(os.getpid(), 9)


In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import urllib.request, os, tempfile

# -----------------------------
# Configuration (adjust as needed)
# -----------------------------
USE_REMOTE   = True               # Set to False to use your own local image
REMOTE_URL   = "https://opensourceai-book.github.io/code/media/Superhero-FaceClone.png"
IMAGE_PATH   = "face_input.png"   # If using your own image, place it in the working dir with this name
CANVAS_SIZE  = 500                # Output size for the aligned crop (square)
MARGIN_FRAC  = 0.25               # How much margin around the detected face box

# Wireframe style
LINE_COLOR   = (0, 220, 90)       # Lines: green (BGR)
LINE_THICK   = 2
DOT_OUTLINE  = (30, 110, 255)     # Landmark outline: blue-ish (BGR)
DOT_FILL     = (255, 255, 255)    # Landmark fill: white (BGR)
DOT_R_OUT    = 6
DOT_R_FILL   = 4

# -----------------------------
# 1) Load image (download or local)
# -----------------------------
if USE_REMOTE:
    # Download to a generic name so readers can easily replace it later
    urllib.request.urlretrieve(REMOTE_URL, IMAGE_PATH)

img_bgr = cv2.imread(IMAGE_PATH)
assert img_bgr is not None, f"Could not load {IMAGE_PATH}. If using a local image, ensure it is named {IMAGE_PATH}."

img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)

# -----------------------------
# 2) Load 68-point LBF landmark model
# -----------------------------
lbf_url  = "https://raw.githubusercontent.com/kurnianggoro/GSOC2017/master/data/lbfmodel.yaml"
lbf_path = os.path.join(tempfile.gettempdir(), "lbfmodel.yaml")
if not os.path.exists(lbf_path):
    urllib.request.urlretrieve(lbf_url, lbf_path)

# -----------------------------
# 3) Detect face, crop with margin, and resize to a canonical square
# -----------------------------
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
faces = face_cascade.detectMultiScale(img_rgb, scaleFactor=1.1, minNeighbors=5, minSize=(80, 80))
assert len(faces) > 0, "No face found. Try a clearer, front-facing image."

# Choose the largest detected face
x, y, w, h = max(faces, key=lambda b: b[2] * b[3])

H, W = img_rgb.shape[:2]
m = int(MARGIN_FRAC * max(w, h))  # adjustable margin
xh, yh = max(0, x - m), max(0, y - m)
x2, y2 = min(W, x + w + m), min(H, y + h + m)

crop_rgb = img_rgb[yh:y2, xh:x2]
aligned_rgb = cv2.resize(crop_rgb, (CANVAS_SIZE, CANVAS_SIZE), interpolation=cv2.INTER_LINEAR)
aligned_bgr = cv2.cvtColor(aligned_rgb, cv2.COLOR_RGB2BGR)

# -----------------------------
# 4) Fit landmarks (68 points in iBUG layout)
# -----------------------------
facemark = cv2.face.createFacemarkLBF()
facemark.loadModel(lbf_path)

faces_aligned = face_cascade.detectMultiScale(aligned_rgb, 1.1, 5, minSize=(80, 80))
assert len(faces_aligned) > 0, "No face found in the aligned crop."

ok, landmarks = facemark.fit(aligned_bgr, faces_aligned)
assert ok and len(landmarks) > 0, "Facemark fit failed."

pts = landmarks[0][0].astype(np.int32)  # shape: (68, 2)

# -----------------------------
# 5) Build a light mesh (brows, eyes, nose, mouth) with minimal cross-links
# -----------------------------
def chain(seq, closed=False):
    pairs = [(seq[i], seq[i + 1]) for i in range(len(seq) - 1)]
    if closed:
        pairs.append((seq[-1], seq[0]))
    return pairs

# 68-point iBUG indices
browL = list(range(17, 22))
browR = list(range(22, 27))
eyeL  = list(range(36, 42))
eyeR  = list(range(42, 48))
noseB = list(range(27, 31))
noseA = list(range(31, 36))
lipO  = list(range(48, 60))  # outer lip
lipI  = list(range(60, 68))  # inner lip

# Light structure with a few cross-links. Tweak here for density.
edges = []
edges += chain(browL)
edges += chain(browR)
edges += chain(eyeL, closed=True)
edges += chain(eyeR, closed=True)
edges += chain(noseB)
edges += chain(noseA)
edges += chain(lipO, closed=True)
edges += chain(lipI, closed=True)
edges += [(21, 22), (39, 42), (27, 33), (33, 51), (33, 57)]  # minimal cross-links

subset_idx = np.hstack([np.arange(17, 27), np.arange(27, 36),
                        np.arange(36, 48), np.arange(48, 68)])

# -----------------------------
# 6) Draw mesh on photo
# -----------------------------
on_photo = aligned_rgb.copy()
for a, b in edges:
    pa, pb = tuple(pts[a]), tuple(pts[b])
    cv2.line(on_photo, pa, pb, LINE_COLOR, LINE_THICK, cv2.LINE_AA)

for i in subset_idx:
    px, py = pts[i]
    cv2.circle(on_photo, (px, py), DOT_R_OUT, DOT_OUTLINE, 2, cv2.LINE_AA)
    cv2.circle(on_photo, (px, py), DOT_R_FILL, DOT_FILL, -1, cv2.LINE_AA)

# -----------------------------
# 7) Mesh-only (transparent PNG)
# -----------------------------
h, w = aligned_rgb.shape[:2]
wire_rgb = np.zeros((h, w, 3), dtype=np.uint8)

for a, b in edges:
    pa, pb = tuple(pts[a]), tuple(pts[b])
    cv2.line(wire_rgb, pa, pb, LINE_COLOR, LINE_THICK, cv2.LINE_AA)

for i in subset_idx:
    px, py = pts[i]
    cv2.circle(wire_rgb, (px, py), DOT_R_OUT, DOT_OUTLINE, 2, cv2.LINE_AA)
    cv2.circle(wire_rgb, (px, py), DOT_R_FILL, DOT_FILL, -1, cv2.LINE_AA)

alpha = cv2.cvtColor(wire_rgb, cv2.COLOR_RGB2GRAY)
wire_rgba = np.zeros((h, w, 4), dtype=np.uint8)
wire_rgba[..., :3] = wire_rgb
wire_rgba[..., 3]  = (alpha > 0).astype(np.uint8) * 255  # stroke becomes alpha

# -----------------------------
# 8) Display and save
# -----------------------------
fig, ax = plt.subplots(1, 2, figsize=(10, 4))
ax[0].imshow(on_photo)
ax[0].set_title("Light mesh on photo")
ax[0].axis("off")
ax[1].imshow(wire_rgba)
ax[1].set_title("Mesh-only (transparent)")
ax[1].axis("off")
plt.tight_layout()
plt.show()

cv2.imwrite("mesh_on_photo.png", cv2.cvtColor(on_photo, cv2.COLOR_RGB2BGR))
cv2.imwrite("mesh_only.png", wire_rgba)
print("Saved: mesh_on_photo.png, mesh_only.png")