In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, f1_score, recall_score, roc_auc_score, precision_score, average_precision_score
from sklearn.metrics import r2_score, mean_squared_error, root_mean_squared_error, mean_absolute_error, mean_absolute_percentage_error
from scipy.stats import pearsonr
from utils import EarlyStopping, load_fingerprints, MLP, CustomDataset
from deepchem.data import NumpyDataset
from deepchem.splits import ScaffoldSplitter

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# for reproducibility
torch.manual_seed(777)
np.random.seed(777)
if device == 'cuda':
    torch.cuda.manual_seed_all(777)

# 고정된 랜덤 시드를 사용하여 재현 가능한 셔플링 설정
g = torch.Generator()
g.manual_seed(777)  # 고정된 시드 설정

if torch.cuda.is_available():
    torch.cuda.manual_seed(777)
    torch.cuda.manual_seed_all(777)  # 멀티 GPU 환경 시 사용
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
nBits=1024
num_epochs = 300
k_folds=5
patience = 10

file_path = ''
file_fingerprint = ''

In [None]:
# Classification 성능 지표 계산 함수
def calculate_metrics(y_true, y_pred, threshold=0.5):
    # y_pred: 확률값, threshold 적용하여 클래스 예측
    y_pred_class = (y_pred > threshold).astype(int)
    
    accuracy = accuracy_score(y_true, y_pred_class)
    precision = precision_score(y_true, y_pred_class)
    recall = recall_score(y_true, y_pred_class)
    f1 = f1_score(y_true, y_pred_class)
    roc_auc = roc_auc_score(y_true, y_pred)
    pr_auc = average_precision_score(y_true, y_pred)
    
    return {
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1 Score": f1,
        "ROC AUC": roc_auc,
        "PR AUC": pr_auc
    }

In [None]:
drop_rate=0.2
lr = 1e-2
weight_decay = 1e-3

# 학습 데이터 로드
data = pd.read_csv(file_path, low_memory=False)
fingerprints = load_fingerprints(nBits, 2, file_fingerprint)

target_endpoint = 'BBB_logbb(cls)'
smiles_column = 'st_smiles'  # SMILES가 저장된 열 이름

# NaN 제거 및 데이터 준비
data_task = data.dropna(subset=[target_endpoint, smiles_column])
X_morgan_np = fingerprints[data_task.index]
y = np.array(data_task[target_endpoint])
smiles = data_task[smiles_column].tolist()

# DeepChem NumpyDataset 생성 (ids에 SMILES 추가)
dataset = NumpyDataset(X_morgan_np, y, ids=smiles)

# Scaffold Splitter 초기화
splitter = ScaffoldSplitter()

# Scaffold Split 적용 (Train:Test 비율: 9:1)
train_dataset, test_dataset = splitter.train_test_split(dataset=dataset, frac_train=0.9, seed=42)

# 5개의 Scaffold Split 기반 Train-Valid 조합 생성
train_valid_folds = generate_scaffold_splits(train_dataset, n_splits=5, seed=42)

# Test 데이터 접근
X_test, y_test = test_dataset.X, test_dataset.y

print(f"Test size: {len(test_dataset)}")

# Train-Valid Fold 접근 방법
for i, (train_fold, valid_fold) in enumerate(train_valid_folds):
    print(f"Fold {i + 1}: Train size = {len(train_fold)}, Valid size = {len(valid_fold)}")
    X_train_fold, y_train_fold = train_fold.X, train_fold.y
    X_valid_fold, y_valid_fold = valid_fold.X, valid_fold.y

# 5개의 Train-Valid Fold에 대해 학습 및 테스트
fold_results = []
valid_results = []

for fold_idx, (train_fold, valid_fold) in enumerate(train_valid_folds):
    print(f"\n=== Fold {fold_idx + 1} ===")
    
    # 데이터셋 생성
    train_dataset = CustomDataset(train_fold.X, train_fold.y)
    val_dataset = CustomDataset(valid_fold.X, valid_fold.y)
    test_dataset = CustomDataset(X_test, y_test)
    
    # 데이터로더 생성
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
    
    # 모델, 옵티마이저, 손실 함수 초기화
    model = MLP(nBits, drop_rate).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = nn.BCELoss().to(device)  # Binary Cross-Entropy Loss
    
    # Early stopping 초기화
    early_stopping = EarlyStopping(patience=patience, delta=0.001)
    
    # 학습
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs.to(device))
            loss = criterion(outputs, labels.to(device).float().unsqueeze(1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0.0
        val_preds, val_targets = [], []
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device).float().unsqueeze(1)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                val_preds.extend(outputs.cpu().numpy().flatten())
                val_targets.extend(labels.cpu().numpy().flatten())
        
        val_loss /= len(val_loader)
        print(f'Epoch {epoch + 1}, Validation Loss: {val_loss:.4f}, Train Loss: {running_loss / len(train_loader):.4f}')
        
        # Early stopping 체크
        early_stopping(val_loss)
        if early_stopping.early_stop:
            print(f"Early stopping at epoch {epoch + 1}")
            break
    
    # Validation 성능 계산
    val_metrics = calculate_metrics(np.array(val_targets), np.array(val_preds))
    valid_results.append(val_metrics)
    print(f"Validation Metrics (Fold {fold_idx + 1}): {val_metrics}")
    
    # Test Step
    model.eval()
    test_preds, test_targets = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device).float().unsqueeze(1)
            outputs = model(inputs)
            test_preds.extend(outputs.cpu().numpy().flatten())
            test_targets.extend(labels.cpu().numpy().flatten())
    
    # Test 성능 계산
    test_metrics = calculate_metrics(np.array(test_targets), np.array(test_preds))
    fold_results.append(test_metrics)
    print(f"Test Metrics (Fold {fold_idx + 1}): {test_metrics}")

# Test 결과 평균 및 표준편차 출력
metrics_keys = fold_results[0].keys()
summary_results = {key: [] for key in metrics_keys}

for result in fold_results:
    for key, value in result.items():
        summary_results[key].append(value)

# 평균 및 표준편차 계산
print("\n=== Test Results Summary ===")
for key, values in summary_results.items():
    mean_val = np.mean(values)
    std_val = np.std(values)
    print(f"{key}: {mean_val:.4f} ± {std_val:.4f}")

# Validation 결과 평균 출력
print("\n=== Validation Results Summary ===")
for key in valid_results[0].keys():
    mean_val = np.mean([res[key] for res in valid_results])
    std_val = np.std([res[key] for res in valid_results])
    print(f"{key}: {mean_val:.4f} ± {std_val:.4f}")
