In [1]:
##*************************Task 1*********************#
#
#   (Class "Critic" should be a subclass of the class CriticBase. You must use the exact class name.)
#   You should implement a multi-layer (2 or 3 layers) LSTM model in this class.
#   The Model (the score function) takes a sequence of envents as input and outputs a score judging
#   whether the piano music corresponding to the sequence is good music or bad music.
#   A function to generate random music is provided in the "midi2seq.py".
#   Use the function to create a collection of random piano plays as examples of bad music.
#   Use the piano plays in the downloaded data as example of good music.
#   (You don't need to use all the downloaded data. A sufficiently large subset will be enough.)
#   Train the model in this class using both the good and the bad examples.

In [38]:
## Import libraries
import numpy as np
import os, sys, time, datetime, pickle, copy, random, glob

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from midi2seq import piano2seq, random_piano, process_midi_seq
from torch.utils.data import DataLoader, TensorDataset
from model_base import ComposerBase, CriticBase

In [50]:
device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
all_midis = glob.glob(f'./maestro-v1.0.0/**/*.midi')

maxlen  = 100
good_music_midi = process_midi_seq(all_midis=all_midis, n=10000, maxlen=maxlen)
bad_music_midi = [random_piano(n=maxlen) for _ in range(len(all_midis))]
bad_music_midi = process_midi_seq(all_midis=bad_music_midi, n=10000, maxlen=maxlen)

good_music = torch.tensor(good_music_midi, dtype=torch.float32)
bad_music = torch.tensor(bad_music_midi, dtype=torch.float32)

good_labels = torch.ones((len(good_music), 1))
bad_labels = torch.zeros((len(bad_music), 1))

all_data = torch.cat([good_music, bad_music], dim=0)
all_labels = torch.cat([good_labels, bad_labels], dim=0)

features_train, features_test, label_train, label_test = train_test_split(
                                                                            all_data, all_labels,
                                                                                    test_size=0.2,
                                                                                    shuffle=True)

print(f"Size before reshaping:- features, {features_train.shape}; labels, {label_train.shape}")                                                                                    
features_train = features_train.reshape((-1, features_train.shape[1], 1))
features_test = features_test.reshape((-1, features_test.shape[1], 1))
label_train = label_train.reshape((-1, 1))
label_test = label_test.reshape((-1, 1))
print(f"Size after reshaping:- features,{features_train.shape}; labels, {label_train.shape}")



Size before reshaping:- features, torch.Size([16136, 101]); labels, torch.Size([16136, 1])
Size after reshaping:- features,torch.Size([16136, 101, 1]); labels, torch.Size([16136, 1])


In [40]:
features_train.shape, features_test.shape, label_train.shape, label_test.shape

(torch.Size([16139, 101, 1]),
 torch.Size([4035, 101, 1]),
 torch.Size([16139, 1]),
 torch.Size([4035, 1]))

In [41]:
def convert_labels(labels):
    converted = torch.zeros(labels.size(0), 2)
    converted[labels.view(-1) == 1, 0] = 1
    converted[labels.view(-1) == 0, 1] = 1
    return converted

In [42]:
label_train_ = convert_labels(label_train)
label_test_ = convert_labels(label_test)

In [43]:
label_train

tensor([[0.],
        [0.],
        [0.],
        ...,
        [0.],
        [0.],
        [0.]])

In [44]:
label_train_

tensor([[0., 1.],
        [0., 1.],
        [0., 1.],
        ...,
        [0., 1.],
        [0., 1.],
        [0., 1.]])

In [45]:
features_train.shape, features_test.shape, label_train_.shape, label_test_.shape

(torch.Size([16139, 101, 1]),
 torch.Size([4035, 101, 1]),
 torch.Size([16139, 2]),
 torch.Size([4035, 2]))

In [46]:
train_dataset = TensorDataset(features_train, label_train_)
test_dataset = TensorDataset(features_test, label_test_)

train_loader = DataLoader(train_dataset, shuffle=True, batch_size=128)
test_loader = DataLoader(test_dataset, shuffle=True, batch_size=128)

In [47]:
for _, batch in enumerate(train_loader):
    print(batch[0].shape, batch[1].shape)
    break

torch.Size([128, 101, 1]) torch.Size([128, 2])


In [48]:
class Critic(nn.Module, CriticBase):
    def __init__(self, input_dim, hidden_size, num_layers=3, n_classes=2):
        super(Critic, self).__init__()
        self.input_dim = input_dim
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = n_classes
        self.lstm = nn.LSTM(input_size = self.input_dim, hidden_size = self.hidden_size, num_layers = self.num_layers, batch_first=True)
        self.fc = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, x):
        device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        batch_size = x.size(0)
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        # LSTM forward pass
        out, _ = self.lstm(x, (h0, c0))
        print(out.shape)
        # Decode the hidden state of the last time step
        out = out[:, -1, :]
        # linear layer
        out = self.fc(out)
        return out

    def score(self,x):
        return(self.forward(x))

    def train_model(self, dataloader, epochs=10, lr=0.0001):
        device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        self.to(device)

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.parameters(), lr=lr)

        print("Start training")
        running_loss = 0.0
        for epoch in range(epochs):
            for batch_idx, (data, target) in enumerate(dataloader):
                data, target = data.to(device), target.to(device)

                optimizer.zero_grad()
                outputs = self.score(data)
                loss = criterion(outputs, target)
                loss.backward()
                optimizer.step()
                break

            print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")
        print("Finished training")

        #torch.save(self.state_dict(), 'critic.pth')

In [49]:
input_dim = 1
hidden_dim = 64
num_layers = 3
learning_rate = 0.001
num_epochs = 10
label_dim = 2

# Initialize model, loss and optimizer
critic_model = Critic(input_dim = input_dim, hidden_size=hidden_dim, n_classes=label_dim)
critic_model.train_model(train_loader, epochs=100)

Start training
torch.Size([128, 101, 64])
Epoch 1/100, Loss: 0.6914892196655273
torch.Size([128, 101, 64])
Epoch 2/100, Loss: 0.690436065196991
torch.Size([128, 101, 64])
Epoch 3/100, Loss: 0.6933046579360962
torch.Size([128, 101, 64])
Epoch 4/100, Loss: 0.69371497631073
torch.Size([128, 101, 64])
Epoch 5/100, Loss: 0.6907191872596741
torch.Size([128, 101, 64])
Epoch 6/100, Loss: 0.6914653778076172
torch.Size([128, 101, 64])
Epoch 7/100, Loss: 0.6904364228248596
torch.Size([128, 101, 64])
Epoch 8/100, Loss: 0.6922365427017212
torch.Size([128, 101, 64])
Epoch 9/100, Loss: 0.6902852058410645
torch.Size([128, 101, 64])
Epoch 10/100, Loss: 0.689089298248291
torch.Size([128, 101, 64])
Epoch 11/100, Loss: 0.6884942054748535
torch.Size([128, 101, 64])
Epoch 12/100, Loss: 0.6930064558982849
torch.Size([128, 101, 64])
Epoch 13/100, Loss: 0.6890537738800049
torch.Size([128, 101, 64])
Epoch 14/100, Loss: 0.6877013444900513
torch.Size([128, 101, 64])
Epoch 15/100, Loss: 0.692158579826355
torch.Siz

# Merge into Class

In [22]:
class MidiDataProcessor:

    def __init__(self, data_directory, maxlen=100, test_size=0.2, random_state=42, batch_size=32):
        self.data_directory = data_directory
        self.maxlen = maxlen
        self.test_size = test_size
        self.random_state = random_state
        self.batch_size = batch_size

    def __get__(self, idx):
        return self.all_data[idx], self.all_labels[idx]

    def prepare_data(self, is_scale=False):
        device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        all_midis = glob.glob(f'{self.data_directory}/maestro-v1.0.0/**/*.midi')

        good_music_midi = process_midi_seq(all_midis=all_midis, datadir=self.data_directory, n=10000, maxlen=self.maxlen)
        bad_music_midi = [random_piano(n=self.maxlen) for _ in range(len(all_midis))]
        bad_music_midi = process_midi_seq(all_midis=bad_music_midi, datadir=self.data_directory, n=10000, maxlen=self.maxlen)

        good_music = torch.tensor(good_music_midi, dtype=torch.float32)
        bad_music = torch.tensor(bad_music_midi, dtype=torch.float32)

        good_labels = torch.ones((len(good_music), 1))
        bad_labels = torch.zeros((len(bad_music), 1))

        self.all_data = torch.cat([good_music, bad_music], dim=0)
        self.all_labels = torch.cat([good_labels, bad_labels], dim=0)

        features_train, features_test, label_train, label_test = train_test_split(
                                                                                    self.all_data, self.all_labels,
                                                                                    test_size=self.test_size,
                                                                                    random_state=self.random_state,
                                                                                    shuffle=True)
        features_train = features_train.reshape((-1, features_train.shape[1], 1))
        features_test = features_test.reshape((-1, features_test.shape[1], 1))

        label_train = label_train.reshape((-1, 1))
        label_test = label_test.reshape((-1, 1))

        def convert_labels(labels):
            converted = torch.zeros(labels.size(0), 2)
            converted[labels.view(-1) == 1, 0] = 1
            converted[labels.view(-1) == 0, 1] = 1
            return converted
        
        label_train = convert_labels(label_train)
        label_test = convert_labels(label_test)
        

        features_train = torch.Tensor(features_train).to(device)
        features_test = torch.Tensor(features_test).to(device)
        label_train = torch.Tensor(label_train).to(device)
        label_test = torch.Tensor(label_test).to(device)

        train_dataset = TensorDataset(features_train, label_train)
        test_dataset = TensorDataset(features_test, label_test)

        train_loader = DataLoader(train_dataset, shuffle=True, batch_size=self.batch_size)
        test_loader = DataLoader(test_dataset, shuffle=True, batch_size=self.batch_size)

        return train_loader, test_loader

    def __repr__(self):
        return f'MidiDataProcessor(data_directory={self.data_directory!r}, maxlen={self.maxlen}, test_size={self.test_size}, random_state={self.random_state}, batch_size={self.batch_size})'


In [23]:
processor = MidiDataProcessor(data_directory='.')
train_loader, test_loader = processor.prepare_data()

In [34]:
class LSTMCritic(nn.Module):
    def __init__(self, input_dim=1, hidden_size=64, num_layers=3, n_classes=2):
        super(LSTMCritic, self).__init__()
        self.input_dim = input_dim
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = n_classes
        self.device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        self.lstm = nn.LSTM(input_size = self.input_dim, hidden_size = self.hidden_size, num_layers = self.num_layers, batch_first=True)
        self.fc = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, x):
        batch_size = x.size(0)
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(self.device)
        # LSTM forward pass
        out, _ = self.lstm(x, (h0, c0))
        # Decode the hidden state of the last time step
        out = out[:, -1, :]
        # linear layer
        out = self.fc(out)
        return out

In [108]:
import torch

model_ = LSTMCritic(input_dim=1, hidden_size=64, num_layers=3, n_classes=2).to( device)
model_ = torch.load('criticR111.pth')
model_.eval()


LSTMCritic(
  (criterion): BCELoss()
  (lstm): LSTM(1, 64, num_layers=3, batch_first=True)
  (fc): Linear(in_features=64, out_features=2, bias=True)
)

In [105]:
import logging

class Critic(CriticBase):
    def __init__(self, load_trained=False):
        '''
        :param load_trained
            If load_trained is True, load a trained model from a file.
            Should include code to download the file from Google drive if necessary.
            else, construct the model
        '''    
    
        self.load_trained = load_trained
        self.device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        self.model = LSTMCritic(input_dim=1, hidden_size=64, num_layers=3, n_classes=2).to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        if self.load_trained:
            logging.info('load model from file ...')

            self.model = torch.load('critic.pth')
            self.model.eval()

    def score(self, x):
        '''
        Compute the score of a music sequence
        :param x: a music sequence
        :return: the score between 0 and 1 that reflects the quality of the music: the closer to 1, the better
        '''
        with torch.set_grad_enabled(False):
            logging.info('Compute score ...')
        
            outputs = self.model(x.to(self.device))
            outputs = torch.argmax(outputs, dim=1)
            outputs ^= 1 # index 0 is good and index 1 is bad 
                    
        return outputs  

    def train(self, x, epochs=10, lr=1e-5):
        '''
        Train the model on one batch of data
        :param x: train data. For critic training, x will be a tuple of two tensors (data, label). expect a batch of dataloader
        :return: (mean) loss of the model on the batch
        '''
            
        optimizer = optim.Adam(self.model.parameters(), lr=lr)

        logging.info('Start training ...')
        self.model.train()
        total_loss = 0
        for epoch in range(epochs):
            for idx, (feature, label) in enumerate(x):
                feature, label = feature.to(self.device), label.to(self.device)

                optimizer.zero_grad()
                outputs = self.model(feature)
                loss = self.criterion(outputs, label)
                total_loss += loss.item()
                loss.backward()
                optimizer.step()

            print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")
        
        logging.info("Finished training ...")

        torch.save(self.model, 'critic.pth')
        
        return total_loss/feature.size(0)

In [41]:
Critics = Critic(load_trained=False)

Critics.train(train_loader, epochs=100)

INFO:Start training ...


Epoch 1/100, Loss: 0.6983073949813843
Epoch 2/100, Loss: 0.24658606946468353
Epoch 3/100, Loss: 0.06094246357679367
Epoch 4/100, Loss: 0.037189848721027374
Epoch 5/100, Loss: 0.06825444847345352
Epoch 6/100, Loss: 0.030911851674318314
Epoch 7/100, Loss: 0.029649732634425163
Epoch 8/100, Loss: 0.07281500101089478
Epoch 9/100, Loss: 0.007318323012441397
Epoch 10/100, Loss: 0.006132175680249929
Epoch 11/100, Loss: 0.008992061018943787
Epoch 12/100, Loss: 0.004087052308022976
Epoch 13/100, Loss: 0.005067975260317326
Epoch 14/100, Loss: 0.0032931193709373474
Epoch 15/100, Loss: 0.0028923728968948126
Epoch 16/100, Loss: 0.0022897967137396336
Epoch 17/100, Loss: 0.01976427435874939
Epoch 18/100, Loss: 0.01067756675183773
Epoch 19/100, Loss: 0.0037330188788473606
Epoch 20/100, Loss: 0.0015168313402682543
Epoch 21/100, Loss: 0.001699704211205244
Epoch 22/100, Loss: 0.0021748384460806847
Epoch 23/100, Loss: 0.00195964309386909
Epoch 24/100, Loss: 0.002166065853089094
Epoch 25/100, Loss: 0.001215

INFO:Finished training ...


Epoch 100/100, Loss: 0.0001916042238008231


1482.1336252746405

In [106]:
# Test the load_trained model
CriticsTest = Critic(load_trained=True)

CriticsTest.score(features_test)

INFO:load model from file ...
INFO:Compute score ...


tensor([0, 0, 1,  ..., 1, 1, 1], device='mps:0')

In [107]:
device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        
torch_model = torch.load('critic.pth')
torch_model.eval()

with torch.set_grad_enabled(False):
    x = features_test
    print(x.shape)
    outputs = torch_model(x.to(device))
    outputs = torch.argmax(outputs, dim=1)
    outputs ^= 1 # index 0 is good and index 1 is bad 

outputs == torch.flatten(label_test).to(device)

arr = (outputs == torch.flatten(label_test).to(device)).to('cpu').numpy() #copy to cpu before convert to numpy
final_test_acc = sum(arr)/len(arr)
final_test_acc

torch.Size([4036, 101, 1])


0.5037165510406343

# Post Tuesday Oct 10, 2023

In [1]:
# Importing libraries
import numpy as np
import os, sys, time, datetime, pickle, copy, random, glob, logging

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from midi2seq import piano2seq, random_piano, process_midi_seq
from torch.utils.data import DataLoader, TensorDataset
from model_base import ComposerBase, CriticBase

from google_drive_downloader import GoogleDriveDownloader as gdd

In [2]:
## Training data
class MidiDataProcessor:

    def __init__(self, data_directory, maxlen=100, test_size=0.2, random_state=42, batch_size=32):
        self.data_directory = data_directory
        self.maxlen = maxlen
        self.test_size = test_size
        self.random_state = random_state
        self.batch_size = batch_size

    def __get__(self, idx):
        return self.all_data[idx], self.all_labels[idx]

    def prepare_data(self):
        device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        all_midis = glob.glob(f'{self.data_directory}/maestro-v1.0.0/**/*.midi')

        good_music_midi = process_midi_seq(all_midis=all_midis, datadir=self.data_directory, n=10000, maxlen=self.maxlen)
        bad_music_midi = [random_piano(n=self.maxlen) for _ in range(len(all_midis))]
        bad_music_midi = process_midi_seq(all_midis=bad_music_midi, datadir=self.data_directory, n=10000, maxlen=self.maxlen)

        good_music = torch.tensor(good_music_midi, dtype=torch.float32)
        bad_music = torch.tensor(bad_music_midi, dtype=torch.float32)

        good_labels = torch.ones((len(good_music), 1))
        bad_labels = torch.zeros((len(bad_music), 1))

        self.all_data = torch.cat([good_music, bad_music], dim=0)
        self.all_labels = torch.cat([good_labels, bad_labels], dim=0)

        features_train, features_test, label_train, label_test = train_test_split(
                                                                                    self.all_data, self.all_labels,
                                                                                    test_size=self.test_size,
                                                                                    random_state=self.random_state,
                                                                                    shuffle=True)
        

        def convert_labels(labels):
            converted = torch.zeros(labels.size(0), 2)
            converted[labels.view(-1) == 1, 0] = 1
            converted[labels.view(-1) == 0, 1] = 1
            return converted
        
        label_train = convert_labels(label_train)
        label_test = convert_labels(label_test)

        features_train = torch.Tensor(features_train).to(device)
        features_test = torch.Tensor(features_test).to(device)

        label_train = torch.Tensor(label_train).to(device)
        label_test = torch.Tensor(label_test).to(device)

        train_dataset = TensorDataset(features_train, label_train)
        test_dataset = TensorDataset(features_test, label_test)

        self.train_loader = DataLoader(train_dataset, shuffle=True, batch_size=self.batch_size)
        self.test_loader = DataLoader(test_dataset, shuffle=True, batch_size=self.batch_size)

        return self.train_loader, self.test_loader

    def __repr__(self):
        return f'MidiDataProcessor(data_directory={self.data_directory!r}, maxlen={self.maxlen}, test_size={self.test_size}, random_state={self.random_state}, batch_size={self.batch_size}, train_loader size={len(self.train_loader.dataset)}, test_loader size={len(self.test_loader.dataset)})'


In [3]:
# good_music_midi = process_midi_seq(datadir='.', n=10000, maxlen=100, rd_seed=1000)
# good_music_midi.reshape((-1, good_music_midi.shape[1], 1)).min() # ((-1, features_train.shape[1], 1))

In [4]:
processor = MidiDataProcessor(data_directory='.')
train_loader, test_loader = processor.prepare_data()

In [5]:
# Critic model

class LSTMCritic(nn.Module):
    def __init__(self, vocab_size, hidden_dim=128, num_layers=3, n_classes=2):
        super(LSTMCritic, self).__init__()
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        
        self.embedding = nn.Embedding(self.vocab_size, self.hidden_dim).to(self.device)
        self.lstm = nn.LSTM(self.hidden_dim, self.hidden_dim, num_layers=self.num_layers, batch_first=True).to(self.device)
        self.fc = nn.Linear(self.hidden_dim, n_classes).to(self.device)
        

    def forward(self, x):
        # Embedding layer
        x = self.embedding(x).to(self.device)
        # LSTM forward pass
        batch_size = x.size(0)
        # Initialize hidden and cell states with zeros
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(self.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(self.device)
        lstm_out, _ = self.lstm(x, (h0, c0))
        # Decode the hidden state of the last time step
        out = lstm_out[:, -1, :]
        # Linear layer
        out = self.fc(out)
        return out

In [None]:
# Critic model

class LSTMCritic(nn.Module):
    def __init__(self, vocab_size, hidden_dim=128, num_layers=3, n_classes=2):
        super(LSTMCritic, self).__init__()
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        
        self.embedding = nn.Embedding(self.vocab_size, self.hidden_dim).to(self.device)
        self.lstm = nn.LSTM(self.hidden_dim, self.hidden_dim, num_layers=self.num_layers, batch_first=True).to(self.device)
        self.fc = nn.Linear(self.hidden_dim, n_classes).to(self.device)
        

    def forward(self, x):
        # Embedding layer
        x = self.embedding(x).to(self.device)
        # LSTM forward pass
        batch_size = x.size(0)
        # Initialize hidden and cell states with zeros
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(self.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(self.device)
        lstm_out, _ = self.lstm(x, (h0, c0))
        # Decode the hidden state of the last time step
        out = lstm_out[:, -1, :]
        # Linear layer
        out = self.fc(out)
        return out

In [57]:
model = LSTMCritic(vocab_size=2, hidden_dim=128, num_layers=3, n_classes=2)
model = model.to(model.device)  # Move the model to the desired device

lr = 0.001
epochs = 100
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

logging.info('Start training ...')
model.train()
total_loss = 0
for epoch in range(epochs):
    for idx, (feature, label) in enumerate(train_loader):
        feature, label = feature.to(device).long(), label.to(device)
        optimizer.zero_grad()
        outputs = model(feature)
        loss = criterion(outputs, label)
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")

In [63]:
class AccumulationMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self):
        self.reset()

    def reset(self):
        self.value = 0.0
        self.avg = 0.0
        self.sum = 0
        self.count = 0.0

    def update(self, value, n=1):
        self.value = value
        self.sum += value * n
        self.count += n
        self.avg = self.sum / self.count
        self.sqrt = self.value ** 0.5
        self.rmse = self.avg ** 0.5

class EarlyStopping:
    def __init__(self, tolerance=5, min_delta=0):

        self.tolerance = tolerance
        self.min_delta = min_delta
        self.counter = 0
        self.early_stop = False

    def __call__(self, train_loss, validation_loss):
        if (validation_loss - train_loss) > self.min_delta:
            self.counter +=1
            if self.counter >= self.tolerance:  
                self.early_stop = True

In [91]:
# Critic class
from torch.utils.data.sampler import SubsetRandomSampler
from sklearn.model_selection import KFold

class Critic(CriticBase):
    def __init__(self, load_trained=False):
        '''
        :param load_trained
            If load_trained is True, load a trained model from a file.
            Should include code to download the file from Google drive if necessary.
            else, construct the model
        '''    
    
        self.load_trained = load_trained
        self.device = torch.device("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu"))
        self.model = LSTMCritic(vocab_size=500, hidden_dim=128, num_layers=3, n_classes=2).to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        if self.load_trained:
            logging.info('load model from file ...')
            gdd.download_file_from_google_drive(file_id='18YkTrsqa0dWCVC4PpE2_7q8nN3jxdzhD',
                                    dest_path='./critic.pth',
                                    unzip=True)
            self.model = torch.load('critic.pth')
            self.model.eval()

    def score(self, x):
        '''
        Compute the score of a music sequence
        :param x: a music sequence
        :return: the score between 0 and 1 that reflects the quality of the music: the closer to 1, the better
        '''
        with torch.set_grad_enabled(False):
            logging.info('Compute score ...')
        
            outputs = self.model(x.to(self.device))
            outputs = torch.argmax(outputs, dim=1)
            outputs ^= 1 # index 0 is good and index 1 is bad 
                    
        return outputs  
    
    def validate(self, val_loader, model):
        """Evaluate the network on the entire validation set."""

        loss_accum = AccumulationMeter()
        model.eval()
        with torch.set_grad_enabled(False):

            for i, (feature, label) in enumerate(val_loader):
                feature, label = feature.to(self.device).long(), label.to(self.device)

                outputs = self.model(feature)

                loss = self.criterion(outputs, label)
                loss_accum.update(loss.item(), label.size(0))
 

        return loss_accum.rmse

    def train(self, x, epochs=10, lr=1e-5):
        '''
        Train the model on one batch of data
        :param x: train data. For critic training, x will be a tuple of two tensors (data, label). expect a batch of dataloader
        :return: (mean) loss of the model on the batch
        '''
            
        optimizer = optim.Adam(self.model.parameters(), lr=lr)
        loss_accum_train = AccumulationMeter()

        # split data for K-fold cross validation to avoid overfitting
        indices = list(range(len(x.dataset)))
        kf = KFold(n_splits=5, shuffle=True)
        cv_index = 0
        index_list_train = []
        index_list_valid = []
        for train_indices, valid_indices in kf.split(indices):
            index_list_train.append(train_indices)
            index_list_valid.append(valid_indices)

            train_sampler = SubsetRandomSampler(train_indices)
            valid_sampler = SubsetRandomSampler(valid_indices)

            train_loader = DataLoader(x.dataset, batch_size=32,
                                                       sampler=train_sampler,
                                                       shuffle=False)
            val_loader = DataLoader(x.dataset, batch_size=32,
                                                     sampler=valid_sampler,
                                                     shuffle=False)

            logging.info('Start training ...')
            self.model.train()
            early_stopping = EarlyStopping(tolerance=5, min_delta=10)
            epoch_train_loss = []
            epoch_validate_loss = []
            for epoch in range(epochs):
                for idx, (feature, label) in enumerate(train_loader):
                    feature, label = feature.to(self.device).long(), label.to(self.device)

                    outputs = self.model(feature)
                    loss = self.criterion(outputs, label)
                    #total_loss += loss.item()
                    loss_accum_train.update(loss.item(), label.size(0))
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                # training loss
                epoch_train_loss.append(loss_accum_train.avg)
                
                # validation loss
                val_loss_avg = self.validate(val_loader, self.model)
                epoch_validate_loss.append(val_loss_avg)

                print(f"Epoch {epoch+1}/{epochs}, Train Loss: {loss_accum_train.avg}, Val Loss: {val_loss_avg}")

                # early stopping
                early_stopping(loss_accum_train.avg, val_loss_avg)
                if early_stopping.early_stop:
                    print("We are at epoch:", epoch)
                    logging.info("Finished training ...Model saved")
                    torch.save(self.model, 'critic.pth') 
                    break
        
            cv_index += 1   # increment cv index
        
        return loss_accum_train.avg

In [92]:
rt = Critic(load_trained=False)
rt.train(train_loader, epochs=100)

INFO:Start training ...


Epoch 1/100, Train Loss: 0.6917002614877992, Val Loss: 0.8294968625707562
Epoch 2/100, Train Loss: 0.6691033990720859, Val Loss: 0.6616604525014153
Epoch 3/100, Train Loss: 0.4930773457685144, Val Loss: 0.26094386346398285
Epoch 4/100, Train Loss: 0.3847082929710066, Val Loss: 0.23794809452955532
Epoch 5/100, Train Loss: 0.31714117089455124, Val Loss: 0.22171827681297912
Epoch 6/100, Train Loss: 0.2714204840186782, Val Loss: 0.21149176010041
Epoch 7/100, Train Loss: 0.23789563396844443, Val Loss: 0.20193328822853523
Epoch 8/100, Train Loss: 0.212300402979822, Val Loss: 0.19181441002380759
Epoch 9/100, Train Loss: 0.19182193755770774, Val Loss: 0.18392870302532874
Epoch 10/100, Train Loss: 0.17528449735721488, Val Loss: 0.1728595726870176
Epoch 11/100, Train Loss: 0.16144992917491977, Val Loss: 0.16551293523815055
Epoch 12/100, Train Loss: 0.14978490334799432, Val Loss: 0.17387458425859195
Epoch 13/100, Train Loss: 0.1397782665165306, Val Loss: 0.1522921362799879
Epoch 14/100, Train Los

KeyboardInterrupt: 

In [84]:
len(train_loader.dataset)

12907