In [None]:
import pandas as pd
import numpy as np 
import os 
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import sys
import time
import matplotlib.pyplot as plt
import librosa
from IPython.display import Audio
from sklearn.metrics import roc_auc_score
from sklearn import metrics

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [2]:
class Audio_Data(Dataset):

    def __init__(self, metadata, dir, transform, target_rate, num_samples, device):
        self.metadata = pd.read_csv(metadata)
        self.dir = dir
        self.device = device
        self.transform = transform.to(self.device)
        self.target_rate = target_rate
        self.num_samples = num_samples

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, index):
        audio_sample_path = self._get_audio_path(index)
        label = self._get_audio_label(index)
        signal, sr = torchaudio.load(audio_sample_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transform(signal)
        return signal, label

    def _get_audio_path(self, index):
        filename = f"{self.metadata.iloc[index,0]}"
        audio_path = os.path.join(self.dir, filename)
        return audio_path

    def _get_audio_label(self, index):
        return self.metadata.iloc[index,1]

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_rate).cuda()
            signal = resampler(signal)
        return signal
    
    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal 

    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        signal_len = signal.shape[1]
        if signal_len < self.num_samples:
            difference = self.num_samples - signal_len
            last_dim_padding = (0, difference)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

In [3]:
def plot_spectrogram(specgram, title=None, ylabel="freq_bin", ax=None):
    if ax is None:
        _, ax = plt.subplots(1, 1)
    if title is not None:
        ax.set_title(title)
    ax.set_ylabel(ylabel)
    ax.imshow(librosa.power_to_db(specgram), origin="lower", aspect="auto", interpolation="nearest")

In [4]:
class DNN(nn.Module):
    def __init__(self, num_fc_layers):
        super(DNN, self).__init__()
        self.num_fc_layers = num_fc_layers

        # INPUT SHAPE = (BATCH SIZE, NUM_CHANNEL, NUM_MELS, NUM_FEATS)
        # INPUT SHAPE = (BATCH SIZE, 1, 64, 219) WHEN SAMPLE RATE = 16000 AND DURATION = 7 SECS
        self.conv_block = nn.Sequential(
            nn.Conv2d(
                in_channels = 1,         # Number of input channels; spectrograms will be treated as grayscale images
                out_channels = 32,       # Number of filters in convolutional layer
                kernel_size = 5,         
            ),
            nn.MaxPool2d(kernel_size = 2),
            nn.BatchNorm2d(32),
            nn.ReLU(),                  
            nn.Conv2d(
                in_channels = 32,        # Number of input channels from previous convolution
                out_channels = 64,       # Number of filters in convolutional layer
                kernel_size = 5,         
            ),
            nn.MaxPool2d(kernel_size = 2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(p = 0.25),
            nn.Flatten(start_dim = 2)
        )
        # OUTPUT SHAPE = (BATCH SIZE, OUT_CHANNELS OF LAST CONV, FLATTEN)

        self.gru  = nn.GRU(64, 128, num_layers = self.num_fc_layers, batch_first = True) # INPUT SIZE IS SAME AS NUMBER OF CHANNELS FROM LAST CNN LAYER
        
        self.fc_block = nn.Sequential(
            nn.Linear(128,64),
            nn.ReLU(),
            nn.Linear(64,32),
            nn.ReLU(),
            nn.Linear(32,2)
        )
        
    def forward(self, input_data):         
        h0 = torch.zeros(self.num_fc_layers, input_data.shape[0], 128).to(device) # (num_layers, batch size, hidden size)
        x = self.conv_block(input_data)
        # print(x.shape)
        x = x.reshape(-1, x.shape[2], x.shape[1]) # SHAPE: (BATCH SIZE, SEQ LENGTH or NUM FEATS, INPUT SIZE or NUM ROWS)
        out, _ = self.gru(x, h0)
        out = out[:, -1, :]
        logits = F.sigmoid(self.fc_block(out))
        return logits

In [5]:
def create_data_loader(data, batch_size):
    dataloader = DataLoader(data, batch_size=batch_size)
    return dataloader

In [6]:
def train(model, train_dataloader, val_dataloader, loss_fn, optimizer, device, epochs):           #Training the model
    model.train()
    train_loss = []
    val_loss = []
    for i in range(epochs):
        print(f"Epoch {i + 1}")
        epoch_train_loss, epoch_val_loss = train_single_epoch(model, train_dataloader, val_dataloader, loss_fn, optimizer, device)
        train_loss.append(epoch_train_loss)
        val_loss.append(epoch_val_loss)
        print("--------------------------")
    print("Finished training")
    return train_loss, val_loss

def train_single_epoch(model, train_dataloader, val_dataloader, loss_fn, optimizer, device):
    epoch_train_loss = 0.0
    num_train_batches = 0

    epoch_val_loss = 0.0
    num_val_batches = 0

    for input, target in train_dataloader:
        input, target = input.to(device), target.to(device)

        #Calculate loss
        prediction = model(input)
        loss = loss_fn(prediction, target)

        #Backpropagate error and update weights
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_train_loss += loss.item()
        num_train_batches += 1
        epoch_train_loss = epoch_train_loss / num_train_batches

    print(f"Train loss: {epoch_train_loss:.4f}")

    with torch.no_grad():
        for input, target in val_dataloader:
            input, target = input.to(device), target.to(device)

            #Calculate loss
            prediction = model(input)
            loss = loss_fn(prediction, target)

            epoch_val_loss += loss.item()
            num_val_batches += 1
            epoch_val_loss = epoch_val_loss / num_val_batches
        
        print(f"Val loss: {epoch_val_loss:.4f}")

    return epoch_train_loss, epoch_val_loss

In [7]:
def check_accuracy(data_loader, model):  
    model.eval()
    num_correct = 0
    num_samples = 0 

    with torch.no_grad():
        for input, target in data_loader:
            input = input.to(device=device)
            target = target.to(device=device)

            scores = model(input)
            _, predictions = scores.max(1)
            num_correct += (predictions == target).sum()
            num_samples += predictions.size(0)
        
        print("Num of samples ", num_samples)
        print("Num of correct ", int(num_correct)) 
        print("Accuracy: ", int((num_correct/num_samples)*100))

In [8]:
def predict(model, input, target):  # Make predictions with the model
    model.eval()
    class_mapping = ['LJ', 'AI']
    
    with torch.no_grad():
        predictions = model(input)
        predicted_index = predictions[0].argmax(0)
        predicted = class_mapping[predicted_index]
        expected = class_mapping[target]
    return predicted, expected

In [9]:
def confusion_matrix(audio_data, model):
    TP, FP, FN, TN = 0, 0, 0, 0

    for i in range(len(audio_data)):
        input, target = audio_data[i][0], audio_data[i][1]  # Sample from dataset: [batch size, number of channels, frequency, time]
        input.unsqueeze_(0)

        predicted, expected = predict(model, input, target)  # Making a prediction with the model
        # print("Sample:", i)
        # print(f"Predicted: '{predicted}', expected: '{expected}'")

        if (expected == 'AI') & (predicted == 'AI'):
            TP += 1
        elif (expected == 'LJ') & (predicted == 'AI'):
            FP += 1
        elif (expected == 'AI') & (predicted == 'LJ'): 
            FN += 1 
        elif (expected == 'LJ') & (predicted == 'LJ'):
            TN += 1   
            
    print("Confusion Matrix")
    print(TP, FP)
    print(FN, TN)

In [10]:
def evaluation_matrix(model, audio_data):
    TP, FP, FN, TN = 0, 0, 0, 0

    for i in range(len(audio_data)):
        input, target = audio_data[i][0], audio_data[i][1]  # Sample from dataset: [batch size, number of channels, frequency, time]
        input.unsqueeze_(0)

        predicted, expected = predict(model, input, target)  # Making a prediction with the model
        # print("Sample:", i)
        # print(f"Predicted: '{predicted}', expected: '{expected}'")

        if (expected == 'AI') & (predicted == 'AI'):
            TP += 1
        elif (expected == 'LJ') & (predicted == 'AI'):
            FP += 1
        elif (expected == 'AI') & (predicted == 'LJ'): 
            FN += 1 
        elif (expected == 'LJ') & (predicted == 'LJ'):
            TN += 1   

    print("Confusion Matrix")
    print(TP, FP)
    print(FN, TN)
    precision = TP / (TP+FP)
    recall = TP / (TP+FN)
    accuracy = (TP+TN) / (TP+FP+FN+TN)
    f1_score = (2 * (precision * recall)) / (precision + recall)
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1-score:", f1_score)

    

In [11]:
def get_roc_auc_score(model, audio_data):    
    model.eval()
    y_true = []
    y_pred = []
    
    with torch.no_grad():
        for i in range(len(audio_data)):
            input, target = audio_data[i][0], audio_data[i][1]  #Sample from dataset: [batch size, number of channels, frequency, time]
            input.unsqueeze_(0)
            y_true.append(target)

            predictions = model(input)
            predicted = predictions[0].argmax(0)
            y_pred.append(predicted.cpu().numpy())
    
    y_true = np.array(y_true)
    # print(y_pred)
    # print(y_true)
    roc_auc = roc_auc_score(y_true, y_pred)

    return roc_auc

In [12]:
def get_roc(model, audio_data):    
    model.eval()
    y_true = []
    y_pred = []
    
    with torch.no_grad():
        for i in range(len(audio_data)):
            input, target = audio_data[i][0], audio_data[i][1]  #Sample from dataset: [batch size, number of channels, frequency, time]
            input.unsqueeze_(0)
            y_true.append(target)

            predictions = model(input)
            predicted = predictions[0].argmax(0)
            y_pred.append(predicted.cpu().numpy())
    
    y_true = np.array(y_true)
    # print(y_pred)
    # print(y_true)
    nn_fpr, nn_tpr, nn_thresholds = metrics.roc_curve(y_true, y_pred)

    return nn_fpr, nn_tpr


In [13]:
# Hyper-parameters for Melspectrogram transformation 

sample_rate = 22050
num_samples = 154350
n_fft = 1024
win_length = None
hop_length = 512
n_mels = 64

mel_spec = torchaudio.transforms.MelSpectrogram(
    sample_rate=sample_rate,
    n_fft=n_fft,
    win_length=win_length,
    hop_length=hop_length,
    center=True,
    pad_mode="reflect",
    power=2.0,
    norm="slaney",
    n_mels=n_mels,
    mel_scale="htk",
)

In [None]:
# Dataloader initialization
# Audio_Data class object is initialized for each dataloader
# Train, Validation, and Test dataloaders were used in original project

Train_metadata_path = 'insert training metadata file path'
Train_dir_path = 'insert training dataset path' 

Valid_metadata_path = 'insert validation metadata file path'
Valid_dir_path = 'insert validation dataset path' 

Test_metadata_path = 'insert testing metadata file path'
Test_dir_path = 'insert testing dataset path' 

batch_size = 128

# Training audio data instance
Train_audio_data = Audio_Data(Train_metadata_path, Train_dir_path, mel_spec, sample_rate, num_samples, device)
print("Training set length:", f"{len(Train_audio_data)}")
train_dataloader = create_data_loader(Train_audio_data, batch_size)

# Validation audio data instance
Valid_audio_data = Audio_Data(Valid_metadata_path, Valid_dir_path, mel_spec, sample_rate, num_samples, device)
print("Validation set length:", f"{len(Valid_audio_data)}")
val_dataloader = create_data_loader(Valid_audio_data, batch_size)

# Testing audio data instance
Test_audio_data = Audio_Data(Test_metadata_path, Test_dir_path, mel_spec, sample_rate, num_samples, device)
print("Test set length:", f"{len(Test_audio_data)}")
test_dataloader = create_data_loader(Test_audio_data, batch_size) 

In [None]:
loss_fn = nn.CrossEntropyLoss()   

model = DNN(4).to(device)  # initializing model, parameter passed determines number of FC layers
model_opt = optim.Adam(model.parameters(), lr = 0.0001)
print(model)

check_accuracy(train_dataloader, model)
check_accuracy(val_dataloader, model)
check_accuracy(test_dataloader, model)

train_loss_results = []
val_loss_results = []

EPOCHS = 100

train_loss_results = []
val_loss_results = []

train_loss, val_loss = train(model, train_dataloader, val_dataloader, loss_fn, model_opt, device, EPOCHS)  # Training the model

train_loss_results.append(train_loss)
val_loss_results.append(val_loss)

model_name = 'name.pth'  # provide model name for each model trained
torch.save(model.state_dict(), model_name)               
print("Trained neural network saved at", model_name)

model_train_loss = []
model_val_loss = []

for i in range(0,EPOCHS):
    model_train_loss.append(float(train_loss_results[0][i]))
    model_val_loss.append(float(val_loss_results[0][i]))

plt.plot(model_train_loss, label='Train Loss')
plt.plot(model_val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training Performance')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

print('Evaluation Matrix')
evaluation_matrix(model, Test_audio_data)
score = get_roc_auc_score(model, Test_audio_data)
print('ROC_AUC_Score:', score)


In [23]:
# input, target = Test_audio_data[350][0], Test_audio_data[350][1] # [batch size, num_channels, fr, time]
# input.unsqueeze_(0)

# # make an inference
# predicted, expected = predict(model, input, target)
# print(f"Predicted: '{predicted}', expected: '{expected}'")

In [None]:
saved_model = DNN(4).to(device)
state_dict = torch.load("insert path of trained model")
saved_model.load_state_dict(state_dict)

In [None]:
evaluation_matrix(saved_model, Test_audio_data)
score = get_roc_auc_score(saved_model, Test_audio_data)
print('ROC_AUC_Score:', score)