# Assignment 2 - Convolutions with MIDI

In this assignment, you're going to play around with the MIDI notebook we've been building in class.

The code should run on mltgpu at the time of submission, but you do not need to use the GPU for this assignment.  (You can if you want to.)

When testing on your own machine, in addition to the full PyTorch stack, you'll need the mido module.  Installing the scamp module is necessary if you want to listen to anything. If using Linux, you will have to install fluidsynth.

You will use the [lakh](https://colinraffel.com/projects/lmd/) MIDI corpus.  A copy will be placed in the scratch directory of mltgpu; information will be provided via Canvas announcement.

This assignment is due on November 1, 2022, at 23:59.  There are **25 points** and **29 bonus points** (!!!) available on this assignment.

In [2]:
import sys
import os
import mido

In [3]:
from mido import MidiFile
import os
import sys
from torch.utils.data import Dataset, DataLoader
import numpy as np

## Part 1 -- improve data handling and representation (4 points)

Here you will take the `MessageSequence` we created in class and make the following improvements:

1. Change the representation so that it can accommodate start and end symbols, as appropriate for your modeling in part 2.

2. Allow for the loading of multiple channels (2 or more, possibly randomly selected), with a reasonable cutoff.  To make things simple, you can make the very wrong assumption that every note is of the same duration and therefore aligned one-by-one, and you can thus ignore duration and offset information.

In [4]:
import torch
import torch.nn.functional as functional

class MIDITrackError(Exception):
    pass

class MessageSequence:
    def __init__(self, mid, number_of_channels = None):
        self.messages = []
        self.max_time = 0
        count = 0
        try:
            if number_of_channels > len(mid.tracks):
                number_of_channels = -1
            for u in mid.tracks[1:number_of_channels]: #another layer, Channels must be fixed across songs, Piano, Guitar, Drums
                #fix in the layering that the metamessage must have that instrument.
#                 if count >= number_of_channels:
#                     break
                channel = []
                for k in u:
                    if k.type in ['note_on', 'note_off']:
                        channel.append(k)
                self.messages.append(channel)
                    
                
        
        except IndexError:
            raise MIDITrackError
        #calculate note durations
        timecounter = 0
        notedict = {}
        real_sequence = []
        for channel in self.messages:
            channel_2 = []
#             channel_2.append('SOS')
            for message in channel:
                timecounter += message.time
                if message.type == "note_on":
                    notedict[message.note] = timecounter

                if message.type == "note_off":
                    duration = timecounter - notedict[message.note]
                    channel_2.append((message.note, notedict[message.note], message.time, duration))
            real_sequence.append(channel_2)
        self.sequence = real_sequence
        
    def midi_reencode(self):
        reencoded = []
        active_notes = {}
        timecounter = 0
        for channel in self.sequence:
            channel_3 = []
#             channel_3.append("SOS")
            for (note, timestamp, offset, duration) in channel: # requires aanother level of iteration
                note_order = sorted(active_notes.keys(), key=lambda x: active_notes[x][0]) #timestamp is tuple item 0
                for active_note in note_order:
                    if active_notes[active_note][0] < timestamp:
                        channel_3.append(mido.Message("note_off", 
                                                      channel=2, #add
                                                      note=active_note, 
                                                      velocity=95, 
                                                      time=active_notes[active_note][1]))
                        timecounter += active_notes[active_note][1]
                        del active_notes[active_note]
                channel_3.append(mido.Message("note_on", 
                                              channel=2,  #channels
                                              note=note,
                                              velocity=95, 
                                              time=timestamp-timecounter))
                active_notes[note] = (timestamp+duration, offset, duration)
                timecounter = timestamp


            note_order = sorted(active_notes.keys(), key=lambda x: active_notes[x][0]) #timestamp is tuple item 0
            for active_note in note_order:
                channel_3.append(mido.Message("note_off", 
                                              channel=2, #add
                                              note=active_note, 
                                              velocity=95, 
                                              time=active_notes[active_note][1]))
                del active_notes[active_note]
            channel_3.append("SOS")
            channel_3.append("EOS")
            reencoded.append(channel_3)
        return reencoded
    
    def vector_encode(self):
        note_db = functional.one_hot(torch.arange(0, 130))
        encoded = []
        f_off = []
        f_dur = []
        
        self.start_vector_token = note_db[128]
        self.end_vector_token = note_db[129]
        for channel in self.sequence:
            channel_4 = []
            acc_offset = []
            acc_dur = []
            channel_4.append(torch.cat((self.start_vector_token, torch.zeros(1,), torch.zeros(1,))))
            for (note, _, offset, duration) in channel:

                note_vec = note_db[note].clone().detach()
                if offset > 100:
                    offset = 100
                if duration > 4000:
                    duration = 4000
                offset = offset/100
                duration = duration/4000
                channel_4.append(torch.cat((note_vec ,torch.Tensor([offset]),  torch.Tensor([duration]))))

            channel_4 = torch.stack(channel_4)
            encoded.append(channel_4)

        return encoded 

In [5]:
def padding(song):
    padded_vectors = []
    #Pads each channel to a fixed point:
    
    data = torch.nn.utils.rnn.pad_sequence(song, batch_first=True)
    data[:, -1, -3] = 1
    print(data.shape)
    return data


In [7]:
store = []
# for filename in os.listdir("""../lt2326-h22-resources/clean_midi/"Weird Al" Yankovic"""):
#     midi_file = MessageSequence(mido.MidiFile("""../lt2326-h22-resources/clean_midi/"Weird Al" Yankovic/"""+ filename), 100)
#     store.append(midi_file)

for filename in os.listdir("""../Wagner"""):
    midi_file = MessageSequence(mido.MidiFile("""../Wagner/"""+ filename), 100)
    store.append(midi_file)
    
vault = []
for song in store:
    vector_raw = song.vector_encode()
    
    t_tensor = padding(vector_raw)
    vault.append(t_tensor)
    


torch.Size([7, 1542, 132])


### Describe your changes and any special motivations for them here (in notebook Markdown):

Changed MessageSequence to allow a specific number of channels rather than all-accept

Changed MessageSequence to cat notes, off, and dur and stack them.

Initially, a "start" and "end" token are inserted into the one hot. The Start token is added and have two empty values added to represent off and dur. The end token is inserted once padding in 3rd from last.

Each group of channels are all padded to the longest

Stored all in a container per Weird Al Yanko

WHAT I WOULD HAVE LIKED TO DO:
- Specify instrument, but also in terms of future training.
- padded according to real-time and not wrongfully assumed beginnings and ends.
- Retrieved more info from the notes; theres guaranteed more good stuff in there . . . 


## Part 2 - Convolutional Model (8 points)

Replace the model below with a model with the following characteristics:

1. It should include an ensemble of parallel 1D-convolutional layers (2 or more)
2. The layers should combine into a single output representation.
3. The layers should be different (have different kernels, windows, or strides).
4. The layers should be able to handle multiple channels. 
5. The input will be the song representation up to time step n, and the output will be a representation of notes for a single time step across the channels at n+1. (This means that an instance will be prediction of the next note, and a song will have to be run n times to predict n characters.)

Training the model will take longer than the n-gram model, especially if you're not using the GPU.

You have a free hand in all other aspects of the model, as long as you explain any significant design decisions (i.e., not every minor choice, but ones with real design impact).

(A bit of advice: the biggest problem here will be keeping the matrix/tensor dimensions straight...)

In [6]:
import torch.nn as nn

In [7]:
import torch

In [8]:
import torch.optim as optim

In [9]:
class MIDIModel(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.conv1 = nn.Conv1d(5 , 5 , kernel_size=3, stride=3)
        self.conv2 = nn.Conv1d(5 , 5   , kernel_size=5, stride=5)
        self.conv3 = nn.Conv1d(10 , 10, kernel_size=7, stride=7)
        self.conv4 = nn.Conv1d(10 , 10, kernel_size=10, stride=10)
        
        self.drop_out = nn.Dropout(0.2)
        
        self.fc1 = nn.Linear(combined_length , combined_length//2)
        self.fc2 = nn.Linear(combined_length//2 , combined_length//4)
        self.fc3 = nn.Linear(combined_length//4 , combined_length//8)
        
        self.Sigmoid_1 = nn.Sigmoid()
        self.Sigmoid_2 = nn.Sigmoid()
        self.Sigmoid_3 = nn.Sigmoid()
        
        
        self.logsoftmax = nn.LogSoftmax(dim=0)
        self.Sigmoid_offset = nn.Sigmoid()
        self.Sigmoid_duration = nn.Sigmoid()
        
        
        self.note_fc = nn.Linear(combined_length//8, 130)
        self.offset_fc = nn.Linear(combined_length//8, 1)
        self.duration_fc = nn.Linear(combined_length//8, 1)
        
    def forward(self, data):
        
        note_final = []
        dur_final = []
        off_final = []
        
        for channel in song:
            conv1 = self.dropout(self.conv1(channel))
            conv2 = self.dropout(self.conv2(channel))
            conv3 = self.dropout(self.conv3(channel))
            conv4 = self.dropout(self.conv4(channel))
            parallel_ensemble = torch.cat((conv1, conv2, conv3, conv4), dim=2)
            
            parallel_ensemble = self.fc1(parallel_ensemble)
            parallel_ensemble = self.Sigmoid1(parallel_ensemble)
            parallel_ensemble = self.fc2(parallel_ensemble)
            parallel_ensemble = self.Sigmoid2(parallel_ensemble)
            parallel_ensemble = self.fc3(parallel_ensemble)
            parallel_ensemble = self.Sigmoid3(parallel_ensemble)
        
            #opdeling af note, dur, offset
            note_out = parallel_emsemble[:, :-2]
            dur_out = parallel_emsemble[: , -1]
            off_out = parallel_emsemble[: , -2]
            
            #Som alle tages igennem deres eget respektive linear FC layer
            note_out = self.note_fc(note_out)
            dur_out = self.duration_fc(dur_out)
            off_out = self.offset_out(off_out)   
            
            #Disse lag tages bliver softmaxet OG SIGMOIDES
            note_out = self.logsoftmax(note_out)
            dur_out = self.Sigmoid_duration(dur_out)
            off_out = self.Sigmoid_offset(off_out)
            
            #append
            note_final.append(note_out)
            dur_final.append(dur_out)
            off_final.append(off_out)
            #og smidt i en accum. liste som bliver stacket efter loopet.
            
        note_final = torch.stack(note_final)
        dur_final = torch.stack(dur_final)
        off_final = torch.stack(off_final)
        
        return note_final, dur_final, off_final
            
            
        
        
        

### Explain your design choices below.

Each song is padded to the length of the its longest channel. Each channels note also have concatenated their individual offset and duration.

The CNN-model has been initiated with four CNN-layers, three fully connected layers, and activation function-layers respective to the input-

The CNN-model handles channels by iterating these individually. It runs four parallel conv-layers as per the assignment and concatenates the results of these features maps. Then result is then brought down to a note-size (130) portion and then given an activation function.



## Part 3 - Dataset sampling (4 points)

Consider how the model is designed above and design a dataset generator capable of producing sample prefixes and next-characters for each time step for each song.  You can replace all the code from the original MIDI notebook with whatever you want.  Consider that there are more and less efficient ways of doing this, and that it may also be worth seeing if it's easier to do this in iterator mode where you can select random prefixes from random songs at each iteration.  You can even choose not to use the torch Dataset class at all, though it means you will have to rewrite the training loop not to use it.

In [10]:
def generate_samples_per_song(song):
    vectors = song.vector_encode()
    samples = []
    for i in range(2, len(vectors)):
        samples.append((torch.cat((vectors[i-2], vectors[i-1])), (torch.LongTensor([song.sequence[i][0]]), vectors[i][-2], vectors[i][-1])))
        
    return samples

def generate_samples(songlist):
    samples = []
    for song in songlist:
        samples += generate_samples_per_song(song)
        
    return samples

In [11]:
class MIDINotesDataset(Dataset):
    def __init__(self, mididir, maximum=500):
        items = os.walk(mididir)
        
        self.filenames = []
        for (directory, _, files) in items:
            self.filenames += [os.path.join(directory, x) for x in files]
        
        mss = []
        count = 0
        for x in self.filenames:
            try:
                midifile = MidiFile(x)
                ms = MessageSequence(midifile)
            except:
                continue
                
            mss.append(ms)
            
            count += 1
            if count == maximum:
                break
            
        self.notes = generate_samples(mss)
        
    def __getitem__(self, i):
        return self.notes[i]
    
    def __len__(self):
        return len(self.notes)

In [12]:
class MIDISongsDataset(Dataset):
    def __init__(self, mididir, maximum=500):
        items = os.walk(mididir)
        
        self.filenames = []
        for (directory, _, files) in items:
            self.filenames += [os.path.join(directory, x) for x in files]
        
        mss = []
        count = 0
        for x in self.filenames:
            try:
                midifile = MidiFile(x)
                ms = MessageSequence(midifile)
            except:
                continue
                
            mss.append(ms)
            
            count += 1
            if count == maximum:
                break
                
        self.songs = [[x[0] for x in y.sequence] for y in mss]
        
    def __getitem__(self, i):
        return self.songs[i]
        
    def __len__(self):
        return len(self.notes)

### Describe any significant choices you made in designing the mode of access to the dataset.

## Part 4 - Training loop (2 points)

Adapt the training loop to the way you organized access to the dataset and to the model you wrote.  Make any other improvements, such as trying out a different optimizer.  Make sure it is possible to vary the batch size as well as the epochs.

In [13]:
def train(data, epochs=10):
    mm = MIDIModel()
    optimizer = optim.SGD(mm.parameters(), lr=0.001, momentum=0.9)
    note_criterion = nn.NLLLoss()
    for epoch in range(epochs):
        losses = []
        loader = DataLoader(data, batch_size=25, shuffle=True, drop_last=True)
        for i, o in loader:
            #print(i)
            #print(i.shape)
            #print(o)
            optimizer.zero_grad()
            (note_output, offset_output, duration_output) = mm(i)
            #print("no: {}, oo: {}, do: {}".format(note_output, offset_output, duration_output))
            #print(note_output, o[0].reshape((i.shape[0])))
            note_loss = torch.exp(-note_criterion(note_output, o[0].reshape((i.shape[0]))))
            offset_loss = torch.abs(o[1] - offset_output)
            duration_loss = torch.abs(o[2] - duration_output)
            #print("nl: {}, ol: {}, dl: {}".format(note_loss, offset_loss, duration_loss))
            loss = note_loss + offset_loss + duration_loss
            losses.append(sum(loss))
            sum(loss).backward()
            optimizer.step()
        print("mean loss in epoch {} is {}".format(epoch, float(torch.mean(torch.stack(losses)))))
    return mm

### If there are any remarks you have on the training loop, put them here:

## Part 5 - Evaluation (7 points)

Actually predicting accuracy of note prediction in a set of songs is probably unlikely to work.  So instead we will calculate the perplexity of your model under different training assumptions (for example, epochs, dropout probability -- if you used dropout -- and/or hidden layer size).  Divide your dataset into training and validation sets and use the validation for the perplexity calculation.  (Note that you are predicting notes across multiple channels, so will have to combine perplexities across the channels.)

In [None]:
# code here, you can add more notebook cells of course.

### Your remarks on your evaluation here:

## Bonus Part 1 -- "Music" (3 points)

You will have to properly install [scamp](http://scamp.marcevanstein.com/) to do this bonus. You can rewrite the mode of song generation here to take into account your convolutional process.  Then use scamp to play the (multi-channel/simultaneous note music back).  Try to see if you get any quality improvement at all by using better parameters. (It will probably sound awful no matter what.)  If you want to train on mltgpu and play music on your own computer, you'll have to also write a way to save and load the model.

In [None]:
from numpy.random import choice

# This is just to get the first two notes out of the development song.
vecs = x.vector_encode()

def generate_music(model, note1, note2, length=30, diversity=5):
    note_db = functional.one_hot(torch.arange(0, 128))
    newsong = [note1, note2]
    model.eval()
    with torch.no_grad():
        for i in range(length):
            notepair = torch.cat((note1, note2))
            fake_batch = torch.stack([notepair] + [torch.randn(260) for _ in range(24)])
            (note_output, offset_output, duration_output) = model(fake_batch)
            note_output = note_output[0]
            offset_output = offset_output[0]
            duration_output = duration_output[0]
            print("note_output: {}".format(note_output))
            notesort = torch.argsort(note_output, descending=True)
            print("notesort: {}".format(notesort))
            noteset = notesort[:diversity]
            print("noteset: {}".format(noteset))
            notenum = int(choice(noteset.numpy()))
            print("notenum: {}".format(notenum))
            note1 = note2
            print("testgen {} {} {}".format(note_db[notenum].clone().detach(), offset_output, duration_output))
            note2 = torch.cat((note_db[notenum].clone().detach(), torch.Tensor([offset_output]), 
                                                                               torch.Tensor([duration_output])))
            newsong.append(note2)
    return newsong

In [None]:
def reconvert_song(notetensors):
    return [(int(torch.argmax(x[0:128])), int(torch.floor(x[128] * 100)), int(torch.floor(x[129] * 4000))) for x in notetensors]

In [None]:
def get_sequence_back(model_ouptut, starting_time):
    sequence = []
    for (note, offset, duration) in model_ouptut:
        sequence.append((note, starting_time, offset, duration))
        starting_time += duration - offset
        
    return sequence

In [None]:
from scamp import *
import time

In [None]:
sess = Session().run_as_server()

In [None]:
clarinet = sess.new_part("clarinet")

In [None]:
for n in converted_song:
    clarinet.play_note(n[0], 0.8, n[2]/1000)
    time.sleep(n[2]/1000 + 0.01)

### Your remarks on the quality of the music.

## Bonus Part 2 - 2D-convolutions (6 points)

Define a model as in part 2 that restructures your representation as an ensemble of 2D convolutional models (using the additional dimension to handle multiple MIDI channels).  This will probably require that you rebuild other parts of the pipeline to accommodate it.

Do an evaluation of the output in terms of perplexity (and, optionally, musical quality).

### Your code here (in as many cells as you need):

### Your remarks:

## Bonus Part 3 - Durations (20 points)

Starting from the song representation, find a way to properly handle durations across multiple channels so that your code is not reliant on an incorrect alignment of the sequence of notes.  Evaluate as in Bonus Part 2.

### Your code here:

### Your remarks:

## Submission

Submit a filled-out version of this notebook via Canvas.