In [None]:
import numpy as np
import torch
import torch.nn as nn
from IPython.display import Audio

import os
from sklearn.model_selection import train_test_split
import random
from random import randint
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import gc
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

In [None]:
sampling_rate = 8_000
languages = ["de", "en", "es", "fr", "nl", "pt"]
language_dict = {languages[i]: i for i in range(len(languages))}

X_train, y_train = np.load("dataset/inputs_train_fp16.npy"), np.load(
    "dataset/targets_train_int8.npy")

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size = 0.2, stratify = y_train)

X_test, y_test = np.load("dataset/inputs_test_fp16.npy"), np.load(
    "dataset/targets_test_int8.npy")

X_train, X_val, X_test = X_train.astype(np.float32), X_val.astype(np.float32), X_test.astype(np.float32)

In [None]:
#Change all labels to one hot encoding
targets_full = torch.from_numpy(y_train)
y_train = torch.nn.functional.one_hot(targets_full.long(), 6).float()

targets_full = torch.from_numpy(y_val)
y_val = torch.nn.functional.one_hot(targets_full.long(), 6).float()

targets_full = torch.from_numpy(y_test)
y_test = torch.nn.functional.one_hot(targets_full.long(), 6).float()

In [None]:
#https://librosa.org/doc/main/generated/librosa.feature.melspectrogram.html
#https://dsp.stackexchange.com/questions/75017/generating-log-mel-spectrogram-using-librosa
#https://importchris.medium.com/how-to-create-understand-mel-spectrograms-ff7634991056
#Creation of the mel spectogram normalization, first used to be log mel spectogram but it didn't perform well, so we changed to to mel spectogram
import librosa

class Gspectogram(torch.nn.Module):
    def __init__(self, sample_rate=8000, n_fft=2048, hop_length=256, n_mels=32): #parameters were slightly adjusted but didnt show alot of difference

        super(Gspectogram, self).__init__()

        self.sample_rate = sample_rate
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels

    def forward(self, audio):
        batch_size = audio.size(0)
        num_samples = audio.size(-1)

        mel_spectrogram = []
        for i in range(batch_size):
            mel = librosa.feature.melspectrogram(
                y=audio[i].detach().cpu().numpy(),
                sr=self.sample_rate,
                n_fft=self.n_fft,
                hop_length=self.hop_length,
                n_mels=self.n_mels
            )
            mel_spectrogram.append(mel)
        
        mel_spectrogram = torch.tensor(mel_spectrogram).to(audio.device)

        return mel_spectrogram

In [None]:
#https://bioacoustics.stackexchange.com/questions/846/should-we-normalize-audio-before-training-a-ml-model
# Mean variance normalizer to normalize the data before sending it to the spectogram normalizer
class MVNormalizer(torch.nn.Module):
    def __init__(self, num_features=40000, eps=1e-6):
        super(MVNormalizer, self).__init__()
        self.num_features = num_features
        self.eps = eps
        
        self.mean = torch.nn.Parameter(torch.zeros(num_features), requires_grad=False)
        self.var = torch.nn.Parameter(torch.ones(num_features), requires_grad=False)
      
    def forward(self, x):
        if self.training:
            mean = x.mean(dim=0)
            var = x.var(dim=0, unbiased=False)
            self.mean.data = 0.99 * self.mean.data + 0.01 * mean
            self.var.data = 0.99 * self.var.data + 0.01 * var
        return (x - self.mean) / torch.sqrt(self.var + self.eps) 


In [None]:
X_trainset = torch.utils.data.TensorDataset(torch.from_numpy(X_train), y_train)
X_valset = torch.utils.data.TensorDataset(torch.from_numpy(X_val), y_val)
X_testset = torch.utils.data.TensorDataset(torch.from_numpy(X_test), y_test)

dataloader_train = torch.utils.data.DataLoader(X_trainset, batch_size=64, shuffle=True, drop_last=True)
dataloader_val = torch.utils.data.DataLoader(X_valset, batch_size=64, shuffle=False, drop_last=True)
dataloader_test = torch.utils.data.DataLoader(X_testset, batch_size=64, shuffle=False, drop_last=True)

In [None]:
class CustomLC(nn.Module):
  def __init__(self, num_classes=6):
    super(CustomLC, self).__init__()

    self.norm = MVNormalizer() # tried both with and without both normalizer, the best accuracy was given when both were used 
    self.norm1 = Gspectogram()

    self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1) # all layers used to be 1d for conv and pool, but since we used both normalizers we had to chnage it to 3d
    self.relu1 = nn.LeakyReLU() # use leaky relu instead of ReLU since we have negative values in input data
    self.pool1 = nn.MaxPool2d(kernel_size=2)

    self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
    self.relu2 = nn.LeakyReLU()
    self.pool2 = nn.MaxPool2d(kernel_size=2)

    self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
    self.relu3 = nn.LeakyReLU()
    self.pool3 = nn.MaxPool2d(kernel_size=2)
    
    self.flatten = nn.Flatten()
    self.fc1 = nn.Linear(4864, 128)
    self.drop1 = nn.Dropout(0.5) # prevent overfitting
    self.relu5 = nn.LeakyReLU()
    self.fc2 = nn.Linear(128, num_classes)
    self.drop2 = nn.Dropout(0.5) # we needed a second dropout layer since the overfitting would be too much after a few epochs, it would result in the model just guessing 1 langauge
    self.softmax = nn.Softmax(dim=1) #seemed to be the best from online sources

  def forward(self, x):

    x = self.norm(x)
    x = self.norm1(x)

    x = self.conv1(x)
    x = self.relu1(x)
    x = self.pool1(x)

    x = self.conv2(x)
    x = self.relu2(x)
    x = self.pool2(x)

    x = self.conv3(x)
    x = self.relu3(x)
    x = self.pool3(x)

    x = self.flatten(x)
    x = self.fc1(x)
    x = self.drop1(x)
    x = self.relu5(x)
    x = self.fc2(x)
    x = self.drop2(x)
    x = self.softmax(x)

    return x

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
model = CustomLC()
crossloss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.01) #adding weight_decay to prevent ov
model = model.to(device)
model.train()

loss_list_train = []
loss_list_val = []
accuracy_list_train = []
accuracy_list_val = []

for epoch in range(30):
  total_loss = 0
  total_correct = 0
  total_samples = 0
  for clips, target in dataloader_train:
    clips = clips.view(clips.shape[0], 1, -1)
    target = target.to(device).float()
    clips = clips.to(device).float()
    optimizer.zero_grad()

    predictions = model(clips)
    loss = crossloss(predictions, target.argmax(dim=1))

    loss.backward()
    optimizer.step()

    total_loss += loss.item() * clips.size(0)
    total_correct += torch.eq(predictions.argmax(dim=1), target.argmax(dim=1)).sum().item()
    total_samples += clips.size(0)

  epoch_loss = total_loss / total_samples
  epoch_acc = total_correct / total_samples
  print(f"Epoch {epoch}: loss={epoch_loss:.4f}, accuracy={epoch_acc:.4f}")
  loss_list_train.append(round(epoch_loss, 4))
  accuracy_list_train.append(round(epoch_acc, 4))

  with torch.no_grad():
    total_loss = 0
    total_correct = 0
    total_samples = 0
    for clips, target in dataloader_val:
      clips = clips.view(clips.shape[0], 1, -1)
      target = target.to(device).float()
      clips = clips.to(device).float()

      predictions = model(clips)
      loss = crossloss(predictions, target.argmax(dim=1))

      total_loss += loss.item() * clips.size(0)
      total_correct += torch.eq(predictions.argmax(dim=1), target.argmax(dim=1)).sum().item()
      total_samples += clips.size(0)

    epoch_loss = total_loss / total_samples
    epoch_acc = total_correct / total_samples
    print(f"Epoch {epoch}: loss={epoch_loss:.4f}, accuracy={epoch_acc:.4f}")
    loss_list_val.append(round(epoch_loss, 4))
    accuracy_list_val.append(round(epoch_acc, 4))

    if epoch > 15:
      torch.save(model.state_dict(), f"model_epoch_{epoch}.pth")

    gc.collect()
    torch.cuda.empty_cache()

In [None]:
model.load_state_dict(torch.load("model_final.pth"))

confusion_matrix_test = None

with torch.no_grad():
  model.eval()
  total_loss = 0
  total_correct = 0
  total_samples = 0
  all_targets = []
  all_predictions = []
  all_targets1 = []
  all_predictions1 = []
  for clips, target in dataloader_test:
    clips = clips.view(clips.shape[0], 1, -1)
    target = target.to(device).float()
    clips = clips.to(device).float()

    predictions = model(clips)
    loss = crossloss(predictions, target.argmax(dim=1))

    total_loss += loss.item() * clips.size(0)
    total_correct += torch.eq(predictions.argmax(dim=1), target.argmax(dim=1)).sum().item()
    total_samples += clips.size(0)
    all_targets.append(target.argmax(dim=1).cpu())
    all_predictions.append(predictions.argmax(dim=1).cpu())
    all_predictions1.append(predictions.cpu())
    all_targets1.append(target.cpu())

  epoch_loss = total_loss / total_samples
  epoch_acc = total_correct / total_samples
  print(f"Epoch {epoch}: loss={epoch_loss:.4f}, accuracy={epoch_acc:.4f}")
  all_targets = np.concatenate(all_targets)
  all_predictions = np.concatenate(all_predictions)
  confusion_matrix_test = confusion_matrix(all_targets, all_predictions)
  print(classification_report(all_targets, all_predictions, target_names=["German", "English", "Spanish", "French", "Dutch", "Portuguese"]))
  disp = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix_test,display_labels=["German", "English", "Spanish", "French", "Dutch", "Portuguese"])
  disp.plot()
  plt.show()