Deep artificial neural network for expressive timing and dynamics predictions in musical pieces
---------------

This notebook loads a sequential dataset with score and performance information and uses it to train and test a deep artificial neural network for generating onset timing deviation and peak loudness level of notes from musical pieces.


In [None]:
### Parameters to set:

runLocal = True  # False for using Google Colab


#### Load and preprocess training data

In [None]:
import os
import numpy as np
import pandas as pd
import pickle

#  read dataset

if runLocal:
    pathRoot = 'data/'
else:
    pathRoot = '/content/drive/My Drive/colab_data/'

with open(os.path.join(pathRoot, 'LvB_train_sequences.data'), 'rb') as seq_path:
    train = pickle.load(seq_path)
with open(os.path.join(pathRoot, 'LvB_val_sequences.data'), 'rb') as seq_path:
    val = pickle.load(seq_path)
with open(pathRoot + 'LvB_pitch_dict.data', 'rb') as filehandle:
    lex_to_ix = pickle.load(filehandle)
    ix_to_lex = {v: k for k, v in lex_to_ix.items()}
with open(pathRoot + 'LvB_normalizer.data', 'rb') as filehandle:
    moments, cols = pickle.load(filehandle)
    moments = dict(zip(cols, list(moments)))

#### Define the neural network

In [None]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pytorch_lightning as pl

import dataloader as dl

np.random.seed(1728)
torch.manual_seed(1728)

class Encoder(nn.Module):
    def __init__(self, n_x, vocab_col, vocab_size, hidden_size):
        super(Encoder, self).__init__()
        
        self.vocab_col = vocab_col
        self.pitchEmbedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=hidden_size, padding_idx=0)
        self.harmonyRhythmProjector = nn.Linear(in_features=n_x - 1, out_features=hidden_size)
        self.rnn = nn.GRU(2 * hidden_size, 2 * hidden_size, num_layers=1, bidirectional=True)
    
    def forward(self, x, lengths):
        pitch = torch.LongTensor(x[:, :, self.vocab_col])
        harmRhythm = torch.cat([torch.FloatTensor(x[:,:,:self.vocab_col]), torch.FloatTensor(x[:,:,self.vocab_col+1:])], dim=2)
        
        pitch = self.pitchEmbedding(pitch)
        harmRhythm = self.harmonyRhythmProjector(harmRhythm)
        src_vec = torch.cat([pitch, harmRhythm], dim=2)
        sequence = nn.utils.rnn.pack_padded_sequence(src_vec, lengths, enforce_sorted=False)
        output, _ = self.rnn(sequence)
        return nn.utils.rnn.pad_packed_sequence(output)


class Attention(nn.Module):
    def __init__(self, ):
        
    def forward(self, ):
    

class Decoder(nn.Module):
    def __init__(self, ):
        
    def forward(self, ):
        

class Net(pl.LightningModule):

    def __init__(self, seq_length, n_x, n_y, vocab_col, vocab_size, batch_size, 
                 hidden_size, dropout_rate, output_cols, lr):
        super(Net, self).__init__()
        
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.output_cols = output_cols
        self.lr = lr
        
        self.encoder = Encoder(n_x, vocab_col, vocab_size, hidden_size)
        self.att = Attention()
        self.decoder = Decoder()
        
        self.ff1 = nn.Linear(2*hidden_size, 4*hidden_size)
        self.drop1 = nn.Dropout(dropout_rate)
        self.ff2 = nn.Linear(4*hidden_size, n_y, bias=False)
        
    def forward(self, x, y):
        
        
        
        
        out_vec = self.ff2(self.drop1(F.relu(self.ff1(out_vec))))
        return out_vec
        
    def training_step(self, batch, batch_idx):
        x, y, lengths = batch
        y_hat = self.forward(x, y)
        print(y_hat[1,1,:])
        return {'loss': F.mse_loss(y_hat, torch.FloatTensor(y))}

    def validation_step(self, batch, batch_idx):
        x, y, lengths = batch
        y_hat = self.forward(x, y)
        return {'loss': F.mse_loss(y_hat, torch.FloatTensor(y))}

    def train_dataloader(self):
        return dl.DataGenerator(train, self.seq_length, batch_size=self.batch_size, output_cols=self.output_cols)

    def val_dataloader(self):
        return dl.DataGenerator(val, self.seq_length, batch_size=self.batch_size, output_cols=self.output_cols)

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.lr)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.25)
        return [optimizer], [scheduler]

    def generate_mask(self, sz, for_input=False):
        diag = 0 if for_input else 1 
        mask = (torch.triu(torch.ones(sz, sz), diagonal=diag) == 1).transpose(0, 1)
#         mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask


In [None]:
model = Net(200, train[0][0].shape[1], len(output_cols),
            vocab_col=train[0][0].columns.get_loc("pitch"),
            vocab_size=len(ix_to_lex) + 3, 
            batch_size=64, 
            hidden_size=32,
            dropout_rate=0.1,
            output_cols=['ioiRatio', 'peakLevel'],
            lr=3e-3)

#### Train the model

In [None]:
trainer = pl.Trainer(max_epochs=1)
trainer.fit(model)

#  Save model
torch.save(model.state_dict(), pathRoot + '2021-01-11-test0.pth')

#### Results

In [None]:
# Load model
# model.load_state_dict(torch.load(pathRoot + '2021-01-11-test0.pth'))

model.eval()

#  Compute note-level error

# use validation data
# test = val

# use test data
with open(os.path.join(pathRoot, 'LvB_test_sequences.data'), 'rb') as seq_path:
    test = pickle.load(seq_path)

### CORRECT BELOW
    
def evaluation(sequences, sequence_length, model, pad_value=0.):
    Yhat = []
    for S in sequences:
        X = S[0]
        tx = X.shape[0]
        n_x = int(tx / sequence_length)
        n_x += 0 if tx % sequence_length == 0 else 1
        x = np.full((n_x, sequence_length, X.shape[1]), pad_value)
        for i in range(n_x - 1):            
            x[i,:,:] = X.iloc[(i * sequence_length):(i + 1) * sequence_length,:].to_numpy()
        x[n_x - 1,:tx - (n_x - 1) * sequence_length,:] = X.iloc[(n_x - 1) * sequence_length:,:].to_numpy()
        y = model(x)
        print(y.shape)
        Yhat.append(y.reshape((-1,y.shape[2])))
    return Yhat

def sliding_evaluation(sequences, sequence_length, model, pad_value=0., pad_start=True):
    Yhat = []
    for S in sequences:
        X = S[0]
        tx = X.shape[0]
        n_x = tx if pad_start else tx - sequence_length + 1
        x = np.full((n_x, sequence_length, X.shape[1]), pad_value)
        idx = 0
        if pad_start:
            for i in range(0, sequence_length):
                x[i,sequence_length-i-1:,:] = X.iloc[0:i+1,:].to_numpy()
            idx = sequence_length
        else:
            x[0,:,:] = X.iloc[0:sequence_length,:].to_numpy()
            idx = 1
        for i in range(1, tx - sequence_length):
            x[idx,:,:] = X.iloc[i:i+sequence_length,:].to_numpy()
            idx += 1
        y = model.predict(x)
        if y.ndim < 3:  # single timestep prediction
            Yhat.append(y)
        elif pad_start:
            Yhat.append(y[:,-1,:])
        else:
            Yhat.append(np.concatenate((y[0,:,:], y[1:, -1, :])))
    return Yhat

Yhat = evaluation(test_sequences, seq_length, model)
mse = np.zeros((len(test_sequences), Yhat[0].shape[1]))
ms = np.zeros((len(test_sequences), Yhat[0].shape[1]))
for i, (_, Y, _, _, _) in enumerate(test_sequences):
    Y = Y.loc[:,output_cols]
    mse[i,:] = np.mean((Yhat[i][:Y.shape[0],:] - Y) ** 2)
    ms[i,:] = np.mean(Y ** 2)
    
print('Validation set MSE for y_0: ' + str(np.mean(mse[:,0])) + '     mean square val: ' + str(np.mean(ms[:,0])))
print('Minimum y_0 MSE among pieces: ' + str(mse[:,0].min()))

In [None]:
import matplotlib.pyplot as plt

plt.plot(mse[:,0])
plt.plot(ms[:,0])

In [None]:
import matplotlib.pyplot as plt

piece = 0
attr = ['peakLevel']
plt.figure(figsize=(21, 5))
plt.plot(Yhat[piece][:,0])
plt.plot(test_sequences[piece][1].loc[:,attr].to_numpy())
# print(test_sequences[piece][1].columns[attr])
plt.show()

#### Listen to a synthesized predicted expression

In [None]:
import pretty_midi
import IPython.display

test_sequences = val

# piece to synthesize:
pieceNum = 27
pieceId = test_sequences[pieceNum][2]
print(pieceId)

deviations_pred = Yhat[pieceNum][:,0] * test_sequences[pieceNum][4][2,1] + test_sequences[pieceNum][4][2,0]
deviations_perf = test_sequences[pieceNum][1].ioiRatio * test_sequences[pieceNum][4][2,1] + test_sequences[pieceNum][4][2,0]
tempo = test_sequences[pieceNum][1].localTempo.iloc[0] * test_sequences[pieceNum][4][0,1] + test_sequences[pieceNum][4][0,0]
no_dev = [test_sequences[pieceNum][4][2,0]] * test_sequences[pieceNum][1].shape[0]
dev_rand = np.random.normal(size=test_sequences[pieceNum][1].shape[0]) * test_sequences[pieceNum][4][2,1] + test_sequences[pieceNum][4][2,0]
pm = pretty_midi.PrettyMIDI(initial_tempo=60 * tempo)
inst = pretty_midi.Instrument(program=test_sequences[pieceNum][3], is_drum=False, name='melody_inst')
pm.instruments.append(inst)
start = 0.
lastNote = None
for x, y, dev in zip(test_sequences[pieceNum][0].itertuples(), test_sequences[pieceNum][1].itertuples(), deviations_perf):
    (pitch, _) = ix_to_lex[x.melody]
    if lastNote:
        if start < lastNote.end:
            lastNote.end = start
    end = start + (x.duration * moments['duration'][1] + moments['duration'][0]) * dev
    lastNote = pretty_midi.Note(100, pitch, start, end)
    inst.notes.append(lastNote)
    start += (x.ioi * moments['ioi'][1] + moments['ioi'][0]) * dev
IPython.display.Audio(pm.fluidsynth(fs=44100), rate=44100)

### Building conductive input from generated performance

This step uses the predicted timing information to build a local tempo signal which can be used as input in a virtual conductor. That signal is compared to the local tempo vector obtained from the chosen reference performance from the dataset.