In [None]:
import os  
import sys  
sys.path.insert(0, "/home/jifengwang/data/25_polymer_database/AI_prediction/GAT")
sys.path.insert(0, "/home/jifengwang/data/25_polymer_database/AI_prediction/GCN")
sys.path.insert(0, "/home/jifengwang/data/25_polymer_database/AI_prediction/MLP")
sys.path.insert(0, "/home/jifengwang/data/25_polymer_database/AI_prediction/PolyBERT")
sys.path.insert(0, "/home/jifengwang/data/25_polymer_database/AI_prediction/Spherenet") 

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GATConv, global_mean_pool
from torch_geometric.loader import DataLoader  
from torch_geometric.nn import global_mean_pool, global_add_pool  
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from GATModel import GAT
from GCNModel import GCN
from MLPModel import MLP
from MLP_regression_head_model_without_QCdescriptor import MLPRegression
from sentence_transformers import SentenceTransformer
import pandas as pd
from Spherenet import SphereNet

In [None]:
# plt.style.use("~/style.mplstyle")

In [None]:

def set_seed(seed):
    import random
    random.seed(seed)  
    np.random.seed(seed)  
    torch.manual_seed(seed)  
    torch.cuda.manual_seed(seed)  
    torch.backends.cudnn.deterministic = True 
    torch.backends.cudnn.benchmark = False

In [None]:
import torch
from torch_geometric.loader import DataLoader  
from torch_geometric.data import Batch

def load_data_list(file_path):
    """
    从指定文件路径加载数据列表。

    参数：
    file_path (str): 数据列表文件的路径。

    返回：
    list: 加载的数据列表。
    """
    return torch.load(file_path)

def data_loader(property, N=None):
    if not N:
        property_load_file_path = f'experiment_dataset/{property}_data_list.pt' # 总数据集
        property_load_train_dataset_path = f'experiment_dataset/train_{property}_data_list.pt' # 训练集
        property_load_val_dataset_path = f'experiment_dataset/val_{property}_data_list.pt' # 验证集
        property_load_test_dataset_path = f'experiment_dataset/test_{property}_data_list.pt' # 测试集
    else:
        property_load_file_path = f'experiment_dataset_N={N}/{property}_data_list.pt' # 总数据集
        property_load_train_dataset_path = f'experiment_dataset_N={N}/train_{property}_data_list.pt' # 训练集
        property_load_val_dataset_path = f'experiment_dataset_N={N}/val_{property}_data_list.pt' # 验证集
        property_load_test_dataset_path = f'experiment_dataset_N={N}/test_{property}_data_list.pt' # 测试集


    property_loaded_data_list = load_data_list(property_load_file_path)
    property_loaded_train_data_list = load_data_list(property_load_train_dataset_path)
    property_loaded_val_data_list = load_data_list(property_load_val_dataset_path)
    property_loaded_test_data_list = load_data_list(property_load_test_dataset_path)


    property_train_loader = DataLoader(property_loaded_train_data_list, batch_size=32, shuffle=True, drop_last=True)  
    property_val_loader = DataLoader(property_loaded_val_data_list, batch_size=32, shuffle=False)  
    property_test_loader = DataLoader(property_loaded_test_data_list, batch_size=32, shuffle=False)  

    return property_train_loader, property_val_loader, property_test_loader

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import math

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def training(loader, model, loss_fn, optimizer):

    model.train() 
    total_loss = 0  
    
    for data in loader: 
        data = data.to(device) 
        optimizer.zero_grad()
        data.x = data.x.float() 
        out = model(data) 
        loss = loss_fn(out, data.y.view(-1, 1)) 
        loss.backward()  
        optimizer.step()  
        total_loss += loss.item() 
        
    average_loss = total_loss / len(loader) 

    
    return average_loss 

def validation(loader, model, loss_fn):

    model.eval()  
    total_loss = 0 
    
    with torch.no_grad(): 
        for data in loader: 
            data = data.to(device)
            data.x = data.x.float() 
            out = model(data)  
            loss = loss_fn(out, data.y.view(-1, 1))
            total_loss += loss.item()
            
    average_loss = total_loss / len(loader)  
    
    return average_loss  

def train_epochs(epochs, model, train_loader, val_loader, test_loader, path):

    initial_lr = 5e-4

    optimizer = torch.optim.Adam(model.parameters(), initial_lr, weight_decay=5e-4)  
    loss_fn = torch.nn.MSELoss()  

    train_losses = np.zeros(epochs)  
    val_losses = np.zeros(epochs) 
    best_loss = float('inf') 

    patience_counter = 0     
    max_patience = 20        

    current_lr = initial_lr

    for epoch in range(epochs):
        train_loss = training(train_loader, model, loss_fn, optimizer)
        val_loss = validation(val_loader, model, loss_fn)

        train_losses[epoch] = train_loss
        val_losses[epoch] = val_loss

        #print(f"Epoch: {epoch}, Train loss: {train_loss:.4f}, Val loss: {val_loss:.4f}")

        if val_loss < best_loss:
            best_loss = val_loss
            torch.save(model.state_dict(), path)
            #print("Model has been saved!")
            patience_counter = 0
        else:
            patience_counter += 1
            #print(f"Patience counter: {patience_counter}")
            if patience_counter >= max_patience:
                if current_lr > initial_lr / 1000:
                    current_lr /= 10
                    for param_group in optimizer.param_groups:
                        param_group['lr'] = current_lr
                    #print(f"Learning rate reduced to: {current_lr}")
                    patience_counter = 0
                else:
                    #print(f"Early stopping triggered. Learning rate has reached its minimum threshold ({current_lr}).")
                    break

    model.load_state_dict(torch.load(path)) 
    model.to(device)
    model.eval()

    total_loss = 0
    test_pred = []
    test_true = []
    loss_fn = torch.nn.MSELoss() 

    for data in test_loader:
        data = data.to(device)
        data.x = data.x.float()
        out = model(data)
        loss = loss_fn(out, data.y.view(-1, 1)) 
        total_loss += loss.item()
        test_pred.append(out.detach().cpu().numpy())
        test_true.append(data.y.cpu().numpy())

    test_pred = np.concatenate(test_pred, axis=0)
    test_true = np.concatenate(test_true, axis=0)
    average_loss = total_loss / len(test_loader)

    test_pred_flat = test_pred.flatten()  
    test_true_flat = test_true.flatten() 
    test_mse = mean_squared_error(test_true_flat, test_pred_flat)
    test_mae = mean_absolute_error(test_true_flat, test_pred_flat)
    test_r2 = r2_score(test_true_flat, test_pred_flat)

    print(f"Test Loss: {average_loss:.4f}, MSE: {test_mse:.4f}, MAE: {test_mae:.4f}, R2: {test_r2:.4f}")
    
    return train_losses, val_losses, test_pred, test_true, test_mse, test_mae, test_r2 



In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import math

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

local_model_path = "/home/jifengwang/data/25_polymer_database/AI_prediction/PolyBERT/PolyBERT"
polyBERT = SentenceTransformer(local_model_path, local_files_only=True, device=device)

def training_PolyBERT_unimol2(loader, model, loss_fn, optimizer, model_type):

    model.train() 
    total_loss = 0 
    for data in loader: 
        data = data.to(device) 
        optimizer.zero_grad() 
        if model_type == "PolyBERT":
            data_embedings = polyBERT.encode(data.psmiles) 
            data_embedings_tensor = torch.tensor(data_embedings, dtype=torch.float32).to(device) 
        elif model_type == "unimol2":
            data_embedings_tensor = torch.tensor(data.unimol, dtype=torch.float32).to(device) 
        out = model(data_embedings_tensor).to(device)  
        loss = loss_fn(out, data.y.view(-1, 1))
        loss.backward()  
        optimizer.step()  
        total_loss += loss.item() 
    average_loss = total_loss / len(loader) 
    return average_loss  

def validation_PolyBERT_unimol2(loader, model, loss_fn, model_type):
    model.eval() 
    total_loss = 0 
    with torch.no_grad():  
        for data in loader:
            data = data.to(device)  
            if model_type == "PolyBERT":
                data_embedings = polyBERT.encode(data.psmiles) 
                data_embedings_tensor = torch.tensor(data_embedings, dtype=torch.float32).to(device) 
            elif model_type == "unimol2":
                data_embedings_tensor = torch.tensor(data.unimol, dtype=torch.float32).to(device) 
            out = model(data_embedings_tensor).to(device)  
            loss = loss_fn(out, data.y.view(-1, 1))
            total_loss += loss.item()  
    average_loss = total_loss / len(loader)  
    return average_loss  

def train_epochs_PolyBERT_unimol2(epochs, model, train_loader, val_loader, test_loader, path, model_type):
    initial_lr = 5e-4
    optimizer = torch.optim.Adam(model.parameters(), lr=initial_lr, weight_decay=5e-4) 
    loss_fn = torch.nn.MSELoss() 

    train_losses = np.zeros(epochs) 
    val_losses = np.zeros(epochs) 
    best_loss = float('inf')

    patience_counter = 0      
    max_patience = 20          
    current_lr = initial_lr

    for epoch in range(epochs):
        train_loss = training_PolyBERT_unimol2(train_loader, model, loss_fn, optimizer, model_type)
        val_loss = validation_PolyBERT_unimol2(val_loader, model, loss_fn, model_type)
        
        train_losses[epoch] = train_loss
        val_losses[epoch] = val_loss

        if val_loss < best_loss:
            best_loss = val_loss
            torch.save(model.state_dict(), path)
            #print("Model has been saved!")
            patience_counter = 0
        else:
            patience_counter += 1
            #print(f"Patience counter: {patience_counter}")
            if patience_counter >= max_patience:
                if current_lr > initial_lr / 1000:
                    current_lr /= 10
                    for param_group in optimizer.param_groups:
                        param_group['lr'] = current_lr
                    #print(f"Learning rate reduced to: {current_lr}")
                    patience_counter = 0
                else:
                    #print(f"Early stopping triggered. Learning rate has reached its minimum threshold ({current_lr}).")
                    break

    model.load_state_dict(torch.load(path))
    model.to(device)
    model.eval()

    total_loss = 0
    test_pred = []
    test_true = []
    loss_fn = torch.nn.MSELoss() 

    # 在测试集上进行评估
    for data in test_loader:
        data = data.to(device)
        if model_type == "PolyBERT":
            data_embedings = polyBERT.encode(data.psmiles) 
            data_embedings_tensor = torch.tensor(data_embedings, dtype=torch.float32).to(device)
        elif model_type == "unimol2":
            data_embedings_tensor = torch.tensor(data.unimol, dtype=torch.float32).to(device) 
        out = model(data_embedings_tensor).to(device)
        loss = loss_fn(out, data.y.view(-1, 1))
        total_loss += loss.item()
        test_pred.append(out.detach().cpu().numpy())
        test_true.append(data.y.cpu().numpy())

    test_pred = np.concatenate(test_pred, axis=0)
    test_true = np.concatenate(test_true, axis=0)
    average_loss = total_loss / len(test_loader)

    test_pred_flat = test_pred.flatten()  
    test_true_flat = test_true.flatten() 
    test_mse = mean_squared_error(test_true_flat, test_pred_flat)
    test_mae = mean_absolute_error(test_true_flat, test_pred_flat)
    test_r2 = r2_score(test_true_flat, test_pred_flat)

    print(f"Test Loss: {average_loss:.4f}, MSE: {test_mse:.4f}, MAE: {test_mae:.4f}, R2: {test_r2:.4f}")
    
    return train_losses, val_losses, test_pred, test_true, test_mse, test_mae, test_r2 

In [None]:
def batch_train_model(
    target_property_col_list,
    model_type_list,
    N=None,
    epochs=500,
    seed=42
) -> pd.DataFrame:

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    set_seed(seed)

    results = []
    for model_type in model_type_list:
        epochs_eff = epochs

        for prop in target_property_col_list:
            print(f"Training {model_type} on property {prop}")

            if model_type == "GAT":
                model = GAT(in_channels=5, hidden_channels=256).to(device)
            elif model_type == "GCN":
                model = GCN(hidden_dim=128).to(device)
            elif model_type == "MLP":
                model = MLP(input_size=2048, hidden_sizes=[256,64], output_size=1).to(device)
            elif model_type == "polyBERT" or model_type == "unimol2":
                model = MLPRegression().to(device)
            elif model_type == "Spherenet":
                model = SphereNet(use_extra_node_feature=False).to(device) 

            else:
                raise ValueError(f"Unknown model type: {model_type}")

            train_loader, val_loader, test_loader = data_loader(prop, N)

            if not N:
                model_path = f"model/{prop}_{model_type}.pt"
            else:
                model_path = f"model_N={N}/{prop}_{model_type}.pt"
            
            try:
                if model_type == "polyBERT" or model_type == "unimol2":
                    _, _, _, _, mse, mae, r2 = train_epochs_PolyBERT_unimol2(
                        epochs_eff, model,
                        train_loader, val_loader, test_loader,
                        model_path, model_type
                    )
                else:
                    _, _, _, _, mse, mae, r2 = train_epochs(
                        epochs_eff, model,
                        train_loader, val_loader, test_loader,
                        model_path
                    )
            except Exception as e:
                print(f"⚠️  {model_type} on {prop} failed: {e}")
                mse = mae = r2 = np.nan

            results.append({
                "property": prop,
                "model_type": model_type,
                "test_mse": mse,
                "test_mae": mae,
                "test_r2": r2
            })

    return pd.DataFrame(results)

In [None]:

target_property_col_list = ['Bandgap', 'CO2_Permeability',
       'Compressive_Strength', 'Crystallization_Temperature',
       'Elongation_at_Break', 'Flexural_Strength', 'Tg', 'H2_Permeability',
       'Hardness', 'Impact_Strength', 'Ion_Exchange_Capacity',
       'Limiting_Oxygen_Index', 'Lower_Critical_Solution_Temperature', 'Tm',
       'Methanol_Permeability', 'O2_Permeability', 'Refractive_Index',
       'Swelling_Degree', 'Thermal_Conductivity', 'Tensile_Strength', 'Td',
       'Upper_Critical_Solution_Temperature', 'Water_Contact_Angle',
       'Water_Uptake', 'Youngs_Modulus', 'Dielectric_Constant_Total'
       ]
'''
target_property_col_list = ['Tg']
'''
# model_type_list = ["GCN", "GAT", "MLP", "polyBERT", "Spherenet", "unimol2"]
model_type_list = ["Spherenet"]

In [None]:
#df_test_performance_summary = batch_train_model(target_property_col_list, model_type_list, epochs=500, seed=42)
#df_test_performance_summary.to_csv("deep_learning_model_test_performance_summary.csv", index=None)

In [None]:
df_test_performance_summary_N2 = batch_train_model(target_property_col_list, model_type_list, N=2, epochs=500, seed=42)
df_test_performance_summary_N2.to_csv("spherenet_deep_learning_model_test_performance_summary_N=2.csv", index=None)
df_test_performance_summary_N4 = batch_train_model(target_property_col_list, model_type_list, N=4, epochs=500, seed=42)
df_test_performance_summary_N4.to_csv("spherenet_deep_learning_model_test_performance_summary_N=4.csv", index=None)

In [None]:
df_test_performance_summary_N1 = batch_train_model(target_property_col_list, model_type_list, epochs=500, seed=42)
df_test_performance_summary_N1.to_csv("unimol2_model_test_performance_summary.csv", index=None) # spherenet_
df_test_performance_summary_N2 = batch_train_model(target_property_col_list, model_type_list, N=2, epochs=500, seed=42)
df_test_performance_summary_N2.to_csv("unimol2_model_test_performance_summary_N=2.csv", index=None)
df_test_performance_summary_N4 = batch_train_model(target_property_col_list, model_type_list, N=4, epochs=500, seed=42)
df_test_performance_summary_N4.to_csv("unimol2_model_test_performance_summary_N=4.csv", index=None)

In [None]:
df_test_performance_summary_N1 = batch_train_model(target_property_col_list, model_type_list, epochs=500, seed=42)
df_test_performance_summary_N1.to_csv("spherenet_model_test_performance_summary.csv", index=None) # spherenet_
df_test_performance_summary_N2 = batch_train_model(target_property_col_list, model_type_list, N=2, epochs=500, seed=42)
df_test_performance_summary_N2.to_csv("spherenet_model_test_performance_summary_N=2.csv", index=None)
df_test_performance_summary_N4 = batch_train_model(target_property_col_list, model_type_list, N=4, epochs=500, seed=42)
df_test_performance_summary_N4.to_csv("spherenet_model_test_performance_summary_N=4.csv", index=None)