In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
%%capture
%cd drive/MyDrive/genrecog/
%pip install speechbrain
%pip install torchlibrosa

In [3]:
from genrecog.preprocess.preprocessor import Preprocessor
from genrecog.nnet.CNN import Conv1d
from genrecog.tools.trainer import FbankTrainer
import torch
from torch.utils.data import TensorDataset, DataLoader
from importlib import reload
import matplotlib.pyplot as plt

import speechbrain as sb



In [4]:
# Load dataset

train_preprcessor = Preprocessor('dataset/npz_files/train.npz')
test_preprcessor = Preprocessor('dataset/npz_files/test.npz')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [5]:
X, y = train_preprcessor.as_shuffled_torch()
X_test, y_test = test_preprcessor.as_shuffled_torch()

dataset = TensorDataset(X.to(device), y.to(device))
validation_dataset, train_dataset = torch.utils.data.random_split(dataset, (400, 3200))
test_dataset = TensorDataset(X_test.to(device), y_test.to(device))

train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=400)
validation_dataloader = DataLoader(validation_dataset, shuffle=True, batch_size=400)
test_dataloader = DataLoader(test_dataset, shuffle=True, batch_size=400)

In [6]:
class RNN(torch.nn.Module):
  def __init__(self, input_size=40, time_sequence=702, hidden_size=128, num_layers=5, output_dim=10):
    super(RNN, self).__init__()

    self.batch_norm_input = torch.nn.BatchNorm1d(time_sequence)

    self.rnn = torch.nn.RNN(
        input_size=input_size, 
        hidden_size=hidden_size, 
        num_layers=num_layers,
        batch_first=True,
        bias=True
        )
    
    self.batch_norm_hidden = torch.nn.BatchNorm1d(hidden_size)
    self.linear = torch.nn.Linear(hidden_size,output_dim)

  def forward(self, X, hidden=None, use_mean=False):
    X = self.batch_norm_input(X)
    Z, hidden = self.rnn(X, hidden)
    self.Z =Z
    if use_mean:
      z = torch.mean(self.Z, 1)
    else:
      z =  self.Z[:, -1, :]
    z = self.batch_norm_hidden(z)
    out = self.linear(z)
    return out



In [7]:
hidden_size = 128
num_layers = 5
input_size = 40
output_dim = 10
time_sequence = 702
lr = 0.001
model = RNN(
    input_size=input_size, 
    time_sequence=time_sequence,
    hidden_size=hidden_size, 
    num_layers=num_layers, 
    output_dim=output_dim
    ).to(device)
loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model

RNN(
  (batch_norm_input): BatchNorm1d(702, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (rnn): RNN(40, 128, num_layers=5, batch_first=True)
  (batch_norm_hidden): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (linear): Linear(in_features=128, out_features=10, bias=True)
)

In [8]:
# from genrecog.preprocess.feature import Feature
# import numpy as np

# # feature extractor module
# feature_maker = Feature()


# X_validation, y_validation = next(iter(validation_dataloader))
# X_validation_features = feature_maker.torch_fbank_features(X_validation)
# train_losses = []
# validation_losses = []

# for epoch in range(100):
#     model.train()
#     epoch_losses = []
#     for X_train, y_train in train_dataloader:
#         model.zero_grad()
#         X_features = feature_maker.torch_fbank_features(X_train)
#         y_hat = model(X_features)
#         l = loss(y_hat, y_train)
#         l.backward()
#         optimizer.step()
#         epoch_losses.append(l.item())
#         # print("Epoch %2d final minibatch had loss %.4f" % (epoch, l.item()))
#     print(epoch, np.average(epoch_losses))
#     train_losses.append(np.average(epoch_losses))
#     y_hat_validation = model(X_validation_features)
#     l_validation = loss(y_hat_validation, y_validation)
#     validation_losses.append(l_validation.item())

# plt.plot(train_losses)
# plt.plot(validation_losses)
# plt.show()

    
    # train_losses.append(sum(epoch_losses) / len(epoch_losses))
    # y_pred, y_eval, validation_loss = self.eval()
    # print("Epoch %2d final minibatch had test loss %.4f" % (epoch, validation_loss))
    # self.validation_losses.append(validation_loss)

In [9]:
# model.eval()
# X, y = next(iter(test_dataloader))
# # print(X.shape)
# X_features = feature_maker.torch_fbank_features(X)
# # X_features = torch.nn.functional.normalize(X_features, dim=0)
# # rnn(X_features).softmax(dim=0)
# y_pred = rnn(X_features)
# # print(y_pred[0])
# y_pred_1 = torch.argmax(y_pred, dim=1)
# y_pred_1

In [10]:
# from sklearn.metrics import accuracy_score
# accuracy_score(y_pred_1.cpu(), y.cpu())

In [11]:
# import torchlibrosa as tl
# X, y = next(iter(test_dataloader))


# batch_size = 16
# sample_rate = 22050
# win_length = 702
# hop_length = 512
# n_mels = 128

# spectrogram_extractor = tl.Spectrogram(n_fft=win_length, hop_length=hop_length)
# sp = spectrogram_extractor.forward(X[:10].cpu())   # (batch_size, 1, time_steps, freq_bins)


In [12]:
# sp.shape

In [13]:
# # sb.nnet.RNN.RNN
# net = sb.nnet.RNN.RNN(hidden_size=128, input_shape=X_features.shape)
# out, _ = net(X_features.cpu())
# # out[:,-1,:]
# linear = torch.nn.Linear(128,10)
# out = linear(out[:,-1,:])
# print(out.shape)
# out.argmax(dim=1)

In [14]:
# import torch
# import torchlibrosa as tl


# batch_size = 16
# sample_rate = 22050
# win_length = 2048
# hop_length = 512
# n_mels = 40

# spectrogram_extractor = tl.Spectrogram(n_fft=win_length, hop_length=hop_length)
# sp = spectrogram_extractor.forward(X_train.cpu())
# logmel_extractor = tl.LogmelFilterBank(sr=sample_rate, n_fft=win_length, n_mels=n_mels)
# logmel = logmel_extractor.forward(sp) 

In [15]:
# logmel.squeeze(1).shape
# plt.imshow(logmel.squeeze(1).cpu().transpose(1,2)[0])

In [16]:
# rnn2 = RNN(input_size=40, hidden_size=hidden_size, num_layers=num_layers).to(device)
# from genrecog.preprocess.feature import Feature
# import numpy as np


# spectrogram_extractor = tl.Spectrogram(n_fft=win_length, hop_length=hop_length).cuda()
# logmel_extractor = tl.LogmelFilterBank(sr=sample_rate, n_fft=win_length, n_mels=n_mels).cuda()

# feature_maker = Feature()
# for epoch in range(500):
#     rnn2.train()
#     epoch_losses = []
#     for X_train, y_train in train_dataloader:
#         rnn.zero_grad()
#         sp = spectrogram_extractor.forward(X_train)
#         logmel = logmel_extractor.forward(sp).squeeze(1)
#         y_hat = rnn2(logmel)
#         # print(y_hat.argmax(dim=1))
#         l = loss(y_hat, y_train)
#         l.backward()
#         optimizer.step()
#         epoch_losses.append(l.item())
#         # print("Epoch %2d final minibatch had loss %.4f" % (epoch, l.item()))
#     print(epoch, np.average(epoch_losses))
    

In [20]:
import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
import pickle

class FbankTrainer():
    def __init__(self, model, optimizer, loss, train_dataloader, validation_dataloader, num_epochs):
      self.model = model
      self.train_dataloader = train_dataloader
      self.num_epochs = num_epochs
      self.optimizer = optimizer
      self.loss = loss
      self.train_losses = []
      self.validation_losses = []
      self.train_accuracies = []
      self.validation_accuracies = []
      self.feature_maker = Feature()
      self.validation_dataloader = validation_dataloader


    def accuracy(self, y_true, y_pred):
        return (torch.sum(y_true == y_pred) / y_pred.shape[0])

  
    def plot_loss(self):
        plt.plot(self.train_losses)
        plt.plot(self.validation_losses)
        plt.legend(['Training loss', 'Validation loss'])
        plt.xlabel('loss')
        plt.ylabel('epoch')
    
    def plot_accuracies(self):
        plt.plot(self.train_accuracies)
        plt.plot(self.validation_accuracies)
        plt.legend(['Training Accuracy', 'Validation Accuracy'])
        plt.xlabel('accuracy %')
        plt.ylabel('epoch')
    
    def plot_confusion_matrix(self, eval_loader):
      y_pred, y_eval, validation_loss, validation_accuracy = self.eval(eval_loader)
      array = confusion_matrix(y_eval.cpu(), y_pred.cpu(), normalize='true')*100
      genres = ['country', 'reggae', 'metal', 'pop', 'classical', 'disco', 'hiphop', 'blues', 'jazz', 'rock']
      df_cm = pd.DataFrame(array, index = genres, columns = genres)
      plt.figure(figsize = (10,7))
      sn.heatmap(df_cm, annot=True, cmap="YlGnBu")

    def classification_report(self, eval_loader):
      y_pred, y_eval, validation_loss, validation_accuracy = self.eval(eval_loader)
      genres = ['country', 'reggae', 'metal', 'pop', 'classical', 'disco', 'hiphop', 'blues', 'jazz', 'rock']
      print(classification_report(y_eval.cpu(), y_pred.cpu(), target_names=genres))

    def save(self, name=""):
      with open(f"./samples/trained_models/FbankTrainer_{name}.pkl", 'wb') as handle:
        pickle.dump(self, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [21]:
from genrecog.preprocess.feature import Feature
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score


class RNNFbankTrainer(FbankTrainer):

    def train(self):
        for epoch in range(self.num_epochs):
            self.model.train()
            epoch_losses = []
            epoch_accuracies = []
            for X_train, y_train in self.train_dataloader:
                self.model.zero_grad()
                X_features = self.feature_maker.torch_fbank_features(X_train)
                y_hat, embedding = self.model(X_features)
                l = self.loss(y_hat, y_train)
                l.backward()
                self.optimizer.step()
                epoch_losses.append(l.item())
                epoch_accuracies.append(self.accuracy(y_train, torch.argmax(y_hat, dim=1)).item())
            
            training_loss = sum(epoch_losses) / len(epoch_losses)
            training_accuracy = sum(epoch_accuracies) / len(epoch_accuracies)
            self.train_accuracies.append(training_accuracy)
            self.train_losses.append(training_loss)
            y_pred, y_eval, validation_loss, validation_accuracy = self.eval()
            self.validation_losses.append(validation_loss)
            self.validation_accuracies.append(validation_accuracy)
            print(f"============================== EPOCH {epoch+1} =================================")
            print("Training accuracy %.2f" % (training_accuracy * 100))
            print("Training loss %.4f" % training_loss)
            print("Validation accuracy %.2f" % (validation_accuracy * 100))
            print("Validation loss %.4f" % validation_loss)

            if epoch == self.num_epochs:
              torch.save(embedding, './samples/trained_models/RNN_embedding.pt')

    def eval(self, eval_loader=None):
        self.model.eval()
        if eval_loader is None:
          X_val, y_val = next(iter(self.validation_dataloader))
        else:
          X_val, y_val = next(iter(eval_loader))
        with torch.no_grad():
            X_features = self.feature_maker.torch_fbank_features(X_val)
            y_pred = torch.argmax(self.model(X_features), dim=1)
            l = self.loss(self.model(X_features), y_val)
            accuracy = self.accuracy(y_val, y_pred)
        return y_pred, y_val, l.item(), accuracy.item()



In [23]:
trainer = RNNFbankTrainer(model, optimizer, loss, train_dataloader, validation_dataloader, 40)
trainer.train()

ValueError: ignored

In [None]:
trainer.plot_loss()

In [None]:
trainer.plot_accuracies()

In [22]:
class LSTM(torch.nn.Module):
  def __init__(self, input_size=40, time_sequence=702, hidden_size=128, num_layers=5, output_dim=10, use_mean=False):
    super(LSTM, self).__init__()

    self.batch_norm_input = torch.nn.BatchNorm1d(time_sequence)

    self.lstm = torch.nn.LSTM(
        input_size=input_size, 
        hidden_size=hidden_size, 
        num_layers=num_layers,
        batch_first=True,
        bias=True
        )
    
    self.batch_norm_hidden = torch.nn.BatchNorm1d(hidden_size)
    self.linear = torch.nn.Linear(hidden_size,output_dim)
    self.use_mean = use_mean

  def forward(self, X, hidden=None):
    X = self.batch_norm_input(X)
    Z, hidden = self.lstm(X, hidden)
    self.Z =Z
    if self.use_mean:
      z = torch.mean(self.Z, 1)
    else:
      z =  self.Z[:, -1, :]
    z = self.batch_norm_hidden(z)
    out = self.linear(z)
    return out

In [None]:
hidden_size = 128
num_layers = 5
input_size = 40
output_dim = 10
time_sequence = 702
lr = 0.001
model = LSTM(
    input_size=input_size, 
    time_sequence=time_sequence,
    hidden_size=hidden_size, 
    num_layers=num_layers, 
    output_dim=output_dim,
    use_mean=True

    ).to(device)
loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model

In [None]:
lstm_trainer = RNNFbankTrainer(
    model=model, 
    optimizer=optimizer, 
    loss=loss, 
    train_dataloader=train_dataloader, 
    validation_dataloader=validation_dataloader, 
    num_epochs=40)
lstm_trainer.train()

In [None]:
lstm_trainer.plot_loss()

In [None]:
lstm_trainer.plot_accuracies()

In [None]:
# y_pred, y_val, loss, accuracy =
# lstm_trainer.accuracy()

lstm_trainer.eval(test_dataloader)

In [None]:
lstm_trainer.plot_confusion_matrix(test_dataloader)

In [None]:
lstm_trainer.classification_report(test_dataloader)

In [30]:
class GRU(torch.nn.Module):
  def __init__(self, input_size=40, time_sequence=702, hidden_size=128, num_layers=5, output_dim=10, use_mean=False):
    super(GRU, self).__init__()

    # self.batch_norm_input = torch.nn.BatchNorm1d(time_sequence)

    self.glu = torch.nn.GRU(
        input_size=input_size, 
        hidden_size=hidden_size, 
        num_layers=num_layers,
        batch_first=True,
        bias=True
        )
    
    self.embedder = torch.nn.Embedding(num_embeddings=time_sequence, embedding_dim=input_size)
    
    # self.batch_norm_hidden = torch.nn.BatchNorm1d(hidden_size)
    self.linear = torch.nn.Linear(hidden_size,output_dim)
    self.use_mean = use_mean

  def forward(self, X, hidden=None):
    embeddings = self.embedder(X.to(device))
    Z, hidden = self.glu(X, hidden)
    self.Z =Z
    if self.use_mean:
      z = torch.mean(self.Z, 1)
    else:
      z =  self.Z[:, -1, :]
    # z = self.batch_norm_hidden(z)
    out = self.linear(z)
    return out, embeddings

In [28]:
hidden_size = 128
num_layers = 5
input_size = 40
output_dim = 10
time_sequence = 702
lr = 0.001
model = GRU(
    input_size=input_size, 
    time_sequence=time_sequence,
    hidden_size=hidden_size, 
    num_layers=num_layers, 
    output_dim=output_dim,
    use_mean=True
    ).to(device)
loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model

GRU(
  (glu): GRU(40, 128, num_layers=5, batch_first=True)
  (embedder): Embedding(702, 40)
  (linear): Linear(in_features=128, out_features=10, bias=True)
)

In [31]:
gru_trainer = RNNFbankTrainer(
    model=model, 
    optimizer=optimizer, 
    loss=loss, 
    train_dataloader=train_dataloader, 
    validation_dataloader=validation_dataloader, 
    num_epochs=40)
gru_trainer.train()

RuntimeError: ignored

In [None]:
gru_trainer.plot_loss()

In [None]:
gru_trainer.plot_accuracies()

In [None]:
# y_pred, y_val, loss, accuracy =
# lstm_trainer.accuracy()

gru_trainer.eval(test_dataloader)

In [None]:
gru_trainer.plot_confusion_matrix(test_dataloader)

In [None]:

gru_trainer.classification_report(test_dataloader)

In [None]:
gru_trainer.save("GRU_with_mean")

In [None]:
with open('./samples/trained_models/FbankTrainer_GRU_with_mean.pkl', 'rb') as handle:
    loaded_model = pickle.load(handle)

In [None]:
loaded_model.plot_accuracies()