## 그래프 임베딩 GNN

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch_geometric.data import Data, DataLoader as GeometricDataLoader
from torch_geometric.nn import GCNConv, global_mean_pool
from rdkit import Chem
from sklearn.metrics import mean_squared_error

# Configuration and Initialization
CFG = {
    'BATCH_SIZE': 32,
    'HIDDEN_DIM': 128,
    'OUT_DIM': 1,
    'LR': 0.001,
    'EPOCHS': 100,
    'SEED': 42,
    'POOLING': 'mean'  # Change to 'max' or 'add' if desired
}

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CFG['SEED'])  # Seed 고정

In [None]:
import torch.nn.functional as F
from torch_geometric.nn import GINConv, global_mean_pool

class AtomEmbedding(nn.Module):
    def __init__(self, num_embeddings, embedding_dim):
        super(AtomEmbedding, self).__init__()
        self.embedding = nn.Embedding(num_embeddings, embedding_dim)

    def forward(self, atom):
        return self.embedding(atom)

class BondEmbedding(nn.Module):
    def __init__(self, num_embeddings, embedding_dim):
        super(BondEmbedding, self).__init__()
        self.embedding = nn.Embedding(num_embeddings, embedding_dim)

    def forward(self, bond):
        return self.embedding(bond)

In [None]:
def smiles_to_graph(smiles, atom_embedding, bond_embedding):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None

    # Nodes (atoms)
    atom_features = [atom_embedding(torch.tensor(atom.GetAtomicNum())) for atom in mol.GetAtoms()]
    atom_features = torch.stack(atom_features)

    # Edges (bonds)
    edge_index = []
    edge_attr = []
    for bond in mol.GetBonds():
        i = bond.GetBeginAtomIdx()
        j = bond.GetEndAtomIdx()
        edge_index.append([i, j])
        edge_index.append([j, i])

        bond_type = bond.GetBondTypeAsDouble()
        edge_attr.append(bond_embedding(torch.tensor(int(bond_type))))
        edge_attr.append(bond_embedding(torch.tensor(int(bond_type))))

    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
    edge_attr = torch.stack(edge_attr)

    return Data(x=atom_features, edge_index=edge_index, edge_attr=edge_attr)

In [None]:
class MoleculeDataset(Dataset):
    def __init__(self, dataframe, atom_embedding, bond_embedding, is_train=True):
        self.df = dataframe
        self.atom_embedding = atom_embedding
        self.bond_embedding = bond_embedding
        self.graphs = [smiles_to_graph(smiles, atom_embedding, bond_embedding) for smiles in dataframe['Smiles']]
        if is_train:
            self.labels = torch.tensor(dataframe['pIC50'].values, dtype=torch.float32)
        else:
            self.labels = None

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        graph = self.graphs[idx]
        if self.labels is not None:
            label = self.labels[idx]
            return graph, label
        else:
            return graph

In [None]:
class GNNModel(nn.Module):
    def __init__(self, atom_embedding_dim, hidden_dim, out_dim):
        super(GNNModel, self).__init__()
        nn1 = nn.Sequential(nn.Linear(atom_embedding_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, hidden_dim))
        nn2 = nn.Sequential(nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, hidden_dim))
        self.conv1 = GINConv(nn1)
        self.conv2 = GINConv(nn2)
        self.fc = nn.Linear(hidden_dim, out_dim)

    def forward(self, data):
        x, edge_index, edge_attr = data.x, data.edge_index, data.edge_attr
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = global_mean_pool(x, data.batch)  # Global pooling
        return self.fc(x)

In [None]:
# Utility function to convert pIC50 to IC50
def pIC50_to_IC50(pic50_values):
    """Convert pIC50 values to IC50 (nM)."""
    return 10 ** (9 - pic50_values)


In [None]:
# Initialize embeddings
atom_embedding_dim = 64
bond_embedding_dim = 64

# Define the number of unique atoms and bonds in your dataset
num_atom_embeddings = 100  # Adjust based on your data
num_bond_embeddings = 10  # Adjust based on your data

atom_embedding = AtomEmbedding(num_embeddings=num_atom_embeddings, embedding_dim=atom_embedding_dim)
bond_embedding = BondEmbedding(num_embeddings=num_bond_embeddings, embedding_dim=bond_embedding_dim)

In [None]:
train_data = pd.read_csv('/content/drive/MyDrive/신약개발/train.csv')

# Prepare training dataset and dataloader
train_dataset = MoleculeDataset(train_data, atom_embedding, bond_embedding)
train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [int(len(train_dataset) * 0.7), len(train_dataset) - int(len(train_dataset) * 0.7)])

train_loader = GeometricDataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True)
val_loader = GeometricDataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

In [None]:
# Initialize the model, optimizer, and loss function
model = GNNModel(atom_embedding_dim, hidden_dim=CFG['HIDDEN_DIM'], out_dim=CFG['OUT_DIM'])
optimizer = optim.Adam(model.parameters(), lr=CFG['LR'])
criterion = nn.MSELoss()

# Training loop
for epoch in range(CFG['EPOCHS']):
    model.train()
    total_loss = 0
    for data, labels in train_loader:
        optimizer.zero_grad()  # 이전 그래프를 지우기 위해 초기화
        preds = model(data)  # 모델 출력
        preds = preds.squeeze()  # 필요 시 차원 조정
        loss = criterion(preds, labels)
        loss.backward(retain_graph=True)  # retain_graph=True로 설정하여 문제 해결 시도
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{CFG["EPOCHS"]}], Loss: {avg_loss:.4f}')


In [None]:
# Validation
model.eval()
val_preds = []
val_labels = []
with torch.no_grad():
    for data, labels in val_loader:
        preds = model(data).squeeze()
        val_preds.append(preds)
        val_labels.append(labels)

val_preds = torch.cat(val_preds).numpy()
val_labels = torch.cat(val_labels).numpy()

mse = mean_squared_error(pIC50_to_IC50(val_labels), pIC50_to_IC50(val_preds))
rmse = np.sqrt(mse)
print(f'Validation RMSE: {rmse}')

In [None]:
# Testing and submission
test_data = pd.read_csv('/content/drive/MyDrive/신약개발/test.csv')
test_dataset = MoleculeDataset(test_data, atom_embedding, bond_embedding, is_train=False)
test_loader = GeometricDataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

test_preds = []
model.eval()
with torch.no_grad():
    for data in test_loader:
        preds = model(data).squeeze()
        test_preds.append(preds)

test_preds = torch.cat(test_preds).numpy()

In [None]:
submission = pd.read_csv('/content/drive/MyDrive/신약개발/sample_submission.csv')
submission['IC50_nM'] = pIC50_to_IC50(test_preds)
submission.to_csv('/content/drive/MyDrive/신약개발/gcn_submit_graph.csv', index=False)