In [1]:
import os
import sys

import matplotlib.pyplot as plt
import numpy as np
from rich.console import Console
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
from copy import deepcopy

sys.path.append(os.path.dirname(os.getcwd()))
from tools.utils import (
    StructureEncoding,
    Target,
)
from tools.data_loader import TestSet, TestSplit, data_loader
from tools.transform import CustomLogTargetTransformer
from tools.train import evaluate_models
%load_ext autoreload
%load_ext rich
%autoreload 2

In [2]:
console = Console()

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [4]:
def magnitude(x):
    return int(np.floor(np.log10(x)))

def magnitude_transform(a):
    return -np.vectorize(magnitude)(a)

# Loading Data

In [5]:
DATA_DIR = os.path.join(
    os.path.dirname(os.path.dirname(os.getcwd())), "data/"
)
DATA_PATH = os.path.join(DATA_DIR, "data.csv")

In [6]:
encoding = StructureEncoding.ATOMIC
target = Target.DELTA_E
test_sets_cfg = [
    TestSet("Parameter gen.", size=0.1, split=TestSplit.ROW),
    TestSet("Structure gen.", size=0.1, split=TestSplit.STRUCTURE),
]

# Data Loading
X_train_raw, y_train_raw, test_sets_raw = data_loader(
    target=target,
    encoding=encoding,
    data_path=DATA_PATH,
    test_sets_cfg=test_sets_cfg,
    console=console,
    remove_ref_rows=True,
)

In [7]:
std_scaler = StandardScaler().fit(X_train_raw)
target_transformer = CustomLogTargetTransformer().fit(y_train_raw)

In [8]:
X_train = torch.Tensor(std_scaler.transform(X_train_raw)).to(device)
y_train = torch.Tensor(magnitude_transform(y_train_raw.values)).long().to(device)

test_sets = []
for name, X_test_raw, y_test_raw in test_sets_raw:
    X_test = torch.Tensor(std_scaler.transform(X_test_raw)).to(device)
    y_test = torch.Tensor(magnitude_transform(y_test_raw.values)).long().to(device)
    test_sets.append((name, X_test, y_test))

# Model definition

In [38]:
class MLP(nn.Module):
    def __init__(self, input_size, output_size):
        super(MLP, self).__init__()
        N = 200
        self.layers = nn.Sequential(
            # nn.Linear(input_size, 1),
            nn.Linear(input_size, N),
            nn.ReLU(),
            nn.Linear(N, N),
            nn.ReLU(),
            nn.Linear(N, output_size),
        )
        
    def forward(self, x):
        return torch.abs(self.layers(x))
    
    def predict(self, x):
        return self.forward(x).detach()

In [39]:
model = MLP(X_train.shape[1], 10)
model = model.to(device)
print(model)

MLP(
  (layers): Sequential(
    (0): Linear(in_features=89, out_features=200, bias=True)
    (1): ReLU()
    (2): Linear(in_features=200, out_features=200, bias=True)
    (3): ReLU()
    (4): Linear(in_features=200, out_features=10, bias=True)
  )
)


In [40]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

loss_fn= nn.CrossEntropyLoss()

epochs = 500

for epoch in range(epochs):
    # train phase
    model.train()
    y_pred_train = model(X_train)
    loss = loss_fn(y_pred_train, y_train)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # eval phase
    model.eval()
    y_preds_test = [model(X_test) for _, X_test, _ in test_sets]
    print(
        f"[Epoch {epoch+1}/{epochs}]: train {loss.item():.4E} {(y_pred_train.argmax(dim=1)==y_train).sum().item()/y_train.shape[0]:.4e}"
        +"\ttests "
        + "\t".join([f"{loss_fn(y_pred_test, y_test):.4E} {(y_pred_test.argmax(dim=1)==y_test).sum().item()/y_test.shape[0]:.4E}" for y_pred_test, (_,_,y_test) in zip(y_preds_test, test_sets)]),
        end="\r"
    )

[Epoch 500/500]: train 2.4849E-01 9.1292e-01	tests 2.7119E-01 9.0665E-01	2.4216E+00 5.7435E-01