In [13]:
!pip insrall -r requirements.txt

In [22]:
!mkdir gen_algo_cache

mkdir: cannot create directory ‘gen_algo_cache’: File exists


In [23]:
from rdkit.Chem import RDConfig
import os
import sys
sys.path.append(os.path.join(RDConfig.RDContribDir, 'SA_Score'))
import sascorer

from rdkit import Chem
from rdkit.Chem.rdchem import Mol
from rdkit.Chem import Descriptors


def smiles_valid(smiles_str: str) -> bool:
    try:
        mol = Chem.MolFromSmiles(smiles_str)
    except Exception:
        return False
    
    if mol is None:
        return False

    return True

aromatic_amine_smarts = Chem.MolFromSmarts('[N,n;H1,H0;$(N-[a])]-[a]') # этот шаблон ищет ароматические амины, в которых азот связан с ароматическим кольцом
phenol_smarts = Chem.MolFromSmarts('[OH]-[c,C]1:[c,C]:[c,C]:[c,C]:[c,C]:[c,C]:1') # этот шаблон ищет фенолы, в которых гидроксильная группа связана с ароматическим кольцом


def is_aromatic_amine_or_phenol(smiles_str: str) -> bool:
    mol = Chem.MolFromSmiles(smiles_str)
    return mol.HasSubstructMatch(phenol_smarts) or \
           mol.HasSubstructMatch(aromatic_amine_smarts)


def has_radical(mol: Mol) -> bool:
    for atom in mol.GetAtoms():
        if atom.GetNumRadicalElectrons() > 0:
            return True
    return False


def neutral_and_no_radical(smiles_str: str) -> bool:
    mol = Chem.MolFromSmiles(smiles_str)
    return Chem.GetFormalCharge(mol) == 0 and not has_radical(mol)

def mol_weight(smiles_str: str) -> float:
    mol = Chem.MolFromSmiles(smiles_str)
    return Descriptors.MolWt(mol)

allowed_atoms = {'C', 'H', 'O', 'N', 'P', 'S'}

def only_allowed_atoms(smiles_str: str) -> bool:
    mol = Chem.MolFromSmiles(smiles_str)
    for atom in mol.GetAtoms():
        if atom.GetSymbol() not in allowed_atoms:
            return False
    return True

def log_p(smiles_str: str) -> float:
    mol = Chem.MolFromSmiles(smiles_str)
    logp = Descriptors.MolLogP(mol)
    return logp


def all_rools_valid(smiles_str) -> bool:
    return smiles_valid(smiles_str) and \
        neutral_and_no_radical(smiles_str) and \
        mol_weight(smiles_str) <= 1000 and \
        only_allowed_atoms(smiles_str) and \
        log_p(smiles_str) > 1 and \
        '@' not in smiles_str and \
        '.' not in smiles_str
    


def sa_score(smiles_str) -> float:
    mol = Chem.MolFromSmiles(smiles_str)
    return sascorer.calculateScore(mol)

In [24]:
import random
from rdkit import Chem
from rdkit.Chem import AllChem, Descriptors
from rdkit.Chem import rdmolops
import joblib
from tqdm.auto import tqdm
import pandas as pd


from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import ElasticNet
from sklearn.impute import SimpleImputer
import numpy as np
from joblib import dump, load

In [25]:
df = pd.concat(
    [
        pd.read_csv('./data/quantitive_neftekod25_data.csv').drop_duplicates('SMILES', keep='first')[['SMILES', 'PDSC']],
        pd.read_csv('./data/export.csv')[['SMILES', 'PDSC']]
    ]
)
df = df[df['PDSC'] > 0]


def get_descriptors(smiles):
    mol = Chem.MolFromSmiles(smiles)
    descriptors = {}
    for descriptor_name, descriptor_func in Descriptors.descList:
        descriptors[descriptor_name] = descriptor_func(mol)

    return descriptors

descriptors_list = []
for smi in df['SMILES']:
    desc = get_descriptors(smi)
    descriptors_list.append(desc)
    
descriptors_df = pd.DataFrame(descriptors_list)

df = pd.concat([df.reset_index(drop=True), descriptors_df.reset_index(drop=True)], axis=1).dropna()
df = df.drop(columns=df.columns[df.nunique() <= 1])


df['bins'] = pd.qcut(df['PDSC'], q=5, labels=False, duplicates='drop')

In [26]:
X = df.drop(columns=['SMILES', 'PDSC', 'bins'])
y = df['PDSC']


pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('elastic', ElasticNet(max_iter=500_000, tol=1e-3))
])

param_grid = {
    'elastic__alpha': np.logspace(-3, 1, 10),
    'elastic__l1_ratio': [0.1, 0.3, 0.5, 0.7, 0.9]
}

stratified_cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)


grid_search = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    cv=stratified_cv.split(X, df['bins']),
    scoring='r2',
    n_jobs=-1,
    verbose=1
)


grid_search.fit(X, y)

print(f"Лучший R2: {grid_search.best_score_:.3f}")
print(f"Лучшие параметры: {grid_search.best_params_}")

Fitting 3 folds for each of 50 candidates, totalling 150 fits
Лучший R2: 0.987
Лучшие параметры: {'elastic__alpha': 0.05994842503189409, 'elastic__l1_ratio': 0.5}


In [27]:
best_params = {'alpha': 1.3, 'l1_ratio': 0.9}

model = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
    ('elastic', ElasticNet(**best_params, max_iter=50000))
])

model.fit(X, y)

coefficients = model.named_steps['elastic'].coef_
importance = pd.DataFrame({
    'Feature': X.columns.tolist(),
    'Coefficient': coefficients,
    'Abs_Effect': np.abs(coefficients)
}).sort_values('Abs_Effect', ascending=False)

In [28]:
importance['Abs_Effect'] /= importance['Abs_Effect'].sum()
importance = importance[importance['Abs_Effect'].cumsum() < 0.95]

In [29]:
best_params = {'alpha': 1.3, 'l1_ratio': 0.9}


model = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
    ('elastic', ElasticNet(**best_params, max_iter=50000))
])

top_features = importance['Feature'].tolist()
X_top = X[top_features]

model.fit(X_top, y)


model_data = {
    'model': model,
    'features': top_features
}

dump(model_data, './models/model_iter_3')

['./models/model_iter_3']

In [30]:
estimation_model, feature_names = list(joblib.load('models/model_iter_3').values())

desc_dict = dict(Descriptors.descList)

descriptor_funcs = []
for f_name in feature_names:
    descriptor_funcs.append((f_name, desc_dict[f_name]))

In [31]:
initial_molecules = \
pd.read_csv('data/quantitive_neftekod25_data.csv')['SMILES'].drop_duplicates().tolist() +\
pd.read_csv('data/export.csv')['SMILES'].tolist()

In [32]:
initial_molecules[:10]

['C1=CC=C(C=C1)NC2=CC=CC=C2',
 'CC(C)(C)CC(C)(C)C1=CC=CC=C1NC2=CC=CC3=CC=CC=C32',
 'C1(=CC=CC=C1N(C2=CC=CC=C2CCCCCCCCC)[H])CCCCCCCCC',
 'C1=CC=C(C=C1)NC2=CC=CC3=CC=CC=C32',
 'CC1=C(C(=CC=C1)O)C',
 'CC1=CC(=C(C=C1)C)O',
 'CC1=C(C=C(C=C1)O)C',
 'CC1=CC(=CC(=C1)O)C',
 'CCC1=CC=C(C=C1)O',
 'CC1=CC(=C(C(=C1)C(C)(C)C)O)C(C)(C)C']

In [33]:
class GeneticAlgorithm:
    def __init__(self, mutation_func, crossover_func, fitness_func, population_size, mutation_prob, crossover_prob):

        self.mutation_func = mutation_func
        self.crossover_func = crossover_func
        self.fitness_func = fitness_func
        self.population_size = population_size
        self.mutation_prob = mutation_prob
        self.crossover_prob = crossover_prob
        self.log = []

    def run(self, initial_population, iterations):

        population = initial_population.copy()

        for _ in range(iterations):

            fitness_values = [self.fitness_func(ind) for ind in population]
            
            min_fitness = min(fitness_values)
            max_fitness = max(fitness_values)
            avg_fitness = sum(fitness_values) / len(fitness_values)
            sorted_fitness = sorted(fitness_values)
            median_fitness = sorted_fitness[len(sorted_fitness) // 2] if len(sorted_fitness) % 2 == 1 else (
                sorted_fitness[len(sorted_fitness) // 2 - 1] + sorted_fitness[len(sorted_fitness) // 2])/ 2
            
            self.log.append({
                'min': min_fitness,
                'max': max_fitness,
                'avg': avg_fitness,
                'median': median_fitness
            })

            print(self.log[-1])
            pd.DataFrame({'SMILES': population, 'res_fitness': fitness_values}).to_csv('gen_algo_cache/population_mol_based.csv', index=False)
            
            offspring = []
            for i in tqdm(range(min(len(population), self.population_size))):

                if random.random() < self.crossover_prob:
                    parent1 = self._tournament_selection(population, fitness_values)
                    parent2 = self._tournament_selection(population, fitness_values)
                    child1, child2 = self.crossover_func(parent1, parent2)
    
                    offspring.extend([child1, child2])
                

                if random.random() < self.mutation_prob:
                    mut_child = self.mutation_func(population[i])
                    offspring.append(mut_child)

            combined = population + offspring
            combined = list(set(combined))
            combined.sort(key=lambda x: -self.fitness_func(x))
            population = combined[:self.population_size]
        
        return population, self.log

    def _tournament_selection(self, population, fitness_values, tournament_size=3):
        participants = random.sample(list(zip(population, fitness_values)), tournament_size)
        participants.sort(key=lambda x: -x[1])
        return participants[0][0]

In [34]:
firness_cache = {}

groups = [
    'O',                 # -OH (гидроксил)
    'N',                 # -NH2 (аминогруппа)
    'OC(=O)C',           # Ацетилокси-группа
    'c1ccccc1',          # Бензольное кольцо
    'c1cc(-c2ccccc2)cc(-c2ccccc2)c1',  # Бифенильные структуры
    'c1ccc2c(c1)ccc1c2ccccc1',           # Нафталиновые ядра
    'C(C)(C)C',          # Трет-бутильная группа
    'C#C',               # Алкиновые группы
    'C=O',               # Карбонильная группа
    'NC(=O)',            # Амидная группа
    'S(=O)(=O)',         # Сульфоновая группа
    'CSC',               # Тиоэфир
    'COC(=O)',           # Сложноэфирная группа
    'C1CCCCC1',          # Циклогексильное кольцо
    'C(C)=C',            # Алкеновая группа
    'CN(C)C',            # Диметиламиногруппа
    'c1ccncc1',          # Пиридиновое кольцо
    'Oc1ccccc1',         # Феноксильная группа
    'C(C)(O)N',          # Аминоспиртовая группа
    'C1=CC=C(C=C1)O'     # Замещённое фенольное кольцо
]

def mol_based_mutation(smiles: str, max_attempts=100):

    mol = Chem.MolFromSmiles(smiles)
    
    def _add_group_to_bond(mol):
        if len(mol.GetBonds()) == 0:
            return mol
        bond = random.choice(mol.GetBonds())
        mol = Chem.RWMol(mol)
        mol.RemoveBond(bond.GetBeginAtomIdx(), bond.GetEndAtomIdx())
        
        group = random.choice(groups)
        group_mol = Chem.MolFromSmiles(group)
        mol.InsertMol(group_mol)

        return Chem.MolFromSmiles(Chem.MolToSmiles(mol))
    
    
    def _add_functional_group(mol):
        
        atom_idx = random.choice([a.GetIdx() for a in mol.GetAtoms()])
        
        rw_mol = Chem.RWMol(mol)
        
        new_group = Chem.MolFromSmiles(random.choice(groups))
        new_group_atoms = list(new_group.GetAtoms())
        
        combo = Chem.CombineMols(rw_mol, new_group)
        
        rw_mol.AddBond(
            atom_idx,
            rw_mol.GetNumAtoms() - 1,
            Chem.BondType.SINGLE
        )
        
        new_mol = rw_mol.GetMol()

        return Chem.MolFromSmiles(Chem.MolToSmiles(new_mol))
        
    transform = random.choice((_add_group_to_bond, _add_functional_group))

    for attempt in range(max_attempts):
        try:
            new_mol = transform(mol)
            new_smiles = Chem.MolToSmiles(new_mol)

            new_smiles = new_smiles.replace('.', '')
            
            assert all_rools_valid(new_smiles)
            return new_smiles
            
        except Exception as e:
            continue

    return smiles


def mol_based_crossover(smiles1: str, smiles2: str, max_attempts=100):

    def _get_breakable_bonds(mol):
        breakable = []
        for bond in mol.GetBonds():
            if bond.GetBondType() == Chem.BondType.SINGLE:
                if bond.GetBeginAtom().GetSymbol() in ['C','O','N'] and \
                   bond.GetEndAtom().GetSymbol() in ['C','O','N']:
                    breakable.append(bond.GetIdx())
        return breakable
    
    def _get_fragments(mol, bond_idx):

        broken = rdmolops.FragmentOnBonds(mol, [bond_idx], addDummies=True)
        return Chem.GetMolFrags(broken, asMols=True, sanitizeFrags=False)

    def crossover(mol1, mol2):        
        bond1 = random.choice(_get_breakable_bonds(mol1))
        bond2 = random.choice(_get_breakable_bonds(mol2))
        
        frags1 = rdmolops.FragmentOnBonds(mol1, [bond1])
        frags2 = rdmolops.FragmentOnBonds(mol2, [bond2])

        frags1 = _get_fragments(mol1, bond1)
        frags2 = _get_fragments(mol2, bond2)
        
        frag1 = random.choice(frags1)
        frag2 = random.choice(frags2)
        
        combined = Chem.CombineMols(frag1, frag2)
        editable = Chem.EditableMol(combined)
        
        dummies = [atom.GetIdx() for atom in combined.GetAtoms() if atom.GetAtomicNum() == 0]

        if len(dummies) >= 2:
            editable.AddBond(dummies[-2], dummies[-1], Chem.BondType.SINGLE)
            editable.RemoveAtom(dummies[-1])
            editable.RemoveAtom(dummies[-2])


        tmp_mol = editable.GetMol()

        for atom in reversed(list(tmp_mol.GetAtoms())):
            if atom.GetAtomicNum() == 0:
                editable.RemoveAtom(atom.GetIdx())

        new_mol = editable.GetMol()
        
        return editable.GetMol()

    mol1 = Chem.MolFromSmiles(smiles1)
    mol2 = Chem.MolFromSmiles(smiles2)

    for attempt in range(max_attempts):
        
        try:
            new_mol = crossover(mol1, mol2)
            new_smiles = Chem.MolToSmiles(new_mol)
            
            new_smiles = new_smiles.replace('.', '')
            
            assert all_rools_valid(new_smiles)
            return new_smiles, new_smiles
            
        except Exception as e:
            continue

    return smiles1, smiles2


def calculate_fitness(smiles: str) -> float:

    if smiles in firness_cache:
        return firness_cache[smiles]
    
    mol = Chem.MolFromSmiles(smiles)
    descriptors = {desc_name: desc_f(mol) for desc_name, desc_f in descriptor_funcs}
    X = pd.DataFrame([descriptors])
    prediction = estimation_model.predict(X)[0]

    raw_sa = sa_score(smiles)

    penalty = (raw_sa - 3) ** 4

    res = prediction - penalty 
    
    firness_cache[smiles] = res
    
    return res

In [35]:
from rdkit import rdBase

rdBase.DisableLog('rdApp.error')  
rdBase.DisableLog('rdApp.warning')

In [36]:
mol_based_genetic_algo = GeneticAlgorithm(
    mol_based_mutation,
    mol_based_crossover,
    calculate_fitness,
    population_size=200,
    mutation_prob=0.8,
    crossover_prob=0.8
)

_ = mol_based_genetic_algo.run(initial_molecules, 30)

{'min': -28.465769436119295, 'max': 458.4979940436838, 'avg': 234.14290310492663, 'median': 268.25504434345606}


100%|██████████| 135/135 [00:07<00:00, 17.01it/s]


{'min': 238.5622537521617, 'max': 462.120288946964, 'avg': 342.83133235661, 'median': 341.3160801790832}


100%|██████████| 200/200 [00:15<00:00, 12.56it/s]


{'min': 349.94379963375246, 'max': 475.3577773955745, 'avg': 396.90297648047346, 'median': 394.7042411917972}


100%|██████████| 200/200 [00:18<00:00, 10.88it/s]


{'min': 393.43543271013465, 'max': 482.724092664125, 'avg': 421.6097912839787, 'median': 417.0793749024175}


100%|██████████| 200/200 [00:25<00:00,  7.76it/s]


{'min': 409.78594121144545, 'max': 488.4397864167414, 'avg': 431.13422331086133, 'median': 428.7509364427507}


100%|██████████| 200/200 [00:24<00:00,  8.09it/s]


{'min': 419.0671258575051, 'max': 488.4397864167414, 'avg': 438.7307441518398, 'median': 437.37341152568615}


100%|██████████| 200/200 [00:23<00:00,  8.47it/s]


{'min': 427.7835470741492, 'max': 507.6507862428064, 'avg': 444.1597929834705, 'median': 441.50442350737296}


100%|██████████| 200/200 [00:26<00:00,  7.43it/s]


{'min': 433.05068603507476, 'max': 526.8233912033072, 'avg': 449.2766284579443, 'median': 445.90942513909465}


100%|██████████| 200/200 [00:27<00:00,  7.26it/s]


{'min': 439.7397695625081, 'max': 526.8233912033072, 'avg': 454.69701847767794, 'median': 451.4699621236889}


100%|██████████| 200/200 [00:27<00:00,  7.34it/s]


{'min': 444.69763147184506, 'max': 526.8233912033072, 'avg': 458.34464799108827, 'median': 454.49338282593}


100%|██████████| 200/200 [00:27<00:00,  7.30it/s]


{'min': 448.2076030604062, 'max': 526.8233912033072, 'avg': 462.0536923224074, 'median': 458.1243091313605}


100%|██████████| 200/200 [00:25<00:00,  7.77it/s]


{'min': 451.4112257035769, 'max': 526.8233912033072, 'avg': 465.3821437910533, 'median': 460.8561378996311}


100%|██████████| 200/200 [00:28<00:00,  7.04it/s]


{'min': 455.34502377490344, 'max': 526.8233912033072, 'avg': 469.72366382874293, 'median': 463.88484397416744}


100%|██████████| 200/200 [00:28<00:00,  6.96it/s]


{'min': 459.1972689222973, 'max': 526.8233912033072, 'avg': 474.7506029735952, 'median': 471.6540302883625}


100%|██████████| 200/200 [00:29<00:00,  6.70it/s]


{'min': 462.1878553597524, 'max': 526.8233912033072, 'avg': 477.4125595183212, 'median': 473.7216459689513}


100%|██████████| 200/200 [00:26<00:00,  7.52it/s]


{'min': 464.7233918125655, 'max': 538.779654660198, 'avg': 482.1710247618175, 'median': 477.7481516664602}


100%|██████████| 200/200 [00:29<00:00,  6.75it/s]


{'min': 471.4395009183096, 'max': 538.779654660198, 'avg': 486.51130442488136, 'median': 482.1426081215616}


100%|██████████| 200/200 [00:32<00:00,  6.23it/s]


{'min': 474.72770798602915, 'max': 538.779654660198, 'avg': 490.4348217925501, 'median': 486.5719531288365}


 18%|█▊        | 35/200 [00:06<00:29,  5.56it/s]


KeyboardInterrupt: 

In [37]:
new_mols_df = pd.read_csv('gen_algo_cache/population_mol_based.csv')
new_mols_df = new_mols_df[~new_mols_df.SMILES.isin(initial_molecules)]

In [38]:
new_mols_df.head(10)

Unnamed: 0,SMILES,res_fitness
0,CNNc1ccc(-c2c(N)cccc2-c2ccc(N)c(Oc3c(N)cc4c(-c...,538.779655
1,NNc1c2cc(NCc3c(ONc4cccc(N)c4-c4ccccc4N)ccc(N)c...,536.671553
2,CNN1c2ccc(-c3c(N)cccc3-c3ccc(N)c(Oc4c(N)cc5c(-...,527.391875
3,CCNc1ccc(-c2cc(N)cc(-c3ccc(N)c(Oc4cc(-c5cccc(N...,526.823391
4,Nc1ccc(ONc2cccc(N)c2-c2ccccc2N)c(CNc2ccc(N)cc2...,526.462235
5,Nc1ccc(-c2ccc3c(c2)-c2cc-3ccc2N)c(-c2cccc(Oc3c...,525.768291
6,CCCNc1ccc(N)cc1-c1c2cc(N)c(Oc3cc(-c4cccc(N)c4-...,521.270945
7,CCCNNc1cc2ccc1-c1ccccc1-c1c(-c3ccccc3Nc3ccc4cc...,521.0512
8,CNNc1ccc(-c2c(N)cccc2-c2ccc(N)c(Oc3c(N)cc4c(-c...,521.002329
9,C#CNNc1cccc(NOc2ccc(N)cc2CNc2c3cc(N)c(c2-c2c4c...,518.473663


In [41]:
new_mols_df = new_mols_df[(~new_mols_df['SMILES'].apply(lambda el: '.' in el)).tolist()]
new_mols_df = new_mols_df[~new_mols_df['SMILES'].isin(initial_molecules)]

In [42]:
new_mols_df.shape

(200, 2)

In [50]:
new_mols_df['SMILES'].head(15).to_csv('submit.csv', encoding='UTF-8', index=False)

In [51]:
import os
import zipfile
with zipfile.ZipFile('submission.zip', 'w', zipfile.ZIP_DEFLATED) as zipf:
    zipf.write('requirements.txt', os.path.basename('requirements.txt'))
    zipf.write('submit.csv', os.path.basename('submit.csv'))
    zipf.write('data', 'data')    
    zipf.write('7_genetic_algo_graph_based_final.ipynb', os.path.basename('7_genetic_algo_graph_based_final.ipynb'))