___
##### $Name:\,\color{blue}{Christopher\,J.\,Watson\,,Nathan\,Edwards\,,Paul\,Thai}$
##### $School:\,\color{blue}{Marcos\,School\,of\,Engineering,\,University\,of\,San\,Diego}$
##### $Class:\,\color{blue}{AAI\,511-\,Neural\,Networks\,and\,Learning}$
##### $Assignment:\,\color{blue}{MSAAI\,Final\,Project}$
##### $Date:\,\color{blue}{8/15/2023}$
___

### Libraries
____

In [1]:
# Libraries 

# File and Data Handling
import os
import time

# Data Visualization and Numerical Computing
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# MIDI Processing
import mido

# Machine Learning and Deep Learning
from torch.utils.data import Dataset, DataLoader
import torch
from torch import nn, optim

# Machine learning Metrics and Evaluations
from sklearn.metrics import accuracy_score, precision_score, f1_score, confusion_matrix

from sklearn.metrics import classification_report

from sklearn.metrics import f1_score
print('GPU Available: ', torch.cuda.is_available())
if torch.cuda.is_available():
    print('GPU Name: ', torch.cuda.get_device_name(torch.cuda.current_device()))
    device = torch.device("cuda:" + str(torch.cuda.current_device())) 

  from .autonotebook import tqdm as notebook_tqdm


GPU Available:  True
GPU Name:  NVIDIA GeForce RTX 4080


### Classes
___

In [2]:
class Dataset(Dataset):
    
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def enum(self,y):
        composers = ["bach", "bartok", "byrd", "chopin", "handel", "hummel", "mendelssohn","mozart", "schumann"]
        #print(y)
        for i in range(len(composers)):
            if composers[i] == y:
                slot = i
        out = [0] * 9
        out[slot] = 1
        
        return out

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = self.sequences[idx]
        label = self.labels[idx]
        label = self.enum(label)
           
        
        sequence = torch.tensor(sequence)
        label = torch.tensor(label)
        if torch.cuda.is_available():
            label = label.to(device)
            #print('GPU Tensor Enabled(Labels):', label.is_cuda)
            sequence = sequence.to(device)
            #print('GPU Tensor Enabled(Sequences):', sequence.is_cuda)
        sequence = sequence[None, :, :] 
        return sequence, label

### Helper Functions
___

In [3]:
def get_children(a_dir):
    dirs = []
    files = []
    for name in os.listdir(a_dir):
        if os.path.isdir(os.path.join(a_dir, name)):
            dirs.append(name)
        else:
            files.append(name)
    return [dirs,files]

def create_files_table(top_level, out_file):
    temp_comps = []
    temp_songs = []
    temp_paths = []
    
    composer_names, songs = get_children(top_level)

    for composer in composer_names:
        temp_path = top_level + '/' + composer
        temp, songs = get_children(temp_path)
        for song in songs:
            if song != '.DS_Store':
                temp_comps.append(composer)
                temp_paths.append(temp_path + '/' + song)
                temp_songs.append(song.split(".")[0])

    temp_dict = {'Composers': temp_comps, 'Songs': temp_songs, 'Paths': temp_paths}

    table = pd.DataFrame.from_dict(temp_dict)

    table.to_csv('./' + out_file + '.csv',index=False)
    
    return table


# Function to extract the notes played in a MIDI file with timestamps
def extract_notes_with_meta(midi_filepath):
    notes = {}
    midi = mido.MidiFile(midi_filepath)
    max_time = 0
    time_counter = 0
    for track in midi.tracks:
        time_counter = 0
        for msg in track:
            time_counter += msg.time
            max_time = max(max_time,time_counter)
            if msg.type == 'note_on':
                if msg.velocity != 0:  # Ensure it's a Note On event
                    notes[msg.note] = notes.get(msg.note, []) + [(msg.velocity, time_counter, 1)]  # 1 represents Note On
            elif msg.type == 'note_off':
                notes[msg.note] = notes.get(msg.note, []) + [(msg.velocity,time_counter, 0)]  # 0 represents Note Off
                
    return notes, max_time


def create_single_sequences(notes, start, tick_count, seq_count): 
    VEL = 0
    TM = 1
    ON = 2
    
    temp_keys = notes.keys()

    seq =  [[0] * 128]* seq_count
    seq = np.array(seq)

    for x in temp_keys:
        temp_note = np.array(notes[x])
        time_store = 0
        for i in range(start, seq_count+start):
            temp_vel = 0
            for t in range(time_store,temp_note[:,TM].size):
                if (temp_note[t,TM]>tick_count*i):
                    break
                else:
                    if temp_note[t,ON] == 1:
                        if temp_note[t,VEL] > temp_vel:
                            temp_vel = temp_note[t,VEL]
                time_store = t
            seq[i-start,x] = temp_vel
    return seq

def sequence_songs(df_songs, tick_count, seq_count, jiggle_on=False):
    labels = []
    sequences = []
    
    
    shake_amount = [0]
    #num_jiggle = 3
    
    if jiggle_on:
       #for i in range(1,num_jiggle):
        #shake_amount.append(int(seq_count/num_jiggle)*i)
        #shake_amount = [0, seq_count/4, seq_count/2]
        shake_amount = [0, int(seq_count/2)]
    
    for j in shake_amount:
        for song in df_songs.iterrows():
            temp_count = tick_count
            song = song[1]
            notes, max_time = extract_notes_with_meta(song['Paths'])
            if max_time / tick_count < seq_count:
                temp_count = int((max_time / seq_count)*.8)
            for i in range(int((max_time-j)/(seq_count*temp_count))):
                sequences.append(create_single_sequences(notes, i*seq_count+j, temp_count, seq_count))
                labels.append(song['Composers'])
                
    return labels, sequences

### Preprocessing
___

In [4]:
devpath = './Composer_Dataset/NN_midi_files_extended/dev/'
testpath = './Composer_Dataset/NN_midi_files_extended/test/'
trainpath = './Composer_Dataset/NN_midi_files_extended/train/'

dev_table = create_files_table(devpath, 'dev_table')
test_table = create_files_table(testpath, 'test_table')
train_table = create_files_table(trainpath, 'train_table')

In [5]:
midi_file_path = './Composer_Dataset/NN_midi_files_extended/dev/bach/bach344.mid'

In [6]:
#1labels, sequences = sequence_songs(train_table, 200, 128,jiggle_on=True)
# A tick rate of 9999 will end up turning on dynamic sampling based on the length of the song
labels, sequences = sequence_songs(train_table, 300, 128,jiggle_on=true)
#2labels, sequences = sequence_songs(train_table,  128, 128,jiggle_on=True)

In [7]:
#1labels_test, sequences_test = sequence_songs(test_table, 200, 128,jiggle_on=True)
# A tick rate of 9999 will end up turning on dynamic sampling based on the length of the song
labels_test, sequences_test = sequence_songs(test_table, 300, 128,jiggle_on=true)
#2labels_test, sequences_test = sequence_songs(test_table, 128, 128,jiggle_on=True)

In [8]:
myset = set(labels_test)
myset

{'bach',
 'bartok',
 'byrd',
 'chopin',
 'handel',
 'hummel',
 'mendelssohn',
 'mozart',
 'schumann'}

In [9]:
#extract_notes_with_meta('./Composer_Dataset/NN_midi_files_extended/test/byrd/byrd150.mid')

In [10]:
#dsadasda
train_table.Composers.unique()

array(['bach', 'bartok', 'byrd', 'chopin', 'handel', 'hummel',
       'mendelssohn', 'mozart', 'schumann'], dtype=object)

In [11]:
# Creating the dataset for the dataloader
dataset = Dataset(sequences, labels)
dataset_test = Dataset(sequences_test, labels_test)

# Creating data loader
batch_size = 1

data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)

In [12]:
dataset[0][0].shape

torch.Size([1, 128, 128])

In [13]:
len(test_loader)

350

### Model Creation
___

In [14]:
# Model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 4, kernel_size=2, padding=1)
        self.conv2 = nn.Conv2d(4, 8, kernel_size=2, padding=1)
        self.conv3 = nn.Conv2d(8, 16, kernel_size=2, padding=1)
        self.conv4 = nn.Conv2d(16, 32, kernel_size=2, padding=1)
    
        self.soft = nn.Softmax(dim=1)

        self.pool = nn.MaxPool2d(kernel_size=2)

        self.LSTM = nn.LSTM(8, 300, 7, batch_first=True)

        self.fc1 = nn.Linear(2400, 1200)
        self.fc2 = nn.Linear(1200, 600)
        self.fc3 = nn.Linear(600, 100)
        self.fc4 = nn.Linear(100, 9)  

    def forward(self, x):
        x = self.pool(nn.functional.relu(self.conv1(x)))
        x = self.pool(nn.functional.relu(self.conv2(x)))
        x = self.pool(nn.functional.relu(self.conv3(x)))
        x = self.pool(nn.functional.relu(self.conv4(x)))

        x =  x[:,0,:,:]
        h0 = torch.zeros(7, x.size(0), 300)
        c0 = torch.zeros(7, x.size(0), 300)     
        if torch.cuda.is_available():
            h0 = h0.to(device)
            #print('GPU Tensor Enabled(labels):', labels.is_cuda)
            c0 = c0.to(device)
            #print('GPU Tensor Enabled(inputs):', inputs.is_cuda)
        x, _ = self.LSTM(x, (h0, c0)) 

        x = torch.flatten(x, 1)

        x = nn.functional.relu(self.fc1(x))
        x = nn.functional.relu(self.fc2(x))
        x = nn.functional.relu(self.fc3(x))
        x = self.fc4(x)
        # return nn.functional.sigmoid(x)
        return self.soft(x)

In [15]:
# Create model object
model = CNN() 
if torch.cuda.is_available():
    model.cuda()
    
# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adagrad(model.parameters(),lr=0.001)

In [16]:
model.load_state_dict(torch.load('model-4_something-Best717.pt'))

<All keys matched successfully>

### Training
___

In [17]:
def Test(dataloader, best_model):
    # Evaluating the model
    model.eval()

    real = []
    pred = []
    counter = 0
    # Disable gradient computation to save memory
    with torch.no_grad():
        for inputs, labels in dataloader:

            # Forward pass
            outputs = model(inputs.float())
            _, predicted = torch.max(outputs.data, 1)

            outputs = outputs.tolist()
            labels = labels.tolist()

            pred_labal = outputs[0].index(max(outputs[0]))

            real_label = labels[0].index(max(labels[0]))

            real.append(real_label)
            pred.append(pred_labal)
            
    
    score = f1_score(pred, real, average = "weighted")
    
    if score > best_model:
        print(classification_report(real, pred))
        best_model = score
        torch.save(model.state_dict(), 'model_something.pt')
    
    return score, best_model

In [18]:
best_model = 0.7175077123970345

In [19]:
# Training loop
num_epochs = 180
prev = time.time()
cur_model = 0
for epoch in range(num_epochs):
    model.train() 
    running_loss = 0.0
    for inputs, labels in data_loader:
        # Zero the parameter gradients
        optimizer.zero_grad()

        inputs = inputs.to(torch.float)
        labels = labels.to(torch.float)
        if torch.cuda.is_available():
            labels = labels.to(device)
            inputs = inputs.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Print the average loss for the epoch
    epoch_loss = running_loss / len(data_loader)
    cur_model, best_model = Test(test_loader, best_model)
    now = time.time()
    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f} Time-Taken: {now-prev}s F1-Score: {cur_model} Best-Score: {best_model}")
    prev = now

print("Training complete!")

Epoch [1/180] Loss: 0.0707 Time-Taken: 24.286863327026367s F1-Score: 0.47531603724863325 Best-Score: 0.7175077123970345
Epoch [2/180] Loss: 0.0620 Time-Taken: 27.0036883354187s F1-Score: 0.5426699926996553 Best-Score: 0.7175077123970345
Epoch [3/180] Loss: 0.0593 Time-Taken: 25.322386503219604s F1-Score: 0.5606323098895507 Best-Score: 0.7175077123970345
Epoch [4/180] Loss: 0.0574 Time-Taken: 24.935598134994507s F1-Score: 0.5740809681320934 Best-Score: 0.7175077123970345
Epoch [5/180] Loss: 0.0558 Time-Taken: 26.416887760162354s F1-Score: 0.5855875658150573 Best-Score: 0.7175077123970345
Epoch [6/180] Loss: 0.0548 Time-Taken: 22.57597804069519s F1-Score: 0.5647704511199881 Best-Score: 0.7175077123970345
Epoch [7/180] Loss: 0.0537 Time-Taken: 27.9418625831604s F1-Score: 0.5823888806557918 Best-Score: 0.7175077123970345
Epoch [8/180] Loss: 0.0527 Time-Taken: 27.974105834960938s F1-Score: 0.5657158223241805 Best-Score: 0.7175077123970345
Epoch [9/180] Loss: 0.0521 Time-Taken: 28.3823361396

KeyboardInterrupt: 

### Evaulation and Results
___

In [None]:
# Evaluating the model
model.eval()

prediction_list = list()
labels_list = list()

real = []
pred = []
counter = 0
# Disable gradient computation to save memory
with torch.no_grad():
    for inputs, labels in test_loader:
        
        # Forward pass
        outputs = model(inputs.float())
        _, predicted = torch.max(outputs.data, 1)
        
        #print(outputs)
        
        outputs = outputs.tolist()
        labels = labels.tolist()
        
        pred_labal = outputs[0].index(max(outputs[0]))
        

        #print(pred_labal)
        
        #print(labels)
        
        real_label = labels[0].index(max(labels[0]))
        
        real.append(real_label)
        pred.append(pred_labal)


        
print(classification_report(real, pred))

In [None]:
temp = [0]
length = 256

for i in range(1,8):
    temp.append(int(length/8)*i)
    
print(temp)


pred = []

for inputs, labels in data_loader:
    inputs = inputs.to(torch.float)
    labels = labels.to(torch.float)
    

    pred.append(model(inputs).tolist())