In [None]:
import torch
import numpy as np
import pandas as pd
import os
import torch.nn as nn
import time
from torch.nn import Linear
import torch.nn.functional as F
from torch.utils.data import Dataset, TensorDataset, DataLoader, random_split
from prefetch_generator import BackgroundGenerator
from tqdm import trange
from torch.optim.lr_scheduler import StepLR
from sklearn.preprocessing import MinMaxScaler
import math
import scipy.constants as const  # Delete them when updating the data 
from scipy.integrate import quad # Delete them when updating the data 
import joblib

torch.cuda.empty_cache()
torch.manual_seed(0)
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [None]:
Properties = pd.read_csv('files/Data/Properties.csv')
Compositions = pd.read_csv('files/Data/Compositions.csv')

'''
Properties indices:
 0: 'Density'
 1: 'Young's modulus'
 2: 'Flexural modulus'
 3: 'Shear modulus'
 4: 'Bulk modulus'
 5: 'Poisson's ratio'
 6: 'Melting point'
 7: 'Thermal conductivity'
 8: 'Specific heat capacity'
 9: 'Thermal expansion coefficient'
10: 'Latent heat of fusion'
11: 'Electrical conductivity'
12: 'Acoustic velocity'
13: 'Average Atomic Weight'
'''

'''
Compositions indices:
  0: 'Ag (silver)'
  1: 'Al (aluminum)'
  2: 'As (arsenic)'
  3: 'Au (gold)'
  4: 'B (boron)'
  5: 'Be (beryllium)'
  6: 'BeO (beryllia)'
  7: 'Bi (bismuth)'
  8: 'C (carbon)'
  9: 'Ca (calcium)'
 10: 'Cd (cadmium)'
 11: 'Ce (cerium)'
 12: 'Co (cobalt)'
 13: 'Cr (chromium)'
 14: 'Cu (copper)'
 15: 'Dy (dysprosium)'
 16: 'Er (erbium)'
 17: 'Eu (europium)'
 18: 'Fe (iron)'
 19: 'Ga (gallium)'
 20: 'Gd (gadolinium)'
 21: 'Ge (germanium)'
 22: 'H (hydrogen)'
 23: 'Hf (hafnium)'
 24: 'Ho (holmium)'
 25: 'In (indium)'
 26: 'Ir (iridium)'
 27: 'La (lanthanum)'
 28: 'Li (lithium)'
 29: 'Lu (lutetium)'
 30: 'Mg (magnesium)'
 31: 'Mn (manganese)'
 32: 'Mo (molybdenum)'
 33: 'N (nitrogen)'
 34: 'Nb (niobium)'
 35: 'Nd (neodymium)'
 36: 'Ni (nickel)'
 37: 'O (oxygen)'
 38: 'O2 (oxygen gas)'
 39: 'Os (osmium)'
 40: 'P (phosphorus)'
 41: 'Pb (lead)'
 42: 'Pd (palladium)'
 43: 'Pr (praseodymium)'
 44: 'Pt (platinum)'
 45: 'Re (rhenium)'
 46: 'Rh (rhodium)'
 47: 'Ru (ruthenium)'
 48: 'S (sulfur)'
 49: 'Sb (antimony)'
 50: 'Sc (scandium)'
 51: 'Se (selenium)'
 52: 'Si (silicon)'
 53: 'Sm (samarium)'
 54: 'Sn (tin)'
 55: 'Sr (strontium)'
 56: 'Ta (tantalum)'
 57: 'Tb (terbium)'
 58: 'Te (tellurium)'
 59: 'ThO2 (thoria)'
 60: 'Ti (titanium)'
 61: 'Tl (thallium)'
 62: 'Tm (thulium)'
 63: 'U (uranium)'
 64: 'V (vanadium)'
 65: 'Y (Yttrium)'
 66: 'Yb (Ytterbium)'
 67: 'W (Tungsten)'
 68: 'Zn (Zinc)'
 69: 'Zr (Zirconium)'
'''


In [None]:
scaler_y = joblib.load('files/Scalers/scaler_Properties.pkl')

In [None]:
X = Compositions
Y = Properties

XX = X.values
YY = scaler_y.transform(Y)

In [None]:
latent_dim = 15

z_dim = latent_dim

c_hidden_dim = [latent_dim, 128, 128, 128]

epochs = 200
dropout = 0.
valid_size = 0.05
batch_size = 32
test_batch_size = batch_size

torch.backends.cudnn.benchmark = True

ResultsFolder = 'files/results'
savedModelFolder = 'files/Models'
load_pretrained_model = False

c_lr = 1e-3
learningRate = 5e-4

param_Dim = YY.shape[1]
add_noise = False
recon_criterion = nn.MSELoss(reduction = 'sum')

Comp_vec_dim = XX.shape[1]

In [None]:
# Prefetching wrapper for efficient data loading
class data_prefetcher():
    def __init__(self, loader):
        self.loader = iter(loader)
        self.preload()

    def preload(self):
        try:
            self.next_data = next(self.loader)
        except StopIteration:
            self.next_input = None
            return
           
    def next(self):
        data = self.next_data
        self.preload()
        return data

# Background prefetch-enabled DataLoader
class DataLoaderX(DataLoader):
    def __iter__(self):
        return BackgroundGenerator(super().__iter__())


# Custom Variational Autoencoder Model for property vector encoding and reconstruction
class vaeModel(nn.Module):
    def __init__(self):
        super(vaeModel, self).__init__()

        # ---- Encoder Network ----
        # Maps input vectors to latent distribution (mu and log_var)
        x_hidden_dim = [Comp_vec_dim, 128, 128, 128, latent_dim]

        # Feed-forward layers for encoding
        self.en_x_fc1 = nn.Linear(x_hidden_dim[0], x_hidden_dim[1])
        self.en_x_fc2 = nn.Linear(x_hidden_dim[1], x_hidden_dim[2])
        self.en_x_fc3 = nn.Linear(x_hidden_dim[2], x_hidden_dim[3])
        self.en_x_fc4 = nn.Linear(x_hidden_dim[3], x_hidden_dim[4])

        # Latent space parameters (mean and log variance)
        self.x_fc_mu = nn.Linear(latent_dim,  latent_dim)
        self.x_fc_std = nn.Linear(latent_dim,  latent_dim)

        # ---- Decoder Network ----
        # Maps latent vector z back to the original input space
        de_x_hidden_dim = [latent_dim, 128, 128, 128, Comp_vec_dim]

        # Feed-forward layers for decoding
        self.de_x_fc1 = nn.Linear(de_x_hidden_dim[0], de_x_hidden_dim[1])
        self.de_x_fc2 = nn.Linear(de_x_hidden_dim[1], de_x_hidden_dim[2])
        self.de_x_fc3 = nn.Linear(de_x_hidden_dim[2], de_x_hidden_dim[3])
        self.de_x_fc4 = nn.Linear(de_x_hidden_dim[3], de_x_hidden_dim[4])

        self.activation = nn.ReLU()

    def reparameterize(self, mu, logvar):
        """
        Reparameterization trick to sample z from N(mu, sigma^2) using
        a standard normal distribution.
        """
        sigma = torch.exp(logvar)
        eps = torch.randn_like(sigma)  # Sample epsilon from N(0, I)
        return mu + sigma * eps

    def encoder(self, x):
        """
        Encodes input vector into latent distribution parameters.
        """
        x = self.activation(self.en_x_fc1(x))
        x = self.activation(self.en_x_fc2(x))
        x = self.activation(self.en_x_fc3(x))
        x = self.en_x_fc4(x)

        x_mu = self.x_fc_mu(x)
        x_log_var = self.x_fc_std(x)

        x_z = self.reparameterize(x_mu, x_log_var)
        return x_z, x_mu, x_log_var

    def decoder(self, z):
        """
        Decodes the latent vector z back to the input space.
        """
        x = self.activation(self.de_x_fc1(z))
        x = self.activation(self.de_x_fc2(x))
        x = self.activation(self.de_x_fc3(x))
        x = self.de_x_fc4(x)

        # Apply softmax to produce normalized probability-like output
        return torch.softmax(x, dim=-1)


class pModel(nn.Module):
    def __init__(self):
        super(pModel, self).__init__()
        self.dropout_p = 0.0

        p_hidden_dim = [latent_dim, 128, 128, 128]  # Easy to modify

        # Define the layers explicitly
        self.e_fc1 = nn.Linear(latent_dim, p_hidden_dim[0], bias=False)
        self.e_relu1 = nn.ReLU()

        self.e_fc2 = nn.Linear(p_hidden_dim[0], p_hidden_dim[1], bias=False)
        self.e_relu2 = nn.ReLU()
        self.e_dropout2 = nn.Dropout(self.dropout_p)

        self.e_fc3 = nn.Linear(p_hidden_dim[1], p_hidden_dim[2], bias=False)
        self.e_relu3 = nn.ReLU()
        self.e_dropout3 = nn.Dropout(self.dropout_p)

        self.e_fc4 = nn.Linear(p_hidden_dim[2], p_hidden_dim[3], bias=False)
        self.e_relu4 = nn.ReLU()
        self.e_dropout4 = nn.Dropout(self.dropout_p)

        self.de_out = nn.Linear(p_hidden_dim[3], param_Dim, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, z):
        x = self.e_relu1(self.e_fc1(z))
        x = self.e_relu2(self.e_fc2(x))
        x = self.e_dropout2(x)
        x = self.e_relu3(self.e_fc3(x))
        x = self.e_dropout3(x)
        x = self.e_relu4(self.e_fc4(x))
        x = self.e_dropout4(x)
        x = self.de_out(x)
        x = self.sigmoid(x)
        return x


def kld_loss(mu, logvar):
    """
    Compute the Kullback-Leibler Divergence (KLD) loss between a Gaussian
    posterior with mean `mu` and log-variance `logvar`, and the standard normal prior.
    """
    # Apply the closed-form expression for KLD between two Gaussians
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return KLD


def weights_init(m):
    """
    Initialize weights of layers using Xavier uniform initialization.
    Bias terms (if present) are initialized to zero.
    """
    if isinstance(m, (nn.Linear, nn.Conv2d, nn.ConvTranspose2d)):
        # Initialize weights with Xavier uniform distribution
        torch.nn.init.xavier_uniform_(m.weight.data)
        # Zero-initialize biases if they exist
        if m.bias is not None:
            torch.nn.init.zeros_(m.bias)



In [None]:
def weighted_mse_loss(input, target, weight):
    return (weight * (input - target) ** 2).sum()


def properties_weighted_loss(p_pred, p):
    """
    Computes the sum of weighted MSE losses over all stiffness parameters.
    Currently, all weights are set to 1, but the structure allows future customization.
    """
    p_mse = 0.0
    weight = 1.0  # Uniform weight; can be customized per index below if needed

    for i in range(param_Dim):
        # Apply the same weight to all parameters for now.
        # Keep this structure if you plan to assign different weights later.
        p_mse += weighted_mse_loss(p_pred[:, i], p[:, i], weight)

    return p_mse


In [None]:
x_weight = 1
p_weight = np.ones([epochs])*100 # constant weight of stiffness prediction loss
model = vaeModel()
p_model = pModel()
model.to(device)
p_model.to(device)

In [None]:
def test(model, p_model, test_loader, saveResults, test_batch_size):
    # Set models to evaluation mode
    model.eval()
    p_model.eval()

    # Initialize test losses
    x_test_mse     = 0.0
    p_test_mse     = 0.0
    test_kld_loss  = 0.0

    # Containers for predictions and ground truths
    x_test = []
    x_pred = []
    p_test = []
    p_pred = []

    with torch.no_grad():
        for data in test_loader:
            X = data[0].to(device)
            Y = data[1].to(device)

            # Encode input
            encoded, mu, std = model.encoder(X)

            # Use either encoded vector or mean vector as input
            p_input = encoded if add_noise else mu

            # Predict properties and reconstruct input
            p_pred_batch = p_model(p_input)
            x_decoded    = model.decoder(encoded)

            # Compute reconstruction and KLD losses
            x_mse        = recon_criterion(x_decoded, X)
            test_kld     = kld_loss(mu, std)
            p_mse        = properties_weighted_loss(p_pred_batch, Y)

            # Accumulate losses
            x_test_mse  += x_mse.item()
            test_kld_loss += test_kld.item()
            p_test_mse  += p_mse.item()

            # Store batch results
            x_test.append(X.cpu().numpy())
            x_pred.append(x_decoded.cpu().numpy())
            p_test.append(Y.cpu().numpy())
            p_pred.append(p_pred_batch.cpu().numpy())

    # Merge all batches
    x_test = np.concatenate(x_test)
    x_pred = np.concatenate(x_pred)
    p_test = np.concatenate(p_test)
    p_pred = np.concatenate(p_pred)

    # Apply inverse transform on predicted and true property values
    x_test1 = x_test
    x_pred1 = x_pred
    p_test1 = scaler_y.inverse_transform(p_test)
    p_pred1 = scaler_y.inverse_transform(p_pred)

    # Save results if requested
    if saveResults:
        np.savetxt(ResultsFolder + '/x_test.csv',  x_test1, delimiter=",")
        np.savetxt(ResultsFolder + '/x_pred.csv',  x_pred1, delimiter=",")
        np.savetxt(ResultsFolder + '/p_test.csv',  p_test1, delimiter=",")
        np.savetxt(ResultsFolder + '/p_pred.csv',  p_pred1, delimiter=",")

    return x_test_mse, p_test_mse, test_kld_loss


In [None]:
def train(epoch, KLweight):
    # Set property model to training mode
    p_model.train()

    # Initialize loss trackers
    x_loss_mse = 0.0
    p_loss_mse = 0.0

    data    = train_prefetcher.next()
    n_batch = 0

    while data is not None:
        n_batch += 1
        if n_batch >= num_iters:
            break

        X = data[0].to(device)
        Y = data[1].to(device)

        data = train_prefetcher.next()

        # Set training or eval mode based on model loading condition
        if not load_pretrained_model:
            model.train()
            optimizer.zero_grad(set_to_none=True)
        else:
            model.eval()
            optimizer.zero_grad(set_to_none=True)

        # Encode input
        encoded, mu, std = model.encoder(X)
        p_input = encoded if add_noise else mu

        # Predict and decode
        p_pred    = p_model(p_input)
        x_decoded = model.decoder(encoded)

        # Compute losses
        x_train_mse = recon_criterion(x_decoded, X)
        train_kld   = kld_loss(mu, std)
        p_train_mse = properties_weighted_loss(p_pred, Y)

        # Final loss depending on pretrained flag
        if not load_pretrained_model:
            loss = train_kld + x_train_mse * x_weight + p_train_mse * p_weight[epoch]
        else:
            loss = p_train_mse

        # Backpropagation and optimizer step
        loss.backward()
        optimizer.step()

        # Track batch losses
        x_loss_mse += x_train_mse.item()
        p_loss_mse += p_train_mse.item()

    return x_loss_mse, p_loss_mse



In [None]:
# Convert numpy arrays to PyTorch tensors
XX = torch.tensor(XX, dtype=torch.float32)
YY = torch.tensor(YY, dtype=torch.float32)

In [None]:
# Enable anomaly detection for autograd debugging
torch.autograd.set_detect_anomaly(True)

# Initialize model weights
model.apply(weights_init)
p_model.apply(weights_init)

# Set model to evaluation mode before training loop starts
model.eval()
saveResults = None

# Prepare dataset and split into training and testing sets
dataset = TensorDataset(XX, YY)
num_train = len(dataset)
split = int(np.floor(valid_size * num_train))

train_dataset, test_dataset = random_split(dataset, [num_train - split, split])

# Create data loaders with pin_memory for better performance
train_loader = DataLoaderX(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
test_loader  = DataLoaderX(test_dataset, batch_size=test_batch_size, shuffle=True, pin_memory=True)
num_iters    = len(train_loader)

# Set optimizer depending on whether pre-trained model is used
if load_pretrained_model:
    optimizer = torch.optim.Adam(p_model.parameters(), lr=c_lr)
else:
    optimizer = torch.optim.Adam(list(model.parameters()) + list(p_model.parameters()), lr=learningRate)

# Learning rate scheduler
scheduler = StepLR(optimizer, step_size=20, gamma=0.5)

# Initialize tracking variables for best accuracy
best_p_accuracy      = 0.0
best_accuracy_file   = None

In [None]:
# Set number of training epochs
epochs = 200

# Initialize loss tracking matrix if not resuming from best model
if best_accuracy_file is None:
    epoch_loss = [[0 for _ in range(6)] for _ in range(epochs)]

    for epoch in trange(epochs):

        # Start timer
        start1 = time.time()

        # Prepare data prefetcher and KL divergence weight
        train_prefetcher = data_prefetcher(train_loader)
        KLweight = 1.0

        # Training step
        x_loss_mse, p_loss_mse = train(epoch, KLweight)

        # Update learning rate
        scheduler.step()

        # Enable result saving in final epoch
        if epoch == epochs - 1:
            saveResults = True

        # Evaluation step
        x_test_mse, p_test_mse, test_kld = test(model, p_model, test_loader, saveResults, test_batch_size)

        # Log metrics
        epoch_loss[epoch][0] = epoch
        epoch_loss[epoch][1] = x_test_mse
        epoch_loss[epoch][2] = p_test_mse
        epoch_loss[epoch][3] = x_loss_mse
        epoch_loss[epoch][4] = p_loss_mse
        epoch_loss[epoch][5] = optimizer.param_groups[0]['lr']

        # Print progress every 2 epochs or at last epoch
        if (epoch % 2 == 0) or (epoch == epochs - 1):
            print("Epoch:", '%03d' % (epoch + 1), '/', str(epochs),
                  ", x train =", "{:.5f}".format(x_loss_mse / len(train_loader) / batch_size),
                  ", x test =", "{:.5f}".format(x_test_mse / len(test_loader) / test_batch_size),
                  ", p train =", "{:.6f}".format(p_loss_mse / len(train_loader) / batch_size),
                  ", p test =", "{:.6f}".format(p_test_mse / len(test_loader) / test_batch_size),
                  ", KL weight =", "{:.5f}".format(KLweight),
                  ", time =", "{:.2f}".format(time.time() - start1))

            # Save best model based on property prediction loss
            current_p_accuracy = p_test_mse / len(test_loader) / test_batch_size
            if best_p_accuracy == 0.0 or current_p_accuracy < best_p_accuracy:
                print('updating best p accuracy: previous best = {:.6f} new best = {:.6f}'.format(
                    best_p_accuracy, current_p_accuracy))
                best_p_accuracy = current_p_accuracy
                torch.save(p_model.state_dict(), savedModelFolder + '/best_p_model.pt')
                torch.save(model.state_dict(), savedModelFolder + '/best_model.pt')

# Final model saving after training is done
model.cpu()
p_model.cpu()
model.eval()
p_model.eval()

torch.save(model.state_dict(), savedModelFolder + '/model.pt')
torch.save(p_model.state_dict(), savedModelFolder + '/p_model.pt')
