In [None]:
%matplotlib inline
import os
import math
import dgl
import dgl.function as fn
from dgl import DGLGraph
from collections import namedtuple
import networkx as nx
import torch
import torch.nn.functional as F
from torch.nn.functional import cosine_similarity
import torch.optim as optim
import torch.nn as nn
from torch.nn import Linear
import numpy as np
import pandas as pd
from torch_geometric.data import Data, Batch
from torch_geometric.nn.conv import GCNConv
from rdkit import Chem
from rdkit import DataStructs
from rdkit.Chem import AllChem, Draw, DataStructs, RDConfig
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem.rdmolops import GetAdjacencyMatrix, Get3DDistanceMatrix
from rdkit.Chem.Descriptors import rdMolDescriptors
from sklearn.preprocessing import normalize
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from e3fp.fingerprint.generate import fp, fprints_dict_from_mol
from e3fp.conformer.generate import generate_conformers
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.decomposition import PCA
import seaborn as sns
from sklearn.metrics import roc_curve
from sklearn import metrics
IPythonConsole.ipython_useSVG=True


In [None]:
# 判断是用GPU或CPU计算
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
df = pd.read_csv('./dataset/BindingDB_PubChem_3D_dataset.csv')
df_E3FPTC = pd.read_csv('./dataset/BindingDB_PubChem_3D_E3FPTC_dataset.csv')
df_E3FP_features = pd.read_csv('./dataset/BindingDB_PubChem_3D_E3FP_features_dataset.csv')
df_E3FP_features

In [None]:
df

In [None]:
df_E3FPTC

In [None]:
# 构建邻接矩阵
# 把邻接矩阵转COO格式的图连接
# 显示i和j之间右有边，用[i, j]表示
# 是pyg Data类的edge_index邻接矩阵表示
adj = []
for i in range(df_E3FPTC.shape[0]):
    adj.append([df_E3FPTC['i'][i], df_E3FPTC['j'][i]])
adj

In [None]:
class GCN(nn.Module):
    def __init__(self, in_features, hidden_features,num_classes):
        super(GCN, self).__init__()
        self.gc1 = GCNConv(in_features, hidden_features)
        self.gc2 = GCNConv(hidden_features, num_classes)
    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.gc1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.gc2(x, edge_index)
        return F.log_softmax(x, dim=1)

In [None]:
mol_weight = torch.tensor(df['mol_weight'].values.astype(float), dtype=torch.float)
ic50 = torch.tensor(df['IC50 (nM)'].values.astype(float), dtype=torch.float)


In [None]:
edge_index = torch.tensor(adj, dtype=torch.long)
edge_index

In [None]:
edge_attr = torch.tensor(df_E3FPTC['E3FPTC'].values.astype(float), dtype=torch.float)
edge_attr

In [None]:
x = torch.tensor(df_E3FP_features.values.astype(float), dtype=torch.float)
x.shape

In [None]:
# 标签
y = ic50
y

In [None]:
# 转成pyg Data类可接受的数据格式
edge_index = edge_index.t().contiguous()
edge_index

In [None]:
x_len = x.shape[0]
# 训练集占80%，测试集占10%，验证集占10%，随机划分
train_idx = np.random.choice(x_len, int(0.8 * x_len), replace=False)
train_mask = torch.zeros(x_len, dtype=torch.bool)
train_mask[train_idx] = 1

test_idx = np.random.choice(x_len, int(0.1 * x_len), replace=False)
test_mask = torch.zeros(x_len, dtype=torch.bool)
test_mask[test_idx] = 1

val_mask = torch.zeros(x_len, dtype=torch.bool)
val_mask[~train_mask & ~test_mask] = 1

train_mask, val_mask, test_mask

In [None]:
print(train_mask.shape, test_mask.shape, val_mask.shape)

In [None]:
data = Data(x=x, y=y,edge_index=edge_index, edge_attr=edge_attr,
            train_mask=train_mask, test_mask=test_mask, val_mask=val_mask)

In [None]:
print(data)
print(data.is_directed())
print(data.has_self_loops())
print(data.has_isolated_nodes())
print(data.num_nodes)
print(data.num_edges)
print(data.num_node_features)
print(data.num_edge_features)

In [None]:
# 选择模型，加入输入特征维度，隐藏层维度，输出维度
model = GCN(in_features=data.num_features, hidden_features=16, num_classes=2).to(device)
# 选择优化器，加入学习率lr，
# 当lr过小->收敛下降过慢，过大->错过局部最优；
# 加入正则化系数weight_decay，防止过拟合
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)

epochs = 100

In [None]:
# 训练模型
def train(model, data, mask):
    model.train()
    optimizer.zero_grad()
    output = model(data)
    loss = F.nll_loss(output[mask], data.y[mask].long())
    loss.backward()
    optimizer.step()
    return loss.item()

In [None]:
# 评估函数
def evaluate(model, data, mask):
    model.eval()
    with torch.no_grad():
        y_pred = model(data)
        y_true = data.y
        # 转换 y_pred 的形状使其符合 y_true
        y_pred = y_pred[:, 0] 

        mae = mean_absolute_error(y_true[mask], y_pred[mask])
        rmse = math.sqrt(mean_squared_error(y_true[mask], y_pred[mask]))
        
    return rmse, mae

In [None]:
train_losses = []

val_rmse_list = []
test_rmse_list = []
val_mae_list = []
test_mae_list = []

In [None]:

for epoch in range(1, epochs + 1):
    train_loss = train(model, data, data.train_mask)
    train_losses.append(train_loss)

    val_rmse, val_mae = evaluate(model, data, data.val_mask)
    val_rmse_list.append(val_rmse)
    val_mae_list.append(val_mae)
    
    test_rmse, test_mae = evaluate(model, data, data.test_mask)
    test_rmse_list.append(test_rmse)
    test_mae_list.append(test_mae)

    if epoch % 5 == 0:
        print('Epoch {:03d} ———— Train Loss: {:.4f}'.format(epoch, train_losses[-1]))
        print('Val RMSE: {:.4f}'.format(val_rmse_list[-1]))
        print('Test RMSE: {:.4f}'.format(test_rmse_list[-1]))
        print('Val MAE: {:.4f}'.format(val_mae_list[-1]))
        print('Test MAE: {:.4f}'.format(test_mae_list[-1]))

    

In [None]:
# 画图
plt.figure(figsize=(10, 8))
plt.plot(train_losses, label='Train Loss')
# 添加x轴标签
plt.xlabel('Epoch')
# 添加y轴标签
plt.ylabel('Loss')
# 添加title
plt.title('Train Loss Curve')
plt.legend()
plt.show()

In [None]:
# 训练损失和验证损失图像
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

# 图表示MAE
ax1.plot(np.arange(len(val_mae_list)), val_mae_list, label='val_MAE')
ax1.plot(np.arange(len(test_mae_list)), test_mae_list, label='test_MAE')
ax1.set_title('MAE value in validation and test')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('MAE')
ax1.legend()

# 图表示RMSE
ax2.plot(np.arange(len(val_rmse_list)), val_rmse_list, label='val_RMSE')
ax2.plot(np.arange(len(test_rmse_list)), test_rmse_list, label='test_RMSE')
ax2.set_title('RMSE value in validation and test')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('RMSE')
ax2.legend()

plt.show()