# Создание QSAR модели для предсказания коэффициента липофильности LogP

In [None]:
!pip install numpy pandas scikit-learn chython ipykernel openbabel-wheel pytorch-lightning rdkit torch

In [None]:
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.metrics import r2_score, mean_squared_error

from rdkit.Chem import Descriptors, MolFromSmiles, rdFingerprintGenerator as fp

from chython import smiles
from collections import defaultdict

import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import pytorch_lightning as pyl

from torch.utils.data import Dataset, DataLoader, TensorDataset

import warnings
warnings.filterwarnings("ignore")


## 1. Разведочный анализ и подготовка данных

In [None]:
train_data = pd.read_csv('train_data.csv')
test_data = pd.read_csv('test_data.csv')

Создадим валидационную выборку (отберем случайным образом из обучающей): 

In [None]:
n = 1318
random_indices = np.random.choice(train_data.index, size=n, replace=False)

val_data = train_data.loc[random_indices]
train_data_remain = train_data.drop(random_indices)

print(f"Исходный DataFrame: {len(train_data)} строк")
print(f"Удалено строк: {len(val_data)}")
print(f"Осталось строк: {len(train_data_remain)}")
train_data = train_data_remain
print(val_data.head())


In [None]:
test_smiles_list = test_data['SMILES'].tolist()
train_smiles_list = train_data['SMILES'].tolist()
val_smiles_list = val_data['SMILES'].tolist()

In [None]:
train_mols = [smiles(m) for m in train_smiles_list]
test_mols = [smiles(m) for m in test_smiles_list]
val_mols = [smiles(m) for m in val_smiles_list]

In [None]:
len(train_mols)

#### Проведем стандартизацию химических структур:


In [None]:
def standardize(mol_list):
    for m in mol_list:
        try:
            m.clean_stereo()
            m.canonicalize()
        except:
            print(m)

In [None]:
standardize(train_mols)
standardize(test_mols)
standardize(val_mols)

#### Преобразуем молекулы в объекты библиотеки RDKit

In [None]:
train_rdkit_mols = [MolFromSmiles(str(m)) for m in train_mols]
test_rdkit_mols = [MolFromSmiles(str(m)) for m in test_mols]
val_rdkit_mols = [MolFromSmiles(str(m)) for m in val_mols]
val_rdkit_mols[0]

#### 1.1 Генерация дескрипторов

In [None]:
def calc_fingerprints(mols):
    """ генерация молекулярных отпечатков по методу Моргана с радиусом 3 и длиной 2048
    """
    morgan_fpgenerator = fp.GetMorganGenerator(radius=3, fpSize=2048)
    return pd.DataFrame([morgan_fpgenerator.GetFingerprintAsNumPy(m) for m in mols])

PhisChemDescriptors = {"MR": Descriptors.MolMR,
                       "TPSA": Descriptors.TPSA}

# функция для генерации дескрипторов из молекул
def mol_dsc_calc(mols): 
    return pd.DataFrame({k: f(m) for k, f in PhisChemDescriptors.items()} 
                     for m in mols)

descriptors_names = PhisChemDescriptors.keys()

Сгенерируем дескрипторы и молекулярные отпечатки и сохраним их в отдельные датафреймы:

In [None]:
descriptors_datasets = defaultdict(dict)

descriptors_transformer = FunctionTransformer(mol_dsc_calc, validate=False)
morgan_transformer = FunctionTransformer(calc_fingerprints, validate=False)

train_rdkit_data = pd.DataFrame(train_rdkit_mols, columns=['molecules'])
test_rdkit_data = pd.DataFrame(test_rdkit_mols, columns=['molecules'])
val_rdkit_data = pd.DataFrame(val_rdkit_mols, columns=['molecules'])


datasets = {'train': train_rdkit_mols, 'test': test_rdkit_mols, 'val': val_rdkit_mols}

for dataset_name, data in datasets.items():
    data_df = pd.DataFrame(data, columns=['structure'])
    print(dataset_name)
    X = morgan_transformer.transform(data_df.structure)

    X_desc = descriptors_transformer.fit_transform(data_df.structure)
    descriptors_datasets[dataset_name] = pd.concat([data_df.drop(columns=['structure']), X, X_desc], axis=1)


In [None]:
num_cols_to_drop = [x for x in descriptors_datasets['train'].columns.to_list() if not isinstance(x, int)]
bool_cols_to_drop = [x for x in descriptors_datasets['train'].columns.to_list() if x not in ['TPSA', 'MR']]

In [None]:
separated_features_df = {}
for dataset_name, df in descriptors_datasets.items():
    num_cols_to_drop = [x for x in descriptors_datasets[f'{dataset_name}'].columns.to_list() if not isinstance(x, int)]
    bool_cols_to_drop = [x for x in descriptors_datasets[f'{dataset_name}'].columns.to_list() if x not in ['TPSA', 'MR']]
    separated_features_df[f'Xd_{dataset_name}'] = df.drop(columns=bool_cols_to_drop)
    separated_features_df[f'Xfp_{dataset_name}'] = df.drop(columns=num_cols_to_drop)

In [None]:
list(separated_features_df.keys())

In [None]:
Xd_train, Xd_test, Xd_val, Xfp_train, Xfp_test, Xfp_val = \
separated_features_df['Xd_train'], separated_features_df['Xd_test'], separated_features_df['Xd_val'], \
separated_features_df['Xfp_train'], separated_features_df['Xfp_test'], separated_features_df['Xfp_val']

In [None]:
Xd_test

### 1.2 Нормализация численных значений

**Формула стандартизации (Z-score Normalization)**:

$$
Y_{\text{norm}} = \frac{Y - \mu}{\sigma}
$$


Где:


- $Y$ — исходные данные,
- $\mu$ — среднее значение \( Y \),
- $\sigma$ — стандартное отклонение \( Y \).

In [None]:
df_to_scale =  [Xd_train, Xd_test, Xd_val]
scaler = StandardScaler()
scaler.fit(Xd_train)
for n, df in enumerate(df_to_scale):
    df = pd.DataFrame(scaler.transform(df), columns=df.columns)
    df_to_scale[n] = np.array(df)

Xd_train, Xd_test, Xd_val = df_to_scale
Xd_train

Приведем датафреймы с отпечатками моргана в нужный формат:

In [None]:
Xfp_train, Xfp_test, Xfp_val = np.array(Xfp_train), np.array(Xfp_test), np.array(Xfp_val)

In [None]:
Y_train = np.array(train_data['LogP'])
# Генерируем данные LogP для тестовой и валидационной выборке, поскольку эти данные требуется предсказать
Y_test = np.array([0] * test_data.shape[0])
Y_val = np.array([0] * val_data.shape[0])

In [None]:
Xd_train.shape, Xd_test.shape

In [None]:
Xfp_train

## 2. Обучение и валидация модели

#### Создание модели на основе нейронной сети в виде многослойного персептрона с одним скрытым слоем

Подготовим данные для обучения в нужном формате:

In [None]:
Xfp_train = torch.Tensor(Xfp_train)
Xfp_test = torch.Tensor(Xfp_test)
Xfp_val = torch.Tensor(Xfp_val)
Xd_train = torch.Tensor(Xd_train)
Xd_test = torch.Tensor(Xd_test)
Xd_val = torch.Tensor(Xd_val)

In [None]:
Y_train = torch.Tensor(Y_train).unsqueeze(-1)
Y_test = torch.Tensor(Y_test).unsqueeze(-1)
Y_val = torch.Tensor(Y_val).unsqueeze(-1)

Y_train.shape, Y_test.shape

Определим конфигурацию:

In [None]:
BATCH_SIZE = 50
EPOCHS = 50
HIDDEN_SIZE = 256

Создадим объект DataLoader - загрузчик данных, который делит входные данные на партии определенного размера - батчи  и подает их на обучение нейронной сети. Создадим загрузчики для каждой из выборок:

In [None]:
class LogPDataset(Dataset):
    def __init__(self, X, y, device):
        self.X = torch.tensor(X, dtype=torch.float32).to(device)
        self.y = torch.tensor(y, dtype=torch.float32).to(device)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_dataset = TensorDataset(Xfp_train, Xd_train, Y_train)
test_dataset = TensorDataset(Xfp_test, Xd_test, Y_test)
val_dataset = TensorDataset(Xfp_val, Xd_val, Y_val)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

Создадим класс LogPModel для определения архитектуры нейронной сети с одним скрытым слоем и двумя входными потоками для обработки отдельно физико-химических дескрипторов и молекулярных отпечатков:

In [None]:
class LogPModel(pyl.LightningModule):
    def __init__(self, fingerprint_size, numeric_features_size, hidden_size=HIDDEN_SIZE):
        super(LogPModel, self).__init__()
        self.test_predictions = []
        self.targets = []

        self.fingerprint_fc = nn.Sequential(
            nn.Linear(fingerprint_size, hidden_size),
            nn.LeakyReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.LeakyReLU()
        )
        self.numeric_fc = nn.Sequential(
            nn.Linear(numeric_features_size, hidden_size),
            nn.LeakyReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.LeakyReLU()
        )
        
        self.combined_fc = nn.Sequential(
            nn.Linear(hidden_size * 2, hidden_size),
            nn.LeakyReLU(),
            nn.Linear(hidden_size, 1)
        )
        
        self.loss_fn = nn.MSELoss()

    def forward(self, fingerprint, numeric_features):
        fingerprint_out = self.fingerprint_fc(fingerprint)
        numeric_out = self.numeric_fc(numeric_features)
        
        combined = torch.cat([fingerprint_out, numeric_out], dim=1)
        
        return self.combined_fc(combined)

    def training_step(self, batch, batch_idx):
        fingerprint, numeric_features, y = batch
        y_pred = self(fingerprint, numeric_features)

        loss = self.loss_fn(y_pred, y)
        self.log('Train MSE', loss, on_step=False, on_epoch=True, prog_bar=True)

        r2 = r2_score(y.detach().cpu().numpy().reshape(-1), y_pred.detach().cpu().numpy().reshape(-1))
        self.log('Train R²', r2, on_step=False, on_epoch=True, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        fingerprint, numeric_features, y = batch
        y_pred = self(fingerprint, numeric_features)

        loss = self.loss_fn(y_pred, y)
        self.log('Validation MSE', loss, on_step=False, on_epoch=True, prog_bar=True)

        r2 = r2_score(y.detach().cpu().numpy().reshape(-1), y_pred.detach().cpu().numpy().reshape(-1))
        self.log('Validation R²', r2, on_step=False, on_epoch=True, prog_bar=True)

        return loss
    
    def test_step(self, batch, batch_idx):
        fingerprint, numeric_features, y = batch
        y_pred = self(fingerprint, numeric_features)

        loss = self.loss_fn(y_pred, y)
        self.log('Test MSE', loss, on_step=False, on_epoch=True, prog_bar=True)

        self.test_predictions.extend(y_pred.cpu().numpy())
        self.targets.extend(y.cpu().numpy())
        
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)

Создадим и обучим модель:

In [None]:
model = LogPModel(fingerprint_size=Xfp_train.shape[1], numeric_features_size=Xd_train.shape[1]).to(device)

# metrics_callback = MetricsCallback(val_loader, train_loader, device)

trainer = pyl.Trainer(max_epochs=EPOCHS, accelerator="auto")
trainer.fit(model, train_loader, val_loader)

Оценим модель на внешней контрольной выборке:

In [None]:
trainer.test(model, test_loader)

y_pred_test = model.test_predictions
y_true = model.targets
mse = mean_squared_error(y_true, y_pred_test)
q2 = r2_score(y_true, y_pred_test)
rmse = np.sqrt(mse)
print(f'test MSE: {mse:.4f}, test PRMSE: {rmse:.4f}, test Q²: {q2:.4f}')


In [None]:
# Откроем файл sample_submission и запишем в него предсказание
sample_submission = pd.read_csv("sample_submission.csv")
# Добавим в качестве предсказаний тестовые данные
sample_submission['LogP'] = [round(float(i), 5) for i in y_pred_test]

In [None]:
# Сохраняем прежсказание 
sample_submission.to_csv("prediction.csv", index=False)