# **Chapter 7** - Multimedia and Deepfake Defense

This notebook demonstrates the integration of AI in multimedia, featuring object detection and video annotation using open-source libraries like `Librosa`, `OpenCV`, `SpeechT5` and `YOLOv5`. With these tools, you’ll learn how to analyze video frames, detect objects, and overlay annotations, creating enhanced, interactive visual content. The modular design makes it easy to follow, adapt, and extend for custom applications. Libraries such as pandas and matplotlib add support for structured data handling and visualization.
**Note**: GPU acceleration is recommended for optimal performance for some of the code cells that train AI models.

### Listing 7-1: Download Audio Samples from Github
This code prepares the environment by downloading required audio samples from GitHub. It includes Jerry’s podcast samples and non-Jerry audio files, ensuring they are available locally for training and testing purposes.

**Note 1:** The download process can take a few mins.

**Note 2:** Using WAV files with a sampling rate of 16kHz and Signed 16-bit PCM encoding ensures compatibility with SpeechT5. Consistent format avoids processing errors, maintains audio quality, and allows the model to generate accurate spectrograms. Variations in format can disrupt training and degrade synthesized speech quality.

In [None]:
import requests
import os

# Base GitHub repository URL for audio files
BASE_URL = "https://opensourceai-book.github.io/code/media/"

# List of Jerry's podcast audio samples for training and testing (Label 1)
Jerry_Audio_Files = [
    "L1-Sample01-Jerry.wav",  # Training sample
    "L1-Sample02-Jerry.wav",  # Training sample
    "L1-Sample03-Jerry.wav",  # Training sample
    "L1-Sample04-Jerry.wav",  # Training sample
    "L1-Sample05-Jerry.wav",  # Training sample
    "L1-Sample06-Jerry.wav",  # Training sample
    "L1-Sample07-Jerry.wav",  # Training sample
    "L1-Sample08-Jerry.wav",  # Training sample
    "L1-Sample09-Jerry.wav",  # Training sample
    "L1-Sample10-Jerry.wav",  # Training sample
    "L1-Sample11-Jerry.wav",  # Reserved for test
    "L1-Sample12-Jerry.wav",  # Reserved for test
]

# List of non-Jerry audio samples for training and testing (Label 0)
Non_Jerry_Audio_Files = [
    "L0-Sample01-Adolfo.wav",  # Non-Jerry speaker
    "L0-Sample02-Rama.wav",    # Non-Jerry speaker
    "L0-Sample03-Alex.wav",    # Non-Jerry speaker
    "L0-Sample04-SynthGeorge.wav",  # Synthetic voice
    "L0-Sample05-SynthJerry.wav",   # Synthetic Jerry voice
    "L0-Sample06-SynthJerry.wav",   # Synthetic Jerry voice
    "L0-Sample07-Teresa.wav",  # Non-Jerry speaker
    "L0-Sample08-Blaine.wav",  # Non-Jerry speaker
    "L0-Sample09-Bill.wav",    # Non-Jerry speaker
    "L0-Sample10-Brian.wav",   # Non-Jerry speaker
    "L0-Sample11-Chris.wav",   # Non-Jerry speaker (test)
    "L0-Sample12-George.wav",  # Non-Jerry speaker (test)
]

# Download a file from BASE_URL and save it to the current directory
# if it does not already exist.
def download_file(filename):

    filepath = os.path.join("./", filename)  # Local path in root
    url = BASE_URL + filename
    if not os.path.exists(filepath):  # Check if file exists
        print(f"Downloading {filename} to {filepath}...")
        response = requests.get(url)
        if response.status_code == 200:
            with open(filepath, 'wb') as f:
                f.write(response.content)
            print(f"Downloaded {filename} successfully!")
        else:
            print(f"Failed to download {filename}. "
                  f"Status code: {response.status_code}")
    else:
        print(f"{filename} already exists at {filepath}.")
    return filepath  # Return the full path

# Download files for Label 1 (Jerry's audio files)
print("Processing Label 1 (Jerry's audio files)...")
for file in Jerry_Audio_Files:
    download_file(file)

# Download files for Label 0 (Non-Jerry audio files)
print("\nProcessing Label 0 (Non-Jerry audio files)...")
for file in Non_Jerry_Audio_Files:
    download_file(file)

print("\nAll files are downloaded and ready!")

### Listing 7-2: Audio Feature Extraction and Visualization

The first code cell defines the `extract_audio_features` function, which computes key audio features like MFCC, spectral centroid, and zero-crossing rate. These features provide summary statistics for training. We refer to this as creating an audio **fingerprint**.

The second cell demonstrates how to load an audio file, extract features, and visualize them with plots for MFCCs, spectral centroid, and zero-crossing rate.

In [None]:
import os
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt

# Function to extract audio features (summary statistics for training)
def extract_audio_features(file_path):
    # Load the audio file
    y, sr = librosa.load(file_path, sr=None)

    # Extract MFCCs (Mel Frequency Cepstral Coefficients)
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)

    # Extract spectral centroid (frequency centroid of the spectrum)
    spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)

    # Extract spectral rolloff (frequency below which a set percentage of the total
    # spectral energy is contained)
    spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)

    # Extract spectral bandwidth (spread of the spectrum around the centroid)
    spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)

    # Extract spectral contrast (difference between peaks and valleys in a spectrum)
    spectral_contrast = librosa.feature.spectral_contrast(y=y, sr=sr)

    # Extract zero-crossing rate (rate of sign changes in the signal)
    zcr = librosa.feature.zero_crossing_rate(y)

    # Extract harmonics and noise and calculate HNR (Harmonics-to-Noise Ratio)
    harmonics, noise = librosa.effects.harmonic(y), librosa.effects.percussive(y)
    hnr = np.mean(harmonics) / (np.mean(noise) + 1e-6)  # Avoid division by zero

    # Extract chroma (distribution of pitch classes)
    chroma = librosa.feature.chroma_stft(y=y, sr=sr)

    # Extract RMS (Root Mean Square energy)
    rmse = librosa.feature.rms(y=y)

    # Create feature dictionary with descriptive statistics
    features = {
        # Average MFCC values
        "mfcc_mean": np.mean(mfccs),
        # Standard deviation of MFCC values
        "mfcc_std": np.std(mfccs),
        # Average spectral centroid
        "spectral_centroid_mean": np.mean(spectral_centroid),
        # Average spectral rolloff
        "spectral_rolloff_mean": np.mean(spectral_rolloff),
        # Avg spectral bandwidth
        "spectral_bandwidth_mean": np.mean(spectral_bandwidth),
        # Avg spectral contrast
        "spectral_contrast_mean": np.mean(spectral_contrast),
        # Std dev of spectral contrast
        "spectral_contrast_std": np.std(spectral_contrast),
        # Average zero-crossing rate
        "zcr_mean": np.mean(zcr),
        # Harmonics-to-Noise Ratio
        "chroma_mean": np.mean(chroma),
        # Average chroma features
        "hnr": hnr,
        # Average RMS energy
        "rmse_mean": np.mean(rmse),
    }
    return features

#### Cell 2 - Plot Basic Audio Features

In [None]:
# Audio sample to plot basic features
file_path = "L1-Sample01-Jerry.wav"

# Download sample, if not done already
download_file(file_path)

# Extract features
print("Extracting audio features...")
features = extract_audio_features(file_path)
print("Extracted Features:", features)

# Load audio for visualization
y, sr = librosa.load(file_path, sr=None)

# Plot MFCCs
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
plt.figure(figsize=(10, 4))
librosa.display.specshow(mfccs, x_axis="time", sr=sr, cmap="coolwarm")
plt.colorbar()
plt.title("MFCCs")
plt.tight_layout()
plt.show()

# Plot Spectral Centroid
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
plt.figure(figsize=(10, 4))
plt.plot(spectral_centroid[0], label="Spectral Centroid")
plt.legend()
plt.title("Spectral Centroid")
plt.ylabel("Hz")
plt.xlabel("Frames")
plt.tight_layout()
plt.show()

# Plot Zero-Crossing Rate
zcr = librosa.feature.zero_crossing_rate(y)
plt.figure(figsize=(10, 4))
plt.plot(zcr[0], label="Zero-Crossing Rate")
plt.legend()
plt.title("Zero-Crossing Rate")
plt.xlabel("Frames")
plt.tight_layout()
plt.show()

### Listing 7-3: Train Jerry Audio Detection Model
This program trains a logistic regression model to distinguish Real Jerry audio from other audio. It uses extracted audio features, standardizes them, splits into train-test sets, and evaluates accuracy.

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report

# Define training and test samples explicitly. Use all but last two as training
train_Jerry_Audio_Files = Jerry_Audio_Files[:-2]
train_Non_Jerry_Audio_Files = Non_Jerry_Audio_Files[:-2]

test_Jerry_Audio_Files = Jerry_Audio_Files[-2:]
test_Non_Jerry_Audio_Files = Non_Jerry_Audio_Files[-2:]

# Combine for labels
train_labels = [1] * len(train_Jerry_Audio_Files) + [0] * len(train_Non_Jerry_Audio_Files)
test_labels = [1] * len(test_Jerry_Audio_Files) + [0] * len(test_Non_Jerry_Audio_Files)

# Extract features for training and test files
train_files = train_Jerry_Audio_Files + train_Non_Jerry_Audio_Files
test_files = test_Jerry_Audio_Files + test_Non_Jerry_Audio_Files

print(f"Total training files: {len(train_files)}")
print(f"Total test files: {len(test_files)}")

# Extract features
train_features = np.array([
    list(extract_audio_features(file).values()) for file in train_files
])

test_features = np.array([
    list(extract_audio_features(file).values()) for file in test_files
])

print(f"Extracted training features shape: {train_features.shape}")
print(f"Extracted test features shape: {test_features.shape}")

# Standardize training and test features
scaler = StandardScaler()
train_features_normalized = scaler.fit_transform(train_features)
test_features_normalized = scaler.transform(test_features)
print("Feature normalization completed.")

# Train logistic regression model
model = LogisticRegression(random_state=42)
model.fit(train_features_normalized, train_labels)
print("Model training completed.")  # Debug: Training step

# Predict on the test set
y_pred = model.predict(test_features_normalized)

# Evaluate model on test set
print("Test Accuracy:", accuracy_score(test_labels, y_pred) * 100, "%")
print("Classification Report:")
print(
    classification_report(
        test_labels, y_pred, target_names=["Not Real Jerry", "Real Jerry"],
        zero_division=1
    )
)

# Print specific predictions
print("Test Set Predictions:")
for file, true_label, pred_label in zip(
    test_files, test_labels, y_pred
):
    true_class = "Real Jerry" if true_label == 1 else "Not Real Jerry"
    predicted_class = "Real Jerry" if pred_label == 1 else "Not Real Jerry"
    print(f"File: {file}, True: {true_class}, Predicted: {predicted_class}")


### Listing 7-4: Transcribe Jerry's Real Audio to Text
This program downloads Real Jerry audio files, transcribes them using OpenAI's Whisper model, and saves the results in a dictionary for further use in other programs.

In [None]:
# Import necessary libraries
from transformers import pipeline, WhisperProcessor, WhisperForConditionalGeneration
import librosa
import torch
import pandas as pd

# Function to transcribe audio files using Whisper model
def transcribe_audio_files(file_list, output_csv="transcriptions.csv"):
    """
    Transcribe audio files using Whisper model and save the filename and
    transcription to a CSV file.
    """
    results = []

    # Load Whisper model and processor
    print("Loading Whisper model...")
    try:
        processor = WhisperProcessor.from_pretrained("openai/whisper-small")
        model = WhisperForConditionalGeneration.from_pretrained(
            "openai/whisper-small"
        )
        model = model.to("cuda" if torch.cuda.is_available() else "cpu")
        print("Whisper model loaded successfully!")
    except Exception as e:
        print(f"Error loading Whisper model: {e}")
        return None

    # Process each file
    for file in file_list:
        print(f"Processing {file}...")

        try:
            # Load and preprocess audio
            audio, sr = librosa.load(file, sr=16000)  # Ensure 16 kHz sampling rate
            inputs = processor(
                audio, sampling_rate=16000, return_tensors="pt", language="en"
            ).input_features
            inputs = inputs.to(model.device)

            # Transcribe the audio
            predicted_ids = model.generate(inputs)
            transcription = processor.batch_decode(
                predicted_ids, skip_special_tokens=True
            )[0]

            results.append({"filename": file, "transcription": transcription})
            print(f"Transcription for {file}: {transcription}")
        except Exception as e:
            print(f"Error transcribing {file}: {e}")
            results.append({"filename": file, "transcription": None})

    # Save results to CSV
    df = pd.DataFrame(results)
    df.to_csv(output_csv, index=False)
    print(f"Transcriptions saved to {output_csv}")

    return results

# Transcribe Real Jerry files
real_jerry_transcriptions = transcribe_audio_files(Jerry_Audio_Files)

# Print transcriptions
if real_jerry_transcriptions:
    print("\n--- Transcriptions ---")
    for entry in real_jerry_transcriptions:
        print(f"{entry['filename']}: {entry['transcription']}")
else:
    print("No transcriptions available due to an error.")

## Listing 7-5: Voice Cloning Listings

This section
It includes the following steps:

1. **Step 1:** Installs the necessary libraries and checks for GPU availability.
2. **Step 2:** Dataset Preparation and Embedding.
3. **Step 3:** Fine-Tuning the SpeehT5 Model.
4. **Step 4:** Testing Jerry's cloned voice. How does it sound?
5. **Step 5:** Comparing Feature Differences: Real vs. Cloned

### Step 1 - Prerequisite Setup
This section installs the necessary libraries and checks for GPU availability to prepare the environment for using SpeechT5 and HiFi-GAN for text-to-speech synthesis.

It includes the following steps:

1. **Install Libraries:** Installs `datasets`, `soundfile`, `speechbrain`, `transformers`, and `accelerate` using `pip`.
2. **Check GPU:** Verifies the availability of a GPU using `nvidia-smi`.
3. **Import Libraries:** Imports the required modules from `transformers` and `torch`.
4. **Load Models:** Loads the SpeechT5 processor, model, and HiFi-GAN vocoder.
5. **Device Setup:** Checks for GPU availability and moves the model and vocoder to the appropriate device (GPU or CPU).

In [None]:
# Install necessary libraries
!pip install datasets soundfile speechbrain
!pip install git+https://github.com/huggingface/transformers.git
!pip install --upgrade accelerate

# Check GPU availability
!nvidia-smi

In [None]:
# Import necessary libraries
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech
from transformers import SpeechT5HifiGan
import torch

# Load the SpeechT5 processor and model
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")

# Load the HiFi-GAN vocoder for converting spectrograms to audio
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

# Check if GPU is available and move the model to GPU if possible
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
vocoder = vocoder.to(device)

print("Prerequisite setup is complete. SpeechT5 and HiFi-GAN are loaded.")

### Step 2 - Dataset Preparation and Embedding
This program processes audio files and their transcripts to create a dataset for voice cloning. It integrates audio features, transcriptions, and speaker embeddings into a Hugging Face Dataset, ready for training or testing voice cloning models. Long samples are filtered, and the dataset is split into train-test subsets.

In [None]:
import os
import pandas as pd
from datasets import Dataset, Audio
from speechbrain.pretrained import EncoderClassifier
import torch

# Step 2: Data Preparation
print("Step 2: Starting data preparation...")

# Define directories
root_dir = "./"  # Root directory for audio files and dataset
audio_files_dir = root_dir  # Audio files are now in the root directory
dataset_dir = os.path.join(root_dir, "processed_dataset")  # Directory to save processed dataset

# Ensure directories exist
os.makedirs(dataset_dir, exist_ok=True)
print(f"Directories set up: - Dataset: {dataset_dir}")

# Load transcriptions from CSV
csv_path = "transcriptions.csv"
if not os.path.exists(csv_path):
    raise FileNotFoundError(f"Transcriptions file not found: {csv_path}")

transcriptions_df = pd.read_csv(csv_path)
if "filename" not in transcriptions_df.columns or "transcription" not in transcriptions_df.columns:
    raise ValueError("CSV must contain 'filename' and 'transcription' columns.")

# Prepare data from the CSV
file_paths = [os.path.join(audio_files_dir, file) for file in transcriptions_df["filename"]]
transcriptions = transcriptions_df["transcription"].tolist()

# Create Hugging Face Dataset
print("Creating Hugging Face dataset...")
data = {"file_path": file_paths, "text": transcriptions}
dataset = Dataset.from_dict(data)

# Add audio information to the dataset
print("Casting audio files for dataset processing...")
dataset = dataset.cast_column("file_path", Audio(sampling_rate=16000))

# Load the SpeechBrain speaker embedding model
print("Loading speaker embedding model...")
spk_model_name = "speechbrain/spkrec-xvect-voxceleb"
speaker_model = EncoderClassifier.from_hparams(
    source=spk_model_name,
    run_opts={"device": "cuda" if torch.cuda.is_available() else "cpu"},
    savedir=os.path.join("/tmp", spk_model_name)
)

# Function to process each dataset example
def prepare_dataset(example):
    audio = example["file_path"]
    # Extract features and tokenize text
    example = processor(
        text=example["text"],
        audio_target=audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_attention_mask=False,
    )
    example["labels"] = example["labels"][0]

    # Generate speaker embedding
    example["speaker_embeddings"] = speaker_model.encode_batch(
        torch.tensor(audio["array"]).unsqueeze(0).to("cuda" if torch.cuda.is_available() else "cpu")
    ).squeeze().cpu().numpy()

    return example

# Process the dataset
print("Processing dataset to tokenize and add speaker embeddings...")
dataset = dataset.map(prepare_dataset, remove_columns=["file_path"])

# List to store removed examples
removed_examples = []

# Adjusted filtering function
def is_not_too_long(example):
    input_length = len(example["input_ids"])
    if input_length >= 200:
        removed_examples.append({"text": example["text"], "input_length": input_length})
    return input_length < 200

# Filter dataset and log removed examples
print("Filtering out long examples and logging removed samples...")
dataset = dataset.filter(is_not_too_long)

# Log the removed examples
if removed_examples:
    print(f"Removed {len(removed_examples)} examples for exceeding the token limit:")
    for i, example in enumerate(removed_examples):
        print(f"{i+1}. Length: {example['input_length']} - Text: {example['text'][:50]}...")
else:
    print("No examples were removed for being too long.")

# Split dataset into train and test sets
print("Splitting dataset into train and test sets...")
dataset = dataset.train_test_split(test_size=0.1)

# Save processed dataset
print(f"Saving processed dataset to: {dataset_dir}")
dataset.save_to_disk(dataset_dir)

print("Data preparation complete!")


### Step 3 - Fine-Tuning the Model
This program fine-tunes a SpeechT5 model for text-to-speech conversion using a processed dataset. It includes a custom data collator for speaker embeddings, trains the model with Hugging Face's Seq2SeqTrainer, and saves the fine-tuned model and processor to the Hugging Face Hub.

In [None]:
# Set Hugging Face token
import os
os.environ["HF_TOKEN"] = "Your_token_goes_here"
print("Hugging Face token set successfully.")

In [None]:
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
from datasets import DatasetDict
from dataclasses import dataclass
from typing import Any, Dict, List, Union
import torch
import os

# Step 3: Fine-Tuning the Model
print("Step 3: Starting fine-tuning process...")

# Load processed dataset as DatasetDict
print("Loading processed dataset...")
dataset_dir = "./processed_dataset"
dataset = DatasetDict.load_from_disk(dataset_dir)
print("Dataset loaded successfully.")

@dataclass
class TTSDataCollatorWithPadding:
    """
    Custom collator class to handle padding and preparing batches
    for SpeechT5 training.
    """
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        input_ids = [{"input_ids": feature["input_ids"]} for feature in features]
        label_features = [{"input_values": feature["labels"]} for feature in features]
        speaker_features = [feature["speaker_embeddings"] for feature in features]

        # Collate inputs and labels into batches
        batch = self.processor.pad(
            input_ids=input_ids,
            labels=label_features,
            return_tensors="pt",
        )

        # Replace padding with -100 for correct loss masking
        batch["labels"] = batch["labels"].masked_fill(
            batch.decoder_attention_mask.unsqueeze(-1).ne(1), -100
        )

        # Remove unused keys
        del batch["decoder_attention_mask"]

        # Adjust target lengths for reduction factor
        if model.config.reduction_factor > 1:
            target_lengths = torch.tensor([
                len(feature["input_values"]) for feature in label_features
            ])
            target_lengths = target_lengths.new([
                length - length % model.config.reduction_factor
                for length in target_lengths
            ])
            max_length = max(target_lengths)
            batch["labels"] = batch["labels"][:, :max_length]

        # Add speaker embeddings to the batch
        batch["speaker_embeddings"] = torch.tensor(speaker_features)

        return batch

# Initialize the data collator
data_collator = TTSDataCollatorWithPadding(processor=processor)

# Configure training arguments
print("Configuring training arguments...")
training_args = Seq2SeqTrainingArguments(
    output_dir="./speecht5_finetuned_model",
    push_to_hub=True,
    hub_token=os.environ["HF_TOKEN"],
    per_device_train_batch_size=8,
    gradient_accumulation_steps=4,
    learning_rate=1e-4,
    warmup_steps=500,
    max_steps=2000,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    save_steps=500,
    eval_steps=500,
    logging_steps=50,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    greater_is_better=False,
    label_names=["labels"],
)

print("Initializing the trainer...")
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor.tokenizer,
)

# Initialize the trainer
print("Initializing the trainer...")
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    tokenizer=processor.tokenizer,
)

# Start training
print("Starting training...")
trainer.train()

# Save the fine-tuned model and processor
print("Saving fine-tuned model and processor...")
trainer.push_to_hub(
    dataset_tags="custom_speech_dataset",
    model_name="SpeechT5 Fine-Tuned on Custom Dataset",
    dataset="Custom Speech Dataset",
    language="en",
    tasks="text-to-speech",
)
processor.save_pretrained("./speecht5_finetuned_model")
print("Fine-tuning process complete!")

### Step 4: Testing and Play Synthesized Speech
This program tests the fine-tuned SpeechT5 model by generating a spectrogram and converting it to audio using a vocoder. It synthesizes speech from custom text input using the selected speaker embedding, visualizes the spectrogram, and saves the generated audio file for playback.

In [None]:
import torch
import matplotlib.pyplot as plt
from IPython.display import Audio
from transformers import (
    SpeechT5Processor,
    SpeechT5ForTextToSpeech,
    SpeechT5HifiGan
)

# Step 4: Testing the Fine-Tuned Model
print("Step 4: Testing the fine-tuned model...")

# Load the fine-tuned model and processor
print("Loading the fine-tuned model and vocoder...")
model_name = "./speecht5_finetuned_model"  # Path to fine-tuned model
processor = SpeechT5Processor.from_pretrained(model_name)
model = SpeechT5ForTextToSpeech.from_pretrained(model_name)
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

# Ensure model is on the correct device
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
vocoder = vocoder.to(device)

# Select a speaker embedding
print("Using speaker embedding from the fine-tuned dataset...")
example = dataset["test"][0]  # Select the first test example
speaker_embeddings = torch.tensor(
    example["speaker_embeddings"]
).unsqueeze(0).to(device)

# Define the input text for synthesis
text = ("Hey ladies and gentlemen, thank you for tuning in to the "
        "Wild Ducks podcast featuring your host, Jerry Cuomo.")

# Tokenize the input text
inputs = processor(text=text, return_tensors="pt")

# Generate the spectrogram
print("Generating spectrogram...")
spectrogram = model.generate_speech(
    inputs["input_ids"].to(device), speaker_embeddings
)

# Visualize the spectrogram
plt.figure()
plt.imshow(
    spectrogram.squeeze().cpu().numpy().T,
    aspect="auto", origin="lower"
)
plt.title("Generated Spectrogram")
plt.show()

# Convert the spectrogram to audio using the vocoder
print("Converting spectrogram to audio...")
with torch.no_grad():
    audio = vocoder(spectrogram)

# Save and play the generated audio
audio_output_path = "Jerry-Cloned-Sample01.wav"
print(f"Saving synthesized audio to: {audio_output_path}")
import soundfile as sf
sf.write(
    audio_output_path,
    audio.squeeze().cpu().numpy(),
    samplerate=16000
)

print("Playing synthesized audio...")
Audio(audio_output_path, rate=16000)

### Step 5 - Comparing Real Audio versus Memorex (Cloned)

This program extracts audio features from real and cloned samples, computes their differences, and visualizes them in a bar chart. The plot highlights subtle variations in the audio fingerprints, helping identify key features where synthetic audio deviates from real recordings.


In [None]:
import pandas as pd

# Function to extract audio features remains the same as provided above

# Paths to real and cloned audio samples
real_audio_path = "L1-Sample11-Jerry.wav"
cloned_audio_path = "Jerry-Cloned-Sample01.wav"

# Extract features for real and cloned audio
real_features = extract_audio_features(real_audio_path)
cloned_features = extract_audio_features(cloned_audio_path)

# Create a DataFrame for easy comparison
feature_df = pd.DataFrame([real_features, cloned_features], index=["Real", "Cloned"])

# Normalize features for comparison (min-max scaling)
normalized_feature_df = (feature_df - feature_df.min()) / (feature_df.max() - feature_df.min())

# Plot feature comparison
import matplotlib.pyplot as plt

# Compute feature-wise differences
feature_differences = feature_df.loc["Real"] - feature_df.loc["Cloned"]

# Plot the feature differences
plt.figure(figsize=(12, 6))
feature_differences.plot(kind="bar", color="red", edgecolor="black")

# Add titles and labels
plt.title("Feature-wise Differences: Real vs. Cloned Audio")
plt.ylabel("Difference in Feature Value")
plt.xlabel("Audio Features")
plt.xticks(rotation=45, ha="right")

# Add grid for better readability
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Show the plot
plt.tight_layout()
plt.show()

## Video Analysis

### Listing 7-6: Scene Detection with OpenCV
Detects video scenes using `OpenCV`. Outputs the scene number, with start and start times, and scene duration.

** Note:** This program attempts to download a sample video file from this books Github. If you get an error downloading, simply rerun the first code cell in this notebook that defines the `download_file` function.

In [None]:
%pip install opencv-python numpy torch torchvision scenedetect

In [None]:
import os
from scenedetect import SceneManager
from scenedetect.detectors import ContentDetector
from scenedetect.backends.opencv import VideoCaptureAdapter
import cv2  # For OpenCV VideoCapture

# Input video file path
INPUT_VIDEO_FILE = "Jerry-Video-Sample01.mp4"
download_file(INPUT_VIDEO_FILE)  # Ensure the file is downloaded

# Check if the input video file exists
if not os.path.exists(INPUT_VIDEO_FILE):
    print(f"Error: File '{INPUT_VIDEO_FILE}' not found.")
    exit()

# Initialize OpenCV VideoCapture for reading the video
video_capture = cv2.VideoCapture(INPUT_VIDEO_FILE)
if not video_capture.isOpened():
    print(f"Error: Unable to open video file '{INPUT_VIDEO_FILE}'.")
    exit()

# Create a VideoCaptureAdapter for SceneDetect compatibility
video_adapter = VideoCaptureAdapter(video_capture)

# Initialize SceneManager for scene detection
scene_manager = SceneManager()

# Add a ContentDetector to detect scene transitions
# Lower threshold (e.g., 15.0) = more sensitive to changes
scene_manager.add_detector(ContentDetector(threshold=15.0))

# Perform scene detection on the video
scene_manager.detect_scenes(video_adapter)

# Retrieve the list of detected scenes with start and end times
scene_list = scene_manager.get_scene_list()

# Filter scenes to exclude those shorter than 3 seconds
filtered_scene_list = [
    (start, end)
    for start, end in scene_list
    if (end - start).get_seconds() >= 3  # Minimum scene length filter
]

# Output the number of detected scenes
print(f"Detected {len(filtered_scene_list)} scenes.")

# Print details of each filtered scene
for i, (start_time, end_time) in enumerate(filtered_scene_list):
    print(f"Scene {i + 1}: Start - {start_time}, End - {end_time}")

# Release the video capture resource
video_capture.release()

### Listing 7-7: Video Object Detection and Annotation

The code demonstrates using `YOLOv5` for real-time object detection on video frames. It processes each frame, detects objects, annotates with bounding boxes and labels, and saves the output video.


In [None]:
import cv2
import torch
from torchvision.transforms import functional as F
import warnings

# Suppress specific warnings, such as FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)

# Load the pre-trained YOLOv5 model for object detection
# 'yolov5s' is a small, pre-trained YOLOv5 model optimized for speed
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')

# Define input video file and output video file paths
INPUT_VIDEO_FILE = "Jerry-Video-Sample01.mp4"  # Input video to process
OUTPUT_VIDEO_FILE = "Jerry-Video-Sample02.mp4"  # Annotated output video

# Ensure the input video file is downloaded or exists
download_file(INPUT_VIDEO_FILE)

# Load the input video using OpenCV
cap = cv2.VideoCapture(INPUT_VIDEO_FILE)
if not cap.isOpened():
    print(f"Error: Unable to open video file '{INPUT_VIDEO_FILE}'.")
    exit()

# Get video properties: width, height, and frames per second (FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Initialize the video writer to save the output video with annotations
out = cv2.VideoWriter(OUTPUT_VIDEO_FILE,
                      cv2.VideoWriter_fourcc(*'mp4v'),
                      fps, (width, height))

# Process the video frame by frame
while cap.isOpened():
    ret, frame = cap.read()  # Read the next frame
    if not ret:  # If no more frames are available, exit the loop
        break

    # Convert the frame to RGB format (required by YOLOv5)
    results = model(frame)  # Perform object detection on the frame

    # Get detection results as a pandas DataFrame
    detected_objects = results.pandas().xyxy[0]

    # Annotate the frame with bounding boxes and labels
    for _, row in detected_objects.iterrows():
        # Extract bounding box coordinates and object details
        x1, y1 = int(row['xmin']), int(row['ymin'])
        x2, y2 = int(row['xmax']), int(row['ymax'])
        conf, cls = row['confidence'], row['name']
        label = f"{cls} {conf:.2f}"  # Format label with class and confidence

        # Draw bounding box on the frame
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # Add label above the bounding box
        cv2.putText(frame, label, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5, (255, 0, 0), 2)

    # Write the annotated frame to the output video
    out.write(frame)

# Release video resources after processing
cap.release()  # Release input video
out.release()  # Save the output video
cv2.destroyAllWindows()  # Close OpenCV windows