Datasets taken from https://github.com/rxn4chemistry/rxn_yields/tree/master/data/

In [2]:
import pandas as pd
import numpy as np
import os
import random

In [3]:
from rdkit.Chem import rdChemReactions, Draw
from rdkit import Chem
from rdkit.Chem import AllChem
from descriptastorus.descriptors import rdDescriptors, rdNormalizedDescriptors

In [4]:
def gen_rdkit2d_features(sm):
    generator = rdNormalizedDescriptors.RDKit2DNormalized()
    try:
        features = generator.process(sm)[1:]
    except TypeError as e:
        return [0.] * 200
    return features

In [5]:
from indigo import *
indigo = Indigo()
from rdkit.Chem import Draw

In [6]:
was_correctly_mapped = []
def map_r(row):
    try:
        r = indigo.loadReaction(row)
        was_correctly_mapped.append(r.automap("discard"))
    except IndigoException as e:
        print(e)
        print(row)
        was_correctly_mapped.append(0)
        return None
    return r.smiles()

In [7]:
def get_num_unmapped_atoms(r):
    if not r:
        return [0,0]
    num_unmapped = []
    for s in r.split('>>'):
        m = Chem.MolFromSmiles(s)
        i = 0
        for a in m.GetAtoms():
            if a.GetAtomMapNum() == 0:
                i += 1
        num_unmapped.append(i)
    return num_unmapped

In [8]:
def draw_reaction(r, fn):
    rxn = AllChem.ReactionFromSmarts(r)
    d2d = Draw.MolDraw2DCairo(1080,500)
    d2d.DrawReaction(rxn)
    png = d2d.GetDrawingText()
    open(fn,'wb+').write(png)

In [9]:
count = 0
def complete_atom_mapping(r):
    """
    Add atoms to the reaction SMILES which aren't in the main product but are present in the reactants or vice versa 
    and asigns indices to them. A workaround which enables to use difference D-MPNN with incomplete reaction SMILES.
    """
    global count
    if pd.isnull(r):
        return None
    str_r, str_p = r.split('>>')
    mr = Chem.MolFromSmiles(str_r)
    mp = Chem.MolFromSmiles(str_p)
    
    for m in [mr, mp]:
        unique_indices = [0]
        for a in m.GetAtoms():
            if a.GetAtomMapNum() in unique_indices:
                a.SetAtomMapNum(0)
            else:
                unique_indices.append(a.GetAtomMapNum())

    unmapped_symbols_r = [a  for a in mr.GetAtoms() if a.GetAtomMapNum() == 0]
    unmapped_symbols_p = [a  for a in mp.GetAtoms() if a.GetAtomMapNum() == 0]
    #maybe add some kind of substucture mapping
    str_to_add_r, str_to_add_p = '', ''
    max_indx = max(max([a.GetAtomMapNum() for a in mr.GetAtoms()]), max([a.GetAtomMapNum() for a in mp.GetAtoms()]))
    for a_r in unmapped_symbols_r:
        was_mapped = False
        s_r = a_r.GetSymbol()
        a_r.SetAtomMapNum(max_indx + 1)
        for i, a_p in enumerate(unmapped_symbols_p):
            if s_r == a_p.GetSymbol():
                a_p.SetAtomMapNum(max_indx + 1)
                unmapped_symbols_p.pop(i)
                was_mapped = True
                break
        
        if not was_mapped:
            str_to_add_p += f'.[{a_r.GetSymbol()}:{max_indx + 1}]'
        max_indx += 1
    
    for a_p in unmapped_symbols_p:
        a_p.SetAtomMapNum(max_indx + 1)
        str_to_add_r += f'.[{a_p.GetSymbol()}:{max_indx + 1}]'
        max_indx += 1
    
    str_r = Chem.MolToSmiles(mr) + str_to_add_r
    str_p = Chem.MolToSmiles(mp) + str_to_add_p
    
    if (not Chem.MolFromSmiles(str_r)) or (not Chem.MolFromSmiles(str_p)):
        return None
    
    if Chem.MolFromSmiles(str_r).GetNumAtoms() !=  Chem.MolFromSmiles(str_p).GetNumAtoms():
        count += 1
#         print('>>'.join([str_r, str_p]))
#         print(Chem.MolFromSmiles(str_r).GetNumAtoms(), Chem.MolFromSmiles(str_p).GetNumAtoms())
#         print()
#         return None

    return '>>'.join([str_r, str_p])


# Buchwald-Hartwig reactions

In [10]:
def canonicalize_with_dict(smi, can_smi_dict={}):
    if smi not in can_smi_dict.keys():
        return Chem.MolToSmiles(Chem.MolFromSmiles(smi))
    else:
        return can_smi_dict[smi]

def generate_buchwald_hartwig_rxns(df):
    """
    Converts the entries in the excel files from Sandfort et al. to reaction SMILES.
    """
    l  = ['IC1=NC=CC=C1', 'BrC1=NC=CC=C1', 'ClC1=NC=CC=C1']
    rdkit_rxns = []
    df = df.copy()
    fwd_template = '[F,Cl,Br,I]-[c;H0;D3;+0:1](:[c,n:2]):[c,n:3].[NH2;D1;+0:4]-[c:5]>>[c,n:2]:[c;H0;D3;+0:1](:[c,n:3])-[NH;D2;+0:4]-[c:5]'
    methylaniline = 'Cc1ccc(N)cc1'
    pd_catalyst = Chem.MolToSmiles(Chem.MolFromSmiles('O=S(=O)(O[Pd]1~[NH2]C2C=CC=CC=2C2C=CC=CC1=2)C(F)(F)F'))
    methylaniline_mol = Chem.MolFromSmiles(methylaniline)
    rxn = rdChemReactions.ReactionFromSmarts(fwd_template)
    products = []
    for i, row in df.iterrows():
        reacts = (Chem.MolFromSmiles(row['Aryl halide']), methylaniline_mol)
        rxn_products = rxn.RunReactants(reacts)

        rxn_products_smiles = set([Chem.MolToSmiles(mol[0]) for mol in rxn_products])
        assert len(rxn_products_smiles) == 1
        p  = list(rxn_products_smiles)[0]
        if row['Aryl halide'] in l:
            p = p.replace('c2c', 'c2n')
            
        products.append(p)
    df['product'] = products
    rxns = []
    can_smiles_dict = {}
    for i, row in df.iterrows():
        aryl_halide = canonicalize_with_dict(row['Aryl halide'], can_smiles_dict)
        can_smiles_dict[row['Aryl halide']] = aryl_halide
        ligand = canonicalize_with_dict(row['Ligand'], can_smiles_dict)
        can_smiles_dict[row['Ligand']] = ligand
        base = canonicalize_with_dict(row['Base'], can_smiles_dict)
        can_smiles_dict[row['Base']] = base
        additive = canonicalize_with_dict(row['Additive'], can_smiles_dict)
        can_smiles_dict[row['Additive']] = additive

        reactants = f"{aryl_halide}.{methylaniline}.{pd_catalyst}.{ligand}.{base}.{additive}"
        rxns.append(f"{reactants}>>{row['product']}")
    return rxns, rdkit_rxns

In [14]:
all_rdkit_features_bh  = {}
df_bh_0 = pd.read_excel('data/Buchwald_Hartwig.xlsx', sheet_name=0)

ligands = [v for v in df_bh_0['Ligand'].value_counts().index]
base = [v for v in df_bh_0['Base'].value_counts().index]
additive = [v for v in df_bh_0['Additive'].value_counts().index]
cat = ['O=S(=O)(O[Pd]1~[NH2]C2C=CC=CC=2C2C=CC=CC1=2)C(F)(F)F']

for l in [ligands, base, additive, cat]:
    for sm in l:
        if not sm:
            f = [0.] * 200
        else:
            f = gen_rdkit2d_features(sm)
        if not f or f == 'nan':
            print(n)
            continue
        all_rdkit_features_bh[sm] = f

In [11]:
!mkdir data/mapped_Buchwald_Hartwig

In [17]:
n = 'mapped_Buchwald_Hartwig_clean'

for indx, s in  zip([2768] * 10 + [3058, 3056, 3059,  3056], list(range(14))):
    
    df_b = pd.read_excel('data/Buchwald_Hartwig.xlsx', sheet_name=s)
    df_b['REACTION'], rdkit_rxns = pd.Series(generate_buchwald_hartwig_rxns(df_b))
    df_b['REACTION SHORT']  = df_b['REACTION'].apply(lambda x: '>>'.join(['.'.join(x.split('.')[:2]), x.split('>>')[1]]))
    mapped_reactions = df_b['REACTION SHORT'].apply(map_r)
    mapped_reactions_sm_all_atoms = [complete_atom_mapping(r) for r in mapped_reactions]
    
    df_b['MAPPED REACTION'] = pd.Series(mapped_reactions_sm_all_atoms)

    df_b['MAPPED REAGENTS'] = df_b['MAPPED REACTION'].apply(lambda x: x.split('>>')[0])
    df_b['MAPPED PRODUCTS'] = df_b['MAPPED REACTION'].apply(lambda x: x.split('>>')[1])
    df_b['YIELD'] = df_b['Output'] / 100.

    train_df_b = df_b.iloc[:indx-1] # paper has starting index 1 not 0
    test_df_b = df_b.iloc[indx-1:] # paper has starting index 1 not 0
    

    train_df_b = train_df_b.reset_index(drop=True)
    test_df_b = test_df_b.reset_index(drop=True)
    
    assert len(train_df_b) + len(test_df_b) == len(pd.concat((train_df_b, test_df_b)).drop_duplicates())
    
    train_df_b[['MAPPED REAGENTS', 'MAPPED PRODUCTS','YIELD']].to_csv(f'data/{n}/train_cv_{s}.csv', index=False)
    test_df_b[['MAPPED REAGENTS', 'MAPPED PRODUCTS', 'YIELD']].to_csv(f'data/{n}/test_cv_{s}.csv', index=False)
    
    with open(f'data/{n}/test_cv_{s}_feat_rdkit.csv', 'w') as f:
        f.write("HEADER\n")
        for i, r in test_df_b.iterrows():
            f.write(','.join([','.join(map(str, all_rdkit_features_bh[r[c]])) for c in ['Ligand', 'Base', 'Additive']]))
            f.write(',')
            f.write(','.join(map(str, all_rdkit_features_bh[cat[0]])))
            f.write('\n')
            
    with open(f'data/{n}/train_cv_{s}_feat_rdkit.csv', 'w') as f:
        f.write("HEADER\n")
        for i, r in train_df_b.iterrows():
            f.write(','.join([','.join(map(str, all_rdkit_features_bh[r[c]])) for c in ['Ligand', 'Base', 'Additive']]))
            f.write(',')
            f.write(','.join(map(str, all_rdkit_features_bh[cat[0]])))
            f.write('\n')


    if s == 0: # for hp tuning
        df_b_hp = df_b.sample(frac=1, random_state=0)
        df_b_hp = df_b_hp.reset_index(drop=True)
        
        train_df_bh = df_b_hp.head(round(len(df_b_hp) * 0.7))
        test_df_bh = train_df_bh.sample(frac=0.14, random_state=0)
        train_df_bh = train_df_bh.drop(test_df_bh.index)
        train_df_bh = train_df_bh.reset_index(drop=True)
        test_df_bh = test_df_bh.reset_index(drop=True)
        
        train_df_bh[['MAPPED REAGENTS', 'MAPPED PRODUCTS','YIELD']].to_csv(f'data/{n}/train_for_hp_tune.csv', index=False)
        test_df_bh[['MAPPED REAGENTS', 'MAPPED PRODUCTS', 'YIELD']].to_csv(f'data/{n}/test_for_hp_tune.csv', index=False)
        
        with open(f'data/{n}/train_for_hp_tune_feat_rdkit.csv', 'w') as f:
            f.write("HEADER\n")
            for i, r in train_df_bh.iterrows():
                f.write(','.join([','.join(map(str, all_rdkit_features_bh[r[c]])) for c in ['Ligand', 'Base', 'Additive']]))
                f.write(',')
                f.write(','.join(map(str, all_rdkit_features_bh[cat[0]])))

                f.write('\n')
              
        with open(f'data/{n}/test_for_hp_tune_feat_rdkit.csv', 'w') as f:
            f.write("HEADER\n")
            for i, r in test_df_bh.iterrows():
                f.write(','.join([','.join(map(str, all_rdkit_features_bh[r[c]])) for c in ['Ligand', 'Base', 'Additive']]))
                f.write(',')
                f.write(','.join(map(str, all_rdkit_features_bh[cat[0]])))
                f.write('\n')


# Suzuki-Miyaura reactions

In [18]:
reactant_1_smiles = {
    '6-chloroquinoline': 'C1=C(Cl)C=CC2=NC=CC=C12.CCC1=CC(=CC=C1)CC',  #CCC1=CC(=CC=C1)CC isn't used
    '6-Bromoquinoline': 'C1=C(Br)C=CC2=NC=CC=C12.CCC1=CC(=CC=C1)CC', 
    '6-triflatequinoline': 'C1C2C(=NC=CC=2)C=CC=1OS(C(F)(F)F)(=O)=O.CCC1=CC(=CC=C1)CC',
    '6-Iodoquinoline': 'C1=C(I)C=CC2=NC=CC=C12.CCC1=CC(=CC=C1)CC', 
    '6-quinoline-boronic acid hydrochloride': 'C1C(B(O)O)=CC=C2N=CC=CC=12.Cl.O',  #CL and O aren't used
    'Potassium quinoline-6-trifluoroborate': '[B-](C1=CC2=C(C=C1)N=CC=C2)(F)(F)F.[K+].O',
    '6-Quinolineboronic acid pinacol ester': 'B1(OC(C(O1)(C)C)(C)C)C2=CC3=C(C=C2)N=CC=C3.O'
}

reactant_2_smiles = {
    '2a, Boronic Acid': 'CC1=CC=C2C(C=NN2C3OCCCC3)=C1B(O)O', 
    '2b, Boronic Ester': 'CC1=CC=C2C(C=NN2C3OCCCC3)=C1B4OC(C)(C)C(C)(C)O4', 
    '2c, Trifluoroborate': 'CC1=CC=C2C(C=NN2C3OCCCC3)=C1[B-](F)(F)F.[K+]',
    '2d, Bromide': 'CC1=CC=C2C(C=NN2C3OCCCC3)=C1Br' 
}

catalyst_smiles = {
    'Pd(OAc)2': 'CC(=O)O~CC(=O)O~[Pd]'
}

ligand_smiles = {
    'P(tBu)3': 'CC(C)(C)P(C(C)(C)C)C(C)(C)C', 
    'P(Ph)3 ': 'c3c(P(c1ccccc1)c2ccccc2)cccc3', 
    'AmPhos': 'CC(C)(C)P(C1=CC=C(C=C1)N(C)C)C(C)(C)C', 
    'P(Cy)3': 'C1(CCCCC1)P(C2CCCCC2)C3CCCCC3', 
    'P(o-Tol)3': 'CC1=CC=CC=C1P(C2=CC=CC=C2C)C3=CC=CC=C3C',
    'CataCXium A': 'CCCCP(C12CC3CC(C1)CC(C3)C2)C45CC6CC(C4)CC(C6)C5', 
    'SPhos': 'COc1cccc(c1c2ccccc2P(C3CCCCC3)C4CCCCC4)OC', 
    'dtbpf': 'CC(C)(C)P(C1=CC=C[CH]1)C(C)(C)C.CC(C)(C)P(C1=CC=C[CH]1)C(C)(C)C.[Fe]', 
    'XPhos': 'P(c2ccccc2c1c(cc(cc1C(C)C)C(C)C)C(C)C)(C3CCCCC3)C4CCCCC4', 
    'dppf': 'C1=CC=C(C=C1)P([C-]2C=CC=C2)C3=CC=CC=C3.C1=CC=C(C=C1)P([C-]2C=CC=C2)C3=CC=CC=C3.[Fe+2]', 
    'Xantphos': 'O6c1c(cccc1P(c2ccccc2)c3ccccc3)C(c7cccc(P(c4ccccc4)c5ccccc5)c67)(C)C',
    'None': ''
}

reagent_1_smiles = {
    'NaOH': '[OH-].[Na+]', 
    'NaHCO3': '[Na+].OC([O-])=O', 
    'CsF': '[F-].[Cs+]', 
    'K3PO4': '[K+].[K+].[K+].[O-]P([O-])([O-])=O', 
    'KOH': '[K+].[OH-]', 
    'LiOtBu': '[Li+].[O-]C(C)(C)C', 
    'Et3N': 'CCN(CC)CC', 
    'None': ''
}

solvent_1_smiles = {
    'MeCN': 'CC#N.O', 
    'THF': 'C1CCOC1.O', 
    'DMF': 'CN(C)C=O.O', 
    'MeOH': 'CO.O', 
    'MeOH/H2O_V2 9:1': 'CO.O', 
    'THF_V2': 'C1CCOC1.O'
}

def make_reaction_smiles(row):
    precursors = f" {reactant_1_smiles[row['Reactant_1_Name']]}.{reactant_2_smiles[row['Reactant_2_Name']]}.{catalyst_smiles[row['Catalyst_1_Short_Hand']]}.{ligand_smiles[row['Ligand_Short_Hand']]}.{reagent_1_smiles[row['Reagent_1_Short_Hand']]}.{solvent_1_smiles[row['Solvent_1_Short_Hand']]} "
    product = 'C1=C(C2=C(C)C=CC3N(C4OCCCC4)N=CC2=3)C=CC2=NC=CC=C12'
    can_precursors = Chem.MolToSmiles(Chem.MolFromSmiles(precursors.replace('...', '.').replace('..', '.').replace(' .', '').replace('. ', '').replace(' ', '')))
    can_product = Chem.MolToSmiles(Chem.MolFromSmiles(product))
    
    return f"{can_precursors}>>{can_product}"

def make_short_reaction_smiles(row):
    precursors = f" {reactant_1_smiles[row['Reactant_1_Name']].split('.')[0]}.{reactant_2_smiles[row['Reactant_2_Name']].split('.')[0]}" 
    product = 'C1=C(C2=C(C)C=CC3N(C4OCCCC4)N=CC2=3)C=CC2=NC=CC=C12'
    can_precursors = Chem.MolToSmiles(Chem.MolFromSmiles(precursors.replace('...', '.').replace('..', '.').replace(' .', '').replace('. ', '').replace(' ', '')))
    can_product = Chem.MolToSmiles(Chem.MolFromSmiles(product))
    
    return f"{can_precursors}>>{can_product}"

In [24]:
df_sm = pd.read_excel('data/Suzuki-Miyaura.xlsx')
df_sm['REACTION'] = df_sm.apply(make_reaction_smiles, axis=1)
df_sm['REACTION SHORT'] = df_sm.apply(make_short_reaction_smiles, axis=1)
mapped_reactions_sm = df_sm['REACTION SHORT'].apply(map_r)
mapped_reactions_sm_all_atoms = [complete_atom_mapping(r) for r in mapped_reactions_sm]

df_sm['MAPPED REACTION'] = pd.Series(mapped_reactions_sm_all_atoms)
df_sm['MAPPED REAGENTS'] = df_sm['MAPPED REACTION'].apply(lambda x: x.split('>>')[0])
df_sm['MAPPED PRODUCTS'] = df_sm['MAPPED REACTION'].apply(lambda x: x.split('>>')[1])
df_sm['YIELD'] = df_sm['Product_Yield_PCT_Area_UV'] / 100.

In [29]:
all_rdkit_features_sm  = {}
for d in [reagent_1_smiles, solvent_1_smiles, ligand_smiles, catalyst_smiles]:
    for n, sm in d.items():
        if not sm:
            f = [0.] * 200
        else:
            f = gen_rdkit2d_features(sm)
        if not f:
            print(n)
            continue
        all_rdkit_features_sm[n] = f

In [11]:
!mkdir data/mapped_Suzuki_Miyaura

In [31]:
columns_to_encode = ['Ligand_Short_Hand','Reagent_1_Short_Hand','Solvent_1_Short_Hand', 'Catalyst_1_Short_Hand']
n = 'mapped_Suzuki_Miyaura'

for s in range(10):
    df_sm_f_r = df_sm.sample(frac=1, random_state=s)
    df_sm_f_r = df_sm_f_r.reset_index(drop=True)
    train_df_sm_f_r = df_sm_f_r.head(round(len(df_sm_f_r) * 0.7))
    test_df_sm_f_r = df_sm_f_r.tail(round(len(df_sm_f_r) * 0.3))

    train_df_sm_f_r = train_df_sm_f_r.reset_index(drop=True)
    test_df_sm_f_r = test_df_sm_f_r.reset_index(drop=True)
    
    
    assert len(df_sm_f_r) == len(train_df_sm_f_r) + len(test_df_sm_f_r)
    assert len(train_df_sm_f_r) + len(test_df_sm_f_r) == len(pd.concat((train_df_sm_f_r, test_df_sm_f_r)).drop_duplicates())
    
    train_df_sm_f_r[['MAPPED REAGENTS', 'MAPPED PRODUCTS','YIELD']].to_csv(f'data/{n}/train_cv_{s}.csv', index=False)
    test_df_sm_f_r[['MAPPED REAGENTS', 'MAPPED PRODUCTS', 'YIELD']].to_csv(f'data/{n}/test_cv_{s}.csv', index=False)
    
    with open(f'data/{n}/train_f{s}_feat_rdkit_reactant.csv', 'w') as f:
        f.write("HEADER\n")
        for i, r in train_df_sm_f_r.iterrows():
            f.write(','.join([','.join(map(str, all_rdkit_features_sm[r[c]])) for c in columns_to_encode]))
            f.write('\n')
        
    with open(f'data/{n}/test_f{s}_feat_rdkit_reactant.csv', 'w') as f:
        f.write("HEADER\n")
        for i, r in test_df_sm_f_r.iterrows():
            f.write(','.join([','.join(map(str, all_rdkit_features_sm[r[c]])) for c in columns_to_encode]))
            f.write('\n')
            
    if s == 0: #for hp tu 
        df_sm_f_r = df_sm_f_r.sample(frac=1, random_state=0)
        df_sm_f_r = df_sm_f_r.reset_index(drop=True)
        train_df_sm_f_r = df_sm_f_r.head(round(len(df_sm_f_r) * 0.7))
        test_df_sm_f_r = train_df_sm_f_r.sample(frac=0.14, random_state=0)
        train_df_sm_f_r = train_df_sm_f_r.drop(test_df_sm_f_r.index)
        train_df_sm_f_r = train_df_sm_f_r.reset_index(drop=True)
        test_df_sm_f_r = test_df_sm_f_r.reset_index(drop=True)
        train_df_sm_f_r[['MAPPED REAGENTS', 'MAPPED PRODUCTS','YIELD']].to_csv(f'data/{n}/train_for_hp_tune.csv', index=False)
        test_df_sm_f_r[['MAPPED REAGENTS', 'MAPPED PRODUCTS', 'YIELD']].to_csv(f'data/{n}/test_for_hp_tune.csv', index=False)
        
        with open(f'data/{n}/train_for_hp_tune_feat_rdkit.csv', 'w') as f:
            f.write("HEADER\n")
            for i, r in train_df_sm_f_r.iterrows():
                f.write(','.join([','.join(map(str, all_rdkit_features_sm[r[c]])) for c in columns_to_encode]))
                f.write('\n')
        
        with open(f'data/{n}/test_for_hp_tune_feat_rdkit.csv', 'w') as f:
            f.write("HEADER\n")
            for i, r in test_df_sm_f_r.iterrows():
                f.write(','.join([','.join(map(str, all_rdkit_features_sm[r[c]])) for c in columns_to_encode]))
                f.write('\n')

        
        