In [1]:
from mido import MidiFile
import pandas as pd
import numpy as np
import os
from tqdm import tqdm

from music21 import converter, instrument, note, chord, stream
import pypianoroll
import sys

sys.path.append("/home/skynet/research/music-generation/pre-processing")

from note_representation import NoteRepresentation

In [2]:
root_dir = '/home/skynet/data/music-generation'
data_dir = root_dir + '/Lakh Piano Dataset/lpd_5/lpd_5_cleansed'
midi_dir = f'{root_dir}/Lakh Piano Dataset/lpd_5_midi'
lakh_dir = f'{root_dir}/Lakh Piano Dataset/'


In [3]:
note_rep = NoteRepresentation(midi_dir)
net_in, net_out = note_rep.get_data()

In [4]:
note_to_int = note_rep.note_to_int


In [5]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import random

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [6]:
# Take a random observation from the network input, return (input, target), each shifted by 1
# NOT NEEDED ANYMORE - each epoch just using entire dataset
def random_training_set(network_input):    
    chunk = network_input[random.randint(0, network_input.shape[0] - 1), : , :]
    input = torch.tensor(chunk[:-1], dtype = torch.long).squeeze()
    target = torch.tensor(chunk[1:], dtype = torch.long).squeeze()
    return input, target


def grad_clipping(net, theta):  
    """Clip the gradient."""
    params = [p for p in net.parameters() if p.requires_grad]

    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

In [7]:
class GenerationRNN(nn.Module):
  # input_size: number of possible pitches
  # hidden_size: embedding size of each pitch
  # output_size: number of possible pitches (probability distribution)
    def __init__(self, input_size, hidden_size, output_size, n_layers=1):
        super(GenerationRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
        self.decoder = nn.Linear(hidden_size * n_layers, output_size)
    
    def forward(self, input, hidden):
        # Creates embedding of the input texts
        #print('initial input', input.size())
        input = self.embedding(input.view(1, -1))
        #print('input after embedding', input.size())
        output, hidden = self.gru(input, hidden)
        #print('output after gru', output.size())
        #print('hidden after gru', hidden.size())
        output = self.decoder(hidden.view(1, -1))
        #print('output after decoder', output.size())
        return output, hidden

    def init_hidden(self):
        return torch.zeros(self.n_layers, 1, self.hidden_size).to(device)

In [8]:
# Single training step for ONE sequence
def train_sequence(input, target, model, optimizer, criterion):
    # Initialize hidden state, zero the gradients of model 
    hidden = model.init_hidden()
    model.zero_grad()
    loss = 0
    # For each character in our chunk (except last), compute the hidden and ouput
    # Using each output, compute the loss with the corresponding target 
    for i in range(len(input)):
        output, hidden = model(input[i], hidden)
        loss += criterion(output, target[i].unsqueeze(0))
    
    # Backpropagate, clip gradient and optimize
    loss.backward()
    grad_clipping(model, 1)
    optimizer.step()

    # Return average loss for the input sequence
    return loss.data.item() / len(input)

def test_sequence(input, target, model, criterion):
    # Initialize hidden state, zero the gradients of model 
    hidden = model.init_hidden()
    model.zero_grad()
    loss = 0
    # For each character in our chunk (except last), compute the hidden and ouput
    # Using each output, compute the loss with the corresponding target 
    for i in range(len(input)):
        output, hidden = model(input[i], hidden)
        loss += criterion(output, target[i].unsqueeze(0))

    # Return average loss for the input sequence
    return loss.data.item() / len(input)

In [9]:
# Overall training loop
def training_loop(model, optimizer, scheduler, criterion, train_input, test_input):

  train_losses = []
  test_losses = []

  for epoch in range(1, n_epochs + 1):
    running_loss = 0
    model.train()

    # Training - sample 2000
    sampled_train_ids = random.choices(range(train_input.shape[0]), k = 2000)
    print(scheduler.get_last_lr())
    for i in range(train_input.shape[0]):
      sequence = train_input[i, : , :]
      input = torch.tensor(sequence[:-1], dtype = torch.long).squeeze().to(device)
      target = torch.tensor(sequence[1:], dtype = torch.long).squeeze().to(device)
      loss = train_sequence(input, target, model, optimizer, criterion)
      running_loss += loss

    train_epoch_loss = running_loss / 2000
    train_losses.append(train_epoch_loss)
    scheduler.step()

    running_loss = 0
    # model.eval()
    # # Testing
    # for i in range(test_input.shape[0]):
    #   sequence = test_input[i, : , :]
    #   input = torch.tensor(sequence[:-1], dtype = torch.long).squeeze().to(device)
    #   target = torch.tensor(sequence[1:], dtype = torch.long).squeeze().to(device)
    #   loss = test_sequence(input, target, model, criterion)
    #   running_loss += loss

    # test_epoch_loss = running_loss / 1000
    # test_losses.append(test_epoch_loss)
    test_epoch_loss = 0

    print('Epoch {}, Train Loss: {}, Test Loss: {}, Time: {}'.format(epoch, train_epoch_loss, test_epoch_loss, datetime.now()))

  return train_losses, test_losses

In [13]:
from datetime import datetime

In [14]:
n_pitches = len(note_to_int)
hidden_size = 96
n_layers = 2
n_epochs = 40
lr = 0.002
lr_lambda = 0.99

model = GenerationRNN(input_size = n_pitches, hidden_size = hidden_size, output_size = n_pitches, n_layers = n_layers).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda = lambda epoch: lr_lambda ** epoch)
criterion = nn.CrossEntropyLoss()
train_losses, test_losses = training_loop(model, optimizer, scheduler, criterion, net_in, net_in)

[0.002]
Epoch 1, Train Loss: 2.366385728668786, Test Loss: 0, Time: 2023-04-02 16:57:05.041006
[0.00198]
Epoch 2, Train Loss: 1.8761272995558023, Test Loss: 0, Time: 2023-04-02 16:57:56.753868
[0.0019602]
Epoch 3, Train Loss: 1.6099545242245414, Test Loss: 0, Time: 2023-04-02 16:58:22.955195
[0.0019405980000000002]
Epoch 4, Train Loss: 1.4200630006631214, Test Loss: 0, Time: 2023-04-02 16:59:02.216330
[0.0019211920199999999]
Epoch 5, Train Loss: 1.2661156891129022, Test Loss: 0, Time: 2023-04-02 16:59:26.245750
[0.0019019800997999998]
Epoch 6, Train Loss: 1.1523321756714335, Test Loss: 0, Time: 2023-04-02 17:00:11.057461
[0.001882960298802]
Epoch 7, Train Loss: 1.0555116254273116, Test Loss: 0, Time: 2023-04-02 17:00:49.316427
[0.0018641306958139799]
Epoch 8, Train Loss: 0.9782372354767294, Test Loss: 0, Time: 2023-04-02 17:01:24.296086
[0.0018454893888558402]
