In [12]:
import torch
import torch.nn as nn
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

# Custom modules
import DataUtils
import Masking
import Models

## Loading Data

In [14]:
trainloader, testloader = DataUtils.get_dataloaders(64, 1)

## Loading in Models

Loading the "frozen" imputation model head (self-supervised)

In [4]:
latent_imputation = Models.ImputationTransformer(embed_dim=128)
latent_imputation.load_state_dict(torch.load('./saved_models/imputation_transformer_prototype.pt'))

<All keys matched successfully>

Loading the pretrained imputation model

In [9]:
imputation_model = Models.ReconstructionImputationTransformer()
imputation_model.load_state_dict(torch.load('./saved_models/recons_imputation_transformer_prototype.pt'))

for param in imputation_model.parameters():
            param.requires_grad = False

## Creating Imputation RUL Transformer Model

In [22]:
class ImputationRULTransformer(nn.Module):
    def __init__(self, latent_model):
        super().__init__()
        
        self.prediction_window = 1
        self.input_len = 542
        self.n_cols = 24
        self.embed_dim = 128
        self.latent_model = latent_model
        
        for param in self.latent_model.parameters():
            param.requires_grad = False
        
        self.input_projection = nn.Sequential(nn.Conv1d(in_channels=542, out_channels=self.embed_dim, kernel_size=1),
                                              nn.ReLU(),
                                              nn.AdaptiveMaxPool1d(1),
                                              nn.Flatten())
        
        self.positional_embed = nn.Parameter(torch.randn(self.embed_dim))
        
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=self.embed_dim, nhead=4, activation="gelu", dropout=0.1)
        self.transformer_blocks = nn.TransformerEncoder(self.encoder_layer, num_layers=2)
        
        self.rul_head = nn.Sequential(nn.Linear(self.embed_dim, 2048),
                                           nn.ReLU(),
                                           nn.Linear(2048, self.prediction_window))
    
    def forward(self, x):
        imputation_latent = self.latent_model(x)
        z = self.input_projection(x) 
        z = z + self.positional_embed + imputation_latent # simply adding for now
        z = self.transformer_blocks(z)
        z = self.rul_head(z)
        
        return z.squeeze(1)

### Only Get Masked Outputs

In [8]:
def get_masked_indices(masked_X):
    missing_idx = (masked_X==float(-1))
    return missing_idx

In [7]:
columns_excluded = [0, 1, 26] # make sure to mask and THEN take these out
columns_kept = [False, False, True, True, True,
               True, True, True, True, True,
               True, True, True, True, True,
               True, True, True, True, True,
               True, True, True, True, True,
               True, False]

### Model Training

In [None]:
objective = nn.MSELoss()

rul_tran = ImputationRULTransformer(latent_imputation)

lr = 1e-4
n_epochs = 25
optim = torch.optim.Adam(rul_tran.parameters(), lr=lr)
losses = []

for n in range(n_epochs):
    counter = 0
    for i, (X, y) in enumerate(tqdm(trainloader)):
        optim.zero_grad()
            
        masked_X = Masking.mask_input(X)
        missing_idx = get_masked_indices(masked_X[:,:,columns_kept])
        
        xhat = imputation_model(masked_X[:,:,columns_kept].float())
        filled_X = X[:,:,columns_kept].clone().float()
        filled_X[missing_idx] = xhat[missing_idx]

        yhat = rul_tran(filled_X.float())
        loss = objective(yhat, y.float())
        loss.backward()
        losses.append(loss.item())
        optim.step()
        counter += 1
        
    print("Epoch:", n+1, "Loss:",np.mean(losses[-counter:][0]))

 10%|████████                                                                         | 32/323 [01:16<12:47,  2.64s/it]