# Training Using Deep Learning

### Import Dependencies

In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Audio
import soundfile as sf
from multiprocessing import Pool
import random
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torch
import torchvision
from sklearn.metrics import classification_report
import sklearn

Check if GPU acceleration is available.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

### Load & Extract Data
This time we will be extracting Mel-Spectrogram.

In [None]:
from mfcc_extraction import make_spectrogram

def multiprocess_spectrograms(paths):
    shape = make_spectrogram(paths[0]).shape
    spectrograms = np.empty((len(paths), shape[0], shape[1]), dtype=np.float32)
    with Pool(os.cpu_count()) as p: 
        for i, spec in enumerate(p.imap(make_spectrogram, paths, chunksize=128)):
            spectrograms[i] = spec
            print(i)
            if i % 1000 == 0:
                print(f'{i}/{len(paths)} spectrograms computed')
        print(f'{len(paths)}/{len(paths)} spectrograms computed') 
    return spectrograms

def file_to_spec_tensor(path):
    return torch.tensor(make_spectrogram(path), dtype=torch.float32).unsqueeze(0)

def check_corrupted(path):
    try:
        sf.read(path, dtype=np.float32)
        return True
    except:
        print("Corrupted:", path)
        return False

def read_FoR_array(path):
    fake_paths_training = []
    real_paths_training = []

    minus_fake = 0
    minus_real = 0

    for file in os.listdir(path + "/fake"):
        curr_path = path + "fake/" + file
        if file not in corrupted_files:
            fake_paths_training.append(curr_path)
        else:
            minus_fake += 1
    for file in os.listdir(path + "/real"):
        curr_path = path + "real/" + file
        real_paths_training.append(curr_path)
    
    paths = np.array(fake_paths_training + real_paths_training)
    labels = np.array([0] * (len(fake_paths_training) - minus_fake) + [1] * (len(fake_paths_training) - minus_real))

    return paths, labels

class log_mel_spect_dataset(Dataset):
    def __init__(self, spectrograms, labels):
        self.spectrograms = torch.tensor(spectrograms, dtype=torch.float32).unsqueeze(1)
        self.labels = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.spectrograms)
    
    def __getitem__(self, idx):

        # feature_tensor = torch.tensor(self.spectrograms[idx], dtype=torch.float32).unsqueeze(0)
        # label_tensor = torch.tensor(self.labels[idx], dtype=torch.float32).unsqueeze(0)
        return self.spectrograms[idx], self.labels[idx]


In [None]:
train_protocol = '../data/ASVspoof_Dataset/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.train.trn.txt'
training_file_path = '../data/ASVspoof_Dataset/ASVspoof2019_LA_train/flac/'
training_file_path2 = '../data/The Fake-or-Real (FoR) Dataset (deepfake audio)/training/'
training_data_df = pd.read_csv(train_protocol, delimiter=" ", names=["SPEAKER_ID", "AUDIO_FILE_NAME", "SYSTEM_ID", "-", "KEY"])

# The Fake-or-Real Dataset contains some corrupted files in the training set 
# which have been noted here to avoid
corrupted_files = {"file13424.mp3","file15746.mp3","file16643.mp3","file17407.mp3","file17450.mp3","file19851.mp3","file27206.mp3","file27643.mp3","file27839.mp3","file30959.mp3","file31017.mp3","file32972.mp3","file5323.mp3","file9875.mp3", "file9904.mp3"} 


test_protocol = '../data/ASVspoof_Dataset/ASVspoof2019_LA_cm_protocols/ASVspoof2019.LA.cm.eval.trl.txt'
test_file_path = '../data/ASVspoof_Dataset/ASVspoof2019_LA_eval/flac/'
test_file_path2 = '../data/The Fake-or-Real (FoR) Dataset (deepfake audio)/testing/'
test_data_df = pd.read_csv(test_protocol, delimiter=" ", names=["SPEAKER_ID", "AUDIO_FILE_NAME", "SYSTEM_ID", "-", "KEY"])

batch_size = 32

training_paths = training_file_path + training_data_df['AUDIO_FILE_NAME'].to_numpy() + ".flac" 
training_labels = np.array(training_data_df['KEY'].map(lambda x: x == 'bonafide'))
"""
FoR_dataset_training = read_FoR_array(training_file_path2)
training_paths = np.concatenate([training_paths, FoR_dataset_training[0]])
training_labels = np.concatenate([training_labels, FoR_dataset_training[1]])
"""

print(training_paths[52223], training_paths[52222])

test_paths = test_file_path + test_data_df['AUDIO_FILE_NAME'].to_numpy() + ".flac" 
test_labels = np.array(test_data_df['KEY'].map(lambda x: x == 'bonafide'))

"""
FoR_Dataset_testing = read_FoR_array(test_file_path2)
test_paths = np.concatenate([test_paths, FoR_Dataset_testing[0]])
test_labels = np.concatenate([test_labels, FoR_Dataset_testing[1]])
"""

print("Computing Training Data:")

training_spectrograms = multiprocess_spectrograms(training_paths)
training_dataset = log_mel_spect_dataset(training_spectrograms, training_labels)
training_loader = DataLoader(training_dataset,
                            batch_size=batch_size,
                            num_workers=0,
                            pin_memory=True,
                            shuffle=True)

print("Computing Test Data:")
test_spectrograms = multiprocess_spectrograms(test_paths)
test_dataset = log_mel_spect_dataset(test_spectrograms, test_labels)
test_loader = DataLoader(test_dataset,
                            batch_size=batch_size,
                            num_workers=0,
                            pin_memory=True)

### Build the Model
Convolutional Neural Network

In [None]:
n_epoch = 25

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        self.fc1 = nn.Linear(in_features=43648, out_features=600)
        self.drop = nn.Dropout(0.25)
        self.fc2 = nn.Linear(in_features=600, out_features=120)
        self.fc3 = nn.Linear(in_features=120, out_features=1)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        out = self.fc3(out)
        
        out = torch.sigmoid(out) #TODO: use logits
        return out

In [None]:
net = Net()
net.to(device)

criterion = nn.BCELoss()
optimizer = optim.SGD(net.parameters(), 
                    lr=0.001,
                    weight_decay=0.0001)


loss_hist = []
net.train()
print("Beginning Training:")
for epoch in range(n_epoch):
    running_loss = 0.0

    for i, data in enumerate(training_loader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)
        
        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    
    loss_hist.append(running_loss / len(training_loader))
    print(f'Epoch: {epoch+1}, loss: {running_loss}')
print('Finished Training')

plt.plot(loss_hist)
plt.title('Loss Curve')
plt.xlabel('Epoch')
plt.ylabel('Log Loss')
plt.show()

### Save the model

In [None]:
PATH = './melspec1_net.pth'

In [None]:
torch.save(net.state_dict(), PATH)

### Evaluating Model Accuracy

In [None]:
net = Net()
net.load_state_dict(torch.load(PATH, weights_only=True))
net.to(device)
net.eval()


In [None]:
threshold = 0.5

def predict(path):
    outputs = net(file_to_spec_tensor(path).to(device).unsqueeze(0))
    return (outputs >= threshold).item()

In [None]:
correct = 0
total = 0
predictions = []

with torch.no_grad():
    for data in test_loader:
        inputs, labels = data[0].to(device), data[1].to(device)
        outputs = net(inputs)
        predicted = (outputs >= threshold)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        predictions.extend(predicted.cpu().detach().numpy())

print(f'Accuracy: {100 * correct // total} %')


In [None]:
predictions = np.array(predictions)
print(classification_report(test_labels, predictions))

In [None]:
# print(predict("../data/The Fake-or-Real (FoR) Dataset (deepfake audio)/training/file36.wav"))

# Generating a Confusion Matrix

In [None]:
cm = sklearn.metrics.confusion_matrix(test_labels, predictions)
disp = sklearn.metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Spoof", "Bonafide"])

disp.plot()
plt.title("Confusion Matrix")
plt.show()