# Chapter 2 에서 배울 내용

1. **분자 데이터 수집**  
   - 공개 데이터셋 다운로드 및 데이터 구조 확인
     
2. **분자 데이터 변환**  
   - SMILES 코드를 활용하여 Task에 적합한 형태로 데이터 변환  
     
3. **데이터 로더 제작**  
   - 모델 학습을 위한 데이터 로더 설계 및 구현
   - 분자 데이터 분류를 위한 사전 지식 이해



### Ignore warnings
코드 실행에서 불필요한 경고 문구 해제 

In [1]:
import warnings
from rdkit import RDLogger

warnings.simplefilter(action='ignore', category=FutureWarning)
RDLogger.DisableLog('rdApp.*')

### Library Import
코드 실행에 필요한 라이브러리를 호출

In [4]:
import os
import rdkit
import torch
import numpy as np
import pandas as pd

from rdkit import Chem
from deepchem.feat.smiles_tokenizer import BasicSmilesTokenizer
from rdkit.Chem import AllChem, Descriptors, rdDepictor, rdDistGeom, MACCSkeys, rdMolDescriptors

from torch.utils.data import Dataset
from torch_geometric import utils as pyg_utils
from torch_geometric.data import InMemoryDataset, download_url, extract_gz, Data, DataLoader, Batch

# 작업을 위한 별도의 함수 불러오기
from utils.splitters import random_split, scaffold_split ## split
from utils.download_preprocess import CustomMoleculeNet, atom_features, EDGE_FEATURES

print(rdkit.__version__)

2024.03.6


## 1) 분자 데이터 수집

**MoleculeNet**은 화학 및 생물학적 데이터 분석과 모델링을 위한 벤치마크 데이터셋 모음이다.

주로 분자 특성 예측 및 약물 설계와 같은 분야에서 활용되며, 다양한 분자 특성과 데이터 분포를 포함하고 있다.

torch_geometric은 MoleculeNet 데이터를 간단하게 다운로드하고 처리할 수 있도록 지원한다.

이를 편리하게 이용할 수 있도록 본 예제 코드에서는 별도로 제작한 **CustomMoleculeNet** 함수를 이용한다.

MoleculeNet에서 제공하는 데이터셋은 목적에 따라 **Classification**과 **Regression**으로 분류할 수 있다.

아래 표는 MoleculeNet에서 제공하는 주요 데이터셋에 대해 정리한 표다.

---

| 데이터셋 이름   | 문제 유형      | 설명                           |
|-----------------|---------------|--------------------------------|
| **HIV**         | Classification | 항바이러스 물질 활성 예측 (이진 분류) |
| **BACE**        | Classification | 알츠하이머 치료 물질 활성 예측 (이진 분류) |
| **BBBP**        | Classification | 혈액-뇌 장벽 투과성 예측 (이진 분류) |
| **Tox21**       | Classification | 화합물 독성 예측 (이진 분류)   |
| **ToxCast**     | Classification | 생물학적 독성 예측 (이진 분류) |
| **SIDER**       | Classification | 부작용 예측 (다중 클래스 분류) |
| **ClinTox**     | Classification | 독성 및 허가 여부 예측 (이진 분류) |
| **ESOL**        | Regression     | 분자의 용해도 예측            |
| **FreeSolv**    | Regression     | 분자의 수화 에너지 예측       |
| **Lipo**        | Regression     | 소수성(LogD) 예측             |

---

이번 실습에서는 **Classification 작업**을 위한 **BACE 데이터셋**과 **Regression 작업**을 위한 **ESOL 데이터셋**을 다운로드하여 활용한다.

### Data Download

CustomMoleculeNet을 이용하여 원하는 데이터셋을 다운로드 받아보자.

In [6]:
# dataset 다운로드 - smiles와 label을 저장
bace = CustomMoleculeNet('dataset', name='BACE')

Data processed and saved successfully.


Processing...
Done!


BACE(1)

In [None]:
# dataset 다운로드 - smiles와 label을 저장
esol = CustomMoleculeNet('dataset', name='ESOL')

In [4]:
data_list = torch.load('dataset/' + dataset_name + '/processed/smiles_labels.pt')

In [5]:
len(data_list), data_list[0]['smiles'], data_list[0]['label']

(1513,
 'O1CC[C@@H](NC(=O)[C@@H](Cc2cc3cc(ccc3nc2N)-c2ccccc2C)C)CC1(C)C',
 [[1.0]])

### Graph Data로 변환

In [7]:
# smiles를 graph data로 변환
def smiles_to_graph(data_list: list, with_hydrogen: bool = False, kekulize: bool = False) :

    graph_list = []
    for data in data_list:
        smiles = data['smiles']
        label = data['label']
        
        mol = Chem.MolFromSmiles(smiles)
        
        # smiles가 객체로 변환되지 않는 경우 filtering
        if mol is None :
            continue
        else: 
            # mol -> graph
            if with_hydrogen:
                mol = Chem.AddHs(mol)
            if kekulize:
                Chem.Kekulize(mol)
        
            xs: List[List[int]] = []
            for atom in mol.GetAtoms():
                current_atom_feat = atom_features(atom)
                xs.append(current_atom_feat)
        
            x = torch.tensor(xs, dtype=torch.long).view(-1, 133)
        
            edge_indices, edge_attrs = [], []
            for bond in mol.GetBonds():
                i = bond.GetBeginAtomIdx()
                j = bond.GetEndAtomIdx()
        
                edge_feature = [EDGE_FEATURES['possible_bonds'].index(bond.GetBondType())] + [
                    EDGE_FEATURES['possible_bond_dirs'].index(bond.GetBondDir())]
        
                edge_indices += [[i, j], [j, i]]
                edge_attrs += [edge_feature, edge_feature]
        
            edge_index = torch.tensor(edge_indices)
            edge_index = edge_index.t().to(torch.long).view(2, -1)
            edge_attr = torch.tensor(edge_attrs, dtype=torch.long).view(-1, 2)
        
            if edge_index.numel() > 0:  # Sort indices.
                perm = (edge_index[0] * x.size(0) + edge_index[1]).argsort()
                edge_index, edge_attr = edge_index[:, perm], edge_attr[perm]

            # label -> y
            y = torch.tensor([label], dtype=torch.float).view(1, -1)
            graph_list.append(Data(x=x, edge_index=edge_index, edge_attr=edge_attr, smiles=smiles, y=y))
    
    return graph_list

In [8]:
graph_list = smiles_to_graph(data_list)

In [9]:
len(data_list),len(graph_list)

(1513, 1513)

In [10]:
seed = 1
train_dataset, valid_dataset, test_dataset = random_split(graph_list, null_value=0, frac_train=0.8, frac_valid=0.1, frac_test=0.1, seed=seed)

In [11]:
def loader_dataset(data_list, batch_size, shuffle=False):
    """
    DataLoader로 변경
    """
    collate = Batch.from_data_list(data_list)
    loader = DataLoader(data_list, batch_size=batch_size, collate_fn=collate, shuffle=shuffle)

    return loader

In [12]:
batch_size = 256
train_loader = loader_dataset(data_list=train_dataset, batch_size=batch_size, shuffle=False)
valid_loader = loader_dataset(data_list=valid_dataset, batch_size=batch_size, shuffle=False) 
test_loader = loader_dataset(data_list=test_dataset, batch_size=batch_size, shuffle=False)

In [13]:
# loader 확인
for batch in train_loader:
    print(batch)
    break

DataBatch(x=[8647, 133], edge_index=[2, 18706], edge_attr=[18706, 2], y=[256, 1], smiles=[256], batch=[8647], ptr=[257])


### 문자열 token으로 변환

In [14]:
def smiles_to_token(data_list):
    # token 종류 수집
    vocab = []
    max_len = 0
    tokenizer = BasicSmilesTokenizer()
    for data in data_list:
        tokens = tokenizer.tokenize(data['smiles'])
        max_len = max(max_len, len(tokens))
        vocab += tokens
        
    uniq_vocab = sorted(set(vocab))
    smiles_vocab = {v: i for i, v in enumerate(uniq_vocab)}
    smiles_vocab['Unk'] = len(smiles_vocab)

    # token으로 변환
    tokens_list = []
    for data in data_list :
        label = data['label']
        tokens = [smiles_vocab[token] for token in tokenizer.tokenize(data['smiles'])]
        pad_len = max_len -len(tokens)
        tokens = tokens + ([0]*pad_len) 
        
        x = torch.tensor(tokens, dtype=torch.float).unsqueeze(1)
        y = torch.tensor([label], dtype=torch.float).view(1, -1)
        tokens_list.append(Data(x=x ,y=y))
        
    return tokens_list

In [15]:
tokens_list = smiles_to_token(data_list)
for tokens in tokens_list:
    print(f"length of each tokens: {len(tokens.x)}")
    print(tokens)
    break

length of each tokens: 178
Data(x=[178, 1], y=[1, 1])


In [16]:
seed = 1
train_dataset, valid_dataset, test_dataset = random_split(tokens_list, null_value=0, frac_train=0.8, frac_valid=0.1, frac_test=0.1, seed=seed)

In [17]:
batch_size = 256
train_loader = loader_dataset(data_list=train_dataset, batch_size=batch_size, shuffle=False)
valid_loader = loader_dataset(data_list=valid_dataset, batch_size=batch_size, shuffle=False) 
test_loader = loader_dataset(data_list=test_dataset, batch_size=batch_size, shuffle=False)

In [18]:
# loader 확인
for batch in train_loader:
    print(batch)
    break

DataBatch(x=[45568, 1], y=[256, 1], batch=[45568], ptr=[257])


### Fingerprint로 변환

In [19]:
# choice : 'rdkit', 'maccs', 'morgan' 중 변환할  fingerprint 선택
def smiles_to_fingerprint(data_list, choice):
    fp_list = []
    for data in data_list:
        smiles = data['smiles']
        label = data['label']
        
        molecule = Chem.MolFromSmiles(smiles)
        
        if molecule is None :
            continue
        else:     
            if choice == 'rdkit':
                rdkit_fp = Chem.RDKFingerprint(molecule)
                x = rdkit_fp
            elif choice == 'maccs':
                maccs_fp = MACCSkeys.GenMACCSKeys(molecule)
                x = maccs_fp
            elif choice == 'morgan':
                morgan_fp = AllChem.GetMorganFingerpirntAsBitVect(molecule, radius=2, nBits=1024)
                x = morgan_fp
                
            x = torch.tensor(x, dtype=torch.float).unsqueeze(1)
            y = torch.tensor([label], dtype=torch.float).view(1, -1)
            fp_list.append(Data(x=x, y=y))
                           
    return fp_list

In [20]:
# 데이터 불러오기
fp_list = smiles_to_fingerprint(data_list, 'rdkit')

In [21]:
seed = 1
train_dataset, valid_dataset, test_dataset = random_split(fp_list, null_value=0, frac_train=0.8, frac_valid=0.1, frac_test=0.1, seed=seed)

In [22]:
batch_size = 256
train_loader = loader_dataset(data_list=train_dataset, batch_size=batch_size, shuffle=False)
valid_loader = loader_dataset(data_list=valid_dataset, batch_size=batch_size, shuffle=False) 
test_loader = loader_dataset(data_list=test_dataset, batch_size=batch_size, shuffle=False)

In [23]:
# loader 확인
for batch in train_loader:
    print(batch)
    break

DataBatch(x=[524288, 1], y=[256, 1], batch=[524288], ptr=[257])


### Descriptors로 변환

In [25]:
def smiles_to_descriptors(data_list):

    des_list = []
    for data in data_list:
        smiles = data['smiles']
        label = data['label']
        
        molecule = Chem.MolFromSmiles(smiles)

        # filtering
        if molecule is None :
            continue
        else:
            descriptors_dict= Descriptors.CalcMolDescriptors(molecule)
            descriptor_vec = np.array([value for value in descriptors_dict.values()]) # dictionary의 value만 추출하여 vector 생성
            x = torch.tensor(descriptor_vec, dtype=torch.float).unsqueeze(1)
            y = torch.tensor([label], dtype=torch.float).view(1, -1)
            des_list.append(Data(x=x, y=y))
        
    return des_list

In [26]:
des_list = smiles_to_descriptors(data_list)

In [27]:
des_list[0]

Data(x=[210, 1], y=[1, 1])

In [28]:
seed = 1
train_dataset, valid_dataset, test_dataset = random_split(des_list, null_value=0, frac_train=0.8, frac_valid=0.1, frac_test=0.1, seed=seed)

In [29]:
batch_size = 256
train_loader = loader_dataset(data_list=train_dataset, batch_size=batch_size, shuffle=False)
valid_loader = loader_dataset(data_list=valid_dataset, batch_size=batch_size, shuffle=False) 
test_loader = loader_dataset(data_list=test_dataset, batch_size=batch_size, shuffle=False)

In [30]:
for batch in train_loader:
    print(batch)
    break

DataBatch(x=[53760, 1], y=[256, 1], batch=[53760], ptr=[257])


### 3D Graph로 변환

In [31]:
def molecule_to_3d(data_list):
    
    graph3d_list = []
    for data in data_list:
        smiles = data['smiles']
        label = data['label']
        
        molecule = Chem.MolFromSmiles(smiles)

        # filtering
        if molecule is None :
            continue
        else:              
            atom_info = [(atom.GetIdx(), atom.GetSymbol()) for atom in molecule.GetAtoms()]             
            status = rdDistGeom.EmbedMolecule(molecule)
            
            # 3d graph 변환 filtering
            if status != 0: # 0이 아닌 경우 변환 실패
                continue
            else:
                conf = molecule.GetConformer()
                pos = np.array([conf.GetAtomPosition(idx) for idx, symbol in atom_info])
            
                graph_data = pyg_utils.from_smiles(smiles)
                graph_data.pos = pos
                graph_data.y = torch.tensor([label], dtype=torch.float).view(1, -1)
                graph3d_list.append(graph_data)
        
    return graph3d_list

In [32]:
# 데이터 불러오기
graph3d_list = molecule_to_3d(data_list)

In [33]:
seed = 1
train_dataset, valid_dataset, test_dataset = random_split(graph3d_list, null_value=0, frac_train=0.8, frac_valid=0.1, frac_test=0.1, seed=seed)

In [34]:
batch_size = 256
train_loader = loader_dataset(data_list=train_dataset, batch_size=batch_size, shuffle=False)
valid_loader = loader_dataset(data_list=valid_dataset, batch_size=batch_size, shuffle=False) 
test_loader = loader_dataset(data_list=test_dataset, batch_size=batch_size, shuffle=False)

In [35]:
for batch in train_loader:
    print(batch)
    break

DataBatch(x=[8588, 9], edge_index=[2, 18586], edge_attr=[18586, 3], smiles=[256], pos=[256], y=[256, 1], batch=[8588], ptr=[257])
