Import Packages

In [None]:
import numpy as np 
import os 
import sys
import pandas as pd
import pickle
from skimage.measure import regionprops
import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
from dgl.nn import GraphConv
from torch.optim import Adam
import optuna
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
import networkx as nx
from collections import defaultdict
from scipy.stats import gaussian_kde

Set Random Seeds

In [None]:
torch.manual_seed(1)
np.random.seed(1) 

Import Pore Graphs

In [None]:
RAG_3D=pickle.load(open(('Pore_Graph_for_H2.pkl'), 'rb'))

In [None]:
#Transfer 3D geometric pore graph (RAG_3D) to the reduced topological pore graph(RAG_2D)
RAG_2D=[]
for g in RAG_3D:
        nodes_2d=[]
        features_2d=[]
        cell_volume=[]
        radius=[]
        max_pbc_group = max(data["pbc_group"] for _, data in g.nodes(data=True))
        rag_2d=nx.MultiGraph()
        for node in g.nodes:
            node_2d=g.nodes[node]['pbc_group']

            if node_2d not in nodes_2d:
                nodes_2d.append(node_2d)
                rag_2d.add_nodes_from(nodes_2d)
                features_2d.append(g.nodes[node]['vdw_hist_y_pbc'])
                cell_volume.append(g.nodes[node]['cell_volume'])
                radius.append(g.nodes[node]['maxima_radii'])
                node_labels= {i: {'labels': attr} for i, attr in enumerate(nodes_2d)}
                node_eh= {i: {'eh': attr} for i, attr in enumerate(features_2d)}
                node_cell_volume= {i: {'cell_volume': attr} for i, attr in enumerate(cell_volume)}
                nx.set_node_attributes(rag_2d, node_cell_volume)
                nx.set_node_attributes(rag_2d, node_labels)
                nx.set_node_attributes(rag_2d, node_eh)

            else:
                if g.nodes[node]['maxima_radii']>radius[node_2d]:
                    radius[node_2d]=g.nodes[node]['maxima_radii']
                    
            node_radius= {i: {'radius': attr} for i, attr in enumerate(radius)}
            nx.set_node_attributes(rag_2d, node_radius)

        pixel_ratios_2d=[0]*(max_pbc_group+1) 
        for node in g.nodes:
            node_2d=g.nodes[node]['pbc_group']
            pixel_ratio=g.nodes[node]['pixel_ratio']
            pixel_ratios_2d[(node_2d)]=pixel_ratios_2d[(node_2d)]+pixel_ratio
            
        node_pixel_ratio= {i: {'pixel_ratio': attr} for i, attr in enumerate(pixel_ratios_2d)}
        nx.set_node_attributes(rag_2d, node_pixel_ratio)
            
        RAG_2D.append(rag_2d)     

In [None]:
#Devide the pore graphs into training, validation, and testing dataset
RAG_Training=RAG_2D[0:900]
RAG_Validation=RAG_2D[900:990]
RAG_Testing=RAG_2D[990:1990] 
print([len(RAG_Training),len(RAG_Validation),len(RAG_Testing)])

Process Graphs and Extract Features

In [None]:
# Convert graphs to DGL graphs and set node features
def Process_Graphs_Labels(RAGs):
  DGL_Graphs = []
  for g in RAGs:
    if g is not None:
        DGL_g = dgl.from_networkx(g)
        DGL_g = dgl.add_self_loop(DGL_g)  # Add self-loops
        DGL_g.ndata['feat'] = torch.tensor([data['eh'] for _, data in g.nodes(data=True)])
        DGL_g.ndata['maxima_radii'] = torch.tensor([data['radius'] for _, data in g.nodes(data=True)])
        DGL_g.ndata['cell_volume'] = torch.tensor([data['cell_volume'] for _, data in g.nodes(data=True)])
        DGL_g.ndata['pixel_ratio'] = torch.tensor([data['pixel_ratio'] for _, data in g.nodes(data=True)])
        DGL_Graphs.append(DGL_g)
  return DGL_Graphs

# Get DGL graphs and Labels
DGL_Graphs_Training=Process_Graphs_Labels(RAG_Training)
DGL_Graphs_Validation=Process_Graphs_Labels(RAG_Validation)
DGL_Graphs_Testing=Process_Graphs_Labels(RAG_Testing)

# Convert list of DGL graphs into a single batched graph
Batched_Graph_Training = dgl.batch(DGL_Graphs_Training)
Batched_Graph_Validation=dgl.batch(DGL_Graphs_Validation)
Batched_Graph_Testing = dgl.batch(DGL_Graphs_Testing)

# Extract node features for all graphs
Features_Training = Batched_Graph_Training.ndata['feat'].float()
Features_Validation = Batched_Graph_Validation.ndata['feat'].float()
Features_Testing = Batched_Graph_Testing.ndata['feat'].float()

Grab Cell Volumes, Pore Volume, and Pore Pixel Ratio

In [None]:
def Grab_Cell_Volume(RAGs):
    Cell_Volume=[]
    for i,g in enumerate(RAGs):
        vol=g.nodes[0]['cell_volume']
        Cell_Volume.append(vol)
    return torch.tensor(Cell_Volume).unsqueeze(1)

Cell_Volume_Training=Grab_Cell_Volume(RAG_Training)
Cell_Volume_Validation=Grab_Cell_Volume(RAG_Validation) 
Cell_Volume_Testing=Grab_Cell_Volume(RAG_Testing)

In [None]:
def Grab_Pore_Volume(RAGs):
    Pore_Volume=[]
    for i,g in enumerate(RAGs):
        for x in g.nodes:
            cell_vol=g.nodes[x]['cell_volume']
            pore_ratio=g.nodes[x]['pixel_ratio']
            pore_vol=cell_vol*pore_ratio
            Pore_Volume.append(pore_vol)
    return torch.tensor(Pore_Volume).unsqueeze(1)

Pore_Volume_Training=Grab_Pore_Volume(RAG_Training)
Pore_Volume_Validation=Grab_Pore_Volume(RAG_Validation)
Pore_Volume_Testing=Grab_Pore_Volume(RAG_Testing)

In [None]:
def Extract_Pixel_Ratio(RAGs):
 pixel_ratio_list=[]
 for i, G in enumerate(RAGs):
    for n in sorted(G.nodes):
        pixel_ratio_list.append(G.nodes[n]['pixel_ratio'])
 return pixel_ratio_list 

Pixel_Ratio_List_Training=torch.tensor(Extract_Pixel_Ratio(RAG_Training)).float().view(-1,1)
Pixel_Ratio_List_Validation=torch.tensor(Extract_Pixel_Ratio(RAG_Validation)).float().view(-1,1)
Pixel_Ratio_List_Testing=torch.tensor(Extract_Pixel_Ratio(RAG_Testing)).float().view(-1,1)

Import MOF-level and Pore-level Labels

In [None]:
Data_all=pd.read_excel('.../PoroNet/GCMC_output/160K_5bar-mof-gL-0609.xlsx').to_numpy()[:,1].reshape(-1,1)

Labels_Training_mof=Data_all[0:900]
Labels_Validation_mof=Data_all[900:990]
Labels_Testing_mof=Data_all[990:1990]

Labels_Training_mof = np.array([[float(x[0])] for x in Labels_Training_mof])
Labels_Validation_mof = np.array([[float(x[0])] for x in Labels_Validation_mof])
Labels_Testing_mof = np.array([[float(x[0])] for x in Labels_Testing_mof])

Labels_Training_mof=torch.tensor(Labels_Training_mof)
Labels_Validation_mof=torch.tensor(Labels_Validation_mof)
Labels_Testing_mof=torch.tensor(Labels_Testing_mof)

In [None]:
#Check the number of pores in the training, validation, and testing datasets
Batched_Graph_Training,Batched_Graph_Validation, Batched_Graph_Testing

In [None]:
Data_pore_all=pd.read_excel('.../PoroNet/GCMC_output/160K_5bar-pore-molecule-0609.xlsx').to_numpy()
Data_Training_pore_molecule=Data_pore_all[0:6078]
Data_Validation_pore_molecule=Data_pore_all[6078:6563]
Data_Testing_pore_molecule=Data_pore_all[6563:13343]

In [None]:
def Process_Labels(Data):
  Labels = []
  for data in Data:
        Labels.append(data)
  return Labels 

Labels_Training_pore_molecule=Process_Labels(Data_Training_pore_molecule)
Labels_Validation_pore_molecule=Process_Labels(Data_Validation_pore_molecule)
Labels_Testing_pore_molecule=Process_Labels(Data_Testing_pore_molecule)

Labels_Training_pore_molecule = torch.tensor(Labels_Training_pore_molecule).float()
Labels_Validation_pore_molecule = torch.tensor(Labels_Validation_pore_molecule).float()
Labels_Testing_pore_molecule=torch.tensor(Labels_Testing_pore_molecule).float()

Labels_Training_pore_gL=((Labels_Training_pore_molecule/(6.022E23))*2.01588)/Pore_Volume_Training
Labels_Validation_pore_gL=((Labels_Validation_pore_molecule/(6.022E23))*2.01588)/Pore_Volume_Validation
Labels_Testing_pore_gL=((Labels_Testing_pore_molecule/(6.022E23))*2.01588)/Pore_Volume_Testing


Define PoroNet

In [None]:
class PoroNet(nn.Module):
    def __init__(self,input_size,hidden_sizes,hidden_activations,dropout):
        super(PoroNet,self).__init__()
        layers=[]
        in_feat=input_size
         
        for i,hidden_size in enumerate(hidden_sizes):
            layers.append(nn.Linear(in_feat,hidden_size))
            hidden_activation=hidden_activations[i]
            if hidden_activation=='relu':
                layers.append(nn.ReLU())
            in_feat=hidden_size

        layers.append(nn.Dropout(p=dropout))
        layers.append(nn.Linear(in_feat,1))
        self.model = nn.Sequential(*layers)
         
         
    def forward(self,g,input_feature):
        x=self.model(input_feature)
        return x

Determine the Hyperparameters

In [None]:
#5 fold
cv=5

#maximum of training epoch
num_epoch=5000

In [None]:
#Determine the pore index of each MOF in the training set
rag_label_dict = defaultdict(list)
label_index=0
for i, rag in enumerate(RAG_Training):
    node_amounts=rag.number_of_nodes()
    label = Labels_Training_pore_gL[label_index:label_index+node_amounts] 
    label_index += node_amounts
    rag_label_dict[i].append(label)

In [None]:
def objective(trial):
    # Suggest values for the hyperparameters
    num_patience=trial.suggest_int('num_patience', 10, 2000)
    num_layers=trial.suggest_int('num_layers', 1, 3)
    hidden_sizes = [trial.suggest_int(f'hidden_size{i}', 16, 128) for i in range(num_layers)]
    hidden_activations=[trial.suggest_categorical(f'hidden_activation{i}',['relu','none'])for i in range(num_layers)]
    dropout = trial.suggest_uniform('dropout', 0.2, 0.4)
    initial_lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
    weight_decay = trial.suggest_loguniform('weight_decay', 1e-9, 1e-3)
    factor=trial.suggest_uniform('factor',0.1, 0.9)
    

    # K-Fold CV
    kf = KFold(n_splits=cv, shuffle=False)
    mae_list = []
    for fold, (training_index, validation_index) in enumerate(kf.split(range(len(RAG_Training)))):
    
     # Extract pore graphs and labels according to index
     RAG_training = [RAG_Training[i] for i in training_index]
     RAG_validation = [RAG_Training[i] for i in validation_index]
     Labels_training = [rag_label_dict[i] for i in training_index]
     Labels_validation = [rag_label_dict[i] for i in validation_index]
     
     Labels_training_mof=Labels_Training_mof[training_index]
     Labels_validation_mof=Labels_Training_mof[validation_index]
     
     # Obtain the dgl graphs
     DGL_Graphs_training = Process_Graphs_Labels(RAG_training)
     DGL_Graphs_validation = Process_Graphs_Labels(RAG_validation)
     
     #Revise the format of training labels
     squeezed_tensors_training = []
     for sublist in Labels_training:
       for tensor in sublist:
        squeezed_tensor = tensor.squeeze()  
        if squeezed_tensor.ndimension() == 0: 
            squeezed_tensor = squeezed_tensor.unsqueeze(0).unsqueeze(1)
        elif squeezed_tensor.ndimension() == 1: 
            squeezed_tensor = squeezed_tensor.unsqueeze(1)
        squeezed_tensors_training.append(squeezed_tensor)

     combined_tensor_training = torch.cat(squeezed_tensors_training, dim=0)
     Labels_training = combined_tensor_training

     #Revise the format of validation labels
     squeezed_tensors_validation = []
     for sublist in Labels_validation:
       for tensor in sublist:
        squeezed_tensor = tensor.squeeze()  
        if squeezed_tensor.ndimension() == 0:  
            squeezed_tensor = squeezed_tensor.unsqueeze(0).unsqueeze(1)
        elif squeezed_tensor.ndimension() == 1: 
            squeezed_tensor = squeezed_tensor.unsqueeze(1)
        squeezed_tensors_validation.append(squeezed_tensor)

     combined_tensor_validation = torch.cat(squeezed_tensors_validation, dim=0)
     Labels_validation = combined_tensor_validation

     # Convert list of DGL graphs into a single batched graph
     Batched_Graph_training = dgl.batch(DGL_Graphs_training)
     Batched_Graph_validation=dgl.batch(DGL_Graphs_validation)

     # Extract node features for all graphs
     Features_training = Batched_Graph_training.ndata['feat'].float()
     Features_validation = Batched_Graph_validation.ndata['feat'].float()
     
     Pixel_Ratio_List_training=torch.tensor(Extract_Pixel_Ratio(RAG_training)).float().view(-1,1)
     Pixel_Ratio_List_validation=torch.tensor(Extract_Pixel_Ratio(RAG_validation)).float().view(-1,1)
     
     Cell_Volume_training=Grab_Cell_Volume(RAG_training)
     Cell_Volume_validation=Grab_Cell_Volume(RAG_validation) 

     # Initialize the model and optimizer
     model = PoroNet(Features_training.size(1),hidden_sizes,hidden_activations,dropout)
     optimizer = torch.optim.Adam(model.parameters(), lr=initial_lr, weight_decay=weight_decay)
     scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=factor, patience=0.9* num_patience, verbose=True)
        
     # Initialize early stopping parameters
     best_val_loss = float('inf') 
     stop_patience = num_patience  
     counter = 0  
     early_stopped = False  
     best_epoch_loss = None      
     
     # Training loop
     for epoch in range(num_epoch):
         model.train()
         Embeddings_training_1 = model(Batched_Graph_training,Features_training)  # Forward propagation, get embeddings
         Embeddings_training_2 = Embeddings_training_1 * Pixel_Ratio_List_training
         Batched_Graph_training.ndata['h'] = Embeddings_training_2  # Assign embeddings to 'h' field
         Logits_training = dgl.sum_nodes(Batched_Graph_training, 'h')  # Sum the embeddings across nodes
         Loss_training_pore = F.l1_loss(Embeddings_training_1, Labels_training)  # Compute loss
         Loss_training_mof = F.l1_loss(Logits_training, Labels_training_mof)  # Compute loss
         Loss_training=(100*Loss_training_pore)+Loss_training_mof
         optimizer.zero_grad()
         Loss_training.backward()
         optimizer.step()
        
         # Set model to evaluation mode
         model.eval()  
         with torch.no_grad():
          Embeddings_validation_1 = model(Batched_Graph_validation,Features_validation) 
          Embeddings_validation_2 = Embeddings_validation_1 * Pixel_Ratio_List_validation
          Batched_Graph_validation.ndata['h'] = Embeddings_validation_2
          Logits_validation = dgl.sum_nodes(Batched_Graph_validation, 'h')  
          Loss_validation_pore = F.l1_loss(Embeddings_validation_1, Labels_validation)  
          Loss_validation_mof = F.l1_loss(Logits_validation, Labels_validation_mof) 
          Loss_validation=(100*Loss_validation_pore)+Loss_validation_mof
    
          scheduler.step(Loss_validation)
             
         # Early stopping
         if Loss_validation < best_val_loss:
          best_val_loss = Loss_validation
          best_epoch_loss = Loss_validation.item()
          counter = 0
         else:
            counter += 1
            
         if counter >= stop_patience:
            print(f'Early stopping at epoch {epoch+1}')
            early_stopped = True
            mae_list.append(best_epoch_loss)
            break
        
     if not early_stopped:  # If not early stopped, record the loss of the last epoch
            mae_list.append(Loss_validation.item())

    mae_average = np.mean(mae_list) 
    

    if trial.should_prune():
     raise optuna.exceptions.TrialPruned()
    mae_average=np.mean(mae_list)
    trial.set_user_attr('mae_list', mae_list)

    return mae_average

In [None]:
sampler = optuna.samplers.TPESampler(seed=1)
study = optuna.create_study(sampler=sampler,direction='minimize', pruner=optuna.pruners.HyperbandPruner())
study.optimize(objective, n_trials=100)
print('Best trial:')
trial = study.best_trial
print('  Trial Number: ', trial.number)
print('  Value: ', trial.value)
print('  Params: ')
for key, value in trial.params.items():
    print('    {}: {}'.format(key, value)) 
print('  Fold MAEs: ', trial.user_attrs['mae_list'])

Train the Model

In [None]:
#Determined optimal hyperparameters
num_patience=1070
model = PoroNet(Features_Testing.size(1),hidden_sizes=[19,123],hidden_activations=['none','relu'],dropout=0.33571087092969193)
optimizer = torch.optim.Adam(model.parameters(), lr=0.05905695348998198, weight_decay=3.3558614446106994e-08)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.79717142906145, patience=0.9*num_patience, verbose=True)

In [None]:
# Initialize early stopping parameters
best_val_loss = float('inf') 
stop_patience =num_patience # Number of epochs to wait for improvement
counter = 0  # Counter for patience  
# Initialize the loss of training and validation set
Loss_Training_List=[]
Loss_Validation_List=[]

# Training loop
for epoch in range(num_epoch):
        model.train()
        Embeddings_Training_1 = model(Batched_Graph_Training,Features_Training) # Forward propagation, get embeddings
        Embeddings_Training_2 = Embeddings_Training_1 * Pixel_Ratio_List_Training
        Batched_Graph_Training.ndata['h'] = Embeddings_Training_2  # Assign embeddings to 'h' field
        Logits_Training = dgl.sum_nodes(Batched_Graph_Training, 'h')  # Sum the embeddings across nodes
        Loss_Training_pore = F.l1_loss(Embeddings_Training_1, Labels_Training_pore_gL)  # Compute loss
        Loss_Training_mof = F.l1_loss(Logits_Training, Labels_Training_mof)  # Compute loss
        Loss_Training=(100*Loss_Training_pore)+Loss_Training_mof
        optimizer.zero_grad()
        Loss_Training_List.append(Loss_Training.item())
        Loss_Training.backward()
        optimizer.step()
        
    
        model.eval()  # Set model to evaluation mode
        with torch.no_grad():
         Embeddings_Validation_1 = model(Batched_Graph_Validation,Features_Validation)  # Forward propagation, get embeddings
         Embeddings_Validation_2 = Embeddings_Validation_1 * Pixel_Ratio_List_Validation
         Batched_Graph_Validation.ndata['h'] = Embeddings_Validation_2
         Logits_Validation = dgl.sum_nodes(Batched_Graph_Validation, 'h')
         Loss_Validation_pore = F.l1_loss(Embeddings_Validation_1, Labels_Validation_pore_gL)
         Loss_Validation_mof = F.l1_loss(Logits_Validation, Labels_Validation_mof)
         Loss_Validation=(100*Loss_Validation_pore)+Loss_Validation_mof
         Loss_Validation_List.append(Loss_Validation.item())
         scheduler.step(Loss_Validation)
        # Early stopping
        if Loss_Validation < best_val_loss:
         best_val_loss = Loss_Validation
         counter = 0
        else:
         counter += 1
        if counter >= stop_patience:
            print(f'Early stopping at epoch {epoch+1}')
            break
       
          

Test the model at the MOF level

In [None]:
model.eval()  
with torch.no_grad():
    Predictions_Testing_pore = model(Batched_Graph_Testing,Features_Testing)  
    Predictions_Testing_pore_2 = Predictions_Testing_pore * Pixel_Ratio_List_Testing
    Batched_Graph_Testing.ndata['h'] = Predictions_Testing_pore_2  
    Predictions_Testing_mof = dgl.sum_nodes(Batched_Graph_Testing, 'h')
    

In [None]:
#Calculate R² and MAE at the MOF level
r2_mof = r2_score(Labels_Testing_mof, Predictions_Testing_mof)
Loss_Testing_mof = F.l1_loss(Predictions_Testing_mof, Labels_Testing_mof)

In [None]:
r2_mof,Loss_Testing_mof

Test the model at the pore level

In [None]:
#Select pores with a diameter bigger than 3 Å(radius > 1.5 Å)
def Screen_Big_Pore(Batch_Graph):
    Big_Pore=[]
    for i in range(Batch_Graph.num_nodes()):
            pore_radius = Batch_Graph.ndata['maxima_radii'][i].item()
            if pore_radius>1.5:
                Big_Pore.append(i)
    return Big_Pore

In [None]:
Big_Pore_Testing=Screen_Big_Pore(Batched_Graph_Testing)

In [None]:
Predictions_Testing_big_pore=Predictions_Testing_pore[Big_Pore_Testing]
Labels_Testing_big_pore=Labels_Testing_pore_gL[Big_Pore_Testing]

In [None]:
#Calculate R² and MAE at the pore level
r2_big_pore = r2_score(Predictions_Testing_big_pore, Labels_Testing_big_pore)
Loss_Testing_big_pore=F.l1_loss(Predictions_Testing_big_pore, Labels_Testing_big_pore)

In [None]:
r2_big_pore,Loss_Testing_big_pore