In [1]:
from rdkit import Chem
import numpy as np
from tqdm import tqdm
import torch
from torch_geometric.loader import DataLoader as GeometricDataLoader
from torch_geometric.data import Data
from torch.utils.data import Subset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# find unique atom within all molecules

In [2]:
def find_unique_elements_and_atomic_numbers(smiles_series):
    atomic_numbers = {}

    for smiles in smiles_series:
        mol = Chem.MolFromSmiles(smiles)
        if not mol: 
            continue
        
        mol_with_h = Chem.AddHs(mol)
        
        for atom in mol_with_h.GetAtoms():
            element = atom.GetSymbol()
            atomic_numbers[element] = atom.GetAtomicNum()

    return atomic_numbers

In [3]:
def one_hot_encode(val, categories):
    return [int(val == category) for category in categories]

In [4]:
def get_atom_number(smiles_series):

    atomic_numbers = find_unique_elements_and_atomic_numbers(smiles_series)
    atom_numbers = list(atomic_numbers.values())
    return atom_numbers

# get atom features

In [5]:
def get_atom_features(atom,atom_numbers):
    atom_type = atom.GetAtomicNum()
    atom_degree = atom.GetDegree()
    atom_implicit_valence = atom.GetImplicitValence()
    atom_aromatic = int(atom.GetIsAromatic())
    atom_hybridization = atom.GetHybridization()
    total_num_hs = atom.GetTotalNumHs()  
    
    atom_type_encoded = one_hot_encode(atom_type,atom_numbers)  
    hybridization_types = [Chem.rdchem.HybridizationType.SP, Chem.rdchem.HybridizationType.SP2, Chem.rdchem.HybridizationType.SP3]
    hybridization_encoded = one_hot_encode(atom_hybridization, hybridization_types)
    
    return np.array([atom_degree, atom_implicit_valence, atom_aromatic, total_num_hs] + atom_type_encoded + hybridization_encoded, dtype=np.float32)


# get bond 2D Topological information

In [6]:
def get_bond_2d_features(bond):
    bond_type = bond.GetBondType()
    bond_conjugated = int(bond.GetIsConjugated())
    bond_in_ring = int(bond.IsInRing())
    
    bond_types = [Chem.rdchem.BondType.SINGLE, Chem.rdchem.BondType.DOUBLE, Chem.rdchem.BondType.TRIPLE, Chem.rdchem.BondType.AROMATIC]
    bond_type_encoded = one_hot_encode(bond_type, bond_types)
    
    return np.array([bond_conjugated, bond_in_ring] + bond_type_encoded, dtype=np.float32)

# Integrate 2D graph data information

In [7]:
def preprocess_2d_graph_data(smiles_list, labels, atom_numbers):
    graph_data_2d = []  

    label_map = {'Negative': 0, 'Positive': 1}

    for smiles, label in tqdm(zip(smiles_list, labels), total=len(smiles_list), desc="get graph_data"):
        mol = Chem.MolFromSmiles(smiles)
        if not mol:
            continue
        
        mol_with_h = Chem.AddHs(mol)
        atoms_features = [get_atom_features(atom,atom_numbers) for atom in mol_with_h.GetAtoms()]
                        
        edge_index_list = []
        edge_attr_list = []
        
        for bond in mol_with_h.GetBonds():
            start, end = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()
            bond_features = get_bond_2d_features(bond)
            edge_attr_list.append(bond_features)
            edge_index_list.append([start, end])
        
        edge_index_array = np.array(edge_index_list).T  
        edge_attr_array = np.array(edge_attr_list, dtype=np.float32)
            
        graph_data_2d.append({
            'nodes_features': np.array(atoms_features),
            'edge_index': edge_index_array,
            'edge_attr': edge_attr_array,
            'label': label_map[label]  
        })

    return graph_data_2d

# preprocess for 2D 3D graph data

In [8]:
import joblib
def preprocess_scaler(graph_data, mode, Preprocess, scale_path):
    if mode == 'train':
        if Preprocess == 'standardize':
            node_scaler = StandardScaler()
            edge_scaler = StandardScaler()
        elif Preprocess == 'normalize':
            node_scaler = MinMaxScaler()
            edge_scaler = MinMaxScaler()

        all_node_features = np.concatenate([item['nodes_features'] for item in graph_data], axis=0)
        all_edge_features = np.concatenate([item['edge_attr'] for item in graph_data], axis=0)

        node_scaler.fit(all_node_features)
        edge_scaler.fit(all_edge_features)
        joblib.dump((node_scaler, edge_scaler), scale_path)
    else:
        node_scaler, edge_scaler = joblib.load(scale_path)
    return node_scaler, edge_scaler

# create 2D 3D graph data list

In [9]:
def get_torch_graph_data_list(graph_data, mode, Preprocess, scale_path):
    if Preprocess:
        Pre_node, Pre_edge = preprocess_scaler(graph_data, mode, Preprocess, scale_path)

    torch_graph_data_list = []
    for item in graph_data:
        x = np.array(item['nodes_features'])
        edge_attr = np.array(item['edge_attr'])
        if Preprocess is not None:
            x = Pre_node.transform(x)
            edge_attr = Pre_edge.transform(edge_attr)
        x = torch.tensor(x, dtype=torch.float)
        edge_attr = torch.tensor(edge_attr, dtype=torch.float)

        edge_index = torch.tensor(item['edge_index'], dtype=torch.long)
        y = torch.tensor([item['label']], dtype=torch.long)

        data_object = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y)
        torch_graph_data_list.append(data_object)
    return torch_graph_data_list

# load data

In [10]:
def load_graph_data(torch_graph_data_list,batch_size):
    graph_data_train, graph_data_test = train_test_split(torch_graph_data_list, test_size=0.2, random_state=42)
    train_loader = GeometricDataLoader(graph_data_train, batch_size=batch_size, shuffle=True)
    test_loader = GeometricDataLoader(graph_data_test, batch_size=batch_size, shuffle=False)
    return train_loader,test_loader

In [11]:
def load_graph_data_10fold_cv(torch_graph_data_list, train_idx, test_idx, batch_size):
    train_dataset = Subset(torch_graph_data_list, train_idx)
    test_dataset = Subset(torch_graph_data_list, test_idx)
    train_loader = GeometricDataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = GeometricDataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader
