In [6]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import GATConv, global_mean_pool
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from rdkit import Chem
from rdkit.Chem import rdmolops
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
from torch_geometric.loader import DataLoader

In [4]:
# ------- 1. import data 
df_train = pd.read_excel('TrainingSet.xlsx', sheet_name=1, header=1)
df_train = df_train.loc[:,['DTXSID','Canonical_QSARr','LD50_mgkg','EPA_category','GHS_category']]
print(df_train.isna().sum())

df_test = pd.read_excel('EvaluationSet.xlsx', sheet_name=1, header=1)
df_test = df_test.loc[:,['DTXSID','Canonical_QSARr','LD50_mgkg','EPA_category','GHS_category']]
print(df_test.isna().sum())

# --(1) train data
## response: LD50 
train_LD50 = df_train.loc[:,['Canonical_QSARr', 'LD50_mgkg']]
train_LD50 = train_LD50.dropna()
print(train_LD50.info())

## response: CHS_category 
train_GHS = df_train.loc[:,['Canonical_QSARr', 'GHS_category']]
train_GHS = train_GHS.dropna()
print(train_GHS.info())

# --(2) test data
## response: LD50 
test_LD50 = df_test.loc[:,['Canonical_QSARr', 'LD50_mgkg']]
test_LD50 = test_LD50.dropna()
print(test_LD50.info())

## response: CHS_category 
test_GHS = df_test.loc[:,['Canonical_QSARr', 'GHS_category']]
test_GHS = test_GHS.dropna()
print(test_GHS.info())

DTXSID             1912
Canonical_QSARr       0
LD50_mgkg          2260
EPA_category        104
GHS_category         34
dtype: int64
DTXSID             619
Canonical_QSARr      0
LD50_mgkg          753
EPA_category        32
GHS_category        11
dtype: int64
<class 'pandas.core.frame.DataFrame'>
Index: 6734 entries, 2082 to 8993
Data columns (total 2 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Canonical_QSARr  6734 non-null   object 
 1   LD50_mgkg        6734 non-null   float64
dtypes: float64(1), object(1)
memory usage: 157.8+ KB
None
<class 'pandas.core.frame.DataFrame'>
Index: 8960 entries, 1 to 8993
Data columns (total 2 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Canonical_QSARr  8960 non-null   object 
 1   GHS_category     8960 non-null   float64
dtypes: float64(1), object(1)
memory usage: 210.0+ KB
None
<class 'pandas.core.frame.DataFrame'>
Index: 224

In [9]:
# ------- 2. model  

# -- (1) GNN

## data preprocess
def smiles_to_graph(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if not mol:
        return None
    
    # 原子特征（10维）
    atom_features = []
    for atom in mol.GetAtoms():
        feature = [
            atom.GetAtomicNum(),          # 原子序数
            atom.GetDegree(),             # 连接度
            atom.GetImplicitValence(),    # 隐式价
            int(atom.GetIsAromatic()),    # 芳香性
            atom.GetFormalCharge(),       # 形式电荷
            atom.GetNumRadicalElectrons(), # 自由基电子
            atom.GetHybridization().real, # 杂化类型
            atom.GetTotalNumHs(),         # 连接氢数
            int(atom.IsInRing()),         # 是否在环中
            atom.GetChiralTag().real      # 手性
        ]
        atom_features.append(feature)
    
    # 边索引（化学键）
    edge_index = []
    for bond in mol.GetBonds():
        i = bond.GetBeginAtomIdx()
        j = bond.GetEndAtomIdx()
        edge_index.extend([[i, j], [j, i]])  # 无向图
    
    # 边特征（3维）
    edge_attr = []
    for bond in mol.GetBonds():
        bond_features = [
            bond.GetBondTypeAsDouble(),   # 键类型
            int(bond.IsInRing()),         # 是否在环中
            int(bond.GetIsConjugated())   # 是否共轭
        ]
        edge_attr.extend([bond_features, bond_features])
    
    return Data(
        x=torch.tensor(atom_features, dtype=torch.float),
        edge_index=torch.tensor(edge_index, dtype=torch.long).t().contiguous(),
        edge_attr=torch.tensor(edge_attr, dtype=torch.float)
    )

# test 
# smiles = "CCO"  
# graph = smiles_to_graph(smiles)
# print(graph)   
    
## model
class GNNModel(nn.Module):
    def __init__(self, num_node_features=10, hidden_dim=128, output_dim=1):
        super().__init__()
        # 图注意力层
        self.conv1 = GATConv(num_node_features, hidden_dim, heads=3)
        self.conv2 = GATConv(hidden_dim*3, hidden_dim)  # 注意heads=3的输出维度是hidden_dim*3
        self.lin = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        
        # 1. 图卷积
        x = F.leaky_relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.5, training=self.training)
        x = F.leaky_relu(self.conv2(x, edge_index))
        
        # 2. 全局池化
        x = global_mean_pool(x, batch)
        
        # 3. 回归输出
        return self.lin(x).squeeze(-1)


# 初始化模型
model = GNNModel(num_node_features=10, hidden_dim=128)
print(model)

graphs = []
labels = []
for smiles, ld50 in zip(train_LD50['Canonical_QSARr'], train_LD50['LD50_mgkg']):
    g = smiles_to_graph(smiles)
    if g is not None:
        graphs.append(g)
        labels.append(ld50)
labels = torch.tensor(labels, dtype=torch.float)


# 划分训练集/测试集
train_graphs, test_graphs, train_labels, test_labels = train_test_split(
    graphs, labels, test_size=0.2, random_state=42
)

# 创建DataLoader
train_loader = DataLoader(
    list(zip(train_graphs, train_labels)),
    batch_size=32,
    shuffle=True
)

test_loader = DataLoader(
    list(zip(test_graphs, test_labels)),
    batch_size=32
)

# 训练配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
# criterion = nn.HuberLoss()  # 对异常值鲁棒的损失函数
criterion = nn.MSELoss()  # 回归任务用均方误差

os.makedirs("results", exist_ok=True)
best_mae = float('inf')

for epoch in range(100):
    
    # 训练
    model.train()
    train_loss = 0
    for data in train_loader:
        graph, label = data
        graph, label = graph.to(device), label.to(device)
        
        optimizer.zero_grad()
        pred = model(graph)
        loss = criterion(pred, label)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    # 测试
    model.eval()
    preds, truths = [], []
    with torch.no_grad():
        for data in test_loader:
            graph, label = data
            graph = graph.to(device)
            preds.extend(model(graph).cpu().tolist())
            truths.extend(label.tolist())
    
    # 计算指标
    mae = mean_absolute_error(truths, preds)
    r2 = r2_score(truths, preds)
    
    # 保存最佳模型
    if mae < best_mae:
        best_mae = mae
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'metrics': {'mae': mae, 'r2': r2}
        }, "results/best_model.pth")
    
    # 打印进度
    print(f"Epoch {epoch:03d} | "
          f"Train Loss: {train_loss/len(train_loader):.4f} | "
          f"Test MAE: {mae:.4f} | "
          f"R²: {r2:.4f}")

GNNModel(
  (conv1): GATConv(10, 128, heads=3)
  (conv2): GATConv(384, 128, heads=1)
  (lin): Linear(in_features=128, out_features=1, bias=True)
)


[17:28:20] Explicit valence for atom # 1 Si, 5, is greater than permitted


Epoch 000 | Train Loss: 19864507.1257 | Test MAE: 2327.3761 | R²: -0.0145
Epoch 001 | Train Loss: 17174114.4364 | Test MAE: 2337.4070 | R²: -0.0025
Epoch 002 | Train Loss: 17080467.8757 | Test MAE: 2316.0731 | R²: -0.0031
Epoch 003 | Train Loss: 17008167.9046 | Test MAE: 2347.4375 | R²: 0.0004
Epoch 004 | Train Loss: 17037111.5192 | Test MAE: 2372.4215 | R²: 0.0023
Epoch 005 | Train Loss: 16992522.5636 | Test MAE: 2325.8087 | R²: -0.0002
Epoch 006 | Train Loss: 16984820.3447 | Test MAE: 2337.5240 | R²: 0.0020
Epoch 007 | Train Loss: 16957552.3047 | Test MAE: 2360.9991 | R²: 0.0071
Epoch 008 | Train Loss: 17037846.3743 | Test MAE: 2319.4877 | R²: 0.0049
Epoch 009 | Train Loss: 16958678.3964 | Test MAE: 2370.3591 | R²: 0.0092
Epoch 010 | Train Loss: 16921384.6494 | Test MAE: 2325.5972 | R²: 0.0074
Epoch 011 | Train Loss: 16922429.6553 | Test MAE: 2347.9585 | R²: 0.0100
Epoch 012 | Train Loss: 16947939.3003 | Test MAE: 2337.4515 | R²: 0.0102
Epoch 013 | Train Loss: 16966936.5074 | Test MA

In [None]:
# (2) transformer


# 加载预训练化学 Transformer（如 ChemBERTa）
tokenizer = AutoTokenizer.from_pretrained("DeepChem/ChemBERTa-77M-MLM")
model = AutoModelForSequenceClassification.from_pretrained(
    "DeepChem/ChemBERTa-77M-MLM", 
    num_labels=1  # 回归任务
)

# 输入处理
smiles = "CCO"  # 示例
inputs = tokenizer(smiles, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
ld50_pred = outputs.logits.squeeze()  # 预测值

# (3) mixed model（GNN + describor）
class HybridModel(nn.Module):
    def __init__(self, gnn_hidden_dim=64, desc_dim=200):
        super().__init__()
        # GNN 分支
        self.gnn = GNNModel(hidden_dim=gnn_hidden_dim)
        # 描述符分支
        self.desc_fc = nn.Sequential(
            nn.Linear(desc_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        # 联合回归头
        self.fc = nn.Linear(gnn_hidden_dim + 32, 1)

    def forward(self, graph_data, descriptors):
        gnn_out = self.gnn(graph_data)            # GNN 特征
        desc_out = self.desc_fc(descriptors)      # 描述符特征
        combined = torch.cat([gnn_out, desc_out], dim=1)
        return self.fc(combined).squeeze()

In [None]:
# ------- 3. train  


In [7]:
# ------- 4. test 



Unnamed: 0,Canonical_QSARr,GHS_category
1,O=C(NC1C=C(Cl)C(Cl)=CC=1)NC1C=CC(Cl)=CC=1,5.0
2,CCCCCCCCC(O)=O,5.0
3,CC12CC(C)CC(CCCCCCCC(O)CC3OC(O)(CC(O)C(C)C=CC(...,1.0
4,CC1(C)OC2CC3C4CC(F)C5=CC(=O)C=CC5(C)C4C(O)CC3(...,1.0
5,[H]N=C(N)NCCCCC1NC(=O)C(C)C(NC(=O)C(CC(C)C)NC(...,2.0
...,...,...
8989,OC(=O)CN(CC(O)=O)CC(O)=O,4.0
8990,CCO,5.0
8991,OCCOC1C=CC=CC=1,4.0
8992,OCCO,5.0
