In [None]:
#----Import necessary libraries and modules----
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math, copy, time
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import seaborn
import pretty_midi
import time

seaborn.set_context(context="talk")
%matplotlib inline

The following cell parses the both data set files and records some statistics on the training data

In [None]:
#----Dataset exploration and vocabulary determination----

# Use Python set object as a list which only stores unique elements (no duplicates)
mel_vocab = set() # Set to contain all unique melody notes
harm_vocab = set() # Set to contain all unique harmony sequences (3 note groups)
# Create list of file paths for train and validation numpy data files
paths = ["train_data.npy",
         "val_data.npy"]

# Iterate through files
for f in paths:
    # Load each file
    data = np.load(f)
    arr_shape = np.shape(data)
    # Iterate through each chorale in data file
    for i in range(arr_shape[0]):
        # Iterate through each chord in chorale
        for j in range(arr_shape[1]):
            # Iterate through each potential transposition distance (including none as 0)
            for o in [0]:
            #for o in [-2, -1, 0, 1, 2]:
                # All chords after the end of the piece are encoded as all 0
                if data[i][j][0] == 0:
                    # If weve reached the end of the chorale, don't add to vocab
                    break 
                # Add offset to melody note and add it to melody vocabulary
                mel_vocab.add(data[i][j][0] + o)
                # Add same offset to all harmony notes and add that sequence to the harmony vocab
                aug_harm = (data[i][j][1] + o, data[i][j][2] + o, data[i][j][3] + o)
                harm_vocab.add(aug_harm)

print("Unique notes after complete data set augmentation: ", len(mel_vocab))
print("Unique chords after complete data set augmentation: ", len(harm_vocab))            

We must create a **Vocabulary** for the harmony and melody. This is a mapping between unique notes/chords and the values that we input to the transformer. Transformers use an *embedding* layer at the input to transform input data into a dense vector in the "embedding" space, and relations between different input data items are encoded in the embedding space. We need to have integer **tokens** for each unique item in both the melody and harmony vocabularies.\
\
We also need to include three more items in each vocabulary:
- A **Start-of-Sequence** token (SOS), indicating the start of the piece.
- An **End-of-Sequence** token (EOS), indicating the end of the piece.
- A **padding** token, used to make all chorales in the dataset a uniform size to store them together in arrays.


In [None]:
#----Use python dictionaries to map between notes/chords and tokens----

# We choose 1 as our SOS token, 100 as our EOS token, and 0 as our padding token
# Create dict for harmony vocabulary: keys are the tuples of unique MIDI number chords, values are integers
harm_dict_seq_2_idx = {(0,0,0): len(harm_vocab), (1,1,1): len(harm_vocab)+1, (100, 100, 100): len(harm_vocab)+2} 
# Create dict for melody vocabulary: keys are unique MIDI numbers in vocab
mel_dict_note_2_idx = {0: len(mel_vocab), 1: len(mel_vocab)+1, 100: len(mel_vocab)+2}

#loop through harmony set to add all unique chords to dictionary
for i, seq in enumerate(harm_vocab):
    harm_dict_seq_2_idx[seq] = i

#loop through melody set to add all unique notes to dictionary   
for i, note in enumerate(mel_vocab):
    mel_dict_note_2_idx[note] = i

    
# Use dict comprehension to create reverse dictionaries (key <---> value) to reverse the lookup later
harm_dict_idx_2_seq = {v: k for k,v in harm_dict_seq_2_idx.items()}
mel_dict_idx_2_note = {v: k for k,v in mel_dict_note_2_idx.items()}

#----Define functions to perform mapping on arrays----
def get_harm_index_from_vocab(data):
    t = torch.tensor([harm_dict_seq_2_idx[tuple(seq)] for seq in data])
    return t

def get_harm_seq_from_idx(data):
    data = data[0]
    t = []
    for i in range(642):
        t.append(list(harm_dict_idx_2_seq[data[i].item()]))
    return np.asarray(t)
    
def get_mel_index_from_vocab(data):
    t = torch.tensor([mel_dict_note_2_idx[n.item()] for n in data])
    return t

def get_mel_note_from_idx(data):
    data = data[0]
    t = []
    for n in range(642):
        t.append(mel_dict_idx_2_note[data[n].item()])
    return np.asarray(t)

To load the training data from files, we'll need a data set class to pass to our PyTorch Dataloader. This class will implement the particulars of extracting the melody and harmony from the data array, insertion of special tokens and padding, data transformations, and setting the size of an individual training sequence.

In [None]:
# Define a custom class which inherits from torch.utils.data.Dataset
class MusicDataSetMIDI(Dataset):
    def __init__(self, data_array_file, sos, eos, transform=None):
        # read data into array from file
        self.data_arr = np.load(data_array_file)
        # start of sequence token
        self.sos = sos 
        # end of sequence token
        self.eos = eos 
        # any data augmentation functions
        self.transform = transform 
    
    def __len__(self):
        #first dimension of data tensor represents number of data   
        return np.shape(self.data_arr)[0] #first dimension of data tensor represents number of data   
    

    def insert_tokens(self, chorale):
        # Create a row of SOS token
        row_sos = np.full((1, chorale.shape[1]), self.sos, dtype=chorale.dtype)
        # Create a row vector of all EOS token
        row_eos = np.full((1, chorale.shape[1]), self.eos, dtype=chorale.dtype)
        # Insert the row of SOS token at the beginning
        chorale = np.vstack((row_sos, chorale))
        # Find the first row of all zeros
        #print("Chorale: ", chorale)
        #print("Chorale shape: ", np.shape(chorale))
        first_zero_row = np.where(np.all(chorale == 0, axis=1))[0][0]
        # Insert the row vector of all EOS token before the first zero row
        arr = np.insert(chorale, first_zero_row, row_eos, axis=0)
        return arr

    
    def __getitem__(self, idx):
        chorale = self.data_arr[idx,:,:] #select chorale 'idx' with shape (640, 4)
        # If we have passed transformation functions, apply them to the chorale
        if self.transform:
            chorale = self.transform(chorale)
        # Call class method to insert SOS and EOS tokens
        chorale = self.insert_tokens(chorale)
        # Find index in array where first row of zeros occurs
        first_zero_idx = np.where(np.all(chorale==0, axis=1))[0][0]
        # Compute number of sections with notes based on our desired training sequence length
        sections = int(first_zero_idx / train_seq_length) + 1
        # Randomly select a section
        sec_sel = np.random.randint(sections)
        # Slice chorale into its section of length 'train_seq_length' and split array into melody and harmony
        mel = chorale[sec_sel*train_seq_length:train_seq_length*(sec_sel+1), 0]
        harm = chorale[sec_sel*train_seq_length:train_seq_length*(sec_sel+1), 1:]
        # Pass melody and harmony sequences through mapping to change values into vocabulary indices
        m = get_mel_index_from_vocab(mel)
        h = get_harm_index_from_vocab(harm)
        
        return m, h

In [None]:
#----Set up data loaders----
# Absolute file paths for data files



# Define special tokens to pass to data set object



# Create data set objects with data file paths



# Define training/validation sequence length


# Define batch size to pass to data loaders


# Instantiate data loader objects



Now we'll define the Transformer model in the following two cells as follows:
- PositionalEncoding is a class which defines the operation of encoding the input and target sequences in a manner
  which encodes its position in the sequence. This serves to give the transformer additional context as to the temporal
  relationship between elements in the sequences.
- Transformer is a class which defines the operations of the forward pass of the Transformer model. This includes token       embedding, positional encoding, and the Transformer model itself. This class also contains a function to generate target   masks, which we give to the transformer so that it may not attend to time steps in the future during inference.

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_length):
        super().__init__()
        """
        dim_model : dimension of the embedded representations used as inputs to the multi-head attention blocks
        dropout_p : probability of dropout in all dropout layers
        max_len   : how far the position can have an effect on a token (like a window of influence)
        """
        self.dropout = nn.Dropout(dropout_p)
        #Encoding from formula
        pos_encoding = torch.zeros(max_length, 1, dim_model)
        positions_list = torch.arange(max_length).unsqueeze(1)
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model)

        #pos_encoding(pos, 2i) = sin(pos/1000^(2i/dim_model))
        pos_encoding[:, 0, 0::2] = torch.sin(positions_list * division_term)
        #pos_encoding(pos, 2i+1) = cos(pos/1000^(2i/dim_model))
        pos_encoding[:, 0, 1::2] = torch.cos(positions_list * division_term)

        #save as buffer (the same as a parameter but without gradients)
        self.register_buffer('pos_encoding', pos_encoding)
            
    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        #residual connection + positional encoding
        token_plus_pos_embedding = token_embedding + self.pos_encoding[:token_embedding.size(0), :]
        #send through dropout layer and return
        return self.dropout(token_plus_pos_embedding)

In [None]:
class Transformer(nn.Module):
    #Constructor
    def __init__(
        self,
        dim_model,
        num_heads,
        num_encoder_layers,
        num_decoder_layers,
        melody_vocab_size,
        harmony_vocab_size,
        dropout_p,
        pos_enc_max_len
    ):
        super().__init__()
        
        self.dim_model = dim_model
        
        # LAYERS
        
        #Give transformer model an object of our positional encoding class
        self.positional_encoder = PositionalEncoding(
            dim_model=dim_model, dropout_p=dropout_p, max_length=pos_enc_max_len
        )

        #Create embedding layer to turn sequence vectors into dense, continuous vector space
        self.source_embedding = nn.Embedding(melody_vocab_size, dim_model)
        self.target_embedding = nn.Embedding(harmony_vocab_size, dim_model)
        
        #Create transformer layer
        self.transformer = nn.Transformer(
            d_model=dim_model, 
            nhead=num_heads, 
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dropout=dropout_p
        )
        self.out = nn.Linear(dim_model, harmony_vocab_size)
        
        self.init_weights()
        
    def init_weights(self):
        initrange = 0.1
        self.source_embedding.weight.data.uniform_(-initrange, initrange)
        self.target_embedding.weight.data.uniform_(-initrange, initrange)
        self.out.bias.data.zero_()
        self.out.weight.data.uniform_(-initrange, initrange)
        
    def forward(self, src, tgt, tgt_mask=None, src_pad_mask=None, tgt_pad_mask=None):
        #src size must be (batch_size, src sequence length)
        #tgt size must be (batch_size, tgt sequence length)
        
        #Embedding + positional encoding: output size = (batch size, sequence length, dim_model)
        src = self.source_embedding(src) * math.sqrt(self.dim_model)
        tgt = self.target_embedding(tgt) * math.sqrt(self.dim_model)
        src = self.positional_encoder(src)
        tgt = self.positional_encoder(tgt)
        
        #Permute embedded/encoded sequences to obtain shape (seqence length, batch size, dim_model)
        src = src.permute(1, 0, 2)
        tgt = tgt.permute(1, 0, 2)
        
        #Pass embedded/encoded sequences to transformer
        transformer_out = self.transformer(
            src, tgt, tgt_mask=tgt_mask, src_key_padding_mask=src_pad_mask, tgt_key_padding_mask=tgt_pad_mask
        )
        out = self.out(transformer_out)
        
        return out
    
    def get_tgt_mask(self, size) -> torch.tensor:
        #Generates a square matrix where each row allows one more 'word' of sequence to be seen
        mask = torch.tril(torch.ones(size, size) == 1)
        mask = mask.float()
        mask = mask.masked_fill(mask==0, float('-inf'))
        mask = mask.masked_fill(mask==1, float(0.0))
        
        # EX for size=5:
        # [[0., -inf, -inf, -inf, -inf],
        #  [0.,   0., -inf, -inf, -inf],
        #  [0.,   0.,   0., -inf, -inf],
        #  [0.,   0.,   0.,   0., -inf],
        #  [0.,   0.,   0.,   0.,   0.]]
        
        return mask
    
    def create_pad_mask(self, matrix: torch.tensor, pad_token: int) -> torch.tensor:
        # If matrix = [1,2,3,0,0,0] where pad_token=0, the result mask is
        # [False, False, False, True, True, True]
        return (matrix == pad_token)

Now we will instantiate the model and create a training loop

In [None]:
# Select device for training and testing the model


#--------Arguments for transformer model-----------

# Size of the melody and harmony vocabularies to pass to the embedding layers in the model


# Size of latent embedded vectors passed to the model. Hyperparameter open to tuning

# Number of attention heads in multi headed attention modules. Hyperparameter open to tuning

# Number of stacked encoder/decoder layers. Hyperparameter open to tuning.


# Probability of dropout layers

# Maximum sequence length. Make sure this is higher than the selected sequence length for the data loader

# Instantiate Transformer model
model = Transformer()

                    






# Adam optimizer

# Since this is fundamentally a multiclass classification along the harmony vocabulary, use cross entropy loss function,
# which is ideally suited for multiclass classification


In [None]:
#---- Training loop ----
def training_loop(model, opt, loss_fn, dataloader):
    
    # Set model to train mode to enable dropout layers
    model.train()
    # Initialize value to track training statistics
    total_loss = 0

    # Iterate through the entire training dataloader
    for input_melody, target_harmony in dataloader:

        # Send data to devices
        input_melody = torch.tensor(input_melody).to(device)
        target_harmony = torch.tensor(target_harmony).to(device)
        
        # Shift target sequences so that when model sees SOS in input sequence, it predicts the token at pos. 1 
        target_input = target_harmony[:, :-1]
        target_expected = target_harmony[:, 1:]

        # Generate target masks
        sequence_length = target_input.size(1)
        tgt_mask = model.get_tgt_mask(sequence_length).to(device)
        
        # Forward pass of melody, target harmony, and mask through model
        pred = model(input_melody, target_input, tgt_mask)
        
        # Rearrange output to have batch size first to pass to loss function
        pred = pred.permute(1, 2, 0)

        # Pass through loss function and perform backpropogation
        loss = loss_fn(pred, target_expected)
        opt.zero_grad()
        loss.backward()
        opt.step()
        
        # Add to running epoch loss
        total_loss += loss.detach().item()
        
    return total_loss / len(dataloader)
        
#----Validation loop----
def validation_loop(model, loss_fn, dataloader):
    
    # Set model to eval mode to disable dropout layers
    model.eval()
    # Initialize value to track training statistics
    total_loss = 0
    
    # Use the no_grad context manager to disable collection of gradients during evaluation,
    # reducing memory useage and speeding up validation loop
    with torch.no_grad():
        # Iterate through the entire validation dataloader
        for input_melody, target_harmony in dataloader:
            # Send data to devices
            input_melody = input_melody.to(device)
            target_harmony = target_harmony.to(device)
        
            # Shift target sequences so that when model sees SOS in input sequence, it predicts the token at pos. 1 
            target_input = target_harmony[:, :-1]
            target_expected = target_harmony[:, 1:]

            # Generate target masks
            sequence_length = target_input.size(1)
            tgt_mask = model.get_tgt_mask(sequence_length).to(device)
        
            # Forward pass of melody, target harmony, and mask through model
            pred = model(input_melody, target_input, tgt_mask)
        
            # Rearrange output to have batch size first to pass to loss function
            pred = pred.permute(1, 2, 0)

            loss = loss_fn(pred, target_expected)
            
            total_loss += loss.detach().item()
    return total_loss / len(dataloader)
    
# Number of training epochs

# Collect list of epoch losses for plotting
train_loss_list, val_loss_list = [], []
start = time.time()
for n in range(num_epochs):
    print('-' * 30)
    print("Epoch | ", n+1)
    
    # Train for one epoch

    # Add training loss to list

    
    # Run inference on validation data

    # Add validation loss to list

    
    #Print statistics
    print(f'Training loss: {train_loss:.4f}  |  Validation loss: {val_loss:.4f}')
    print('-' * 30)
    
stop = time.time()
train_time = stop-start
print(f'Training time: {train_time:.4f} seconds')

In [None]:
#----Plotting training loss----

# List for x-axis
epochs = [i+1 for i in range(num_epochs)]

plt.plot(epochs, train_loss_list, color='blue', label="Training loss")
plt.plot(epochs, val_loss_list, color='red', label="Validation loss")

#Adding axis labels and title
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss per epoch')
#Add legend
plt.legend()

plt.show()

Now with the model, we need to define a function which takes a melody sequence and performs an inference (forward pass) on the trained model.

In [None]:
def run_inference(model, melody_seq):
    
    # set model to evaluation mode to turn off Dropout/BatchNorm/other functionalities which are used for training
    # but should not be used on inference of a trained model
    model.eval()
    
    # Find length of melody sequence: melody tensor is of shape (1, L), so the length L of 
    # the sequence is the second element in the return value from tensor.size()
    seq_length = melody_seq.size()[1]
    print("Length of melody sequence: ", seq_length)
    
    # initialize empty list to store the predicted harmony at each time step
    harmony = []
    
    # initialize a "previous output" tensor to pass to the Transformer decoder. Eventually this will be
    # appended with the predictions to give the decoder more of the sequence for context, but here we will 
    # use the SOS token
    harm_input = torch.tensor([[1]], dtype=torch.long, device=device)
    
    # loop as many times as there are elements in the input melody
    for _ in range(seq_length):
        
        # Get target mask from Transformer model class method to stop decoder from attending to future inputs while
        # predicting the current input
        tgt_mask = model.get_tgt_mask(harm_input.size(1)).to(device)
        
        # The model will output a tensor with its last dimension of shape V, where V is the number of elements in the 
        # harmony vocab. The values for these V elements can be seen as probabilities that each element is the "proper"
        # selection.
        # Forward pass of model, requires input sequence, harmony input to decoder, and target mask as arguments
        prediction = model(melody_seq, harm_input, tgt_mask)
        
        # Use PyTorch 'topk' function to select the 5 largest values in the predictions
        top_5 = torch.topk(prediction, 5)
        
        # Topk returns a tuple of tensors, the first tensor has the highest 5 values, and the second tensor
        # has the indices where those top 5 values were found.
        top_5_values = top_5[0][0]
        top_5_indices = top_5[1][0][0]
        
        # Now, sample a single value from the probability distribution of the top 5 values using a multinomial
        # distribution. This will select one value in the top 5 with likelihood proportional to its value.
        next_item = torch.multinomial(top_5_values, 1)
        
        # torch.multinomial returns the indices of the selected values in the tensor, so use that index to get the 
        # "vocab index".
        next_index = top_5_indices[next_item[-1]]
        
        # Append the "vocab index" to our harmony list
        harmony.append(next_index.item())
        
        # Recast vocab index as a torch tensor of the correct dimension and add it to the 'previous outputs'
        # tensor to feed to the decoder in the next loop

        next_index = torch.tensor([[next_index]], device=device)
        harm_input = torch.cat((harm_input, next_index), dim=1)
        
    #After loop is done, return the harmony list
    return harmony
    

In [None]:
from melody_harm_utilities import melody_sequence_to_index, reassemble_sequences
import os

# Test melody - Twinkle Twinkle Little Star
melody = [67, 67, 67, 67, 74, 74, 74, 74, 76, 76, 72, 72, 74, 74, 71, 71, 
          72, 72, 69, 69, 71, 71, 67, 67, 69, 69, 66, 66, 67, 67, 66, 66]


nb_dir = os.getcwd()
# Change path and uncomment to load melody array from song requests
#melody = np.load(os.path.join(nb_dir, 'samples\\superstition.npy))

#----Use helper functions to run inference on melody sequence----
# convert from MIDI to vocab index
melody = melody_sequence_to_index(melody, mel_dict_note_2_idx)
# send melody tensor to compute device
melody = melody.to(device) 
# Call function to run inference on melody
harmony = run_inference(model, melody) 
# Reassemble melody and predicted harmony into 4 part harmony sequence
chorale = reassemble_sequences(melody, harmony, mel_dict_idx_2_note, harm_dict_idx_2_seq)
# Print melody and harmony sequences (in indices rather than MIDI numbers) and reassembled piece
print("Melody: ", melody)
print("Harmony: ", harmony)
print("Reassembled piece: ", chorale)

In [None]:
from melody_harm_utilities import piano_roll_from_sequence, piano_roll_to_midi

# Use helper functions to convert symbolic music into MIDI format
chorale_piano_roll = piano_roll_from_sequence(chorale, 10)
chorale_midi = piano_roll_to_midi(chorale_piano_roll, 2)

# Synthesize generated music into audio
import IPython.display
