In [1]:
import torch
import torch.nn as nn
import torch.nn.init as init

In [2]:
MODEL_PATH = './model/autoencoder-2/regressor_freezed_checkpoint.pth'

# Validation Dataset

In [3]:
import torch
import numpy as np
from typing import List
from torch.utils.data import Dataset
from sklearn.preprocessing import StandardScaler
import random

class CustomDataset(Dataset):
    def __init__(self, large_file_path, chunk_size, subset_size=50000):
        self.large_file_path = large_file_path
        self.chunk_size = chunk_size
        self.subset_size = subset_size
        self.line_offsets = self.get_line_offsets(large_file_path, chunk_size)
        self.scaler = StandardScaler()
        print("Calculating mean and std...")
        self.mean, self.std = self.calculate_mean_std()
        print(f"Mean: {self.mean}, Std: {self.std}")

    def get_line_offsets(self, path: str, chunk_size: int) -> List[int]:
        offsets = [0]
        with open(path, "rb") as file:
            chunk = file.readlines(chunk_size)
            while chunk:
                for line in chunk:
                    offsets.append(offsets[-1] + len(line))
                chunk = file.readlines(chunk_size)
                print(f"Lines found: {len(offsets)}", end='\r')
        offsets = offsets[:-1]
        print(f"Lines found: {len(offsets)}", end='\n')
        return offsets

    def calculate_mean_std(self):
        selected_offsets = random.sample(self.line_offsets, min(self.subset_size, len(self.line_offsets)))
        features = []
        for offset in selected_offsets:
            with open(self.large_file_path, 'r', encoding='utf-8') as f:
                f.seek(offset)
                line = f.readline()
                numbers = [float(num) for num in line.strip().split()]
                features.append(numbers[:4])
        features = np.array(features)
        mean = np.mean(features, axis=0, dtype=np.float32)
        std = np.std(features, axis=0, dtype=np.float32)
        return mean, std

    def standardize_features(self, features):
        standardized_features = (features - self.mean) / self.std
        return standardized_features

    def __len__(self):
        return len(self.line_offsets)

    def __getitem__(self, line):
        offset = self.line_offsets[line]
        with open(self.large_file_path, 'r', encoding='utf-8') as f:
            f.seek(offset)
            line = f.readline()
            numbers = [float(num) for num in line.strip().split()]
            features, targets = numbers[:4], numbers[4:]
            standardized_features = self.standardize_features(np.array(features))
            return torch.tensor(standardized_features, dtype=torch.float32), torch.tensor(targets, dtype=torch.float32)


In [4]:
filename = "./data/sph_100_10_20.txt"
full_dataset = CustomDataset(filename, 2**20)

Lines found: 26620
Calculating mean and std...
Mean: [5.       9.5      1.570781 2.984502], Std: [3.1622777  5.766281   0.99348164 1.8115467 ]


In [5]:
dataloader = torch.utils.data.DataLoader(full_dataset, batch_size=32, shuffle=True)

# Model

In [6]:
class Block(nn.Module):
    def __init__(self, in_layers, out_layers):
        super(Block, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(in_layers, out_layers),
            nn.ReLU(),
            nn.Dropout1d(p=0.2),
        )
        
    def forward(self, x):
        x = self.layers(x)
        return x

In [7]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.blocks = nn.ModuleList([
            Block(4, 512),
            Block(512, 512),
            Block(512, 512),
        ])
        self.bottleneck = nn.ModuleList([
            Block(512, 512),
            nn.Linear(512, 4),
        ])
        
        self.__init_weights()

    def __init_weights(self):
        layers = [self.blocks, self.bottleneck]
        # Initialize linear layers using Kaiming (He) uniform initialization
        for m in layers:
            for layer in m:
                self.__init_layer(layer)
                        
    def __init_layer(self, layer):
        if isinstance(layer, nn.Linear):
            init.kaiming_uniform_(layer.weight, mode='fan_in', nonlinearity='tanh')
            if layer.bias is not None:
                init.zeros_(layer.bias)

    def forward(self, x):
        x = self.blocks[0](x)
        for block in self.blocks[1:]:
            y = block(x)
            x = x + y
        for btl in self.bottleneck:
            x = btl(x)
        return x

In [8]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.blocks = nn.ModuleList([
            Block(4, 512),
            Block(512, 512),
            Block(512, 512),
            Block(512, 512),
        ])
        self.out = nn.Linear(512, 2)
        self.__init_weights()

    def __init_weights(self):
        layers = [self.blocks]
        # Initialize linear layers using Kaiming (He) uniform initialization
        for m in layers:
            for layer in m:
                self.__init_layer(layer)
        self.__init_layer(self.out)
                        
    def __init_layer(self, layer):
        if isinstance(layer, nn.Linear):
            init.kaiming_uniform_(layer.weight, mode='fan_in', nonlinearity='tanh')
            if layer.bias is not None:
                init.zeros_(layer.bias)

    def forward(self, x):
        x = self.blocks[0](x)
        for block in self.blocks[1:]:
            y = block(x)
            x = x + y
        x = self.out(x)
        return x

In [9]:
class RegressionAutoencoder(nn.Module):
    def __init__(self, encoder, decoder, freeze):
        super(RegressionAutoencoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

        if self.encoder is not None:
            self.gradient(self.encoder, freeze)
        
    def gradient(self, model, freeze: bool):
        for parameter in model.parameters():
            parameter.requires_grad_(not freeze)
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [10]:
encoder = Encoder()
decoder = Decoder()
autoencoder_model = RegressionAutoencoder(encoder=encoder, decoder=decoder, freeze=True)

In [11]:
checkpoint = torch.load(MODEL_PATH, map_location=torch.device('cpu'))
autoencoder_model.load_state_dict(checkpoint['autoencoder_model_state_dict'])

<All keys matched successfully>

In [12]:
autoencoder_model.eval()

RegressionAutoencoder(
  (encoder): Encoder(
    (blocks): ModuleList(
      (0): Block(
        (layers): Sequential(
          (0): Linear(in_features=4, out_features=512, bias=True)
          (1): ReLU()
          (2): Dropout1d(p=0.2, inplace=False)
        )
      )
      (1-2): 2 x Block(
        (layers): Sequential(
          (0): Linear(in_features=512, out_features=512, bias=True)
          (1): ReLU()
          (2): Dropout1d(p=0.2, inplace=False)
        )
      )
    )
    (bottleneck): ModuleList(
      (0): Block(
        (layers): Sequential(
          (0): Linear(in_features=512, out_features=512, bias=True)
          (1): ReLU()
          (2): Dropout1d(p=0.2, inplace=False)
        )
      )
      (1): Linear(in_features=512, out_features=4, bias=True)
    )
  )
  (decoder): Decoder(
    (blocks): ModuleList(
      (0): Block(
        (layers): Sequential(
          (0): Linear(in_features=4, out_features=512, bias=True)
          (1): ReLU()
          (2): Dropout1d

In [13]:
def validate(model, dataloader, loss_fn):
    model.eval()
    total_loss = 0
    total_diff = 0

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(torch.float32), targets.to(torch.float32)
            
            outputs = model(inputs)

            loss = loss_fn(outputs, targets)
            diff = torch.abs(outputs - targets).mean()
            
            total_loss += loss.item()
            total_diff += diff.item()

    average_loss = total_loss / len(dataloader)
    average_diff = total_diff / len(dataloader)

    print(f"Test Loss: {average_loss:.4f}, Test Diff: {average_diff:.4f}")

In [14]:
validate(autoencoder_model, dataloader, nn.MSELoss())

Test Loss: 0.0398, Test Diff: 0.1214


In [16]:
torch_input = torch.randn(1, 4)
onnx_program = torch.onnx.dynamo_export(autoencoder_model, torch_input)



In [19]:
onnx_program.save("onnx/autoencoder2_freezed.onnx")

In [20]:
import onnx
onnx_model = onnx.load("onnx/autoencoder2_freezed.onnx")
onnx.checker.check_model(onnx_model)