In [211]:
import pandas as pd
import json
import numpy as np

from rdkit import Chem
from rdkit.Chem import rdchem, AllChem, MACCSkeys

from openbabel import pybel
from torch_geometric.data import Data

from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, mean_absolute_error, mean_squared_error, r2_score
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Batch
from torch_geometric.nn import global_max_pool

In [212]:
class GNNLayerSimple(nn.Module):
    def __init__(self, node_dim, edge_dim, hidden_dim):
        super().__init__()
        self.node_mlp = nn.Sequential(
            nn.Linear(node_dim + edge_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, node_dim)
        )

        self.edge_mlp = nn.Sequential(
            nn.Linear(2 * node_dim + edge_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, edge_dim),
        )

    def forward(self, h, edge_index, edge_attr):
        row, col = edge_index
        node_feat = torch.cat([h[row], edge_attr], dim=1)
        delta_h = self.node_mlp(node_feat)

        edge_feat = torch.cat([h[row], h[col], edge_attr], dim=1)
        delta_edge = self.edge_mlp(edge_feat)

        h = h + torch.zeros_like(h).scatter_add(0, row.unsqueeze(1).expand_as(delta_h), delta_h)
        edge_attr = edge_attr + delta_edge
        
        return h, edge_attr


class AttentionPooling(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.q = nn.Parameter(torch.randn(1, dim))  
        self.k_proj = nn.Linear(dim, dim)
        self.v_proj = nn.Linear(dim, dim)
        self.scale = dim ** -0.5

    def forward(self, h):
        K = self.k_proj(h)  
        V = self.v_proj(h)  
        Q = self.q

        scores = (Q @ K.T) * self.scale  
        attn = F.softmax(scores, dim=-1) 
        out = attn @ V  

        return out  

class GNNFingerprintSimple(nn.Module):
    def __init__(self, node_dim, edge_dim, hidden_dim=256, num_layers=6, out_dim=1024):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.egnn_layers = nn.ModuleList([
            GNNLayerSimple(node_dim, edge_dim, hidden_dim) for _ in range(num_layers)
        ])

        self.projection_head = nn.Sequential(
            nn.Linear(node_dim, hidden_dim),
            nn.ReLU(),
            nn.LayerNorm(hidden_dim),
            nn.Linear(hidden_dim, out_dim),
        )
        
        self.attn_pool = AttentionPooling(node_dim)

    def forward(self, data):
        t = False
        if isinstance(data, list):
            for graph in data:
                if graph.edge_index.shape[0] != 2:
                    graph.edge_index = graph.edge_index.t()
            data = Batch.from_data_list(data)
            t = True
        else:
            if data.edge_index.shape[0] != 2:
                data.edge_index = data.edge_index.t()

        h, edge_index, edge_attr = data.x, data.edge_index, data.edge_attr

        for layer in self.egnn_layers:
            h, edge_attr = layer(h, edge_index, edge_attr)

        if t:
            batch_size = data.num_graphs
            pooled = torch.stack([ 
                self.attn_pool(h[data.batch == i]) for i in range(batch_size)
            ])
            pooled = pooled.squeeze(1)
        else:
            pooled = self.attn_pool(h)
        
        return self.projection_head(pooled)

In [213]:
def get_edge_features(mol):
    """Extracts bond features, including distances and angles."""
    edges, distances = [], []
    axes = {"X": np.array([1, 0, 0]), "Y": np.array([0, 1, 0]), "Z": np.array([0, 0, 1])}

    conf = mol.GetConformer()

    for bond in mol.GetBonds():
        i, j = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()
        pos_i, pos_j = np.array(conf.GetAtomPosition(i)), np.array(conf.GetAtomPosition(j))
        vector_ij, norm_vector_ij = pos_j - pos_i, np.linalg.norm(pos_j - pos_i)
        
        if norm_vector_ij == 0:
            continue  # Avoid division by zero
        
        distances.append(norm_vector_ij)
        angles = {f"angle_{axis}": np.degrees(np.arccos(np.clip(np.dot(vector_ij, axis_vec) / norm_vector_ij, -1.0, 1.0))) for axis, axis_vec in axes.items()}
        
        bond_order = {
            rdchem.BondType.SINGLE: 0.33,
            rdchem.BondType.DOUBLE: 0.66,
            rdchem.BondType.TRIPLE: 1.0,
            rdchem.BondType.AROMATIC: 0.5
        }.get(bond.GetBondType(), 0)
        
        edges.append([i, j, bond_order, norm_vector_ij, angles["angle_X"], angles["angle_Y"], angles["angle_Z"]])
    
    edges = np.array(edges)
    edges[:, 4:] /= 180  # Normalize angles
    
    return edges

def add_conformer_to_mol(mol, df):
    conf = Chem.Conformer(mol.GetNumAtoms())

    for i, row in df.iterrows():
        conf.SetAtomPosition(i, (row['x'], row['y'], row['z']))
    
    mol.AddConformer(conf, assignId=True)
    return mol

In [214]:
with open("data/means_and_stds.json") as f:
    scaler = json.load(f)

def normalize_df(df, scaler):
    df["x"] = (df["x"] - scaler["x_mean"]) / scaler["x_std"]
    df["y"] = (df["y"] - scaler["y_mean"]) / scaler["y_std"]
    df["z"] = (df["z"] - scaler["z_mean"]) / scaler["z_std"]
    return df

def preprocess_df(df, positions):
    """Preprocesses atomic and positional data for a molecule."""
    
    # Define atomic number mappings
    atom_options = {6: "C", 7: "N", 8: "O", 9: "F"}
    
    # One-hot encoding for atomic numbers
    for key, label in atom_options.items():
        df[label] = (df["atomic_num"] == key).astype(int)
    df.drop(columns=["atomic_num"], inplace=True)
    
    # Normalize features
    features = {"num_bonds": 4, "hybridization": 4, "aromatic": 1, "chirality": 2, "valence": 4, "in_ring": 1}
    for feature, val in features.items():
        df[feature] = df[feature].clip(0, val) / val
        
    # Process positional data
    positions = normalize_df(positions, scaler).reset_index()
    positions_df = pd.DataFrame(positions, columns=["x", "y", "z"])
    df = df.join(positions_df)
    
    return df, positions_df

def get_atom_features(atom):
    """Extracts atomic features from an RDKit atom object."""
    return {
        "atomic_num": atom.GetAtomicNum(),
        "num_bonds": len(atom.GetBonds()),
        "hybridization": int(atom.GetHybridization()),
        "aromatic": int(atom.GetIsAromatic()),
        "chirality": int(atom.GetChiralTag()),
        "valence": atom.GetTotalValence(),
        "in_ring": int(atom.IsInRing()),
    }

In [215]:
def get_atoms_coordinates(smiles):
    mol = pybel.readstring("smiles", smiles)
    mol.make3D()

    atom_coordinates = []
    for atom in mol.atoms:
        atom_symbol = atom.type[0]  
        atom_coords = atom.coords
        atom_coordinates.append([atom_symbol, atom_coords[0], atom_coords[1], atom_coords[2]])

    return atom_coordinates

In [216]:
def craete_pos_df(atoms):
    arr = []
    for i,atom in enumerate(atoms):
        if atom[0] != "H":
            res = {}
            res["x"] = atom[1]
            res["y"] = atom[2]
            res["z"] = atom[3]
            arr.append(res)
    return pd.DataFrame(arr)

def create_df_from_mol(smiles):
    mol = Chem.MolFromSmiles(smiles)
    atoms = get_atoms_coordinates(smiles)
    node_features = [get_atom_features(atom) for atom in mol.GetAtoms()]
    atoms = craete_pos_df(atoms)
    nodes, positions = preprocess_df(pd.DataFrame(node_features), atoms)
    mol = add_conformer_to_mol(mol, positions)
    edges = get_edge_features(mol)
    return  mol, nodes, edges

In [217]:
def read_graph(nodes, edges):
    x = torch.tensor(nodes.to_numpy(), dtype=torch.float).cuda()

    edge_index = []
    edge_attr = []
    for edge in edges:
        edge_index.append([edge[0], edge[1]])
        edge_index.append([edge[1], edge[0]])

        edge_attr.append(edge[2:])
        edge_attr.append(edge[2:])
        
    edge_index = torch.tensor(edge_index, dtype=torch.long).cuda()
    edge_attr = torch.tensor(edge_attr, dtype=torch.float).cuda()

    return Data(x=x, edge_index=edge_index, edge_attr=edge_attr)

In [218]:
final_model = GNNFingerprintSimple(13, 5, 256)
final_model.load_state_dict(torch.load('models/FINAL_GNN_NE_SMALLER.pth'))
final_model.eval()

  final_model.load_state_dict(torch.load('models/FINAL_GNN_NE_SMALLER.pth'))


GNNFingerprintSimple(
  (egnn_layers): ModuleList(
    (0-5): 6 x GNNLayerSimple(
      (node_mlp): Sequential(
        (0): Linear(in_features=18, out_features=256, bias=True)
        (1): ReLU()
        (2): Linear(in_features=256, out_features=512, bias=True)
        (3): ReLU()
        (4): Linear(in_features=512, out_features=13, bias=True)
      )
      (edge_mlp): Sequential(
        (0): Linear(in_features=31, out_features=256, bias=True)
        (1): ReLU()
        (2): Linear(in_features=256, out_features=5, bias=True)
      )
    )
  )
  (projection_head): Sequential(
    (0): Linear(in_features=13, out_features=256, bias=True)
    (1): ReLU()
    (2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
    (3): Linear(in_features=256, out_features=1024, bias=True)
  )
  (attn_pool): AttentionPooling(
    (k_proj): Linear(in_features=13, out_features=13, bias=True)
    (v_proj): Linear(in_features=13, out_features=13, bias=True)
  )
)

In [219]:
def filter_and_split_data_r(df, ki_threshold=1000):
    df = df.copy()
    df = df[df['Standard Value'] < ki_threshold]
    df = df[['Smiles', 'Standard Value']].dropna()

    X_train, X_test, y_train, y_test = train_test_split(
        df['Smiles'], df['Standard Value'], test_size=0.2, random_state=42
    )

    return X_train.reset_index(drop=True), X_test.reset_index(drop=True), y_train.reset_index(drop=True), y_test.reset_index(drop=True)

In [220]:
def filter_and_split_data_c(df, ki_threshold=1000):
    df = df.copy()
    df['Activity'] = df['Standard Value'].apply(lambda x: 1 if x < ki_threshold else 0)

    df = df[['Smiles', 'Activity']].dropna()

    X_train, X_test, y_train, y_test = train_test_split(
        df['Smiles'], df['Activity'], test_size=0.2, random_state=42
    )

    return X_train.reset_index(drop=True), X_test.reset_index(drop=True), y_train.reset_index(drop=True), y_test.reset_index(drop=True)

In [221]:
def smiles_to_ecfp(smiles, radius=2, nbits=1024):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return np.zeros(nbits)
    fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits=nbits)
    arr = np.zeros((1,))
    AllChem.DataStructs.ConvertToNumpyArray(fp, arr)
    return arr

In [222]:
def smiles_to_maccs(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return np.zeros(167)
    fp = MACCSkeys.GenMACCSKeys(mol)
    arr = np.zeros((167,), dtype=int)
    Chem.DataStructs.ConvertToNumpyArray(fp, arr)
    return arr

In [223]:
def smiles_to_rdkit_fp(smiles, nbits=2048):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return np.zeros(nbits)
    fp = Chem.RDKFingerprint(mol, fpSize=nbits)
    arr = np.zeros((nbits,), dtype=int)
    Chem.DataStructs.ConvertToNumpyArray(fp, arr)
    return arr

In [224]:
def fingerprint_3d(smiles, model):
    _, nodes, edges = create_df_from_mol(smiles)
    record = read_graph(nodes, edges).to("cuda")

    return model(record)

In [225]:
data = pd.read_csv(r"data\CHEMBL\CHEMBL224_5HT2A.csv", sep=";")

In [226]:
X_train, X_test, y_train, y_test = filter_and_split_data_r(data)

In [231]:
def get_score(X_train, y_train, X_test, y_test, task="c"):
    s_scaler = StandardScaler()
    X_train = s_scaler.fit_transform(X_train)
    X_test = s_scaler.transform(X_test)

    if task == "c":
        model = LogisticRegression(random_state=42)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        print("Classification Report:")
        print(classification_report(y_test, y_pred))

        print("Confusion Matrix:")
        print(confusion_matrix(y_test, y_pred))
    else:
        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        print("Regression Metrics:")
        print(f"MAE:  {mean_absolute_error(y_test, y_pred):.4f}")
        print(f"R²:   {r2_score(y_test, y_pred):.4f}")

In [228]:
X_train_ecfp = np.array([smiles_to_ecfp(smiles) for smiles in X_train])
X_test_ecfp = np.array([smiles_to_ecfp(smiles) for smiles in X_test])

X_train_maccs = np.array([smiles_to_maccs(smiles) for smiles in X_train])
X_test_maccs = np.array([smiles_to_maccs(smiles) for smiles in X_test])

X_train_rdk = np.array([smiles_to_rdkit_fp(smiles) for smiles in X_train])
X_test_rdk = np.array([smiles_to_rdkit_fp(smiles) for smiles in X_test])

#### 3D FINGERPRINT ####
fingerprint_model = GNNFingerprintSimple(13, 5, 256)
fingerprint_model.load_state_dict(torch.load(r"models\FINAL_GNN_NE_SMALLER.pth"))
fingerprint_model = fingerprint_model.to("cuda")
fingerprint_model.eval()

X_train_3D = np.array([fingerprint_3d(smiles, fingerprint_model).detach().cpu() for smiles in X_train]).squeeze(1)
X_test_3D = np.array([fingerprint_3d(smiles, fingerprint_model).detach().cpu() for smiles in X_test]).squeeze(1)

rows_all_nan_train = np.isnan(X_train_3D).all(axis=1)
nan_indices_train = np.where(rows_all_nan_train)[0]

rows_all_nan_test = np.isnan(X_test_3D).all(axis=1)
nan_indices_test = np.where(rows_all_nan_test)[0]

X_train_3D = np.delete(X_train_3D, nan_indices_train, axis=0)
y_train_3D = np.delete(y_train, nan_indices_train, axis=0)
X_test_3D = np.delete(X_test_3D, nan_indices_test, axis=0)
y_test_3D = np.delete(y_test, nan_indices_test, axis=0)

  fingerprint_model.load_state_dict(torch.load(r"models\FINAL_GNN_NE_SMALLER.pth"))


In [232]:
##ECFP

In [233]:
get_score(X_train_ecfp, y_train, X_test_ecfp, y_test, "r")

Regression Metrics:
MAE:  141.6878
R²:   0.1061


In [234]:
##MACCS

In [235]:
get_score(X_train_maccs, y_train, X_test_maccs, y_test, "r")

Regression Metrics:
MAE:  133.0207
R²:   0.2341


In [236]:
##RDKIT

In [237]:
get_score(X_train_rdk, y_train, X_test_rdk, y_test, "r")

Regression Metrics:
MAE:  288.3204
R²:   -4.6206


In [238]:
##3D

In [239]:
get_score(X_train_3D, y_train_3D, X_test_3D, y_test_3D, "r")

Regression Metrics:
MAE:  153.9368
R²:   0.0472
