## EC523 HW#5: Music Generation with LSTMs

In this homework, you will be applying autoregressive models to the problem of generating symbolic music.

In [None]:
!sudo apt install -y fluidsynth

In [None]:
!pip install --upgrade pyfluidsynth

In [None]:
!pip install pretty_midi

In [None]:
### IMPORTS ###
import os
import librosa
import pretty_midi
import matplotlib.pyplot as plt
from tqdm import tqdm
import math
import statistics
from IPython.display import Audio

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import scipy

### Part 1: Loading & Understanding the Data (12 points, 2 per part)
The dataset we will be using is a subset of the [Lakh MIDI Dataset](https://colinraffel.com/projects/lmd/). MIDI (Musical Instrument Digital Interface) is a widely used communication protocol and file format that allows communication between electronic musical instruments and computers. It essentially stores the same information that is communicated in sheet music (what notes play when and how loudly). As opposed to a wav file, which stores the actual waveform of a musical performance, MIDI files carry much less information and thus are significantly smaller.

The dataset you will be using for this homework has been filtered and preprocessed in the following ways:
- Standardized to the same tempo (120bpm) and key (C/Am)
- Each MIDI file contains exactly one (non-drum) track
- Dataset only contains monophonic tracks, i.e. there is a maximum of one note playing at any given time.
- Removed excessively sparse tracks (tracks with a note playing <30% of the time)

We can use the [pretty_midi](https://craffel.github.io/pretty-midi/) library to manipulate MIDI files in Python. In this section of the homework, you will familiarize yourself with the training data and ways to view, listen to, and manipulate it.

**Problem 1:** The dataset is uploaded to this scc location: `/projectnb/ec523kb/materials/Copy of ec523_MIDI_DATA.zip`. Unzip it first. Load a pretty_midi object for the file 'ABBA_Mamma_Mia_t7.mid' and answer the following questions about it:

1.1 How long is the MIDI in seconds?

1.2 What is the name of the instrument that the track is set to?

1.3 How many total notes are played?

1.4 What are the names of the pitches of the first 10 notes in the MIDI?

In [None]:
#### YOUR CODE HERE ####
# Consult the pretty_midi documentation page for help with answering any of these questions :)
data_directory = '/path/to/midi/data/'
pm = pretty_midi.PrettyMIDI(data_directory + 'ABBA_Mamma_Mia_t7.mid')

Now that we understand the PrettyMIDI class a little bit better, it would also be helpful for us to be able to visualize and listen to the MIDI data.

For visualization, we first need to convert our MIDI object into a **piano roll**. A piano roll is a 2-D representation of MIDI where the X axis represents time and the Y axis represents pitch. When we represent MIDI this way, we can easily graph it. Luckily, the pretty_midi library has a built-in function for converting MIDI objects into piano rolls. The code below uses librosa's specshow library to display a piano roll, which is nice because you can automatically have the y-axis display note names.

In order to create a piano roll for a given MIDI, we need to choose a time subdivision to use for the x-axis. Since the MIDIs are standardized to 120bpm, we know that the length of 1 "beat" (typically a quarter note) is 1/2 a second. We will choose a time subdivision of 1/16th of a second, which will allow us to represent 32nd notes.

1.5 Try using the function below to plot the first 15 seconds of the Mamma Mia MIDI object from above.

In [None]:
def plot_piano_roll(pm, length, fs=16):
    plt.figure(figsize=(12, 6))
    # Use librosa's specshow function for displaying the piano roll
    pianoroll = pm.get_piano_roll(fs) #=fs)
    pianoroll[pianoroll > 0] = 1 # sets all the velocities to be the same
    pianoroll = pianoroll[:, :fs*length]
    nonzero_row_indices = np.nonzero(np.count_nonzero(pianoroll, axis=1))
    start_pitch = max(np.min(nonzero_row_indices) - 3, 0)
    end_pitch = min(np.max(nonzero_row_indices) + 3, 127)
    librosa.display.specshow(pianoroll[start_pitch:end_pitch],
                             hop_length=1, sr=fs, x_axis='time', y_axis='cqt_note',
                             fmin=pretty_midi.note_number_to_hz(start_pitch))


### YOUR CODE HERE

It would also be nice to be listen to our MIDIs! We can do that using pretty_midi's fluidsynth method.

1.6 Use the code below to render 60 seconds of audio for our Mamma Mia MIDI.

In [None]:
def display_audio(pm, seconds=None, fs=16000):
  waveform = pm.fluidsynth(fs=fs)
  if seconds:
    waveform = waveform[:seconds*fs]
  return Audio(waveform, rate=fs)

### YOUR CODE HERE

### Part 2: Creating our Representation. (15 points)
An LSTM will not be able to understand a MIDI object or a piano roll object. LSTMs take as input sequences of integers, so we need to convert our MIDIs into this kind of representation. Since we are working strictly with monophonic MIDIs, there is a very natural way to represent our dataset. We will create a dictionary in which each integer key corresponds to a distinct pitch (0-127) and then add three extra tokens for rests, beginning-of-sequence, and end-of-sequence.

2.1 Define a token dictionary with integer keys to represent the 128 MIDI pitches, a `<rest>` token, a `<bos>` token, and an `<eos>` token. Create an inverse token dictionary to be used for going from sequence representation back to MIDI. (10 points)

In [None]:
### YOUR CODE HERE
# Need a token dictionary and an inverse token dictionary

We will also chunk the MIDI data into shorter segments to make everything more manageable. Let's consider chunks that are 4 bars long. With a 32nd note subdivision, each chunk will be 128 tokens long, plus a `<bos>` and `<eos>` token, so 130 total. We will want to exclude chunks that are majority rests. Let's write some code to create our token sequences.

In [None]:
import random

def create_token_sequences(midi, token_dict, fs=16, seq_len=128, add_bos_eos=True):
  pianoroll = midi.get_piano_roll(fs)
  pianoroll[pianoroll>0]=1
  total_len = pianoroll.shape[1] // 128
  seqs = []
  for i in range(total_len):
    vel_sum = np.sum(pianoroll[:,i*seq_len:(i+1)*seq_len])
    if vel_sum > seq_len*(0.3):
      new_seq = []
      if add_bos_eos:
        new_seq.append(token_dict['<bos>'])
      for j in range(seq_len):
        nonzero_indices = np.nonzero(pianoroll[:,i*seq_len + j])
        if len(nonzero_indices[0]) != 0:
          new_seq.append(nonzero_indices[0][0])
        else:
          new_seq.append(token_dict['<rest>'])
      if add_bos_eos:
        new_seq.append(token_dict['<eos>'])
      seqs.append(new_seq)
  return seqs

def create_all_token_sequences(data_dir, token_dict, fs=16, seq_len=128, test_prop=0.10):
  file_names = os.listdir(data_directory)
  all_sequences = []
  for f in tqdm(file_names):
    pm = pretty_midi.PrettyMIDI(data_directory + f)
    cur_seqs = create_token_sequences(pm, token_dict, fs=fs, seq_len=seq_len)
    all_sequences += cur_seqs

  # shuffle list
  random.shuffle(all_sequences)
  test_seqs = all_sequences[:int(len(all_sequences)*test_prop)]
  train_seqs = all_sequences[int(len(all_sequences)*test_prop):]
  return train_seqs, test_seqs

### YOUR CODE HERE
# Call create_all_token_sequences to create test and train sequences
train_seqs, test_seqs = [],[]
print(len(train_seqs), len(test_seqs))

What if we want to go back the other way? We need an inverse function so we can convert any sequences that our model generates into MIDI objects! It's pretty straightforward to go from sequence to pianoroll. Going from pianoroll to MIDI is a little more complicated, so we provided code for that.

2.2 Finish the function below by creating a pianoroll given a sequence of pitch tokens. (5 points)

In [None]:
def piano_roll_to_pretty_midi(piano_roll, fs=16, program=0):
    '''
    Parameters
    ----------
    piano_roll : np.ndarray
    fs : int
        Sampling frequency of the columns
    Returns
    -------
    midi_object : pretty_midi.PrettyMIDI
    '''
    notes, frames = piano_roll.shape
    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(program=program)

    # pad 1 column of zeros so we can acknowledge inital and ending events
    piano_roll = np.pad(piano_roll, [(0, 0), (1, 1)], 'constant')

    # use changes in velocities to find note on / note off events
    velocity_changes = np.nonzero(np.diff(piano_roll).T)

    # keep track on velocities and note on times
    prev_velocities = np.zeros(notes, dtype=int)
    note_on_time = np.zeros(notes)

    for time, note in zip(*velocity_changes):
        # use time + 1 because of padding above
        velocity = piano_roll[note, time + 1]
        time = time / fs
        if velocity > 0:
            if prev_velocities[note] == 0:
                note_on_time[note] = time
                prev_velocities[note] = velocity
        else:
            pm_note = pretty_midi.Note(
                velocity=prev_velocities[note],
                pitch=note,
                start=note_on_time[note],
                end=time)
            instrument.notes.append(pm_note)
            prev_velocities[note] = 0
    pm.instruments.append(instrument)
    return pm

def mono_seq_to_pretty_midi(seq, inv_token_dict):
    ### YOUR CODE HERE! Take a sequence, create a pianoroll
    #   to pass to piano_roll_to_pretty_midi
    #   check for BOS, EOS!
    pianoroll = None

    midi = piano_roll_to_pretty_midi(pianoroll)

    for note in midi.instruments[0].notes:
        note.velocity = 100

    return pianoroll,midi

In [None]:
plot_piano_roll(pm)

In [None]:
### Unit test: go from MIDI to pianoroll and plot. Then go to sequence, back to MIDI then plot
import itertools

tokens = create_token_sequences(pm, fs=16, seq_len=128, add_bos_eos=False)
print(len(tokens))
all_tokens = list(itertools.chain.from_iterable(tokens)) # concatenate all the token list chunks
print(len(all_tokens))

In [None]:
new_pr, new_pm = mono_seq_to_pretty_midi(all_tokens)
plot_piano_roll(new_pm)

### Part 3: LSTMs (30 Points)

Now we have to define an autoregressive model. We have provided the init function and supporting weight initialization functions.

3.1 Fill in the forward pass of the LSTM that should embed the source sequence, send it through the LSTM, and then generate a prediction with the fully connected layer. You can also add dropout layers to try to reduce overfitting/create a more robust model. (10 points)

In [None]:
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, dropout_rate,
                tie_weights):

        super().__init__()
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers,
                    dropout=dropout_rate, batch_first=True)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hidden_dim, vocab_size)

        if tie_weights:
            assert embedding_dim == hidden_dim, 'cannot tie, check dims'
            self.embedding.weight = self.fc.weight
        self.init_weights()

    def forward(self, src, hidden): # TO DO
        prediction, hidden = None, None
        return prediction, hidden

    def init_weights(self):
        init_range_emb = 0.1
        init_range_other = 1/math.sqrt(self.hidden_dim)
        self.embedding.weight.data.uniform_(-init_range_emb, init_range_emb)
        self.fc.weight.data.uniform_(-init_range_other, init_range_other)
        self.fc.bias.data.zero_()
        for i in range(self.num_layers):
            self.lstm.all_weights[i][0] = torch.FloatTensor(self.embedding_dim,
                    self.hidden_dim).uniform_(-init_range_other, init_range_other)
            self.lstm.all_weights[i][1] = torch.FloatTensor(self.hidden_dim,
                    self.hidden_dim).uniform_(-init_range_other, init_range_other)

    def init_hidden(self, bs):
        hidden = torch.zeros(self.num_layers, self.hidden_dim)
        cell = torch.zeros(self.num_layers, self.hidden_dim)
        return hidden, cell

    def detach_hidden(self, hidden):
        hidden, cell = hidden
        hidden = hidden.detach()
        cell = cell.detach()
        return hidden, cell

**3.2** Define an LSTM and initialize the hidden states. Also define an Adam optimizer and use cross entropy loss as the criterion. (5 points)

In [None]:
### YOUR CODE HERE
model = None
hidden = None
optimizer = None
criterion = None
# Leave the rest of these alone
batch_size = 1
seq_len = 129
best_test_loss = float('inf')
best_model_params = None

**3.3** Write a training loop for the model. Note that we are using a batch size of one. You can use `torch.nn.utils.clip_grad_norm_` with a clip value of 0.25 to try to improve performance. (10 points)

In [None]:
num_epochs = 5
for e in range(num_epochs): # TO DO
  epoch_loss = 0
  epoch_test_loss = 0
  model.train()
  for seq in tqdm(train_seqs):
    # WRITE TRAINING LOOP
    print("Training!")

  model.eval()
  for seq in tqdm(test_seqs): # TO DO
    # WRITE EVAL LOOP
    print("Eval!")

  print(e, epoch_loss/len(train_seqs), epoch_test_loss/len(test_seqs))

  if epoch_test_loss < best_test_loss:
    print("MODEL UPDATED")
    best_test_loss = epoch_test_loss
    best_model_params = {
        'model_state_dict':model.state_dict(),
        'optimizer_state_dict':optimizer.state_dict()}


**3.4** Finish this function that takes an LSTM model and the beginning of a sequence and generates a continuation of the sequence. You can divide predictions by the `temp` variable before using softmax and sampling to encourage more variation in the output. (5 points)

In [None]:
def lstm_generate(model, prompt, max_seq_len=130, temp=0.5, seed=None):
    # BATCH SIZE MUST BE 1!!! #
    model.eval()
    if seed is not None:
        torch.manual_seed(seed)
    indices = [t for t in prompt]
    hidden = model.init_hidden(1) # batch_size = 1
    with torch.no_grad():
        while len(indices) < max_seq_len: # every time through this loop we predict the next token
            # YOUR CODE HERE
            prediction = None
            if prediction == 130: #eos
                break
            indices.append(prediction)
    return indices

In [None]:
# TEST GENERATION!
generation = lstm_generate(model, [129,80,80,80,80,60,60,60,60], max_seq_len=130, temp=0.5)
print(generation)
new_pr, new_pm = mono_seq_to_pretty_midi(generation[1:])
plot_piano_roll(new_pm)


In [None]:
display_audio(new_pm)

## Part 4 Transformers (30 points)

4.1 Fill in the forward function of the transformer model below. You will need to embed the source sequence, add the positional encoding, send it through the transformer with a square causal mask (see `nn.Transformer.generate_square_subsequent_mask`), and then use a linear layer afterward to get predictions similar to the LSTM model. (10 points)

In [None]:
from torch import nn, Tensor
import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple
from torch.utils.data import dataset

BOS_IDX, EOS_IDX = 129, 130 # 131

# helper Module that adds positional encoding to the token embedding to introduce a notion of word order.
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.linear = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, src): # TO DO
        """
        Arguments:
            src: Tensor of shape ``[seq_len, batch_size]``
        Returns:
            output Tensor of shape ``[seq_len, batch_size, ntoken]``
        """
        ### TO DO, don't forget src_mask!
        output = None
        return output

torch.manual_seed(0)

In [None]:
### HELPER FUNCTION FOR BATCH STUFF ###
def get_batch(train_seqs, i, bs=4):
  seqs = []
  for j in range(bs):
    seqs.append(train_seqs[i*bs + j])
  seqs = torch.LongTensor(seqs).T
  source = seqs[:-1]
  target = seqs[1:].reshape(-1)

  return source, target

**4.2** Define a transformer using the class constructor above. Also define an Adam optimizer and use cross entropy loss as the criterion. (5 points)

In [None]:
ntokens = 131  # size of vocabulary
emsize = 512  # embedding dimension
d_hid = 512 # dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 4  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 8  # number of heads in ``nn.MultiheadAttention``
dropout = 0.2  # dropout probability
transformer = None
criterion = None
optimizer = None
# Leave the rest of these alone
bs = 1
seq_len = 129
num_epochs = 10
best_test_loss = float('inf')
best_model_params = None
log_interval=200 # for progress updates

**4.3** Write a training loop for the model. Use the `get_batch` function above to generate the data and targets for the predictions. You can use `torch.nn.utils.clip_grad_norm_` with a clip value of 0.25 to try to improve performance. (10 points)

In [None]:
import time

for e in range(num_epochs): # TO DO
  epoch_loss = 0
  epoch_test_loss = 0
  transformer.train()
  for i in range(len(train_seqs)):
    ### TO DO: TRAINING LOOP ###

    if i % log_interval == 0 and i > 0:
      #lr = scheduler.get_last_lr()[0]
      cur_loss = epoch_loss / i
      print(f'| epoch {e:3d} | i {i:3d} |'
            f'loss {cur_loss:5.2f}')

  transformer.eval()
  for i in tqdm(range(len(test_seqs))):
    ### TO DO: EVAL LOOP ###
    print("Eval!")

  print(e, epoch_loss/len(train_seqs), epoch_test_loss/len(test_seqs))

  if epoch_test_loss < best_test_loss:
    print("BEAT OLD ONES!")
    best_test_loss = epoch_test_loss
    best_model_params = {
        'model_state_dict':model.state_dict(),
        'optimizer_state_dict':optimizer.state_dict()}

**4.4** Finish this function that takes a transformer model and the beginning of a sequence and generates a continuation of the sequence. (5 points)

In [None]:
def transformer_generate(model, prompt, max_seq_len, temp=1.0, seed=None):
    # BATCH SIZE MUST BE 1 #
    model.eval()
    if seed is not None:
        torch.manual_seed(seed)
    indices = [t for t in prompt]
    with torch.no_grad():
        for _ in range(max_seq_len-len(indices)):
            ### YOUR CODE HERE
            prediction=None
            if prediction == 130: #eos
                break
            indices.append(prediction)
    return indices

generation = transformer_generate(transformer, [129,120,120,120], 65, temp=.8)
print(generation)
new_pr, new_pm = mono_seq_to_pretty_midi(generation[1:])
plot_piano_roll(new_pm)

In [None]:
display_audio(new_pm)