In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import torch
import torch.nn as nn

import torch.utils.data as data
import os
import random
import numpy as np
from tqdm import tqdm

import pypianoroll

In [2]:
#some constants
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
LEARNING_RATE = 0.001
TRAIN_BATCH_SIZE = 60
VAL_BATCH_SIZE = 30
DATA_PATH = '../data/lmd_aligned/'
NUM_EPOCHS = 5
POSITIVE_WEIGHT = 2

In [4]:
def path_to_pianoroll(path):
    #Resolution is set to 3 so that the sequences are not that long
    midi_data = pypianoroll.read(path, resolution=3)
    
    piano_roll = midi_data.blend()[:, 21:109].transpose() #Taking just 81 usefull notes
    
    #we want to perform multilabel classification at each step so we need to binaryze the roll
    piano_roll[piano_roll > 0] = 1
    
    return piano_roll
    

In [5]:
def collate(batch):
    #Helper function for DataLoader
    #Batch is a list of tuple in the form (input, target)
    #We do not have to padd everything thanks to pack_sequence
    data = [item[0] for item in batch] #
    data = nn.utils.rnn.pack_sequence(data, enforce_sorted=False)
    targets = [item[1] for item in batch]
    targets = nn.utils.rnn.pack_sequence(targets, enforce_sorted=False)
    return [data, targets]

In [6]:
def calculate_num_of_samples(path: str) -> int:
    num_of_samples = 0
    for root, subdirs, files in os.walk(path):
        for f in files:
            num_of_samples += 1
            
    return num_of_samples

NUM_OF_SAMPLES = calculate_num_of_samples(DATA_PATH)

In [8]:
class NotesGenerationDataset(data.Dataset):
    
    def __init__(self, path, train=True, num_of_samples=NUM_OF_SAMPLES, train_percentage = 0.8):
        
        self.path = path
        self.num_of_samples = num_of_samples
        self.full_filenames = []
        self.train_percentage = train_percentage
        
        #Here we assume that all midi files are valid, we do not check anything here.
        train_dataset_volume = num_of_samples*train_percentage
        if train:
            for root, subdirs, files in os.walk(path):
                for f in files:
                    self.full_filenames.append(os.path.join(root, f))
                    if len(self.full_filenames) >= train_dataset_volume:
                        return
        else:
            skipped = 0 # we want to skip bigger part of the dataset
            for root, subdirs, files in os.walk(path):
                for f in files:
                    if skipped < train_dataset_volume:
                        skipped +=1
                    else:
                        self.full_filenames.append(os.path.join(root, f))
                        
    def __len__(self):
        return len(self.full_filenames)
    
    
    def __getitem__(self, index):
        full_filename = self.full_filenames[index]
        
        piano_roll = path_to_pianoroll(full_filename)
        
        #input and gt are shifted by one step w.r.t one another.
        #we transpose it since piano_roll has shape [num_of_notes, number_of_event] we want to have format [number of events, num_of_notes]
        input_sequence = piano_roll[:, :-1].transpose()
        ground_truth_sequence = piano_roll[:, 1:].transpose()
        
        return torch.tensor(input_sequence, dtype=torch.float32), torch.tensor(ground_truth_sequence, dtype=torch.float32)

In [9]:
trainset = NotesGenerationDataset(DATA_PATH, train=True)

#ofc we want big batch_size. However, one training sample takes quite a lot of memory.
#We will use torch.cuda.amp.autocast() so that we can make bigger batches
trainset_loader = torch.utils.data.DataLoader(trainset, batch_size=TRAIN_BATCH_SIZE,
                                              shuffle=True, drop_last=True, collate_fn=collate)

valset = NotesGenerationDataset(DATA_PATH, train=False)

valset_loader = torch.utils.data.DataLoader(valset, batch_size=VAL_BATCH_SIZE, shuffle=False, drop_last=False, collate_fn=collate)

In [39]:
#Small sanity check that our sets do not intersect at any moment
train_songs = set(trainset.full_filenames)
for song in valset.full_filenames:
    assert not song in train_songs

In [15]:
class RNN(nn.Module):
    
    def __init__(self, input_size, hidden_size, num_classes, n_layers=2):
        
        super(RNN, self).__init__()
        
        self.input_size = input_size # amount of different notes
        self.hidden_size = hidden_size
        self.num_classes = num_classes 
        self.n_layers = n_layers
        
        #At first we need layer that will encode our vector with only once to better representation
        self.notes_encoder = nn.Linear(in_features=input_size, out_features=hidden_size)
        
        self.lstm = nn.LSTM(hidden_size, hidden_size, n_layers)
        
        #At the end we want to get vector with logits of all notes
        self.logits_fc = nn.Linear(hidden_size, num_classes)
    
    
    def forward(self, inp, hidden=None):
        
        if isinstance(inp, nn.utils.rnn.PackedSequence):
            #If we have Packed sequence we proceed a little bit differently
            batch_sizes = inp.batch_sizes
            notes_encoded = self.notes_encoder(inp.data) #PackedSequence.data is a tensor representation of shape [samples, num_of_notes]
            rnn_in = nn.utils.rnn.PackedSequence(notes_encoded,batch_sizes) #This is not recommended in PyTorch documentation.
            #However this saves a day here. Since otherwise we would have to create padded sequences 
            outputs, hidden = self.lstm(rnn_in, hidden)
            
            logits = self.logits_fc(outputs.data) #Again we go from packedSequence to tensor.
        else:
            #If we have tensor at the input this is pretty straightforward
            notes_encoded = self.notes_encoder(inp)
            outputs, hidden = self.lstm(notes_encoded, hidden)
            logits = self.logits_fc(outputs)
        
        return logits, hidden

In [16]:
#Now sanity check about Packed Sequences. So I check if Unpacking -> packing the packed Sequence will lead to exactly the same Object.
inp, targets = next(iter(trainset_loader))

batch_sizes = inp.batch_sizes
inp2 = nn.utils.rnn.PackedSequence(inp.data, batch_sizes)
assert torch.all(torch.eq(inp.data, inp2.data))



In [144]:
rnn = RNN(input_size=88, hidden_size=256, num_classes=88)
rnn = rnn.to(DEVICE)

criterion = nn.BCEWithLogitsLoss(pos_weight=torch.full((88,), POSITIVE_WEIGHT, device=DEVICE))

optimizer = torch.optim.Adam(rnn.parameters(), lr=LEARNING_RATE)

scaler = torch.cuda.amp.GradScaler()

In [145]:
def validate():
    rnn.eval()
    loop = tqdm(valset_loader, leave=True)
    
    losses = []
    
    with torch.no_grad():
        for idx, (inp, target) in enumerate(loop):
            inp, target = inp.to(DEVICE), target.to(DEVICE)
            logits, _ = rnn(inp)

            loss = criterion(logits, target.data)
            
            losses.append(loss.item())
            loop.set_postfix(loss=loss.item())

    rnn.train()
    return sum(losses) / len(losses)

In [146]:
clip = 1.0
best_val_loss = float("inf")

loss_list = []
val_list = []

for epoch_number in range(NUM_EPOCHS):
    loop = tqdm(trainset_loader, leave=True)
    losses = []
    for idx, (inp, target) in enumerate(loop):
        
        inp, target = inp.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()

        with torch.cuda.amp.autocast(): 
            logits, _ = rnn(inp)
            loss = criterion(logits, target.data)
             
        scaler.scale(loss).backward()
        # Unscales the gradients of optimizer's assigned params in-place
        scaler.unscale_(optimizer)
        # Since the gradients of optimizer's assigned params are unscaled, clips as usual:
        torch.nn.utils.clip_grad_norm_(rnn.parameters(), clip)
        
        scaler.step(optimizer)
        scaler.update()
    
        
        losses.append(loss.item())
        loop.set_postfix(loss=loss.item())
    
    train_loss = sum(losses)/len(losses)
    loss_list.append(train_loss)
    current_val_loss = validate()
    val_list.append(current_val_loss)
    
    print(f"Epoch {epoch_number}, train_loss: {train_loss}, val_loss: {current_val_loss}")
    if current_val_loss < best_val_loss:
        
        torch.save(rnn.state_dict(), 'music_rnn.pth')
        best_val_loss = current_val_loss

100%|██████████| 13/13 [00:19<00:00,  1.54s/it, loss=0.205]
100%|██████████| 7/7 [00:04<00:00,  1.49it/s, loss=0.202]


Epoch 0, train_loss: 0.47092414590028614, val_loss: 0.18847319058009557


100%|██████████| 13/13 [00:17<00:00,  1.35s/it, loss=0.149]
100%|██████████| 7/7 [00:03<00:00,  1.85it/s, loss=0.162]


Epoch 1, train_loss: 0.16357416143784156, val_loss: 0.15431699369634902


100%|██████████| 13/13 [00:16<00:00,  1.27s/it, loss=0.148]
100%|██████████| 7/7 [00:04<00:00,  1.73it/s, loss=0.161]


Epoch 2, train_loss: 0.15196926433306474, val_loss: 0.15230111352034978


100%|██████████| 13/13 [00:15<00:00,  1.18s/it, loss=0.144]
100%|██████████| 7/7 [00:04<00:00,  1.61it/s, loss=0.16] 


Epoch 3, train_loss: 0.15008225234655234, val_loss: 0.15109907516411372


100%|██████████| 13/13 [00:20<00:00,  1.61s/it, loss=0.151]
100%|██████████| 7/7 [00:05<00:00,  1.27it/s, loss=0.158]

Epoch 4, train_loss: 0.14965867537718552, val_loss: 0.1498615379844393





In [185]:
def sample_from_piano_rnn(sample_length=4, temperature=1, starting_sequence=None, deterministic = False, threshold=0.5):

    if starting_sequence is None:
                
        current_sequence_input = torch.zeros(1,1, 88, dtype=torch.float32, device=DEVICE)
        current_sequence_input[0, 0, 40] = 1
        current_sequence_input[0, 0, 50] = 1
        current_sequence_input[0, 0, 56] = 1

    final_output_sequence = [current_sequence_input.squeeze(1)]
    
    hidden = None
    with torch.no_grad():
        for i in range(sample_length):

            output, hidden = rnn(current_sequence_input, hidden)
            
            probabilities = torch.sigmoid(output.div(temperature)) # The less the temperature the bigger probabilities of 1 will be
            if deterministic and len(final_output_sequence) > 5:
                #print(probabilities)
                current_sequence_input = (probabilities > threshold).to(torch.float32)                
            else:
                #print(probabilities)
                prob_of_0 = 1 - probabilities
                #print(prob_of_0)
                dist = torch.stack((prob_of_0, probabilities), dim=3).squeeze() #Here we will get tensor [num_of_notes, 2]
                #print(dist)
                
                #from multinomial we have [num_of_notes, 1]. But eventually we want to have [1,1,num_of_notes]
                current_sequence_input = torch.multinomial(dist, 1).squeeze().unsqueeze(0).unsqueeze(1).to(torch.float32)
                #print(current_sequence_input)
                #break

            final_output_sequence.append(current_sequence_input.squeeze(1))

    sampled_sequence = torch.cat(final_output_sequence, dim=0).cpu().numpy()
    
    return sampled_sequence

In [189]:
sample = sample_from_piano_rnn(sample_length=200, temperature=0.25, deterministic=False, threshold=0.3)

In [190]:
np.sum(sample)

541.0

In [191]:
roll = np.zeros((201,128))
roll[:, 21:109] = sample
roll[roll == 1] = 100
track = pypianoroll.Multitrack(resolution=3)
track.append(pypianoroll.StandardTrack(pianoroll=roll))
pypianoroll.write("sample2.mid", track)