In [None]:
import os  
import sys 
sys.path.insert(0, "/data/jupyternotebook_public/jifengwang/17_HTQC_analysis/AI_prediction_properties") 

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GATConv, global_mean_pool
from torch_geometric.loader import DataLoader  
from torch_geometric.nn import global_mean_pool, global_add_pool  
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from GATModel import GAT
from GCNModel import GCN
from MLPModel import MLP

In [None]:
plt.style.use("style.mplstyle")

In [None]:
def set_seed(seed):
    import random
    random.seed(seed)  
    np.random.seed(seed) 
    torch.manual_seed(seed) 
    torch.cuda.manual_seed(seed) 
    torch.backends.cudnn.deterministic = True 
    torch.backends.cudnn.benchmark = False 

set_seed(42) 

In [None]:
import torch

def load_data_list(file_path):
    return torch.load(file_path)

############## Tm ##############
Tm_load_file_path = '../dataset/Tm_data_list.pt' 
Tm_load_train_dataset_path = '../dataset/train_Tm_data_list.pt' 
Tm_load_val_dataset_path = '../dataset/val_Tm_data_list.pt' 
Tm_load_test_dataset_path = '../dataset/test_Tm_data_list.pt' 


Tm_loaded_data_list = load_data_list(Tm_load_file_path)
Tm_loaded_train_data_list = load_data_list(Tm_load_train_dataset_path)
Tm_loaded_val_data_list = load_data_list(Tm_load_val_dataset_path)
Tm_loaded_test_data_list = load_data_list(Tm_load_test_dataset_path)

Tm_train_loader = DataLoader(Tm_loaded_train_data_list, batch_size=32, shuffle=True)  
Tm_val_loader = DataLoader(Tm_loaded_val_data_list, batch_size=32, shuffle=False)  
Tm_test_loader = DataLoader(Tm_loaded_test_data_list, batch_size=32, shuffle=False) 


############## conductivity ##############
conductivity_load_file_path = 'conductivity_data_list.pt'
conductivity_load_train_dataset_path = 'train_conductivity_data_list.pt' 
conductivity_load_val_dataset_path = 'val_conductivity_data_list.pt'
conductivity_load_test_dataset_path = 'test_conductivity_data_list.pt' 

conductivity_loaded_data_list = load_data_list(conductivity_load_file_path)
conductivity_loaded_train_data_list = load_data_list(conductivity_load_train_dataset_path)
conductivity_loaded_val_data_list = load_data_list(conductivity_load_val_dataset_path)
conductivity_loaded_test_data_list = load_data_list(conductivity_load_test_dataset_path)


conductivity_train_loader = DataLoader(conductivity_loaded_train_data_list, batch_size=32, shuffle=True)  
conductivity_val_loader = DataLoader(conductivity_loaded_val_data_list, batch_size=32, shuffle=False)  
conductivity_test_loader = DataLoader(conductivity_loaded_test_data_list, batch_size=32, shuffle=False)  

######################ECW######################
IL_ECW_save_total_dataset_path = '../dataset/IL_ECW_data_list.pt' 
IL_ECW_save_train_dataset_path = '../dataset/train_IL_ECW_data_list.pt' 
IL_ECW_save_val_dataset_path = '../dataset/val_IL_ECW_data_list.pt'
IL_ECW_save_test_dataset_path = '../dataset/test_IL_ECW_data_list.pt'

IL_ECW_loaded_data_list = load_data_list(IL_ECW_save_total_dataset_path)
IL_ECW_loaded_train_data_list = load_data_list(IL_ECW_save_train_dataset_path)
IL_ECW_loaded_val_data_list = load_data_list(IL_ECW_save_val_dataset_path)
IL_ECW_loaded_test_data_list = load_data_list(IL_ECW_save_test_dataset_path)

IL_ECW_train_loader = DataLoader(IL_ECW_loaded_train_data_list, batch_size=32, shuffle=True)  
IL_ECW_val_loader = DataLoader(IL_ECW_loaded_val_data_list, batch_size=32, shuffle=False)  
IL_ECW_test_loader = DataLoader(IL_ECW_loaded_test_data_list, batch_size=32, shuffle=False)  

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import math

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def training(index, loader, model, loss_fn, optimizer, scale=1):

    model.train() 
    total_loss = 0  
    
    for data in loader: 
        data = data.to(device) 
        optimizer.zero_grad()  
        data.x = data.x.float()  
        out = model(data)  
        loss = loss_fn(out, scale*data.y.view(-1, 1)) 
        loss.backward()  
        optimizer.step()  
        total_loss += loss.item() 
        
    average_loss = total_loss / len(loader)  

    return average_loss  

def validation(index, loader, model, loss_fn, scale=1):
    model.eval()  
    total_loss = 0  
    
    with torch.no_grad(): 
        for data in loader: 
            data = data.to(device) 
            data.x = data.x.float()  
            out = model(data) 
            loss = loss_fn(out, scale*data.y.view(-1, 1)) 
            total_loss += loss.item() 
            
    average_loss = total_loss / len(loader)
    
    return average_loss 

def train_epochs(index, epochs, model, train_loader, val_loader, test_loader, path, scale=1):

    initial_lr = 5e-4

    optimizer = torch.optim.Adam(model.parameters(), initial_lr, weight_decay=5e-4)  
    loss_fn = torch.nn.MSELoss()  

    train_losses = np.zeros(epochs) 
    val_losses = np.zeros(epochs) 
    best_loss = float('inf') 

    patience_counter = 0      
    max_patience = epochs/10         

    current_lr = initial_lr

    for epoch in range(epochs):
        train_loss = training(index, train_loader, model, loss_fn, optimizer, scale)
        val_loss = validation(index, val_loader, model, loss_fn, scale)
        
        train_losses[epoch] = train_loss
        val_losses[epoch] = val_loss

        print(f"Epoch: {epoch}, Train loss: {train_loss:.4f}, Val loss: {val_loss:.4f}")

        if val_loss < best_loss:
            best_loss = val_loss
            torch.save(model.state_dict(), path)
            print("Model has been saved!")
            patience_counter = 0
        else:
            patience_counter += 1
            print(f"Patience counter: {patience_counter}")
            if patience_counter >= max_patience:
                if current_lr > initial_lr / 1000:
                    current_lr /= 10
                    for param_group in optimizer.param_groups:
                        param_group['lr'] = current_lr
                    print(f"Learning rate reduced to: {current_lr}")
                    patience_counter = 0
                else:
                    print(f"Early stopping triggered. Learning rate has reached its minimum threshold ({current_lr}).")
                    break

    model.load_state_dict(torch.load(path)) 
    model.to(device)
    model.eval()

    total_loss = 0
    test_pred = []
    test_true = []
    loss_fn = torch.nn.MSELoss() 

    for data in test_loader:
        data = data.to(device)
        data.x = data.x.float()
        out = model(data)
        loss = loss_fn(out, scale*data.y.view(-1, 1))  
        total_loss += loss.item()
        test_pred.append(out.detach().cpu().numpy())
        test_true.append(scale*data.y.cpu().numpy())

    test_pred = np.concatenate(test_pred, axis=0)
    test_true = np.concatenate(test_true, axis=0)
    average_loss = total_loss / len(test_loader)

    test_pred_flat = test_pred.flatten()  
    test_true_flat = test_true.flatten() 
    test_mse = mean_squared_error(test_true_flat, test_pred_flat)
    test_mae = mean_absolute_error(test_true_flat, test_pred_flat)
    test_r2 = r2_score(test_true_flat, test_pred_flat)

    print(f"Test Loss: {average_loss:.4f}, MSE: {test_mse:.4f}, MAE: {test_mae:.4f}, R2: {test_r2:.4f}")
    
    return train_losses, val_losses, test_pred, test_true, test_mse, test_mae, test_r2 


In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import math

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def training_mlp(index, loader, model, loss_fn, optimizer, scale=10, input_type="fp"):

    model.train() 
    total_loss = 0  
    for data in loader: 
        data = data.to(device)  
        optimizer.zero_grad() 
        
        if input_type=="fp":
            data.morgan_fp = data.morgan_fp.float() 
        elif input_type=="2Ddescriptors":
            data.moldescriptor = data.moldescriptor.float() 
            
        out = model(data) 
        loss = loss_fn(out, scale*data.y.view(-1, 1)) 
        loss.backward() 
        optimizer.step() 
        total_loss += loss.item()
    average_loss = total_loss / len(loader)
    return average_loss

def validation_mlp(index, loader, model, loss_fn, scale=10, input_type="fp"):

    model.eval() 
    total_loss = 0  
    with torch.no_grad():  
        for data in loader:  
            data = data.to(device) 
            
            if input_type=="fp":
                data.morgan_fp = data.morgan_fp.float()  
            elif input_type=="2Ddescriptors":
                data.moldescriptor = data.moldescriptor.float() 

            out = model(data) 
            loss = loss_fn(out, scale*data.y.view(-1, 1))
            total_loss += loss.item() 
    average_loss = total_loss / len(loader) 
    return average_loss 

def train_epochs_mlp(index, epochs, model, train_loader, val_loader, test_loader, path, scale=10, input_type="fp"):

    initial_lr = 5e-4
    optimizer = torch.optim.Adam(model.parameters(), lr=initial_lr, weight_decay=5e-4)
    loss_fn = torch.nn.MSELoss()

    train_losses = np.zeros(epochs) 
    val_losses = np.zeros(epochs) 
    best_loss = float('inf') 

    patience_counter = 0     
    max_patience = epochs/10         
    current_lr = initial_lr

    for epoch in range(epochs):
        train_loss = training_mlp(index, train_loader, model, loss_fn, optimizer, scale, input_type="fp")
        val_loss = validation_mlp(index, val_loader, model, loss_fn, scale, input_type="fp")
        
        train_losses[epoch] = train_loss
        val_losses[epoch] = val_loss
        
        print(f"Epoch: {epoch}, Train loss: {train_loss:.4f}, Val loss: {val_loss:.4f}")

        if val_loss < best_loss:
            best_loss = val_loss
            torch.save(model.state_dict(), path)
            print("Model has been saved!")
            patience_counter = 0
        else:
            patience_counter += 1
            print(f"Patience counter: {patience_counter}")
            if patience_counter >= max_patience:
                if current_lr > initial_lr / 1000:
                    current_lr /= 10
                    for param_group in optimizer.param_groups:
                        param_group['lr'] = current_lr
                    print(f"Learning rate reduced to: {current_lr}")
                    patience_counter = 0
                else:
                    print(f"Early stopping triggered. Learning rate has reached its minimum threshold ({current_lr}).")
                    break

    model.load_state_dict(torch.load(path)) 
    model.to(device)
    model.eval()

    total_loss = 0
    test_pred = []
    test_true = []
    loss_fn = torch.nn.MSELoss() 

    for data in test_loader:
        data = data.to(device)
        if input_type=="fp":
            data.morgan_fp = data.morgan_fp.float()
        elif input_type=="2Ddescriptors":
            data.moldescriptor = data.moldescriptor.float() 
        out = model(data)
        loss = loss_fn(out, scale*data.y.view(-1, 1)) 
        total_loss += loss.item()
        test_pred.append(out.detach().cpu().numpy())
        test_true.append(scale*data.y.cpu().numpy())

    test_pred = np.concatenate(test_pred, axis=0)
    test_true = np.concatenate(test_true, axis=0)
    average_loss = total_loss / len(test_loader)

    test_pred_flat = test_pred.flatten()  
    test_true_flat = test_true.flatten()  
    test_mse = mean_squared_error(test_true_flat, test_pred_flat)
    test_mae = mean_absolute_error(test_true_flat, test_pred_flat)
    test_r2 = r2_score(test_true_flat, test_pred_flat)

    print(f"Test Loss: {average_loss:.4f}, MSE: {test_mse:.4f}, MAE: {test_mae:.4f}, R2: {test_r2:.4f}")
    
    return train_losses, val_losses, test_pred, test_true, test_mse, test_mae, test_r2 



In [None]:
# GAT
model_Tm_GAT = GAT(in_channels=5,hidden_channels=256).to(device)  
model_conductivity_GAT = GAT(in_channels=5,hidden_channels=256).to(device)  
model_anion_ox_GAT = GAT(in_channels=5,hidden_channels=256).to(device)  
model_cation_red_GAT = GAT(in_channels=5,hidden_channels=256).to(device)  
model_IL_ECW_GAT = GAT(in_channels=5,hidden_channels=256).to(device) 
# GCN
model_Tm_GCN = GCN(hidden_dim=128).to(device)
model_conductivity_GCN = GCN(hidden_dim=128).to(device) 
model_anion_ox_GCN = GCN(hidden_dim=128).to(device) 
model_cation_red_GCN = GCN(hidden_dim=128).to(device) 
model_IL_ECW_GCN = GCN(hidden_dim=128).to(device) 
# MLP
input_size = 2048
hidden_sizes = [256, 64]
output_size = 1
epochs = 200
model_Tm_MLP = MLP(input_size=input_size, hidden_sizes=hidden_sizes, output_size=output_size).to(device) 
model_conductivity_MLP = MLP(input_size=input_size, hidden_sizes=hidden_sizes, output_size=output_size).to(device) 
model_anion_ox_MLP = MLP(input_size=input_size, hidden_sizes=hidden_sizes, output_size=output_size).to(device) 
model_cation_red_MLP = MLP(input_size=input_size, hidden_sizes=hidden_sizes, output_size=output_size).to(device) 
model_IL_ECW_MLP = MLP(input_size=input_size, hidden_sizes=hidden_sizes, output_size=output_size).to(device) 

# GAT Model

In [None]:
epochs = 200

Tm_train_losses_gat, Tm_val_losses_gat, Tm_test_pred_gat, Tm_test_true_gat, Tm_test_mse_gat, Tm_test_mae_gat, Tm_test_r2_gat = train_epochs(
    0, epochs, model_Tm_GAT, Tm_train_loader, Tm_val_loader, Tm_test_loader, "Tm_GAT_model.pt"
)

conductivity_train_losses_gat, conductivity_val_losses_gat, conductivity_test_pred_gat, conductivity_test_true_gat, conductivity_test_mse_gat, conductivity_test_mae_gat, conductivity_test_r2_gat = train_epochs(
    0, epochs, model_conductivity_GAT, conductivity_train_loader, conductivity_val_loader, conductivity_test_loader, "conductivity_GAT_model.pt", scale=10
)

# GCN Model

In [None]:
epochs = 200

Tm_train_losses_gcn, Tm_val_losses_gcn, Tm_test_pred_gcn, Tm_test_true_gcn, Tm_test_mse_gcn, Tm_test_mae_gcn, Tm_test_r2_gcn = train_epochs(
    0, epochs, model_Tm_GCN, Tm_train_loader, Tm_val_loader, Tm_test_loader, "Tm_GCN_model.pt"
)

conductivity_train_losses_gcn, conductivity_val_losses_gcn, conductivity_test_pred_gcn, conductivity_test_true_gcn, conductivity_test_mse_gcn, conductivity_test_mae_gcn, conductivity_test_r2_gcn = train_epochs(
    0, epochs, model_conductivity_GCN, conductivity_train_loader, conductivity_val_loader, conductivity_test_loader, "conductivity_GCN_model.pt", scale=10
)

# MLP Model

In [None]:
epochs = 200

Tm_train_losses_mlp, Tm_val_losses_mlp, Tm_test_pred_mlp, Tm_test_true_mlp, Tm_test_mse_mlp, Tm_test_mae_mlp, Tm_test_r2_mlp = train_epochs_mlp(
    0, epochs, model_Tm_MLP, Tm_train_loader, Tm_val_loader, Tm_test_loader, "Tm_MLP_model.pt", input_type="fp"
)

conductivity_train_losses_mlp, conductivity_val_losses_mlp, conductivity_test_pred_mlp, conductivity_test_true_mlp, conductivity_test_mse_mlp, conductivity_test_mae_mlp, conductivity_test_r2_mlp = train_epochs_mlp(
    0, epochs, model_conductivity_MLP, conductivity_train_loader, conductivity_val_loader, conductivity_test_loader, "conductivity_MLP_model.pt", scale=10, input_type="fp"
)