In [20]:
import torch
import torch.nn as nn
import numpy as np    
import matplotlib.pyplot as plt

In [21]:
''' Utils for creating data + training 
'''
import numpy as np
import torch
import torch.nn as nn
import math
import copy


####################################
#  Data generation + manipulation  #
####################################
def batchify(data, target):
    num_batch = int(math.floor(data.shape[0] / 100.0))
    input_batches = data.chunk(num_batch)
    output_batches = target.chunk(num_batch)
    
    return num_batch, input_batches, output_batches

def generateSeqs(N, in_len, data_range = (-1, 1), operation = 'subtraction', 
    method = 'neighbors', SOS_token = np.pi, EOS_token = -np.pi):
    ''' 
        Args:
            N (int)
            in_len (int)
            data_range (tuple) 
            operation (str)
            method (str)
            
        Out
            data
            target
    '''
    
    # Pull random floats in range (data_range[0], data_range[1])
    data = (data_range[0] - data_range[1]) * torch.rand((N, in_len)) + data_range[1]
    
    target = torch.zeros((N, in_len - 1))
    
    if operation == 'subtraction':
        op_func = subtractTensors
    elif operation == 'multiplication':
        op_func = multiplyTensors
    elif operation == 'division':
        op_func = divideTensors
    elif operation == 'power':
        op_func = exponentiateTensors
    
    for i in np.arange(0, in_len - 1, 1):
        target[:, i] = op_func(data[:, i], data[:, i + 1])
        
    # Appent SOS and EOS token
    SOS_ = SOS_token*torch.ones((N, 1))
    EOS_ = EOS_token*torch.ones((N, 1))

    data = torch.cat((SOS_, data), axis=1)
    data = torch.cat((data, EOS_), axis=1)

    target = torch.cat((SOS_, target), axis=1)
    target = torch.cat((target, EOS_), axis=1)

    # Unsqueeze so each entry becomes its own dimension, needed for the nn.Linear embedding
    data = torch.unsqueeze(data, dim=2)
    target = torch.unsqueeze(target, dim=2)
    
    return data, target
     
def subtractTensors(ten1, ten2):
    return ten1 - ten2
    
def multiplyTensors(ten1, ten2):
    return torch.mul(ten1, ten2)
    
def divideTensors(ten1, ten2):
    return torch.mul(ten1, 1/ten2)

def exponentiateTensors(ten1, ten2):
    return torch.pow(ten1, ten2)
    
   


###############################
#       Training              #
###############################   

def train_epoch(model, opt, criterion, scheduler, device, data_batches, target_batches, num_batch, verbose = False):
    ''' Train transformer model
    
        Args:
            model (FloatTransformer)
            opt
            criterion
            device (str)
            data_batches (tuple of tensors)
            target_batches (tuple of tensors)
            num_batch (int)
    '''
    model.train()
    total_loss = 0
    
    for i in range(num_batch):
        data = data_batches[i].to(device)
        target = target_batches[i].to(device)
        
        # Take SOS to last token before EOS as target input
        target_in = target[:, :-1].to(device)
        
        # Take first input to EOS as target expected from transformer
        target_expected = target[:, 1:].to(device)
        
        # Create masks
        tgt_mask = model.get_tgt_mask(target_in.size(1)).to(device)
        src_mask = model.get_tgt_mask(data.size(1)).to(device)
        
        pred = model(data, target_in, src_mask, tgt_mask)
        
        if i == 0 and verbose:
            print('data', data)
            print('tgt in', target_in)
            print('pred', pred)
            print('expected', target_expected)
        
        loss = (criterion(pred, target_expected).type(torch.float))
        
        opt.zero_grad()
        loss.backward()
        opt.step()

        total_loss += loss.detach().item()
        
    scheduler.step()
    return total_loss
    
def train(model, n_epochs, opt, criterion, scheduler, device, data_batches, target_batches, num_batch):
    best_loss = 10000.0
    best_model = copy.deepcopy(model).to(device)
    loss_ = np.array([])
    epochs = np.array([])
    
    for i in range(n_epochs):
        loss = train_epoch(model, opt, criterion, scheduler, device, data_batches, target_batches, num_batch)
        loss_ = np.append(loss_, loss)
        epochs = np.append(epochs, i)
        
        if loss < best_loss:
            best_loss = loss
            best_model = copy.deepcopy(model).to(device)
            
        if i % 10 == 0:
            print(f'Epoch: {i}\nTotal Loss: {loss}')
            print(f'-----------------------------------')
            
    return best_model, loss_, epochs
    
if __name__ == "__main__":
    src, tgt = generateSeqs(10, 3, operation = 'multiplication')
    print(src[0])
    print(tgt[0])

tensor([[ 3.1416],
        [-0.1277],
        [-0.5596],
        [ 0.0442],
        [-3.1416]])
tensor([[ 3.1416],
        [ 0.0715],
        [-0.0247],
        [-3.1416]])


In [22]:

  
''' Module containing class definitions for a transformer implemented to 
    manipulate sequences of floats
''' 

import torch
import torch.nn as nn
import numpy as np
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout_p, max_len):
        super().__init__()

        # Info
        self.dropout = nn.Dropout(dropout_p)
        
        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, d_model)
        positions_list = torch.arange(0, max_len, dtype=torch.long).view(-1, 1) # 0, 1, 2, 3, 4, 5
        division_term = torch.exp(torch.arange(0, d_model, 2).long() * (-math.log(10000.0)) / d_model) # 1000^(2i/dim_model)
        
        # PE(pos, 2i) = sin(pos/1000^(2i/d_model))
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        
        # PE(pos, 2i + 1) = cos(pos/1000^(2i/d_model))
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
        
        # Saving buffer (same as parameter without gradients needed)
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding",pos_encoding)
        
    def forward(self, token_embedding: torch.tensor, train=False) -> torch.tensor:
        # Residual connection + pos encoding
        
        # If training, apply dropout
        if train:
            return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])
            
        # If not, return without dropout
        else:
            return token_embedding + self.pos_encoding[:token_embedding.size(0), :]
            
class FloatTransformer(nn.Module):
    def __init__(self, d_model, n_head, n_layers, device, in_element_dim = 1, out_element_dim = 1, pos_encoding = False):
        ''' Initialize transformer model. Right now, dropout probability is set to 0 as that is what is found to work
        with float manipulation
        
            Args:
                d_model (int): dimension of embedding
                n_head (int): number of attention heads
                n_layers (int): number of encoders/decoders in encoder and decoder blocks
                in_element_dim (int): The dimensionality of each element of a sequence inputted to the transformer. Default
                set to 1
                out_element_dim (int): The dimensionality of each element of a sequence outputted by the transformer. Default 
                set to 1
        '''
        super().__init__()
        
        ## Define dimensionality of each object of transformer
        # N: Batch num
        # S: Sequence length
        # T: Target length
        
        # Initialize parameters + objects
        self.d_model = d_model
        self.device = device
        
        # Input dimensions: N x S x in_element_dim ---> Output dimensions: N x S x d_model
        self.embedding = nn.Linear(in_element_dim, d_model)
        
        # If doing positional encoding, do so. If not, leave as identity
        if pos_encoding:
        
            # Input dimensions: N x S x d_model ---> Output dimensions: N x S x d_model
            self.positional_encoder = PositionalEncoding(
                d_model = d_model, 
                dropout_p = 0.0,
                max_len = 100)
                
        else:
        
            # Input dimensions: N x S x d_model ---> Output dimensions: N x S x d_model
            self.positional_encoder = nn.Identity()

        # Input dimensions: src: N x S x d_model & tgt: N x T x d_model ---> Output dimensions: N x T x d_model
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=n_head,
            num_encoder_layers=n_layers,
            num_decoder_layers=n_layers,
            dropout = 0 )
        
        # Input dimensions: N x T x d_model ---> Output dimensions: N x T x out_element_dim
        self.out = nn.Linear(d_model, out_element_dim) 
        
    def get_tgt_mask(self, size) -> torch.Tensor:
        ''' Generate square tensor of length size, where the lower triangular entries of the matrix
        are set to 0 (true) and the rest are set to -inf (false)
        
            Args:
                size (int): size of square tensor
                
            Out:
                mask
        '''
        
        # Create lower triangular matrix where triangular entries are 1 and the rest are 0
        mask = torch.tril(torch.ones(size, size) == 1)
        mask = mask.float()
        
        # Turn zeros to -inf
        mask = mask.masked_fill(mask == 0, float('-inf'))
       
        # Turn ones in the matrix to zeros
        mask = mask.masked_fill(mask == 1, float(0.0))
        
        return mask
        
    def forward(self, src, tgt, src_mask=None, tgt_mask=None, src_key_padding_mask=None, 
                tgt_key_padding_mask=None, verbose=False, train=False):
        ''' Forward method of transformer. Let the following symbols be defined as below:
            N - batch num
            S - src sequence length
            A - src element size
            T - tgt sequence length
            B - tgt element size
            Important: src and tgt tensors must have three dimensions in order to use the forward
            method properly.
            
            Args:
                src (tensor): input sequence(s)   N x S x A
                tgt (tensor): target sequence(s)  N x T x B
                src_mask (tensor): mask for src input  S x S
                tgt_mask (tensor): mask for tgt input  T x T
                src_key_padding_mask (tensor): mask for any padding after EOS  N x S
                tgt_key_padding_mask (tensor): mask for any padding after EOS  N x L
                verbose (bool): boolean for printing out intermediate values
                train (bool): boolean for whether or not to apply dropout w/ positional encoding
        '''
        
        # Apply embeddings
        src_emb = (self.embedding(src) * np.sqrt(self.d_model)).to(self.device)
        tgt_emb = (self.embedding(tgt) * np.sqrt(self.d_model)).to(self.device)
        
        src_pemb = self.positional_encoder(src_emb)
        tgt_pemb = self.positional_encoder(tgt_emb)
        
        # Do so that length is first?
        src_pemb = src_pemb.permute(1, 0, 2)
        tgt_pemb = tgt_pemb.permute(1, 0, 2)
        
        transformer_output = self.transformer(src_pemb, tgt_pemb, src_mask=src_mask, tgt_mask=tgt_mask, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask).to(self.device)
        
        output = self.out(transformer_output).to(self.device)
    
        # Repermute so that batch size N is first again :>
        output = output.permute(1, 0, 2).to(self.device)
        
        if verbose:
            print(f'\nTracing through the Transformer\n')
            print(f'Input: {src}')
            print(f'Target: {tgt}')
            
            print(f'\nAfter Embeddings')
            print(f'Embedded input: {src_emb}')
            print(f'Embedded target: {tgt_emb}')
            
            print(f'\nAfter Positional Encoding')
            print(f'Positionally encoded input: {src_pemb}')
            print(f'Positionally encoded target: {tgt_pemb}')
            
            print(f'\nAfter Encoder + Decoder Blocks')
            print(f'Output from transformer: {transformer_output}')
            
            print(f'Final output: {output}')
        
        return output

In [23]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

d_model = 16
n_head = 1
n_layers = 2

N = 30000
seq_len = 3
model = FloatTransformer(d_model, n_head, n_layers, device, pos_encoding = False).to(device)
src, tgt = generateSeqs(N, seq_len, operation = "multiplication")
opt = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.MSELoss()
scheduler = torch.optim.lr_scheduler.ExponentialLR(opt, gamma=0.95)
num_batch, src_batches, tgt_batches = batchify(src, tgt)
    
print(src_batches[0][0])
print(tgt_batches[0][0])
    
n_epochs = 500
best_model, loss, epochs = train(model, n_epochs, opt, criterion, scheduler, device, src_batches, tgt_batches, num_batch)

plt.plot(epochs, loss)
plt.show()

tensor([[ 3.1416],
        [ 0.5828],
        [ 0.7396],
        [-0.0918],
        [-3.1416]])
tensor([[ 3.1416],
        [ 0.4310],
        [-0.0679],
        [-3.1416]])
Epoch: 0
Total Loss: 161.44000736624002
-----------------------------------
Epoch: 10
Total Loss: 0.3978856448084116
-----------------------------------
Epoch: 20
Total Loss: 0.120656288236205
-----------------------------------
Epoch: 30
Total Loss: 0.04845082746032858
-----------------------------------
Epoch: 40
Total Loss: 0.02357302640848502
-----------------------------------
Epoch: 50
Total Loss: 0.012955369385963422
-----------------------------------
Epoch: 60
Total Loss: 0.008210779610635655
-----------------------------------
Epoch: 70
Total Loss: 0.005929385823037592
-----------------------------------
Epoch: 80
Total Loss: 0.004735058361802658
-----------------------------------
Epoch: 90
Total Loss: 0.003958543666158221
-----------------------------------
Epoch: 100
Total Loss: 0.0035652544052027224
--

KeyboardInterrupt: ignored

In [38]:
trial_in = torch.tensor([[[np.pi], [0.5], [0.5], [0.2], [-np.pi]]]).to(device)
trial_out = torch.tensor([[[np.pi], [0.25], [0.1]]]).to(device)
src_mask = model.get_tgt_mask(5).to(device)
tgt_mask = model.get_tgt_mask(3).to(device)
model(trial_in, trial_out, src_mask, tgt_mask)

tensor([[[ 0.2547],
         [ 0.1039],
         [-3.1427]]], grad_fn=<PermuteBackward0>)

In [39]:
## Convention
# File name of model: OPERATION_NAME_tfmr_dmodel-nhead-layer
torch.save(model.state_dict(), 'mult_tfmr_16-1-2')