In [5]:
!pip install datasets[audio] torchaudio librosa pyannote.audio kaggle onnxruntime



## Loading dataset from kaggle

Ensure that you have [kaggle key](https://www.kaggle.com/docs/api) in your directory

In [None]:
!mkdir -p ~/.kaggle
!cp /content/kaggle.json ~/.kaggle/ # path to your kaggle key
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d birdy654/deep-voice-deepfake-voice-recognition

Downloading deep-voice-deepfake-voice-recognition.zip to /content
100% 3.68G/3.69G [02:59<00:00, 21.1MB/s]
100% 3.69G/3.69G [02:59<00:00, 22.1MB/s]


In [None]:
!unzip /content/deep-voice-deepfake-voice-recognition.zip

Archive:  /content/deep-voice-deepfake-voice-recognition.zip
  inflating: DEMONSTRATION/DEMONSTRATION/linus-original-DEMO.mp3  
  inflating: DEMONSTRATION/DEMONSTRATION/linus-to-musk-DEMO.mp3  
  inflating: KAGGLE/AUDIO/FAKE/Obama-to-Biden.wav  
  inflating: KAGGLE/AUDIO/FAKE/Obama-to-Trump.wav  
  inflating: KAGGLE/AUDIO/FAKE/biden-to-Obama.wav  
  inflating: KAGGLE/AUDIO/FAKE/biden-to-Trump.wav  
  inflating: KAGGLE/AUDIO/FAKE/biden-to-linus.wav  
  inflating: KAGGLE/AUDIO/FAKE/biden-to-margot.wav  
  inflating: KAGGLE/AUDIO/FAKE/biden-to-musk.wav  
  inflating: KAGGLE/AUDIO/FAKE/biden-to-ryan.wav  
  inflating: KAGGLE/AUDIO/FAKE/biden-to-taylor.wav  
  inflating: KAGGLE/AUDIO/FAKE/linus-to-biden.wav  
  inflating: KAGGLE/AUDIO/FAKE/linus-to-margot.wav  
  inflating: KAGGLE/AUDIO/FAKE/linus-to-musk.wav  
  inflating: KAGGLE/AUDIO/FAKE/linus-to-obama.wav  
  inflating: KAGGLE/AUDIO/FAKE/linus-to-ryan.wav  
  inflating: KAGGLE/AUDIO/FAKE/linus-to-taylor.wav  
  inflating: KAGGLE/AUDIO/

## Augmenting real data
since we have less files with human audio, we generate augmented samples of those

In [6]:
import librosa
import numpy as np
import os
import random
import soundfile as sf

# Function to load audio file using Librosa
def load_audio(file_path, target_sr=16000):
    audio, _ = librosa.load(file_path, sr=target_sr)
    return audio

# Function to add random noise to audio
def add_noise(audio, noise_level=0.005):
    noise = np.random.normal(0, noise_level, len(audio))
    augmented_audio = audio + noise
    return augmented_audio

# Function to perform time stretching on audio
def time_stretch(audio, rate=1.2):
    augmented_audio = librosa.effects.time_stretch(audio, rate=rate)
    return augmented_audio

# Function to perform pitch shifting on audio
def pitch_shift(audio, semitone_steps=2):
    augmented_audio = librosa.effects.pitch_shift(audio, sr=16000, n_steps=semitone_steps)
    return augmented_audio

# Function to save augmented audio
def save_audio(audio, output_path, sr=16000):
  """Saves augmented audio using soundfile."""
  sf.write(output_path, audio, sr, subtype='PCM_16')


# Function to augment audio and save the augmented samples
def augment_and_save(input_folder, output_folder, num_augmentations=5):
    # Ensure output folder exists
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Iterate through audio files in the input folder
    for filename in os.listdir(input_folder):
        if filename.endswith(".wav"):
            file_path = os.path.join(input_folder, filename)
            audio = load_audio(file_path)

            # Augment and save multiple times
            for i in range(num_augmentations):
                augmented_audio = audio

                # Apply random augmentation
                augmentation_type = random.choice(['noise', 'time_stretch', 'pitch_shift'])
                if augmentation_type == 'noise':
                    augmented_audio = add_noise(augmented_audio)
                elif augmentation_type == 'time_stretch':
                    augmented_audio = time_stretch(augmented_audio)
                elif augmentation_type == 'pitch_shift':
                    augmented_audio = pitch_shift(augmented_audio)

                # Save augmented audio
                output_filename = f"{os.path.splitext(filename)[0]}_aug_{i+1}.wav"
                output_path = os.path.join(output_folder, output_filename)
                save_audio(augmented_audio, output_path)

# Example usage
input_folder = "/content/KAGGLE/AUDIO/REAL"
output_folder = "/content/KAGGLE/AUDIO/REAL"
augment_and_save(input_folder, output_folder, num_augmentations=3)


## Loading audio files
The code block below loads the audio files from their respective direcories using the load_dataset() function from HuggingFace. The dataset is then converted to a pandas dataframe to add labels (we can do this directly on the dataset using map() function but that is very slow). Next, the datasets are concatenated, and the audio arrays are padded to match the max length. Finally, the dataset is split into training, validation and test sets, and loaded in PyTorch DataLoaders.

In [8]:
# set the paths
real_data_dir = Path("/content/KAGGLE/AUDIO/REAL")
fake_data_dir = Path("/content/KAGGLE/AUDIO/FAKE")

# Load the datasets
dataset_real = load_dataset("audiofolder", data_dir=real_data_dir)['train']
dataset_fake = load_dataset("audiofolder", data_dir=fake_data_dir)['train']

# Convert to pandas DataFrame
df_real = dataset_real.to_pandas()
df_fake = dataset_fake.to_pandas()

# Add label column
df_real['label'] = 1
df_fake['label'] = 0

# Convert back to datasets
dataset_real = Dataset.from_pandas(df_real)
dataset_fake = Dataset.from_pandas(df_fake)

# Combine the datasets
combined_dataset = concatenate_datasets([dataset_real, dataset_fake])

# Convert audio to a sample rate of 16000
combined_dataset = combined_dataset.cast_column("audio", Audio(sampling_rate=16000, mono=True))

# Define a function to pad the audio to the max length
def pad_collate(batch):
    # Find the max length of the audio
    max_length = max([sample['audio']['array'].shape[-1] for sample in batch])
    # Pad the audio to the max length
    padded_audio = [torch.nn.functional.pad(torch.tensor(sample['audio']['array']), (0, max_length - sample['audio']['array'].shape[-1]), mode='constant', value=0) for sample in batch]
    # Get the labels
    labels = torch.tensor([sample['label'] for sample in batch])
    # Return the padded audio and labels
    return {'audio': torch.stack(padded_audio), 'label': labels}

# Perform the random split for training and validation datasets
train_size = int(0.8 * len(combined_dataset))
val_size = len(combined_dataset) - train_size
train_dataset, val_dataset = random_split(combined_dataset, [train_size, val_size])

# Further split the validation set into validation and test sets
val_size = int(0.5 * len(val_dataset))
test_size = len(val_dataset) - val_size
val_dataset, test_dataset = random_split(val_dataset, [val_size, test_size])

# Create DataLoaders for the training, validation, and test datasets
train_dataloader = DataLoader(train_dataset, batch_size=1, collate_fn=pad_collate)
val_dataloader = DataLoader(val_dataset, batch_size=1, collate_fn=pad_collate)
test_dataloader = DataLoader(test_dataset, batch_size=1, collate_fn=pad_collate)

Resolving data files:   0%|          | 0/32 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

Resolving data files:   0%|          | 0/56 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

### Log in to HuggingFace
This is done to download the pre-trained model from huggingface

In [None]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

## Download the pre-trained model
For this example, we are using the WeSpeaker model, pre-trained on voxceleb dataset, wrapped by pyannote-audio library.

In [9]:
from pyannote.audio import Model

base_model = Model.from_pretrained("pyannote/wespeaker-voxceleb-resnet34-LM")

  torchaudio.set_audio_backend("soundfile")
  torchaudio.set_audio_backend("soundfile")


pytorch_model.bin:   0%|          | 0.00/26.6M [00:00<?, ?B/s]

config.yaml:   0%|          | 0.00/221 [00:00<?, ?B/s]

In [10]:
import torch
# setting device to GPU if available, othervise CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

## Preparing for fine-tuning
Freeze all the layers of the base model.
Add a classifier head for Binary Classification.

In [11]:
from torch import nn

class WeSpeakerResNet34WithClassifier(nn.Module):
    def __init__(self, model: nn.Module, num_classes=2):
        super().__init__()

        # Move the model to GPU if available
        self.base_model = model.to(device)

        # Freeze the parameters of the base model
        for param in self.base_model.parameters():
            param.requires_grad = False

        self.classifier = nn.Sequential(
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(32, num_classes)
        ).to(device)

    def forward(self, x):
        x = self.base_model(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate the custom network with the base model and specify the number of classes
model = WeSpeakerResNet34WithClassifier(num_classes=2, model=base_model)


In [12]:
import torch
from torch import nn

class WeSpeakerResNet34WithLSTMClassifier(nn.Module):
    def __init__(self, model: nn.Module, hidden_size=128, num_classes=2):
        super().__init__()

        # Move the model to GPU if available
        self.base_model = model.to(device)

        # Freeze the parameters of the base model
        for param in self.base_model.parameters():
            param.requires_grad = False

        # Add an LSTM layer before the linear classifier
        self.lstm = nn.LSTM(256, hidden_size, batch_first=True)

        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        ).to(device)

    def forward(self, x):
        x = self.base_model(x)
        x = x.view(x.size(0), -1)

        # LSTM layer
        lstm_out, _ = self.lstm(x.unsqueeze(1))  # Adding an additional dimension for time steps

        # Get the output of the last time step from the LSTM
        lstm_last_output = lstm_out[:, -1, :]

        # Linear classifier
        x = self.classifier(lstm_last_output)

        return x

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define hidden size for the LSTM
hidden_size = 256  # Adjust according to your desired hidden size

# Instantiate the custom network with the base model and LSTM classifier
model_with_lstm = WeSpeakerResNet34WithLSTMClassifier(model=base_model, hidden_size=hidden_size, num_classes=2).to(device)


In [None]:
for i, (audio, label) in enumerate(train_dataloader):
  audio, label = audio.to(device), label.to(device)
  print(audio.dtype)
  print(label.dtype)
  break

torch.float32
torch.int64


# Run the model through a training loop

In [13]:
import torch
import torch.nn as nn

def train_model(model, train_dataloader, num_epochs=5, learning_rate=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.classifier.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        print(f"Epoch number: {epoch + 1}")

        running_loss = 0.0
        correct = 0
        total = 0

        # Training loop using the DataLoader
        for i, (audio, label) in enumerate(train_dataloader):
            # Move data to GPU if available
            audio = batch["audio"].unsqueeze(0).float().to(device)
            label = batch['label'].long().to(device)

            # Forward pass through the model
            output = model(audio)
            # Compute the loss between the model's output and the ground truth labels
            loss = criterion(output, label)
            # Track running loss for monitoring training progress
            running_loss += loss.item()
            # Zero the gradients to prevent accumulation
            optimizer.zero_grad()
            # Backward pass to compute gradients
            loss.backward()
            # Update model parameters using the optimizer
            optimizer.step()

            # Compute accuracy metrics
            _, predicted = torch.max(output.data, 1)
            total += label.size(0)  # Accumulate the total number of samples processed
            correct += (predicted == label).sum().item()  # Accumulate the number of correct predictions

            if i % 9 == 0:
              print(f"{i+1} out of {len(train_dataloader)}")

        # Calculate average loss and accuracy at the end of each epoch
        average_loss = running_loss / len(train_dataloader)
        accuracy = correct / total * 100 if total != 0 else 0

        print(f"Epoch [{epoch + 1}/{num_epochs}], Average Loss: {average_loss:.4f}, Accuracy: {accuracy:.2f}%")


In [None]:
# train_model(model_with_lstm, train_dataloader, num_epochs=10)

In [None]:
train_model(model, train_dataloader, num_epochs=3)

Epoch number: 1


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt


# Function to plot a confusion matrix
def plot_confusion_matrix(y_true, y_pred, labels):
    # Compute the confusion matrix using scikit-learn's confusion_matrix function
    cm = confusion_matrix(y_true, y_pred, labels=labels)

    # Create a figure for the plot with specified size
    plt.figure(figsize=(8, 6))

    # Plot the confusion matrix as a heatmap with annotations
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)

    # Set labels for the x and y axes
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')

    # Set the title of the plot
    plt.title('Confusion Matrix')

    # Display the plot
    plt.show()


def process_audio_batch(model, dataloader, labels):
    model.eval()  # Set model to evaluation mode

    # initializing empty lists to store prediction and ground truth
    predictions = []
    ground_truths = []

    # Pass each batch through the model without gradient calculation
    for audio, label in dataloader:
        # Move data to GPU if available
        audio, label = audio.to(device), label.to(device)
        audio = audio.unsqueeze(0)
        # run the model in inference mode
        with torch.no_grad():
            output = model(audio)
        # add the predictions and ground truth to the list
        predictions.extend(output.argmax(dim=1).tolist())
        ground_truths.extend(label.tolist())

    # Check if all labels are present in the ground truth
    unique_labels = set(ground_truths + predictions)
    if not set(labels).issubset(unique_labels):
        # print("Warning: Not all specified labels are present in the ground truth.")
        # print(f"Present labels in ground truth: {unique_labels}")
        labels = list(unique_labels)

    # Calculate metrics after processing all batches
    accuracy = accuracy_score(ground_truths, predictions)
    precision = precision_score(ground_truths, predictions, average='weighted')
    recall = recall_score(ground_truths, predictions, average='weighted')
    f1 = f1_score(ground_truths, predictions, average='weighted')

    # Create confusion matrix
    plot_confusion_matrix(ground_truths, predictions, labels)

    return predictions, ground_truths, accuracy, precision, recall, f1

In [None]:
labels = ["fake", "real"]
_, _, train_accuracy, train_precision, train_recall, train_f1 = process_audio_batch(model, train_dataloader, labels)
print(f"Accuracy: {train_accuracy}\nPrecision: {train_precision}\nRecall: {train_recall}\nF1 Score: {train_f1}")

KeyboardInterrupt: 

In [None]:
prediction, ground_truth, test_accuracy, test_precision, test_recall, test_f1 = process_audio_batch(model, test_dataloader, labels)
print(f"Accuracy: {test_accuracy}\nPrecision: {test_precision}\nRecall: {test_recall}\nF1 Score: {test_f1}")

In [None]:
prediction_list = [id2label[str(id)] for id in prediction]
ground_truth_list = [id2label[str(id)] for id in ground_truth]

In [None]:
prediction_list, ground_truth_list

## Saving the model

In [None]:
torch.save(model, '/content/model.pt')

In [None]:
from google.colab import files
files.download('/content/model.pt')