In [1]:
import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset
import math

In [20]:
# nn.Module is the base class for every model
# this class keeps track of other sub modules, i.e conv layers
# or transformer layers
# as well as the loss and forward pass layers
class TransformerModel(nn.Module):
    
    def __init__(self, ntoken: int, d_model: int, nhead: int, 
                 d_hid: int, nlayers: int, droupout: float = 0.5):
        # ntokens - number of unique tokens (including special tokens such as padding or eos
        #
        # d_model - input vector dimension, i.e. the dimension the vocabulary will be mapped to
        # this is what is learned when training, either through masking or seq-to-seq or some other task
        #
        # nhead - number of attention heads, this takes an input in three parameters
        # Query, Key and Value, each token has 3 vectors in each of these vector spaces.
        # These are computed by passing the intial seq embedding through 3 seperate linear layers, on for Q, K and V respectively
        # where each linear layer is of size Emb x Emb
        # the self attention mechanism then calculates scores by taking the dot product of the Query, and Key vectors
        # This results in a vector of size |T| (# of tokens in a given input)
        # A softmax function is applied, this determines the weight of each tokens value vector
        # The weighted sum of the value vectors forms the output for each input token
        # In multi-headed attention, these matricies are split across each attention head
        # Then an attention score is computed for each head
        # More details can be found: https://towardsdatascience.com/transformers-explained-visually-part-3-multi-head-attention-deep-dive-1c1ff1024853
        #
        # d_hid - dimension of hidden layers, i.e. the feed forward net after the attention step
        # n_layers - # of encoding layers
        # dropout - rate of dropout applied to the output of each sublayer
        
        super().__init__() # needed for multi-class inheritance
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, droupout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.linear = nn.Linear(d_model, ntoken)
        # This is used to transform the output of the model into a vector of size n_token, which is
        # the probability of the next token in the sequence
        
        self.init_weights()
        
        def init_weights(self) -> None:
            initrange = 0.1
            # Intialize embedding with uniform weights form -0.1 to 0.1
            # .data allows access to the underlying tensor of the nn.Embedding class
            # .uniform_ will modify the weights in place
            self.embedding.weight.data.uniform_(-initrange, initrange)
            # set linear bias to zero in-place
            self.linear.bias.zero_()
            # intialize linear weights in-place
            self.linear.weight.data.uniform_(-initrange, initrange)
            
        # This function defines the forward pass of the model,
        # that is, what steps it will take when data is input
        def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
            """ Arguments:
                src: Tensor, shape ``[seq_len, batch_size]``
                src_mask: Tensor, shape ``[seq_len, seq_len]`` - Used to mask so future tokens do not have influence in prediction

            Returns:
                output Tensor of shape ``[seq_len, batch_size, ntoken]``
            """
            
            # This is a heuristic defined in th original "Attention is all you need"
            # The embeddings typically have a variance of 1/d
            # This step will increase the variance to ~1
            # This will reduce the probability of a vanishing gradient
            src = self.embedding(src) * math.sqrt(self.d_model)
            # positional encoding
            src = self.pos_encoder(src)
            
            # Now pass through our nlayer encoder
            output = self.transformer_encoder(src, src_mask)
            output = self.linear(output)
            return output

In [2]:
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple

import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.linear = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        """
        Arguments:
            src: Tensor, shape ``[seq_len, batch_size]``
            src_mask: Tensor, shape ``[seq_len, seq_len]``

        Returns:
            output Tensor of shape ``[seq_len, batch_size, ntoken]``
        """
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        if src_mask is None:
            """Generate a square causal mask for the sequence. The masked positions are filled with float('-inf').
            Unmasked positions are filled with float(0.0).
            """
            src_mask = nn.Transformer.generate_square_subsequent_mask(len(src)).to(device)
        output = self.transformer_encoder(src, src_mask)
        output = self.linear(output)
        return output

In [3]:
# Since we have inheritance, we can change the super class and inherit the changes down the line
# This is what I will need to edit to get the same encoding that is used in 
# "musicautobot"

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        # intialize dropout layer
        self.dropout = nn.Dropout(p = dropout)
        
        # torch.arange(max_len) - produce 1D tensor that contains 0 to max_len -1
        # unsqueeze(1) will expand this to a 2D tensor of size [max_len, 1]
        # so the resulting tensor will look like: [[0], [1], ..., [max_len -1]]
        position = torch.arange(max_len).unsqueeze(1)
        
        
        # torch.arange(0, d_model, 2) - 1D tensor from 0 to d_model by steps of 2
        # math.log(10000.0) is a constant chosen for the original paper in order to create a series that decrease exponentially when multiplie by positoinal values
        # torch.exp() - apply exp to each value of the tensor
        # result = exp(-(2i/d_model))
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        
        # Intialize positional encoding
        pe = torch.zeros(max_len, 1, d_model)
        # Apply positoinal encoding to each position
        # Even positions
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        # Odd positions
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        
        # Create a buffer, that is a tensor that is part of a model but
        # does not have gradients that are to be updated during training
        self.register_buffer('pe', pe)
        
    def forward(self, x: Tensor) -> Tensor:
        '''Arguments:
            x: Tensor, shape: [seq_len, batch_size, embedding_size]
        '''
        # Add x to its computed positional encoding for a given sequence length
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [4]:
from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator


In [5]:
train_iter = WikiText2(split='train')
tokenizer = get_tokenizer('basic_english')
vocab = build_vocab_from_iterator(map(tokenizer, train_iter), specials = ['unk'])
vocab.set_default_index(vocab['unk'])


# This funciton will tokenize and vocabularize the data in chunks
# hence the type ShardFilterIterData - each shard is put into a tensor containing the vocab
# Then, will remove all tensors with zero elements (t.numel() > 0
# Then concat them into a singular tensor along the 0th dimension
def data_process(raw_text_iter: dataset.IterableDataset) -> Tensor:
    '''Converts raw text into a flat Tensor.'''
    data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

train_iter, val_iter, test_iter = WikiText2()
train_data = data_process(train_iter)
val_data = data_process(val_iter)
test_data = data_process(test_iter)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def batchify(data: Tensor, bsz: int) -> Tensor:
    """Divides the data into ``bsz`` separate sequences, removing extra elements
    that wouldn't cleanly fit.

    Arguments:
        data: Tensor, shape ``[N]``
        bsz: int, batch size

    Returns:
        Tensor of shape ``[N // bsz, bsz]``
    """
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(device)

batch_size = 20
eval_batch_size = 10
train_data = batchify(train_data, batch_size) # Shape ''[seq_len, batch_size]''
val_data = batchify(val_data, eval_batch_size)
test_data = batchify(test_data, eval_batch_size)

In [7]:
bptt = 35
def get_batch(source: Tensor, i: int) -> Tuple[Tensor, Tensor]:
    """
    Args:
        source: Tensor, shape ``[full_seq_len, batch_size]``
        i: int

    Returns:
        tuple (data, target), where data has shape ``[seq_len, batch_size]`` and
        target has shape ``[seq_len * batch_size]``
    """
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].reshape(-1)
    return data, target

In [8]:
ntokens = len(vocab)  # size of vocabulary
emsize = 200  # embedding dimension
d_hid = 200  # dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 2  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 2  # number of heads in ``nn.MultiheadAttention``
dropout = 0.2  # dropout probability
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)



In [9]:
import time

criterion = nn.CrossEntropyLoss()
lr = 5.0 
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma = 0.95)

def train(model: nn.Module) -> None:
    model.train()
    total_loss = 0
    log_interval = 200
    start_time = time.time()

    num_batches = len(train_data) // bptt
    for batch, i in enumerate(range(0, train_data.size(0) -1, bptt)):
        data, targets = get_batch(train_data, i)
        output = model(data)
        output_flat = output.view(-1, ntokens)
        loss = criterion(output_flat, targets)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        if batch % log_interval == 0 and batch > 0:
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
            ppl = math.exp(cur_loss)
            print(f'| epoch {epoch:3d} | {batch:5d}/{num_batches:5d} batches | '
                  f'lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | '
                  f'loss {cur_loss:5.2f} | ppl {ppl:8.2f}')
            total_loss = 0
            start_time = time.time()

def evaluate(model: nn.Module, eval_data: Tensor) -> float:
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for i in range(0, eval_data.size(0) -1, bptt):
            data, targets = get_batch(eval_data, i)
            seq_len = data.size(0)
            output = model(data)
            output_flat = output.view(-1, ntokens)
            total_loss += seq_len * criterion(output_flat, targets).item()
    return total_loss / (len(eval_data) - 1)

In [10]:
best_val_loss = float('inf')
epochs = 3

with TemporaryDirectory() as tempdir:
    best_model_params_path = os.path.join(tempdir, "best_model_params.pt")

    for epoch in range(1, epochs + 1):
        epoch_start_time = time.time()
        train(model)
        val_loss = evaluate(model, val_data)
        val_ppl = math.exp(val_loss)
        elapsed = time.time() - epoch_start_time
        print('-' * 89)
        print(f'| end of epoch {epoch:3d} | time: {elapsed:5.2f}s | '
            f'valid loss {val_loss:5.2f} | valid ppl {val_ppl:8.2f}')
        print('-' * 89)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_model_params_path)

        scheduler.step()
    model.load_state_dict(torch.load(best_model_params_path)) # load best model states

| epoch   1 |   200/ 2928 batches | lr 5.00 | ms/batch 10.39 | loss  8.10 | ppl  3298.93
| epoch   1 |   400/ 2928 batches | lr 5.00 | ms/batch  7.16 | loss  6.85 | ppl   943.26
| epoch   1 |   600/ 2928 batches | lr 5.00 | ms/batch  6.40 | loss  6.42 | ppl   613.67
| epoch   1 |   800/ 2928 batches | lr 5.00 | ms/batch  6.29 | loss  6.30 | ppl   543.63
| epoch   1 |  1000/ 2928 batches | lr 5.00 | ms/batch  6.36 | loss  6.19 | ppl   485.90
| epoch   1 |  1200/ 2928 batches | lr 5.00 | ms/batch  6.31 | loss  6.15 | ppl   470.32
| epoch   1 |  1400/ 2928 batches | lr 5.00 | ms/batch  6.88 | loss  6.12 | ppl   452.76
| epoch   1 |  1600/ 2928 batches | lr 5.00 | ms/batch  7.23 | loss  6.11 | ppl   450.02
| epoch   1 |  1800/ 2928 batches | lr 5.00 | ms/batch  6.66 | loss  6.03 | ppl   413.91
| epoch   1 |  2000/ 2928 batches | lr 5.00 | ms/batch  6.41 | loss  6.02 | ppl   410.00
| epoch   1 |  2200/ 2928 batches | lr 5.00 | ms/batch  6.41 | loss  5.90 | ppl   365.50
| epoch   1 |  2400/ 

## Now that we understand the underlying workings of how a pytorch transformer works, we can expand the application to music
The first step is modifying out MIDI files to be in a form that is able to be processed by the transformer model

musicautobot did this with it's numpy encoder, but I may try a different way here