# Working with Dask and chemical data

Some notes here:

- the starting dataset here is the SDF from ChEMBL25: ~1.8million records, 555M compressed
- calculations are done on a t3.xlarge AWS EC2 instance (4 vCPUs, 16GB of RAM, relatively cheap)

In [1]:
from rdkit import Chem
from rdkit.Chem import rdinchi
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.warning')
RDLogger.DisableLog('rdApp.info')
import time
import sys

from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole
%pylab inline



Populating the interactive namespace from numpy and matplotlib


In [2]:
from IPython.display import SVG   
def show_molblock(molb):
    m1 = Chem.MolFromMolBlock(molb,sanitize=False)
    m1.UpdatePropertyCache(False)
    Chem.FastFindRings(m1)
    wedge_based_on_mol_props(m1)
    d2d = Draw.MolDraw2DSVG(300,350)
    d2d.DrawMolecule(m1)
    d2d.FinishDrawing()
    return SVG(d2d.GetDrawingText())

In [38]:
import dask
import dask.bag as db
import gzip

@dask.delayed
def records_from_sdf(fname):
    with gzip.open(fname,'rt') as inf:
        return [x+'$$$$\n' for x in inf.read().split('$$$$\n')]
def mol_from_record(blk,**kwargs):
    r = Chem.MolFromMolBlock(blk,**kwargs)
    return r

In [29]:
import dask.distributed
client = dask.distributed.Client()

# Create a bag from a collection of separate files

In [39]:
import glob
# Create the readers:
fns = sorted(glob.glob('../chembl_25.block*.sdf.gz'))
dfs2 = [records_from_sdf(x) for x in fns]

# now the dask bag that will actually do the work:
b2 = db.from_delayed(dfs2)

# construct molecules:
mols2 = b2.map(lambda x:mol_from_record(x,sanitize=False,removeHs=False))

# and now compose a couple of queries and count the results.
# Note that no actual computation is done until the .compute() method is called
q = Chem.MolFromSmarts('C(F)(F)F')
t1=time.time();mols2.filter(lambda x: x is not None).filter(lambda x,q=q: x.HasSubstructMatch(q)).count().compute();t2=time.time()
print(f"{t2-t1 : .2f}")

 97.70


# Generate a bunch of properties and write to avro

In [26]:
from rdkit.Chem import rdMolDescriptors
from rdkit.Chem import Descriptors
import glob

# construct the bag the same way we did before
fns = sorted(glob.glob('../chembl_25.block*.sdf.gz'))
dfs2 = [records_from_sdf(x) for x in fns]
b2 = db.from_delayed(dfs2)
mols2 = b2.map(mol_from_record)

# this time we're replacing the records with dicts that include the molecules
# and ChEMBL IDs:
tmols = b2.map(lambda x,y:{'sdf':x,
                           'chembl_id':x.split('\n')[0],
                           'mol':y},mols2).filter(lambda x:x['mol'] is not None)

# add a bunch of molecular properties:
def get_mol_res(row):
    ''' calculate a bunch of molecular properties '''
    m = row['mol']
    return {
        'canonical_smiles':Chem.MolToSmiles(m),
        'inchi':Chem.MolToInchi(m),
        'num_atoms':m.GetNumAtoms(),
        'num_heavy_atoms':m.GetNumHeavyAtoms(),
        'num_rotatable_bonds':rdMolDescriptors.CalcNumRotatableBonds(m),
        'num_rings':rdMolDescriptors.CalcNumRings(m),
        'tpsa':rdMolDescriptors.CalcTPSA(m),
        'mollogp':Descriptors.MolLogP(m),
        'molwt':Descriptors.MolWt(m)
    }
tmols = tmols.map(lambda x:dict(get_mol_res(x),**x))

# write to avro files:
schema = {'name':'chembl25_test','type':'record',
          'fields':[{'name':'chembl_id','type':'string'},
                   {'name':'sdf','type':'string'},
                   {'name':'canonical_smiles','type':'string'},
                   {'name':'inchi','type':'string'},
                   {'name':'num_atoms','type':'int'},
                           {'name':'num_heavy_atoms','type':'int'},
                           {'name':'num_rotatable_bonds','type':'int'},
                           {'name':'num_rings','type':'int'},
                           {'name':'tpsa','type':'double'},
                           {'name':'mollogp','type':'double'},
                           {'name':'molwt','type':'double'}]}
tmols.to_avro('chembl25_output.*.avro',schema)


['/data/ChEMBL/notebooks/chembl25_output.00.avro',
 '/data/ChEMBL/notebooks/chembl25_output.01.avro',
 '/data/ChEMBL/notebooks/chembl25_output.02.avro',
 '/data/ChEMBL/notebooks/chembl25_output.03.avro',
 '/data/ChEMBL/notebooks/chembl25_output.04.avro',
 '/data/ChEMBL/notebooks/chembl25_output.05.avro',
 '/data/ChEMBL/notebooks/chembl25_output.06.avro',
 '/data/ChEMBL/notebooks/chembl25_output.07.avro',
 '/data/ChEMBL/notebooks/chembl25_output.08.avro',
 '/data/ChEMBL/notebooks/chembl25_output.09.avro',
 '/data/ChEMBL/notebooks/chembl25_output.10.avro',
 '/data/ChEMBL/notebooks/chembl25_output.11.avro',
 '/data/ChEMBL/notebooks/chembl25_output.12.avro',
 '/data/ChEMBL/notebooks/chembl25_output.13.avro',
 '/data/ChEMBL/notebooks/chembl25_output.14.avro',
 '/data/ChEMBL/notebooks/chembl25_output.15.avro',
 '/data/ChEMBL/notebooks/chembl25_output.16.avro',
 '/data/ChEMBL/notebooks/chembl25_output.17.avro',
 '/data/ChEMBL/notebooks/chembl25_output.18.avro',
 '/data/ChEMBL/notebooks/chembl

## chemical queries against avro

In [41]:
nb = db.read_avro('./chembl25_output.*.avro')
mols2 = nb.map(lambda x:mol_from_record(x['sdf'],sanitize=False,removeHs=False))

q = Chem.MolFromSmarts('C(F)(F)F')
t1=time.time();mols2.filter(lambda x: x is not None).filter(lambda x,q=q: x.HasSubstructMatch(q)).count().compute();t2=time.time()
print(f"{t2-t1 : .2f}")

 104.47


It's faster to construct the molecules from the SMILES:

In [36]:
nb = db.read_avro('./chembl25_output.*.avro')
mols2 = nb.map(lambda x:Chem.MolFromSmiles(x['canonical_smiles'],sanitize=False))

q = Chem.MolFromSmarts('C(F)(F)F')
t1=time.time();mols2.filter(lambda x: x is not None).filter(lambda x,q=q: x.HasSubstructMatch(q)).count().compute();t2=time.time()
print(f"{t2-t1 : .2f}")

 40.52


# Create a parquet file and try using dask.DataFrame

In [28]:
nb = db.read_avro('./chembl25_output.*.avro')
df = nb.to_dataframe()
df.set_index('inchi')
df.to_parquet('./chembl25_parquet')

In [7]:
from dask import dataframe as dd

In [35]:
df = dd.read_parquet('./chembl25_parquet')
df['mol'] = df['canonical_smiles'].map(
    lambda x:Chem.MolFromSmiles(x,sanitize=False),meta=('mol',object))
q = Chem.MolFromSmarts('C(F)(F)F')
t1=time.time();df[df['mol'] != None].count().compute();t2=time.time()
print(f"{t2-t1 : .2f}")

 37.12


I haven't managed to get substructure queries to work correctly with `dask.DataFrame` yet. :-(

Try doing string queries:

In [30]:
sdf = dd.read_parquet('./chembl25_parquet')
t1=time.time();len(sdf[sdf.inchi.map(lambda i:i.startswith('InChI=1S/C10H13N5S'))]);t2=time.time()
print(f'{t2-t1:.2f}')

7.27


In [31]:
sdf = dd.read_parquet('./chembl25_parquet',columns=['chembl_id','inchi'])
t1=time.time();len(sdf[sdf.inchi.map(lambda i:i.startswith('InChI=1S/C10H13N5S'))]);t2=time.time()
print(f'{t2-t1:.2f}')

1.98


In [32]:
nb = db.read_avro('./chembl25_output.*.avro')
t1=time.time();nb.filter(lambda x:x['inchi'].startswith('InChI=1S/C10H13N5S')).count().compute();t2=time.time()
print(f'{t2-t1:.2f}')

7.26


In [33]:
sdf = dd.read_parquet('./chembl25_parquet')
t1=time.time();len(sdf.inchi);t2=time.time()
print(f'{t2-t1:.2f}')

1.28


In [34]:
nb = db.read_avro('./chembl25_output.*.avro')
t1=time.time();nb.count().compute();t2=time.time()
print(f'{t2-t1:.2f}')

6.85


## Take a look at using pyarrow directly

In [19]:
import pyarrow as pa
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('./chembl25_parquet/')
t1=time.time();len([1 for x in dataset.read(columns=['inchi']).column('inchi') if x.as_py().startswith('InChI=1S/C10H13N5S')]);t2=time.time()
print(f'{t2-t1:.2f}')

1.86


A lot of the win here seems to be that we are only reading the columns we need from the parquet file

In [23]:
dataset = pq.ParquetDataset('./chembl25_parquet/')
t1=time.time();len([1 for x in dataset.read().column('inchi') if x.as_py().startswith('InChI=1S/C10H13N5S')]);t2=time.time()
print(f'{t2-t1:.2f}')

6.38
