Import modules

In [None]:
from torch.utils.data import TensorDataset, DataLoader
from basic_FFN import get_batch_datasets
from trial_encoding_block import TrialEncodingBlock
from json import load
import torch
import torch.nn as nn
import torch.optim as optim
import math
import numpy as np
import matplotlib.pyplot as plt

Load some data

In [None]:
def process_nans(in_vec: torch.Tensor) -> torch.Tensor:
    """
    *Assuming (Number of sources, number of rows in each source, sources size). This function gets rid of nan values and their positions from the data tensor.
    
    :param in_vec: torch.Tensor
    :return: torch.Tensor
    """
    
    out_vec = []
    for sub_vec in in_vec:
        non_nan_indices = ~torch.isnan(sub_vec[1])
        filtered_sub_vec = sub_vec[:, non_nan_indices]
        out_vec.append(filtered_sub_vec)
        
    filtered_in_vec = torch.cat(out_vec, dim=1)
    
    return filtered_in_vec
        

In [None]:
# Get raw datasets.
train_ds = get_batch_datasets(0,
                              800,
                        "agn_{}_synthetic.csv",
                        "/Users/jackhu/PycharmProjects/pytorchSelflearn/data/agn_synthetic")
validate_ds = get_batch_datasets(800,
                                 899,
                             "agn_{}_synthetic.csv",
                             "/Users/jackhu/PycharmProjects/pytorchSelflearn/data/validate")

with open("/Users/jackhu/PycharmProjects/pytorchSelflearn/dataTransformer/trial_configs.json", 'r') as f:
    options = load(f)

# Initialize transformer object, make dataset list a Tensor.
train_ds_tensor = process_nans(torch.Tensor(np.array(train_ds)).requires_grad_(True))
valid_ds_tensor = process_nans(torch.Tensor(np.array(validate_ds)).requires_grad_(True))

# # Rework the training and validation tensors to only have the data dimension.
# train_ds_tensor = train_ds_tensor[:, 1, :].unsqueeze(2)
# valid_ds_tensor = valid_ds_tensor[:, 1, :].unsqueeze(2)

train_dataset = TensorDataset(train_ds_tensor)
train_loader = DataLoader(dataset=train_dataset, batch_size=10, shuffle=True)

valid_dataset = TensorDataset(valid_ds_tensor)
valid_loader = DataLoader(dataset=valid_dataset, batch_size=10, shuffle=True)

In [None]:
train_ds_tensor.size()

Positional Encoder

In [None]:
class PositionalEncoding(nn.Module):
    """
    Positional Encoding Module
    """

    def __init__(self, d_model, max_len=5000):
        """
        
        :param d_model: dimension of embedding.
        :param max_len: maximum length of input sequence.
        """
        super(PositionalEncoding, self).__init__()       
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        "x" is expected to be of size [seq_len, d_model].
        :param x: embedded Tensor.
        :return: Combined positional embedding Tensor.
        """
        
        return x + self.pe[:x.size(0), :]

In [None]:
class Transformer(nn.Module):
    """
    Transformer Module. 
    """
    
    def __init__(self, **configs):
        super(Transformer, self).__init__()
        self.__d_input = configs['input_dim']
        self.__d_output = configs['output_dim']
        self.__d_emb = configs['embed_dim']
        self.__n_enc_layers = configs['encoder_layers']
        
        self.__model_type = 'Transformer'
        self.__src_mask = None
        self.__pos_enc = PositionalEncoding(self.__d_emb)
        self.__ini_enc = nn.Embedding(self.__d_input, self.__d_emb)
        
        # My implementation of encoder.
        self.__transformer_encoder = nn.ModuleList([TrialEncodingBlock(**configs) 
                                                    for _ in range(self.__n_enc_layers)])
        
        self.__encoder = nn.Linear(1, self.__d_emb)
        self.__decoder = nn.Linear(self.__d_emb, self.__d_output)
        self.init_weights()
        
    def init_weights(self, init_range=0.1):
        self.__encoder.weight.data.uniform_(-init_range, init_range)
        self.__decoder.weight.data.uniform_(-init_range, init_range)
        
    def forward(self, src, src_mask=None):
        """
        
        :param src: 
        :param src_mask: 
        :return: 
        """
        
        if src_mask is None or src_mask.size(0) != src.size(1):
            device = src.device
            mask = self.__generate_mask(src.size(1)).to(device)
            self.__src_mask = mask
            
        src = self.__encoder(src)
        src = self.__pos_enc(src)
        output = None
        for layer in self.__transformer_encoder:
            output = layer(src, self.__src_mask)
        output = self.__decoder(output)
        
        return output
    
    def __generate_mask(self, size):
        mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(
            mask == 1, float(0.0))
        return mask
        
# For debugging
test_transform = Transformer(**options)
test_transform.train()

out = None
for batch in train_loader:
    batch_tensor = batch[0]
    
    out = test_transform.forward(batch_tensor)

print(out.size())
