In [116]:
import h5_getter
import numpy as np
import os
import pypianoroll
import pandas as pd
import pretty_midi
import matplotlib.pyplot as plt
import librosa
import music21
import random
import json
import itertools
import datetime
import torch
import torch.nn as nn
from torch.nn import functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [78]:
root_dir = '.'
data_dir = root_dir + '/dataset/lpd_5_cleansed'
music_dataset_lpd_dir = root_dir + '/dataset/lmd_matched_h5'

cleansed_ids = pd.read_csv(os.path.join('dataset', 'cleansed_ids.txt'), delimiter = '    ', header = None)
lpd_to_msd_ids = {a:b for a, b in zip(cleansed_ids[0], cleansed_ids[1])}
msd_to_lpd_ids = {a:b for a, b in zip(cleansed_ids[1], cleansed_ids[0])}

RESULTS_PATH = os.path.join(root_dir, 'dataset')

# Utility functions for retrieving paths
def msd_id_to_dirs(msd_id):
    """Given an MSD ID, generate the path prefix.
    E.g. TRABCD12345678 -> A/B/C/TRABCD12345678"""
    return os.path.join(msd_id[2], msd_id[3], msd_id[4], msd_id)


def msd_id_to_h5(msd_id):
    """Given an MSD ID, return the path to the corresponding h5"""
    return os.path.join(RESULTS_PATH, 'lmd_matched_h5',
                        msd_id_to_dirs(msd_id) + '.h5')

# Load the midi npz file from the LMD cleansed folder
def get_midi_npz_path(msd_id, midi_md5):
    return os.path.join(data_dir,
                        msd_id_to_dirs(msd_id), midi_md5 + '.npz')
    
# Load the midi file from the Music Dataset folder
def get_midi_path(msd_id, midi_md5):
    return os.path.join(music_dataset_lpd_dir,
                        msd_id_to_dirs(msd_id), midi_md5 + '.mid')

  cleansed_ids = pd.read_csv(os.path.join('dataset', 'cleansed_ids.txt'), delimiter = '    ', header = None)


In [79]:
# Reading the genre annotations
genre_file_dir = os.path.join('dataset', 'msd_tagtraum_cd1.cls')
ids = []
genres = []
with open(genre_file_dir) as f:
    line = f.readline()
    while line:
        if line[0] != '#':
          split = line.strip().split("\t")
          if len(split) == 2:
            ids.append(split[0])
            genres.append(split[1])
          elif len(split) == 3:
            ids.append(split[0])
            ids.append(split[0])
            genres.append(split[1])
            genres.append(split[2])
        line = f.readline()
genre_df = pd.DataFrame(data={"TrackID": ids, "Genre": genres})

genre_dict = genre_df.groupby('TrackID')['Genre'].apply(lambda x: x.tolist()).to_dict()


In [80]:
# get ids of pop songs
pop_ids = genre_df[genre_df['Genre'] == 'Pop_Rock']['TrackID'].tolist()

pop_lpd_ids = [msd_to_lpd_ids[msd_id] for msd_id in pop_ids if msd_id in msd_to_lpd_ids]

In [81]:
# Take random 200 pop songs
pop_lpd_ids = np.random.choice(pop_lpd_ids, 50)

2423

In [None]:
notes = []

i = 0
while i < 50:
    lpd_file_name = pop_lpd_ids[np.random.randint(0, len(pop_lpd_ids) - 1)]
    msd_file_name = lpd_to_msd_ids[lpd_file_name]

    # Get the NPZ path
    npz_path = get_midi_npz_path(msd_file_name, lpd_file_name)

    multitrack = pypianoroll.load(npz_path)
    pm = pypianoroll.to_pretty_midi(multitrack)
    new_midi_path = npz_path[:-4] + '.mid'
    pypianoroll.write(new_midi_path, multitrack)
    # Get the MIDI path (should already be generated)
    new_midi_path = npz_path[:-4] + '.mid'
    midi = music21.converter.parse(new_midi_path)

    s2 = music21.instrument.partitionByInstrument(midi)
    piano_part = None
    # Filter for  only the piano part
    instr = music21.instrument.Piano
    for part in s2:
        if isinstance(part.getInstrument(), instr):
            piano_part = part

    notes_song = []
    if piano_part: # Some songs somehow have no piano parts
        for element in piano_part:
            if isinstance(element, music21.note.Note):
            # Return the pitch of the single note
                notes_song.append(str(element.pitch))
            elif isinstance(element, music21.chord.Chord):
            # Returns the normal order of a Chord represented in a list of integers
                notes_song.append('.'.join(str(n) for n in element.normalOrder))

    notes.append(notes_song)
    i+=1
    print(i)

In [82]:
filtered_notes = [x for x in notes if len(x) > 0]
len(filtered_notes)

480

In [96]:
random.seed(42)
test_ids = random.choices(list(range(480)), k = 48)
train_ids = [e for e in range(480) if e not in test_ids]

notes_train = [notes[i] for i in train_ids]
notes_test = [notes[i] for i in test_ids]

with open('train_notes.json', 'w') as f:
    json.dump(notes_train, f)

with open('test_notes.json', 'w') as f:
    json.dump(notes_test, f)

In [101]:
# Prepare input and output sequences
def prepare_sequences(notes, note_to_int = None, sequence_length = 32):
    network_input = []
    network_output = []

    if not note_to_int:
        # Set of note/chords (collapse into list)
        pitch_names = sorted(set(itertools.chain(*notes)))
        # create a dictionary to map pitches to integers
        note_to_int = dict((note, number) for number, note in enumerate(pitch_names))

    # Loop through all songs
    for song in notes:
        # Check for the end
        i = 0
        while i + sequence_length < len(song):
            # seq_len notes for the input seq
            sequence_in = song[i: i + sequence_length]
            # Next note to predict
            sequence_out = song[i+sequence_length]
            # Return the int representation of the note - *(If note not found)
            network_input.append([note_to_int.get(char, 0) for char in sequence_in])
            network_output.append(note_to_int.get(sequence_out, 0))
            i += 5

    n_patterns = len(network_input)

    # Reshape for LSTM input
    network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))
    # Normalize input (?? - CHECK LATER - this assumes the alphabetical order of the notes carries semantic meaning?)
    #network_input = network_input / len(pitch_names)
    #network_output = np_utils.to_categorical(network_output)

    return network_input, network_output, note_to_int

train_input, train_output, note_to_int = prepare_sequences(notes_train, sequence_length = 64)
#test_input, test_output, _ = prepare_sequences(notes_test, note_to_int = note_to_int, sequence_length = 64)

In [106]:
int_to_note = {number:note for note, number in note_to_int.items()}

In [112]:
# Take a random observation from the network input, return (input, target), each shifted by 1
# NOT NEEDED ANYMORE - each epoch just using entire dataset
def random_training_set(network_input):    
    chunk = network_input[random.randint(0, network_input.shape[0] - 1), : , :]
    input = torch.tensor(chunk[:-1], dtype = torch.long).squeeze()
    target = torch.tensor(chunk[1:], dtype = torch.long).squeeze()
    return input, target


def grad_clipping(net, theta):  
    """Clip the gradient."""
    params = [p for p in net.parameters() if p.requires_grad]

    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

In [113]:
class GenerationRNN(nn.Module):
  # input_size: number of possible pitches
  # hidden_size: embedding size of each pitch
  # output_size: number of possible pitches (probability distribution)
    def __init__(self, input_size, hidden_size, output_size, n_layers=1):
        super(GenerationRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
        self.decoder = nn.Linear(hidden_size * n_layers, output_size)
    
    def forward(self, input, hidden):
        # Creates embedding of the input texts
        #print('initial input', input.size())
        input = self.embedding(input.view(1, -1))
        #print('input after embedding', input.size())
        output, hidden = self.gru(input, hidden)
        #print('output after gru', output.size())
        #print('hidden after gru', hidden.size())
        output = self.decoder(hidden.view(1, -1))
        #print('output after decoder', output.size())
        return output, hidden

    def init_hidden(self):
        return torch.zeros(self.n_layers, 1, self.hidden_size).to(device)

In [114]:
# Single training step for ONE sequence
def train_sequence(input, target, model, optimizer, criterion):
    # Initialize hidden state, zero the gradients of model 
    hidden = model.init_hidden()
    model.zero_grad()
    loss = 0
    # For each character in our chunk (except last), compute the hidden and ouput
    # Using each output, compute the loss with the corresponding target 
    for i in range(len(input)):
        output, hidden = model(input[i], hidden)
        loss += criterion(output, target[i].unsqueeze(0))
    
    # Backpropagate, clip gradient and optimize
    loss.backward()
    grad_clipping(model, 1)
    optimizer.step()

    # Return average loss for the input sequence
    return loss.data.item() / len(input)

def test_sequence(input, target, model, criterion):
    # Initialize hidden state, zero the gradients of model 
    hidden = model.init_hidden()
    model.zero_grad()
    loss = 0
    # For each character in our chunk (except last), compute the hidden and ouput
    # Using each output, compute the loss with the corresponding target 
    for i in range(len(input)):
        output, hidden = model(input[i], hidden)
        loss += criterion(output, target[i].unsqueeze(0))

    # Return average loss for the input sequence
    return loss.data.item() / len(input)

In [115]:
# Overall training loop
def training_loop(model, optimizer, scheduler, criterion, train_input, test_input):

  train_losses = []
  test_losses = []

  for epoch in range(1, n_epochs + 1):
    running_loss = 0
    model.train()

    # Training - sample 2000
    sampled_train_ids = random.choices(range(train_input.shape[0]), k = 2000)
    print(scheduler.get_last_lr())
    for i in range(train_input.shape[0]):
      sequence = train_input[i, : , :]
      input = torch.tensor(sequence[:-1], dtype = torch.long).squeeze().to(device)
      target = torch.tensor(sequence[1:], dtype = torch.long).squeeze().to(device)
      loss = train_sequence(input, target, model, optimizer, criterion)
      running_loss += loss

    train_epoch_loss = running_loss / 2000
    train_losses.append(train_epoch_loss)
    scheduler.step()

    running_loss = 0
    # model.eval()
    # # Testing
    # for i in range(test_input.shape[0]):
    #   sequence = test_input[i, : , :]
    #   input = torch.tensor(sequence[:-1], dtype = torch.long).squeeze().to(device)
    #   target = torch.tensor(sequence[1:], dtype = torch.long).squeeze().to(device)
    #   loss = test_sequence(input, target, model, criterion)
    #   running_loss += loss

    # test_epoch_loss = running_loss / 1000
    # test_losses.append(test_epoch_loss)
    test_epoch_loss = 0

    print('Epoch {}, Train Loss: {}, Test Loss: {}, Time: {}'.format(epoch, train_epoch_loss, test_epoch_loss, datetime.now()))

  return train_losses, test_losses

In [117]:
n_pitches = len(note_to_int)
hidden_size = 96
n_layers = 2
n_epochs = 40
lr = 0.002
lr_lambda = 0.99

model = GenerationRNN(input_size = n_pitches, hidden_size = hidden_size, output_size = n_pitches, n_layers = n_layers).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = lr)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda = lambda epoch: lr_lambda ** epoch)
criterion = nn.CrossEntropyLoss()
train_losses, test_losses = training_loop(model, optimizer, scheduler, criterion, train_input, train_input)

[0.002]


KeyboardInterrupt: 