In [39]:
import pandas as pd
import numpy as np
import random
from datetime import datetime

import torch
from lightning import pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

import optuna
from optuna.pruners import MedianPruner
from optuna.integration import PyTorchLightningPruningCallback

from chemprop.data import MoleculeDatapoint, MoleculeDataset
from chemprop.nn import BondMessagePassing, MeanAggregation, SumAggregation, NormAggregation, AttentiveAggregation, RegressionFFN
from chemprop.models import MPNN
from chemprop.data import build_dataloader
from chemprop.nn.transforms import UnscaleTransform, ScaleTransform
from chemprop.nn.metrics import RMSE
from chemprop.featurizers import SimpleMoleculeMolGraphFeaturizer

from rdkit import Chem
from rdkit.Chem import Descriptors, rdMolDescriptors
from mordred import Calculator, descriptors

# Оптимизация гиперпараметров и параметрического пространства

Этот Jupyter notebook посвящён оптимизации гиперпараметров и выбора дескрипторов для модели Chemprop, используемой для предсказания молекулярного свойства LogP. Применяется библиотека Optuna для автоматического подбора оптимальных гиперпараметров и подмножеств дескрипторов, чтобы минимизировать метрику RMSE.

In [40]:
# Самые влиятельные дескрипторы, отобраны в feature_selection.ipynb
DESCRIPTORS = [
    'FpDensityMorgan1', 'FpDensityMorgan2', 'HeavyAtomMolWt', 'MolWt', 'NumHDonors',
    'NumHAcceptors', 'fr_COO', 'fr_Al_COO', 'fr_Ar_N','fr_Al_OH', 'fr_Ar_NH', 'fr_quatN', 'BertzCT','NumRings', 'NumAromaticRings', 'NumAromaticCarbocycles', 'NumHeteroatoms', 'TPSA', 'LabuteASA', 'CalcKappa3', 'SLogP', 'PEOE_VSA6', 'PEOE_VSA1', 'C2SP2', 'ZMIC1', 'ZMIC2', 'NaasC', 'NsNh2', 'NsOH', 'NdO', 'nX', 'nN', 'VSA_EState6', 'SMR_VSA7', 'AATS1p', 'AATS1v', 'AATS1i', 'AATS2p', 'AATS2v', 'AATS1se', 'AATS0se', 'AATS4s', 'MATS1Z', 'MATS2d', 'MATS1m', 'MATS1pe', 'MATS1se', 'SlogP_VSA7', 'SlogP_VSA8', 'nAcid', 'nBase', 'FilterItLogS', 'nBondsM', 'MIC0', 'EState_VSA5', 'IC5', 'IC0'
    ]

## Очистка данных
Стандартный pipeline очистки, включающий:
1) Удаление выбросов IQR-методом по целевому признаку LogP.
2) Фильтрация некорректных SMILES.
3) Удаление дубликатов SMILES.

In [41]:
def iqr_remove_outliers(df):
    logp_values = df['LogP']

    Q1 = np.percentile(logp_values, 25)
    Q3 = np.percentile(logp_values, 75)
    IQR = Q3 - Q1

    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    df_clear = df[(logp_values >= lower_bound) & (logp_values <= upper_bound)]
    print(f"Number of outliers removed: {len(df) - len(df_clear)}")
    return df_clear


def remove_invalid_molecules(df):
    invalid_smiles_indices = []
    for index, row in df.iterrows():
        try:
            MoleculeDatapoint.from_smi(row['SMILES'], row['LogP'])
        except Exception as e:
            invalid_smiles_indices.append(index)

    df_clear = df.drop(invalid_smiles_indices)
    print(f"Number of invalid SMILES removed: {len(invalid_smiles_indices)}")
    return df_clear

def remove_duplicate_molecules(df):
   smiles_counts = df['SMILES'].value_counts()
   duplicates = smiles_counts[smiles_counts > 1].index
   df_clear = df[~df['SMILES'].isin(duplicates)]

   print(f"Number of duplicate SMILES removed: {len(duplicates)}")
   return df_clear

In [42]:
df = pd.read_csv('./data/final_train_data80.csv')
df = iqr_remove_outliers(df)
df = remove_invalid_molecules(df)
df = remove_duplicate_molecules(df)

Number of outliers removed: 401


[02:13:57] SMILES Parse Error: syntax error while parsing: CC(C)MC1=C(C(N=C2NNN=N2)=O)SC(OC)=C1Br
[02:13:57] SMILES Parse Error: check for mistakes around position 6:
[02:13:57] CC(C)MC1=C(C(N=C2NNN=N2)=O)SC(OC)=C1Br
[02:13:57] ~~~~~^
[02:13:57] SMILES Parse Error: Failed parsing SMILES 'CC(C)MC1=C(C(N=C2NNN=N2)=O)SC(OC)=C1Br' for input: 'CC(C)MC1=C(C(N=C2NNN=N2)=O)SC(OC)=C1Br'
[02:13:57] Explicit valence for atom # 8 N, 4, is greater than permitted
[02:13:57] Explicit valence for atom # 9 C, 5, is greater than permitted
[02:13:57] SMILES Parse Error: extra open parentheses while parsing: N(C(N=S)CC
[02:13:57] SMILES Parse Error: check for mistakes around position 2:
[02:13:57] N(C(N=S)CC
[02:13:57] ~^
[02:13:57] SMILES Parse Error: Failed parsing SMILES 'N(C(N=S)CC' for input: 'N(C(N=S)CC'
[02:13:57] SMILES Parse Error: syntax error while parsing: C1=CC2(C3C(CC4(C)CmO)(C(=O)CO)CCC4C3CCC2=CC1=O)=O)C
[02:13:57] SMILES Parse Error: check for mistakes around position 19:
[02:13:57] C1=CC2

Number of invalid SMILES removed: 430
Number of duplicate SMILES removed: 963


## Вычисление молекулярных дескрипторов
В данном блоке подгружаем необходимые дескрипторы

In [43]:
def add_top_descriptors(df, selected_descriptors):
    rdkit_funcs = {
        'FpDensityMorgan1': Descriptors.FpDensityMorgan1,
        'FpDensityMorgan2': Descriptors.FpDensityMorgan2,
        'HeavyAtomMolWt': Descriptors.HeavyAtomMolWt,
        'MolWt': Descriptors.MolWt,
        'NumHDonors': Descriptors.NumHDonors,
        'NumHAcceptors': Descriptors.NumHAcceptors,
        'fr_COO': Chem.Fragments.fr_COO,
        'fr_Al_COO': Chem.Fragments.fr_Al_COO,
        'fr_Ar_N': Chem.Fragments.fr_Ar_N,
        'fr_Al_OH': Chem.Fragments.fr_Al_OH,
        'fr_Ar_NH': Chem.Fragments.fr_Ar_NH,
        'fr_quatN': Chem.Fragments.fr_quatN,
        'BertzCT': Chem.GraphDescriptors.BertzCT,
    }

    rdmol_funcs = {
        'NumRings': rdMolDescriptors.CalcNumRings,
        'NumAromaticRings': rdMolDescriptors.CalcNumAromaticRings,
        'NumAromaticCarbocycles': rdMolDescriptors.CalcNumAromaticCarbocycles,
        'NumHeteroatoms': rdMolDescriptors.CalcNumHeteroatoms,
        'TPSA': rdMolDescriptors.CalcTPSA,
        'LabuteASA': rdMolDescriptors.CalcLabuteASA,
        'CalcKappa3': rdMolDescriptors.CalcKappa3,
    }

    mordred_dict = {
        'SLogP': descriptors.SLogP,
        'PEOE_VSA6': descriptors.MoeType.PEOE_VSA(6),
        'PEOE_VSA1': descriptors.MoeType.PEOE_VSA(1),
        'C2SP2': descriptors.CarbonTypes.CarbonTypes(2, 2),
        'ZMIC1': descriptors.InformationContent.ZModifiedIC(1),
        'ZMIC2': descriptors.InformationContent.ZModifiedIC(2),
        'NaasC': descriptors.EState.AtomTypeEState ('count', 'aasC'),
        'NsNh2': descriptors.EState.AtomTypeEState ('count', 'sNH2'),
        'NsOH': descriptors.EState.AtomTypeEState ('count', 'sOH'),
        'NdO': descriptors.EState.AtomTypeEState ('count', 'dO'),
        'nX': descriptors.AtomCount.AtomCount('X'),
        'nN': descriptors.AtomCount.AtomCount('N'),
        'VSA_EState6': descriptors.MoeType.VSA_EState(6),
        'SMR_VSA7': descriptors.MoeType.SMR_VSA(7),
        'AATS1p': descriptors.Autocorrelation.AATS(1, 'p'),
        'AATS1v': descriptors.Autocorrelation.AATS(1, 'v'),
        'AATS1i': descriptors.Autocorrelation.AATS(1, 'i'),
        'AATS2p': descriptors.Autocorrelation.AATS(2, 'p'),
        'AATS2v': descriptors.Autocorrelation.AATS(2, 'v'),
        'AATS1se': descriptors.Autocorrelation.AATS(1, 'se'),
        'AATS0se': descriptors.Autocorrelation.AATS(0, 'se'),
        'AATS4s': descriptors.Autocorrelation.AATS(4, 's'),
        'MATS1Z': descriptors.Autocorrelation.MATS(1, 'Z'),
        'MATS2d': descriptors.Autocorrelation.MATS(2, 'd'),
        'MATS1m': descriptors.Autocorrelation.MATS(1, 'm'),
        'MATS1pe': descriptors.Autocorrelation.MATS(1, 'pe'),
        'MATS1se': descriptors.Autocorrelation.MATS(1, 'se'),
        'SlogP_VSA7': descriptors.MoeType.SlogP_VSA(7),
        'SlogP_VSA8': descriptors.MoeType.SlogP_VSA(8),
        'nAcid': descriptors.AcidBase.AcidicGroupCount(),
        'nBase': descriptors.AcidBase.BasicGroupCount(),
        'FilterItLogS': descriptors.LogS.LogS(),
        'nBondsM': descriptors.BondCount.BondCount('multiple', False),
        'MIC0': descriptors.InformationContent.ModifiedIC(0),
        'EState_VSA5': descriptors.MoeType.EState_VSA(5),
        'IC5': descriptors.InformationContent.InformationContent(5),
        'IC0': descriptors.InformationContent.InformationContent(0)
    }

    rdkit_selected = {k: f for k, f in rdkit_funcs.items() if k in selected_descriptors}
    rdmol_selected = {k: f for k, f in rdmol_funcs.items() if k in selected_descriptors}
    mordred_selected = {k: f for k, f in mordred_dict.items() if k in selected_descriptors}

    # Подготовка Mordred-калькулятора
    mordred_calc = Calculator(list(mordred_selected.values()), ignore_3D=True) if mordred_selected else None

    def compute_all(smiles):
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            return {name: np.nan for name in selected_descriptors}

        result = {}

        for name, func in rdkit_selected.items():
            result[name] = func(mol)

        for name, func in rdmol_selected.items():
            result[name] = func(mol)

        if mordred_calc:
            mordred_vals = mordred_calc(mol)
            for i, name in enumerate(mordred_selected.keys()):
                result[name] = mordred_vals[i]

        return result

    descriptors_df = df['SMILES'].apply(compute_all).apply(pd.Series)
    df_final = pd.concat([df, descriptors_df], axis=1)

    return df_final

def clean_data(df):
   df = df.replace([np.inf, -np.inf], np.nan).dropna(axis=1)
   df_clear = df.rename(str, axis="columns") 
   return df_clear

In [None]:
df = add_top_descriptors(df, DESCRIPTORS)
df = clean_data(df)

Так как заранее подготовленных валидационных данных у нас нет, создаём набор из тренировочных данных. Так как таргетная переменная - непрерывные значения, то используем псевдо-стратификацию данных, чтобы не было дисбаланса. Дробим данные на 20 групп (квантилей) и проводим разделение на train и valid, используя train_test_split.

In [None]:
# Создаём квазикатегориальную переменную для стратификации
df['stratify_bins'] = pd.qcut(df['LogP'], q=20)  # q=кол-во квантилей (групп)

train_df, val_df = train_test_split(
    df, test_size=0.2, random_state=666, stratify=df['stratify_bins']
)

train_df = train_df.drop(columns='stratify_bins')
val_df = val_df.drop(columns='stratify_bins')

Создаём объекты MoleculeDatapoint. Chemprop требует данные в формате MoleculeDatapoint для обработки молекулярных графов.

In [None]:
def create_molecule_datapoints(df, smiles_column, target_columns=None, descriptor_columns=None):
    if target_columns is not None:
        ys = df[target_columns].to_numpy()

    all_data = []
    for i, (_, row) in enumerate(df.iterrows()):
        if descriptor_columns is not None and descriptor_columns != []:
            descriptors = np.array(row[descriptor_columns], dtype=float).tolist()

            if target_columns is not None:
                datapoint = MoleculeDatapoint.from_smi(row[smiles_column], ys[i], x_d=np.array(descriptors))
            else:
                datapoint = MoleculeDatapoint.from_smi(row[smiles_column], x_d=np.array(descriptors))
        else:
            if target_columns is not None:
                datapoint = MoleculeDatapoint.from_smi(row[smiles_column], ys[i])
            else:
                datapoint = MoleculeDatapoint.from_smi(row[smiles_column])
        all_data.append(datapoint)

    return all_data

## Optuna
Функция оптимизирует гиперпараметры и дескрипторы, используя Optuna с TPE и MedianPruner.
Гиперпараметры:
agg_type: Тип агрегации (mean, sum, norm, attentive).
bias: Использование смещения в слоях (True/False).
d_h: Размер скрытых слоёв в MPNN (300–2000).
hidden_dim: Размер скрытых слоёв в FFN (1000–4000).
depth: Глубина MPNN (2–8).
ffn_num_layers: Количество слоёв FFN (2–10).
batch_norm: Использование нормализации пакета (True/False).
selected_descriptors: Бинарный выбор дескрипторов.

TPE с multivariate=True: Учитывает корреляции между параметрами, улучшая эффективность поиска.
MedianPruner: Останавливает неперспективные испытания.
EarlyStopping: Предотвращает переобучение, останавливая обучение, если validation loss не улучшается 7 эпох.

In [None]:
def optimize_chemprop_model(train_df, val_df, descriptor_cols, target_col='LogP', n_trials=100, seed=666):
    """
    Оптимизирует гиперпараметры модели Chemprop и выбор дескрипторов с помощью Optuna.

    Параметры:
    - train_df: pandas DataFrame с тренировочными данными (SMILES, target_col, descriptor_cols)
    - val_df: pandas DataFrame с валидационными данными (SMILES, target_col, descriptor_cols)
    - descriptor_cols: список дескрипторов для выбора
    - target_col: имя целевой колонки
    - n_trials: количество испытаний Optuna
    - seed: начальное число для воспроизводимости

    Возвращает:
    - best_params: словарь с лучшими гиперпараметрами и выбранными дескрипторами
    - best_value: лучшая метрика (RMSE на валидационной выборке)
    """

    def objective(trial):
        # Установка seed для воспроизводимости
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        pl.seed_everything(seed)

        # Подбор гиперпараметров
        agg_type = trial.suggest_categorical('agg_type', ['mean', 'sum', 'norm', 'attentive'])
        bias = trial.suggest_categorical('bias', [True, False])
        d_h = trial.suggest_categorical('d_h', [300, 400, 500, 600, 700, 800, 900, 1000, 1250, 1500, 1750, 2000])
        hidden_dim = trial.suggest_categorical('hidden_dim', [1000, 1250, 1500, 1600, 1700, 1800, 1900, 2000, 2250, 2500, 2750, 3000, 3250, 3500, 3750, 4000])
        depth = trial.suggest_int('depth', 2, 8)
        ffn_num_layers = trial.suggest_int('ffn_num_layers', 2, 10)
        batch_norm = trial.suggest_categorical('batch_norm', [True, False])

        # Подбор дескрипторов
        selected_descriptors = [desc for desc in descriptor_cols if trial.suggest_int(f'{desc}', 0, 1)]

        # Нормализация целевой переменной
        logp_scaler = StandardScaler()
        logp_scaler.fit(train_df[[target_col]])

        # Нормализация дескрипторов, если они выбраны
        if selected_descriptors:
            desc_scaler = StandardScaler()
            desc_scaler.fit(train_df[selected_descriptors])
            X_d_transform = ScaleTransform.from_standard_scaler(desc_scaler)
        else:
            desc_scaler = None
            X_d_transform = None

        # Создание датасетов
        train_datapoints = create_molecule_datapoints(train_df, 'SMILES', ['LogP'], selected_descriptors)
        val_datapoints = create_molecule_datapoints(val_df, 'SMILES', ['LogP'], selected_descriptors)

        featurizer = SimpleMoleculeMolGraphFeaturizer()

        train_dataset = MoleculeDataset(train_datapoints, featurizer)
        target_scaler = train_dataset.normalize_targets(logp_scaler)
        descriptors_scaler = train_dataset.normalize_inputs("X_d", desc_scaler)

        val_dataset = MoleculeDataset(val_datapoints, featurizer)
        val_dataset.normalize_targets(target_scaler)
        val_dataset.normalize_inputs("X_d", descriptors_scaler)
        
        num_workers = 10 if torch.cuda.is_available() else 0
        train_loader = build_dataloader(train_dataset, num_workers=num_workers)
        val_loader = build_dataloader(val_dataset, num_workers=num_workers, shuffle=False)

        # Настройка модели
        mp = BondMessagePassing(d_h=d_h, depth=depth, bias=bias)

        if agg_type == 'attentive':
            agg = AttentiveAggregation(output_size=d_h)
        else:
            agg_map = {
                'mean': MeanAggregation,
                'sum': SumAggregation,
                'norm': NormAggregation
            }
            agg = agg_map[agg_type]()

        input_dim = mp.output_dim + len(selected_descriptors) if selected_descriptors else mp.output_dim

        ffn = RegressionFFN(
            input_dim=input_dim,
            hidden_dim=hidden_dim,
            n_layers=ffn_num_layers,
            criterion=RMSE(),
            output_transform=UnscaleTransform.from_standard_scaler(target_scaler)
        )

        mpnn = MPNN(mp, agg, ffn, batch_norm, [RMSE()], X_d_transform=X_d_transform)
        
        # Настройка тренера
        early_stop_callback = EarlyStopping(monitor='val_loss', patience=7, mode='min')
        
        # Класс-обёртка, без неё может подтекать
        class OptunaPruning(PyTorchLightningPruningCallback, pl.Callback):
            def __init__(self, *args, **kwargs):
                super().__init__(*args, **kwargs)
        
        pruner = OptunaPruning(trial, monitor='val_loss')
        trainer = pl.Trainer(
            max_epochs=40,
            accelerator='auto',
            devices=1,
            enable_progress_bar=False,
            enable_checkpointing=False,
            callbacks=[early_stop_callback, pruner],
            logger=False,
            deterministic=True
        )

        # Обучение
        trainer.fit(mpnn, train_loader, val_loader)

        # Получение метрики
        val_rmse = trainer.callback_metrics['val_loss'].item()
        print(f'Validation RMSE: {val_rmse}')
        return val_rmse

    # Создание и запуск оптимизации
    study = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=seed, multivariate=True, group=True), pruner=MedianPruner(n_startup_trials=10, n_warmup_steps=5, interval_steps=1))
    study.optimize(objective, n_trials=n_trials)

    # Получение лучших параметров
    best_trial = study.best_trial
    best_params = best_trial.params
    selected_descriptors = [desc for desc in descriptor_cols if best_params.get(f'{desc}', 0) == 1]
    best_params['selected_descriptors'] = selected_descriptors
    best_params['agg_type'] = best_trial.params['agg_type']
    best_params['bias'] = best_trial.params['bias']
    best_params['batch_norm'] = best_trial.params['batch_norm']
    best_params['hidden_dim'] = best_trial.params['hidden_dim']
    best_params['depth'] = best_trial.params['depth']
    best_params['ffn_num_layers'] = best_trial.params['ffn_num_layers']
    best_params['d_h'] = best_trial.params['d_h']

    return best_params, best_trial.value

In [None]:
best_params, best_value = optimize_chemprop_model(train_df, val_df, DESCRIPTORS, target_col='LogP', n_trials=1000, seed=666)
print(f"Лучшая RMSE: {best_value}")
print(f"Лучшие параметры: {best_params}")

## Сохранение результатов
Выводит словарик лучших параметров, который можно скопировать и перенести в model_train_and_inference.ipynb. На всякий случай сохраняет .txt файл с результатами в ./data

In [2]:
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
filename = f'./data/optuna_optimization_log_{timestamp}.txt'

with open(filename, 'w') as f:
    f.write(f'Best RMSE: {best_value:.4f}\n')
    f.write("best_params = {\n")
    for key, value in best_params.items():
        f.write(f"    {repr(key)}: {repr(value)},\n")
    f.write("}\n")