In [1]:
import pandas as pd
import numpy as np
import torch
from torch import nn
from tqdm import tqdm
from torch.utils.data import TensorDataset
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from music21 import note, chord, instrument, stream

In [2]:
data = pd.read_csv('data.txt', header = None, sep = ";")
data = data[data[1]=='Johann Sebastian Bach']
data = data.reset_index(drop=True)

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
data[2] = data[2].str.strip('[]')
data[2] = data[2].str.split(',')
data[2] = data[2].apply(np.array)
data[3] = data[2].apply(set)
total_set = set.union(*data[3])
pitchnames = sorted(set(item for item in total_set))
note_to_int = dict((note, number) for number, note in enumerate(pitchnames))
int_to_note = dict((number, note) for number, note in enumerate(pitchnames))
seq_len = 100

In [5]:
all_notes = np.concatenate(data[2])
note_counts = np.unique(all_notes, return_counts = True)[1] # Sorted by default
weights = 1 / note_counts
weights = weights / np.sum(weights)
weights = -1/np.log(weights)
weights = torch.Tensor(weights).to(device)

In [6]:
def get_inp_out(data):
    network_input_np = []
    network_output_np = []

    for j in tqdm(range(data.shape[0])):
        for i in range(0, len(data[2][j]) - seq_len, 1):
            sequence_in = data[2][j][i:i + seq_len]
            sequence_out = data[2][j][i + seq_len]
            network_input_np.append([note_to_int[char] for char in sequence_in])
            network_output_np.append(note_to_int[sequence_out])
    return network_input_np, network_output_np

In [7]:
data_in, data_out = get_inp_out(data)

100%|██████████████████████████████████████████████████████████████████████████████████| 89/89 [00:21<00:00,  2.98it/s]


In [8]:
batch_size = 128
data_in = torch.Tensor(data_in)
data_out = torch.Tensor(data_out)
dataset = TensorDataset(data_in, data_out)
train_dataset, val_dataset = random_split(dataset, [int(np.ceil(len(data_in)*0.99)), int(np.floor(len(data_in)*0.01))])
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_data_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

In [9]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(LSTMModel, self).__init__()
        # Hidden dimensions
        self.hidden_dim = hidden_dim

        # Number of hidden layers
        self.layer_dim = layer_dim

        # Building your LSTM
        # batch_first=True causes input/output tensors to be of shape
        # (batch_dim, seq_dim, feature_dim)
        self.lstm = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True, dropout = 0.2)
        
        #####self.lstm2 = nn.LSTM(hidden_dim, hidden_dim2, layer_dim2, batch_first=True)
        # Readout layer
        self.fc0 = nn.Linear(hidden_dim, 256)
        self.drop = nn.Dropout(p = 0.3)
        self.fc1 = nn.Linear(256, output_dim)

    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(device)

        # Initialize cell state
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(device)

        # 28 time steps
        # We need to detach as we are doing truncated backpropagation through time (BPTT)
        # If we don't, we'll backprop all the way to the start even after going through another batch
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))

        # Index hidden state of last time step
        # out.size() --> 100, 28, 100
        # out[:, -1, :] --> 100, 100 --> just want last time step hidden states! 
        out = self.fc0(out[:, -1, :])
        out = self.drop(out)
        out = self.fc1(out)
        # out.size() --> 100, 10
        return out

In [11]:
model = LSTMModel(1, 256, 3, len(pitchnames)).to(device)
model.load_state_dict(torch.load('model_1', map_location=torch.device('cpu')))

<All keys matched successfully>

In [11]:
learning_rate = 0.001

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  
criterion = nn.CrossEntropyLoss(weight = weights)
num_epochs = 100

19.33

In [12]:
count = 0
for epoch in range(num_epochs):
    for i, (batch_x, batch_y) in enumerate(train_data_loader):

        # Put data in the correct device
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device).long()
        # Clear gradients w.r.t. parameters
        optimizer.zero_grad()

        # Forward pass to get output/logits
        # outputs.size() --> 100, 10
        outputs = model(batch_x.view(batch_size, seq_len, 1))

        # Calculate Loss: softmax --> cross entropy loss
        loss = criterion(outputs, batch_y)

        # Getting gradients w.r.t. parameters
        loss.backward()

        # Updating parameters
        optimizer.step()
        count += 1

        #if count % 5000 == 0:
    correct = 0
    total = 0
    v_loss = 0

    for val_x, val_y in val_data_loader:

        # Put data in the correct device
        val_x = val_x.to(device)
        val_y = val_y.to(device).long()
        # Forward pass only to get logits/output
        with torch.no_grad():
            output = model(val_x.view(batch_size, seq_len, 1))

        # Get predictions from the maximum value
        _, predicted = torch.max(output, 1)
        val_bloss = criterion(output, val_y)
        v_loss += val_bloss*batch_size

        # Total correct predictions
        total += batch_size
        correct += (predicted == val_y).sum()
    accuracy = 100 * correct / total
    v_loss = v_loss/total

    # Print Loss
    print('Epoch: {}. Loss: {}. ValLoss: {}. Accuracy: {} %'.format(epoch, loss.item(), v_loss, accuracy))

Epoch: 0. Loss: 4.424532413482666. ValLoss: 4.216208457946777. Accuracy: 3 %
Epoch: 1. Loss: 4.255472660064697. ValLoss: 4.1815643310546875. Accuracy: 3 %
Epoch: 2. Loss: 4.010770320892334. ValLoss: 4.147602558135986. Accuracy: 4 %
Epoch: 3. Loss: 4.128831386566162. ValLoss: 4.150794982910156. Accuracy: 4 %
Epoch: 4. Loss: 3.879819631576538. ValLoss: 4.07977819442749. Accuracy: 5 %
Epoch: 5. Loss: 3.9912993907928467. ValLoss: 4.053655624389648. Accuracy: 5 %
Epoch: 6. Loss: 3.784130334854126. ValLoss: 3.986482620239258. Accuracy: 7 %
Epoch: 7. Loss: 3.7824018001556396. ValLoss: 3.972188949584961. Accuracy: 6 %
Epoch: 8. Loss: 3.980056047439575. ValLoss: 3.949808120727539. Accuracy: 7 %
Epoch: 9. Loss: 3.8413937091827393. ValLoss: 3.9255802631378174. Accuracy: 6 %
Epoch: 10. Loss: 3.570657253265381. ValLoss: 3.9147305488586426. Accuracy: 7 %
Epoch: 11. Loss: 3.6698853969573975. ValLoss: 3.8973217010498047. Accuracy: 8 %
Epoch: 12. Loss: 3.650193214416504. ValLoss: 3.8238728046417236. Ac

In [16]:
torch.save(model.state_dict(), 'model_1')

In [None]:
model

In [13]:
pred_len = 200
seq = [note_to_int[note] for note in data[2][0][:100]]
seq = torch.Tensor(seq).view(1,100,1).to(device)
prediction = []
for i in range(pred_len):
    with torch.no_grad():
        new_note = model(seq)
    _, new_note = torch.max(new_note, 1)
    seq = torch.cat((seq, new_note.view(1,1,1).float()), 1)[:, 1:,:]
    new_note = new_note.cpu().numpy()
    prediction.append(new_note)

In [14]:
np.savetxt('train.txt', prediction)

In [15]:
pred_len = 200
seq1 = val_dataset[2][0]
seq = seq1.view(1,100,1).to(device)
prediction = []
for i in range(pred_len):
    with torch.no_grad():
        new_note = model(seq)
    _, new_note = torch.max(new_note, 1)
    seq = torch.cat((seq, new_note.view(1,1,1).float()), 1)[:,1:,:]
    new_note = new_note.cpu().numpy()
    prediction.append(new_note)

In [16]:
np.savetxt('val.txt', prediction)

In [17]:
predicted_train = np.loadtxt('train.txt')
predicted_val = np.loadtxt('val.txt')
predicted_train = [int_to_note[note] for note in predicted_train]
predicted_val = [int_to_note[note] for note in predicted_val]

In [20]:
offset = 0
output_notes = []
# create note and chord objects based on the values generated by the model
for pattern in predicted_val:
    # pattern is a chord
    if ('.' in pattern) or pattern.isdigit():
        notes_in_chord = pattern.split('.')
        notes = []
        for current_note in notes_in_chord:
            new_note = note.Note(int(current_note))
            new_note.storedInstrument = instrument.Piano()
            notes.append(new_note)
        new_chord = chord.Chord(notes)
        new_chord.offset = offset
        output_notes.append(new_chord)
    # pattern is a note
    else:
        new_note = note.Note(pattern)
        new_note.offset = offset
        new_note.storedInstrument = instrument.Piano()
        output_notes.append(new_note)
    # increase offset each iteration so that notes do not stack
    offset += 0.5

In [21]:
midi_stream = stream.Stream(output_notes)
midi_stream.write('midi', fp = "val_3.mid")

'val_3.mid'