# 22th trial / 240923
- Chemberta-v1
- new train unique / test canonical
- loss function: **CUSTOM**
- relu (X)
- descriptor 10개
- batch 64 / learning rate 1e-6
- o1-preview
- epoch 1000

In [None]:
import numpy as np
import torch
import torch.nn as nn
from transformers import RobertaTokenizer, RobertaForSequenceClassification, get_cosine_schedule_with_warmup
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from rdkit import Chem
from rdkit.Chem import Descriptors
from tqdm import tqdm
import pandas as pd

# 데이터 로드
train_data = pd.read_csv('../data/train_new_canonical_unique.csv')
test_data = pd.read_csv('../data/test_canonical.csv')

# 다양한 분자 설명자를 계산하는 함수
def compute_descriptors(smiles):
    mol = Chem.MolFromSmiles(smiles)
    descriptors = []
    if mol is not None:
        descriptors.extend([
            Descriptors.MolWt(mol),
            Descriptors.NumHAcceptors(mol),
            Descriptors.NumHDonors(mol),
            Descriptors.TPSA(mol),
            Descriptors.MolLogP(mol),
            Descriptors.NumRotatableBonds(mol),
            Descriptors.NumAliphaticRings(mol),
            Descriptors.NumAromaticRings(mol),
            Descriptors.FractionCSP3(mol),
            Descriptors.HeavyAtomCount(mol)
            # 필요한 다른 설명자 추가 가능
        ])
    else:
        descriptors = [0] * 10  # 설명자 수에 맞게 0으로 채움
    return descriptors

# 커스텀 데이터셋 클래스 정의
class SMILESDataset(Dataset):
    def __init__(self, smiles, descriptors, labels=None, tokenizer=None, max_length=512):
        self.smiles = smiles
        self.descriptors = descriptors
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.smiles)

    def __getitem__(self, idx):
        smile = self.smiles[idx]
        inputs = self.tokenizer(smile, max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pt')
        inputs = {key: val.squeeze(0) for key, val in inputs.items()}

        descriptor = torch.tensor(self.descriptors[idx], dtype=torch.float)
        inputs['descriptors'] = descriptor

        if self.labels is not None:
            label = torch.tensor(self.labels[idx], dtype=torch.float)
            return inputs, label
        return inputs

# 커스텀 모델 정의 (ReLU 제거)
class CustomChemBERTaModel(nn.Module):
    def __init__(self, base_model, descriptor_size):
        super(CustomChemBERTaModel, self).__init__()
        self.base_model = base_model  # RobertaForSequenceClassification
        
        # 분자 설명자 처리용 FC 레이어
        self.descriptor_fc = nn.Sequential(
            nn.Linear(descriptor_size, 128),
            nn.ReLU(),
            nn.LayerNorm(128)
        )
        
        # BERT 출력과 분자 설명자를 결합하는 FC 레이어
        self.fc = nn.Sequential(
            nn.Linear(base_model.config.hidden_size + 128, 64),
            nn.ReLU(),
            nn.LayerNorm(64),
            nn.Linear(64, 1)
        )
        
        # 출력 활성화 함수 제거 (ReLU 제거)
    
    def forward(self, input_ids, attention_mask, descriptors):
        # BERT 모델의 출력 얻기
        bert_outputs = self.base_model.roberta(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state
        bert_cls_output = bert_outputs[:, 0, :]  # [CLS] 토큰의 표현
        
        # 분자 설명자 처리
        descriptor_outputs = self.descriptor_fc(descriptors)
        
        # BERT 출력과 분자 설명자 결합
        combined = torch.cat((bert_cls_output, descriptor_outputs), dim=1)
        output = self.fc(combined)
        
        return output  # ReLU 제거로 음수 값도 예측 가능

# model list
chembert = 'seyonec/ChemBERTa-zinc-base-v1'
chembert77m = 'DeepChem/ChemBERTa-77M-MTR'

# 데이터 전처리
tokenizer = RobertaTokenizer.from_pretrained(chembert)
train_smiles = train_data['can_smiles'].values
train_labels = train_data['IC50_nM'].values  # IC50 값은 nM 단위로 제공
test_smiles = test_data['can_smiles'].values

# 레이블 로그 변환 (epsilon 추가)
epsilon = 1e-6
train_labels_log = np.log(train_labels + epsilon)
train_labels_ic50 = np.exp(train_labels_log) - epsilon  # IC50_nM 값 복원

# 전체 훈련 데이터에서 y_range_global 계산
y_range_global = train_labels_ic50.max() - train_labels_ic50.min()

# 분자 설명자 계산
train_descriptors = [compute_descriptors(s) for s in train_smiles]
test_descriptors = [compute_descriptors(s) for s in test_smiles]

# 설명자에 대한 정규화/표준화
scaler = StandardScaler()
train_descriptors = scaler.fit_transform(train_descriptors)  # Train descriptors를 기준으로 fit
test_descriptors = scaler.transform(test_descriptors)  # Test descriptors는 같은 스케일로 변환

# 데이터셋 분할
train_smiles, val_smiles, train_descriptors, val_descriptors, train_labels_log, val_labels_log = train_test_split(
    train_smiles, train_descriptors, train_labels_log, test_size=0.2, random_state=42)

train_dataset = SMILESDataset(train_smiles, train_descriptors, train_labels_log, tokenizer)
val_dataset = SMILESDataset(val_smiles, val_descriptors, val_labels_log, tokenizer)
test_dataset = SMILESDataset(test_smiles, test_descriptors, None, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=46, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=46)
test_loader = DataLoader(test_dataset, batch_size=46)

In [None]:
# 모델 로드 및 초기화
descriptor_size = 10  # 사용한 설명자의 수에 맞게 설정
base_model = RobertaForSequenceClassification.from_pretrained(chembert, num_labels=1)
model = CustomChemBERTaModel(base_model, descriptor_size)

# 모델을 디바이스로 이동
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 옵티마이저 및 학습률 스케줄러 정의
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-4)
num_epochs = 500
num_training_steps = num_epochs * len(train_loader)
scheduler = get_cosine_schedule_with_warmup(
    optimizer=optimizer, num_warmup_steps=int(0.1 * num_training_steps), num_training_steps=num_training_steps
)

# 커스텀 손실 함수 수정
def custom_loss(predictions_log_ic50, targets_log_ic50, y_range_global, k=10):
    # IC50_nM 단위로 변환
    predictions_ic50 = torch.exp(predictions_log_ic50)
    targets_ic50 = torch.exp(targets_log_ic50)
    
    # A 계산 (NRMSE)
    mse = torch.mean((predictions_ic50 - targets_ic50) ** 2)
    rmse = torch.sqrt(mse)
    y_range = torch.tensor(y_range_global, device=targets_ic50.device)
    epsilon = 1e-6  # 분모가 0이 되는 것을 방지
    nrmse = rmse / (y_range + epsilon)
    A = nrmse
    
    # B 계산 (Correct Ratio)
    # pIC50로 변환
    predictions_pIC50 = 9 - torch.log10(predictions_ic50 + epsilon)
    targets_pIC50 = 9 - torch.log10(targets_ic50 + epsilon)
    errors = torch.abs(predictions_pIC50 - targets_pIC50)
    B_values = torch.sigmoid(k * (0.5 - errors))
    B = torch.mean(B_values)
    
    # 손실 함수 계산 (수정됨)
    loss = A + (1 - B)
    return loss



# 검증 함수 정의
def validate(model, val_loader, device, y_range_global):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            inputs, labels_log = batch
            for key in inputs:
                inputs[key] = inputs[key].to(device)
            descriptors = inputs.pop('descriptors').to(device)
            labels_log = labels_log.to(device)
            outputs = model(**inputs, descriptors=descriptors).squeeze()
            loss = custom_loss(outputs, labels_log, y_range_global)
            val_loss += loss.item()
    return val_loss / len(val_loader)

# 학습 루프 (조기 종료 포함)
best_val_loss = float('inf')
patience_counter = 0
patience = 20

In [None]:
for epoch in range(num_epochs):
    model.train()
    train_loss = 0

    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        inputs, labels_log = batch
        for key in inputs:
            inputs[key] = inputs[key].to(device)
        descriptors = inputs.pop('descriptors').to(device)
        labels_log = labels_log.to(device)

        outputs = model(**inputs, descriptors=descriptors).squeeze()
        loss = custom_loss(outputs, labels_log, y_range_global)
        train_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 스케줄러 스텝
    scheduler.step()

    # 평균 학습 손실 계산
    avg_train_loss = train_loss / len(train_loader)

    # 검증 손실 계산
    val_loss = validate(model, val_loader, device, y_range_global)

    # 현재 에포크의 손실 출력
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.6f}, Validation Loss: {val_loss:.6f}")

    # 조기 종료 로직
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), 'chembert_22th.pt')  # 베스트 모델 저장
        print(f"Validation loss improved, saving model at epoch {epoch+1}")
    else:
        patience_counter += 1
        print(f"No improvement in validation loss, patience counter: {patience_counter}/{patience}")
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break

In [None]:
# 테스트 데이터에 대한 예측
model.load_state_dict(torch.load('chembert_22th.pt'))  # 베스트 모델 로드

model.eval()
predictions = []

with torch.no_grad():
    for batch in tqdm(test_loader):
        inputs = batch
        for key in inputs:
            inputs[key] = inputs[key].to(device)
        descriptors = inputs.pop('descriptors').to(device)
        outputs = model(**inputs, descriptors=descriptors)
        preds = outputs.squeeze().cpu().numpy()
        predictions.extend(np.atleast_1d(preds))

# 예측값을 지수화하여 IC50_nM 단위로 복원
predictions_ic50 = np.exp(predictions) - epsilon  # epsilon을 빼줌

# 예측 결과를 테스트 데이터에 저장
test_data['IC50_nM'] = predictions_ic50
test_data[['ID', 'IC50_nM']].to_csv('../data/results/chembert_22th.csv', index=False) #0.5694023363	

print("====================================Prediction Complete====================================")