In [1]:
import pandas as pd
import numpy as np
import torch
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, roc_auc_score, average_precision_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from rdkit import Chem
from rdkit.Chem import AllChem, QED, DataStructs
import joblib
import os

# 데이터 로드 및 전처리
df = pd.read_csv('/data/home/dbswn0814/2025JCM/data/single task/sto_data.csv')

def preprocess_dataframe(df):
    df_copy = df.copy()
    df_copy.reset_index(drop=True, inplace=True)
    mols, none_list = [], []
    for i in range(len(df_copy)):
        smiles = str(df_copy.loc[i, 'SMILES'])
        mol = Chem.MolFromSmiles(smiles)
        if mol is not None:
            try:
                Chem.SanitizeMol(mol)
                mols.append(mol)
            except:
                none_list.append(i)
                print(f'Index {i}: 유효하지 않은 SMILES, 제외')
        else:
            none_list.append(i)
            print(f'Index {i}: None SMILES, 제외')
    if none_list:
        df_copy.drop(none_list, inplace=True)
        df_copy.reset_index(drop=True, inplace=True)
    return df_copy, mols

# 전처리 실행
df_clean, mols = preprocess_dataframe(df)

# Morgan Fingerprint 생성
fps = [AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=1024) for mol in mols]
arr_list = []
for bit in fps:
    arr = np.zeros((1024,), dtype=np.int8)
    DataStructs.ConvertToNumpyArray(bit, arr)
    arr_list.append(arr)
x_fingerprint = np.array(arr_list, dtype=np.float32)

# QED 속성 계산 및 스케일링
qe_props = [QED.properties(mol) for mol in mols]
qe_df = pd.DataFrame(qe_props)
scaler = StandardScaler()
qe_scaled = scaler.fit_transform(qe_df)
qe_scaled_df = pd.DataFrame(qe_scaled, columns=qe_df.columns)

# 최종 Feature 결합
features = np.hstack((x_fingerprint, qe_scaled_df.values))
final_df = pd.concat([pd.DataFrame(features), df_clean['sto'].reset_index(drop=True)], axis=1)
final_df.dropna(inplace=True)

X = final_df.drop('sto', axis=1).values
y = final_df['sto'].values

# K-Fold 외부 검증 설정
n_splits = 20
outer_cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

save_dir = './processed'
os.makedirs(save_dir, exist_ok=True)

fold = 1
for train_idx, test_idx in outer_cv.split(X, y):
    print(f'=== Fold {fold} ===')
    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]
    
    # Test 데이터 저장
    test_data = {'features': torch.tensor(X_test), 'labels': torch.tensor(y_test)}
    test_save_path = os.path.join(save_dir, f'SVM_test_fold{fold}.pt')
    torch.save(test_data, test_save_path)
    print(f'Test : {test_save_path}')
    
    # Train/Validation 분리
    sss = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=0)
    for tr_idx, val_idx in sss.split(X_train, y_train):
        X_tr, X_val = X_train[tr_idx], X_train[val_idx]
        y_tr, y_val = y_train[tr_idx], y_train[val_idx]

    # Train/Val 데이터 저장
    train_save_path = os.path.join(save_dir, f'SVM_train_fold{fold}.pt')
    torch.save({'features': torch.tensor(X_tr), 'labels': torch.tensor(y_tr)}, train_save_path)
    print(f'Train : {train_save_path}')
    
    val_save_path = os.path.join(save_dir, f'SVM_val_fold{fold}.pt')
    torch.save({'features': torch.tensor(X_val), 'labels': torch.tensor(y_val)}, val_save_path)
    print(f'Validation : {val_save_path}')
    
    # 불균형 처리: class_weight 계산
    neg_count = np.sum(y_tr == 0)
    pos_count = np.sum(y_tr == 1)
    scale_pos_weight = neg_count / pos_count

    # SVM 모델 학습
    svm_model = SVC(
        C=1,
        kernel='rbf',
        gamma='scale',
        probability=True,
        class_weight={0: 1, 1: scale_pos_weight},
        random_state=42
    )
    
    svm_model.fit(X_tr, y_tr)
    print('Trained SVM with params:', svm_model.get_params())

    # Validation 데이터에서 평가
    y_val_pred = svm_model.predict(X_val)
    y_val_proba = svm_model.predict_proba(X_val)[:, 1]
    val_metrics = {
        'accuracy': accuracy_score(y_val, y_val_pred),
        'roc_auc': roc_auc_score(y_val, y_val_proba),
        'avg_precision': average_precision_score(y_val, y_val_proba),
        'precision': precision_score(y_val, y_val_pred),
        'recall': recall_score(y_val, y_val_pred),
        'f1': f1_score(y_val, y_val_pred)
    }
    print(f'Validation Metrics: {val_metrics}')
    joblib.dump(val_metrics, os.path.join(save_dir, f'SVM_val_metrics_fold{fold}.pkl'))

    # 모델 저장
    model_path = os.path.join(save_dir, f'svm_model_fold{fold}.pkl')
    joblib.dump(svm_model, model_path)
    print(f'Fold {fold} save: {model_path}\n')

    fold += 1

print('fin.')

=== Fold 1 ===
Test : ./processed/SVM_test_fold1.pt
Train : ./processed/SVM_train_fold1.pt
Validation : ./processed/SVM_val_fold1.pt
Trained SVM with params: {'C': 1, 'break_ties': False, 'cache_size': 200, 'class_weight': {0: 1, 1: 0.8481012658227848}, 'coef0': 0.0, 'decision_function_shape': 'ovr', 'degree': 3, 'gamma': 'scale', 'kernel': 'rbf', 'max_iter': -1, 'probability': True, 'random_state': 42, 'shrinking': True, 'tol': 0.001, 'verbose': False}
Validation Metrics: {'accuracy': 0.696969696969697, 'roc_auc': 0.7833333333333333, 'avg_precision': 0.8165615191119597, 'precision': 0.75, 'recall': 0.6666666666666666, 'f1': 0.7058823529411765}
Fold 1 save: ./processed/svm_model_fold1.pkl

=== Fold 2 ===
Test : ./processed/SVM_test_fold2.pt
Train : ./processed/SVM_train_fold2.pt
Validation : ./processed/SVM_val_fold2.pt
Trained SVM with params: {'C': 1, 'break_ties': False, 'cache_size': 200, 'class_weight': {0: 1, 1: 0.8481012658227848}, 'coef0': 0.0, 'decision_function_shape': 'ovr',