In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


from tqdm import tqdm
import pickle

from sklearn.model_selection import train_test_split

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader

In [11]:
device = torch.device("cuda")

### ***Dataset***

In [3]:
class MusicData(Dataset):


    def __init__(self, pkl_path, sequence_length, use_embedding=False, subset=None):

        self.sequence_length = sequence_length
        self.use_embedding = use_embedding

        with open(pkl_path, 'rb') as f:

            self.notes = pickle.load(f)
        if subset:

            self.notes = self.notes[:subset]
        
        
        self.n = len(set(self.notes))
        self.pitchnames = sorted(set(item for item in self.notes))
        self.note_to_int = dict((note, number) for number, note in enumerate(self.pitchnames))

        data = []
        label = []


        for i in range(0, len(self.notes) - sequence_length, 1):
            sequence_in = self.notes[i:i + sequence_length]
            sequence_out = self.notes[i + sequence_length]
            data.append([self.note_to_int[char] for char in sequence_in])
            label.append(self.note_to_int[sequence_out])

        n_patterns = len(data)


        self.data = torch.tensor(np.reshape(data, (n_patterns, sequence_length, 1)))
        #self.label = torch.tensor(label, dtype=torch.double)
        self.label = label
    def __len__(self):
        return len(self.data)


    def __getitem__(self, index):

        x, y = self.data[index], self.label[index]

        if self.use_embedding:
            
            return x, y

        else:

            x = x / float(self.n)

            return x, y

In [4]:
all_ds  = MusicData("../data/notes_final.pkl", sequence_length=100, use_embedding=True, subset=None) # full_datasets

In [None]:
train_ds, test_val_ds = train_test_split(all_ds,test_size=0.2) # apply train_test_split from sklearn to "all_ds"
val_ds, test_ds = train_test_split(test_val_ds,test_size=0.5) # apply train_test_split from sklearn to "test_val_ds"

In [None]:
trainloader = DataLoader(train_ds, batch_size=128, shuffle=True)
valloader = DataLoader(val_ds,batch_size=128,shuffle=False)
testloader = DataLoader(test_ds,batch_size=128,shuffle=True)

In [None]:
all_ds.n

### ***Model***

In [5]:
class MusicEmbeddingBaseline(nn.Module):

    def __init__(self, embedding_size, hidden_size, post_embedding, vocab_size, bidirectional):

        super(MusicEmbeddingBaseline, self).__init__()

        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.post_embedding = post_embedding
        self.bidirectional = bidirectional

        self.hidden_lstm_input = hidden_size * 2 if bidirectional else hidden_size


        self.embedding_layer = nn.Embedding(all_ds.n, self.embedding_size)

        self.base_lstm = nn.LSTM(embedding_size, hidden_size, bidirectional=bidirectional, batch_first=True)
        self.hidden_lstm = nn.LSTM(self.hidden_lstm_input, self.hidden_lstm_input, batch_first=True)
        self.output_lstm = nn.LSTM(self.hidden_lstm_input, self.post_embedding, batch_first=True)
        self.classifier = nn.Linear(self.post_embedding, self.vocab_size)

        #self.classifier_activation = torch.nn.Softmax(dim=-1)

        
    def init_hidden(self, size, bidirectional):
        return (torch.zeros(1+ (1*int(bidirectional)), self.batch, size).to(device), torch.zeros(1+ (1*int(bidirectional)), self.batch, size).to(device))

        
    def forward(self, x):
        
        self.batch = x.shape[0]

        h0, c0 = self.init_hidden(self.hidden_size, bidirectional=True)
        h1, c1 = self.init_hidden(self.hidden_lstm_input, bidirectional=False)
        h2, c2 = self.init_hidden(self.post_embedding, bidirectional=False)

        x = self.embedding_layer(x)
        #if len(x.shape) != 2:
        x = torch.squeeze(x, dim=-2)
    

        o1, (h0, c0) = self.base_lstm(x, (h0, c0))
        o2, (h1, c1) = self.hidden_lstm(o1, (h1, c1))
        o3, (h2, c2) = self.output_lstm(o2, (h2, c2))

        x = self.classifier(h2.view(self.batch, self.post_embedding))
        #x = self.classifier_activation(x)


        return x, (h0, h1, h2)



In [None]:
model = MusicEmbeddingBaseline(embedding_size=50, hidden_size=512, post_embedding=128, vocab_size=all_ds.n, bidirectional=True).to(device)

In [None]:
loss_function = torch.nn.CrossEntropyLoss()

In [None]:
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
epochs = 50

### ***Training***

In [None]:
running_loss = []
running_valid_loss = []
for i in range(epochs):
    
    model.train()

    # epoch loss
    epoch_loss = 0
    
    ## Training with loss logging
    pbar = tqdm(trainloader)
    pbar.set_description(f"Epoch - {i + 1} / {epochs}")
    for data, label in pbar:

        data, label = data.to(device), label.to(device)
        
        optimizer.zero_grad()

        pred, hidden = model(data)

        loss = loss_function(pred, label)

        loss.backward()

        optimizer.step()

        epoch_loss += loss.item() * data.size(0)
        pbar.set_postfix({"loss" : loss.item()})
    

    print(f"Average CrossEntropyLoss of Epoch {i + 1} : {epoch_loss / len(trainloader.dataset)}")
    running_loss.append(epoch_loss / len(trainloader.dataset))


    model.eval()

    valid_loss = 0

    for data, label in valloader:

        data, label = data.to(device), label.to(device)

        pred, hidden = model(data)

        loss = loss_function(pred, label)

        valid_loss += loss.item() * data.size(0)

    print(f"Validation of Epoch {i + 1} : {epoch_loss / len(valloader.dataset)}")
    running_valid_loss.append(epoch_loss / len(valloader.dataset))


print("Finish training !")

In [None]:
torch.save(model.state_dict() ,"embedded_model_fulldata_50_epochs.pt")

In [None]:
import matplotlib.pyplot as plt


plt.plot(np.arange(epochs - 2), running_loss)
plt.plot(np.arange(epochs - 2),  running_valid_loss)

### ***Reload pre-trained Model***

In [6]:
loaded = MusicEmbeddingBaseline(embedding_size=50, hidden_size=512, post_embedding=128, vocab_size=1140, bidirectional=True)

In [7]:
loaded.load_state_dict(torch.load("embedded_model_fulldata_50_epochs.pt"))

<All keys matched successfully>

In [8]:
loaded

MusicEmbeddingBaseline(
  (embedding_layer): Embedding(1140, 50)
  (base_lstm): LSTM(50, 512, batch_first=True, bidirectional=True)
  (hidden_lstm): LSTM(1024, 1024, batch_first=True)
  (output_lstm): LSTM(1024, 128, batch_first=True)
  (classifier): Linear(in_features=128, out_features=1140, bias=True)
)

### ***Evaluation***

In [17]:
### utils function that writes .midi files

def create_midi(prediction_output):
    from music21 import note, chord, instrument, stream
    offset = 0
    output_notes = []

        # create note and chord objects based on the values generated by the model
    for pattern in prediction_output:
            # pattern is a chord
        if ('.' in pattern) or pattern.isdigit():
            notes_in_chord = pattern.split('.')
            notes = []
            for current_note in notes_in_chord:
                new_note = note.Note(int(current_note))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
            # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = instrument.Piano()
            output_notes.append(new_note)

            # increase offset each iteration so that notes do not stack
        offset += 0.25

    midi_stream = stream.Stream(output_notes)

    midi_stream.write("midi", fp="test_midi_fulldata_random_02.mid")

In [None]:
testing_model = loaded.eval().to(device)

start = np.random.randint(0, len(all_ds.data)-1)


pitchnames = sorted(set(item for item in all_ds.notes))
int_to_note = dict((number, note) for number, note in enumerate(pitchnames))
pattern = all_ds.data[start]
window_size = all_ds.sequence_length
prediction_output = []

# doing auto regressive generation
for note_indx in tqdm(range(500)):
    prediction_input = pattern.view(1, len(pattern), 1).to(device)[:, note_indx:note_indx+window_size, :]

    prediction, _ = testing_model(prediction_input)
    # get prediction
    index = torch.argmax(prediction, dim=-1)
    
    # get note from prediction
    result = int_to_note[index.item()]
    # add to output
    prediction_output.append(result)
    # add predicted note to the input sequence
    pattern = torch.cat((pattern.cpu(), torch.unsqueeze(index.cpu(), dim=0)), dim=0)

    

create_midi(prediction_output)

In [18]:
import random
testing_model = loaded.eval().to(device)

start = np.random.randint(0, len(all_ds.data)-1)


pitchnames = sorted(set(item for item in all_ds.notes))
int_to_note = dict((number, note) for number, note in enumerate(pitchnames))
pattern = torch.tensor(np.reshape(random.choices(list(range(len(int_to_note))), k=100), (100, 1)))
window_size = all_ds.sequence_length
prediction_output = []

# doing auto regressive generation
for note_indx in tqdm(range(200)):
    prediction_input = pattern.view(1, len(pattern), 1).to(device)[:, note_indx:note_indx+window_size, :]

    prediction, _ = testing_model(prediction_input)
    # get prediction
    index = torch.argmax(prediction, dim=-1)
    
    # get note from prediction
    result = int_to_note[index.item()]
    # add to output
    prediction_output.append(result)
    # add predicted note to the input sequence
    pattern = torch.cat((pattern.cpu(), torch.unsqueeze(index.cpu(), dim=0)), dim=0)

    

create_midi(prediction_output)

100%|██████████| 200/200 [00:03<00:00, 53.81it/s]


In [None]:
prediction_output