diff --git a/.dockerignore b/.dockerignore index f5397e7e..0af3f7ce 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,25 @@ ./data ./cache ./cddd + +.bash_history +.cache +.chembl_ws_client__0.10.2.sqlite +.cheminf_local_environment +.config +.cudf +.cupy +dask-worker-space +.flake8 +.git +.gitignore +.ipynb_checkpoints +.ipython +.jupyter +.keras +.local +.npm +.nv +.pytest_cache +.python_history +.vscode diff --git a/Dockerfile b/Dockerfile index 58daec70..b8818dfd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,9 @@ # Copyright 2020 NVIDIA Corporation # SPDX-License-Identifier: Apache-2.0 FROM nvidia/cuda:11.0-base -RUN apt update && DEBIAN_FRONTEND=noninteractive apt-get install -y wget git + +RUN apt update && DEBIAN_FRONTEND=noninteractive apt-get install -y wget git\ + && rm -rf /var/lib/apt/lists/* SHELL ["/bin/bash", "-c"] RUN wget --quiet -O /tmp/miniconda.sh \ @@ -9,22 +11,24 @@ RUN wget --quiet -O /tmp/miniconda.sh \ && /bin/bash /tmp/miniconda.sh -b -p /opt/conda \ && rm /tmp/miniconda.sh \ && ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh + ENV PATH /opt/conda/bin:$PATH # Copy conda env spec. COPY setup/cuchem_rapids_0.17.yml /tmp -RUN conda env create --name cuchem -f /tmp/cuchem_rapids_0.17.yml +RUN conda env create --name cuchem -f /tmp/cuchem_rapids_0.17.yml \ + && rm /tmp/cuchem_rapids_0.17.yml\ + && conda clean -ay ENV PATH /opt/conda/envs/cuchem/bin:$PATH -RUN conda clean -afy -RUN rm /tmp/cuchem_rapids_0.17.yml RUN source activate cuchem && python3 -m ipykernel install --user --name=cuchem RUN echo "source activate cuchem" > /etc/bash.bashrc -COPY launch.sh /opt/nvidia/cheminfomatics/ -COPY *.py /opt/nvidia/cheminfomatics/ -COPY nbs/*.ipynb /opt/nvidia/cheminfomatics/ +RUN mkdir -p /opt/nvidia/ \ + && cd /opt/nvidia/ \ + && git clone https://github.com/NVIDIA/cheminformatics.git cheminfomatics \ + && rm -rf /opt/nvidia/cheminfomatics/.git ENV UCX_LOG_LEVEL error diff --git a/nvidia/cheminformatics/data/__init__.py b/nvidia/cheminformatics/data/__init__.py index 5fa74c7a..1888e0e7 100644 --- a/nvidia/cheminformatics/data/__init__.py +++ b/nvidia/cheminformatics/data/__init__.py @@ -5,16 +5,29 @@ class ClusterWfDAO(object): Base class for all DAO for fetching data for Clustering Workflows """ + def meta_df(self): + """ + Returns df with dtype set for structure without any column filter. + """ + NotImplemented + def fetch_molecular_embedding(self, n_molecules:int, cache_directory:str=None): """ Fetch molecular properties from database/cache into a dask array. """ - pass + NotImplemented + + def fetch_molecular_embedding_by_id(self, molecule_id:List): + """ + Fetch molecular properties from database for the given id. Id depends on + the backend databse. For chemble DB it should be molregid. + """ + NotImplemented - def fetch_new_molecules(self, new_molecules: List): + def fetch_id_from_smile(self, new_molecules: List): """ Fetch molecular details for a list of molecules. The values in the list of molecules depends on database/service used. For e.g. it could be ChemblId or molreg_id for Chemble database. """ - pass + NotImplemented diff --git a/nvidia/cheminformatics/data/cluster_wf.py b/nvidia/cheminformatics/data/cluster_wf.py index 645bc050..8e8df983 100644 --- a/nvidia/cheminformatics/data/cluster_wf.py +++ b/nvidia/cheminformatics/data/cluster_wf.py @@ -1,7 +1,12 @@ +from nvidia.cheminformatics.config import Context +from nvidia.cheminformatics.utils.singleton import Singleton import os +import cudf +import dask_cudf import dask import logging -import dask_cudf +import sqlite3 +from contextlib import closing from typing import List @@ -13,9 +18,23 @@ FINGER_PRINT_FILES = 'filter_*.h5' -class ChemblClusterWfDao(ClusterWfDAO): +class ChemblClusterWfDao(ClusterWfDAO, metaclass=Singleton): + + def __init__(self): + + context = Context() + db_file = context.get_config('data_mount_path', default='/data') + + self.chembl_db = 'file:%s/db/chembl_27.db?mode=ro' % db_file + logger.info('Reading ChEMBL database at %s...' % self.chembl_db) + + def meta_df(self): + chem_data = ChEmblData() + return chem_data._meta_df() - def fetch_molecular_embedding(self, n_molecules:int, cache_directory:str=None): + def fetch_molecular_embedding(self, + n_molecules:int, + cache_directory:str=None): chem_data = ChEmblData() if cache_directory: hdf_path = os.path.join(cache_directory, FINGER_PRINT_FILES) @@ -30,5 +49,28 @@ def fetch_molecular_embedding(self, n_molecules:int, cache_directory:str=None): return mol_df - def fetch_new_molecules(self, new_molecules: List): - pass + def fetch_molecular_embedding_by_id(self, molecule_id:List): + chem_data = ChEmblData() + meta = chem_data._meta_df() + fp_df = chem_data._fetch_mol_embedding(molregnos=molecule_id) \ + .astype(meta.dtypes) + + fp_df = cudf.from_pandas(fp_df) + fp_df = dask_cudf.from_cudf(fp_df, npartitions=1).reset_index() + return fp_df + + def fetch_id_from_chembl(self, new_molecules: List): + logger.debug('Fetch ChEMBL ID using molregno...') + + with closing(sqlite3.connect(self.chembl_db, uri=True)) as con, con, \ + closing(con.cursor()) as cur: + select_stmt = ''' + SELECT cs.molregno as molregno, md.chembl_id as chembl_id + FROM compound_structures cs, + molecule_dictionary md + WHERE md.molregno = cs.molregno + AND md.chembl_id in (%s) + ''' % "'%s'" %"','".join(new_molecules) + cur.execute(select_stmt) + + return cur.fetchall() \ No newline at end of file diff --git a/nvidia/cheminformatics/data/helper/chembldata.py b/nvidia/cheminformatics/data/helper/chembldata.py index 001bf669..b1998621 100644 --- a/nvidia/cheminformatics/data/helper/chembldata.py +++ b/nvidia/cheminformatics/data/helper/chembldata.py @@ -46,6 +46,8 @@ ADDITIONAL_FEILD_TYPE = [pandas.Series([], dtype='object'), pandas.Series([], dtype='object')] + +# DEPRECATED. Please add code to DAO classes. class ChEmblData(object, metaclass=Singleton): def __init__(self, @@ -102,6 +104,19 @@ def fetch_molregno_by_chemblId(self, chemblIds): cur.execute(select_stmt) return cur.fetchall() + + def fetch_chemblId_by_molregno(self, molregnos): + logger.debug('Fetch ChEMBL ID using molregno...') + with closing(sqlite3.connect(self.chembl_db, uri=True)) as con, con, \ + closing(con.cursor()) as cur: + select_stmt = ''' + SELECT md.chembl_id as chembl_id + FROM molecule_dictionary md + WHERE md.molregno in (%s) + ''' % ", ".join(list(map(str, molregnos))) + cur.execute(select_stmt) + return cur.fetchall() + def fetch_molecule_cnt(self): logger.debug('Finding number of molecules...') with closing(sqlite3.connect(self.chembl_db, uri=True)) as con, con, \ @@ -118,11 +133,21 @@ def fetch_molecule_cnt(self): return cur.fetchone()[0] - @delayed + def _meta_df(self, **transformation_kwargs): + transformation = self.fp_type(**transformation_kwargs) + + prop_meta = {'id': pandas.Series([], dtype='int64')} + prop_meta.update(dict(zip(IMP_PROPS + ADDITIONAL_FEILD, + IMP_PROPS_TYPE + ADDITIONAL_FEILD_TYPE))) + prop_meta.update({i: pandas.Series([], dtype='float32') for i in range(len(transformation))}) + + return pandas.DataFrame(prop_meta) + def _fetch_mol_embedding(self, - start, + start=0, batch_size=5000, smiles_transforms=SMILES_TRANSFORMS, + molregnos=None, **transformation_kwargs): """ Returns compound properties and structure for the first N number of @@ -132,15 +157,29 @@ def _fetch_mol_embedding(self, logger.info('Fetching %d records starting %d...' % (batch_size, start)) imp_cols = [ 'cp.' + col for col in IMP_PROPS] - select_stmt = ''' - SELECT md.molregno, %s, cs.canonical_smiles - FROM compound_properties cp, - molecule_dictionary md, - compound_structures cs - WHERE cp.molregno = md.molregno - AND md.molregno = cs.molregno - LIMIT %d, %d - ''' % (', '.join(imp_cols), start, batch_size) + + if molregnos is None: + select_stmt = ''' + SELECT md.molregno, %s, cs.canonical_smiles + FROM compound_properties cp, + molecule_dictionary md, + compound_structures cs + WHERE cp.molregno = md.molregno + AND md.molregno = cs.molregno + LIMIT %d, %d + ''' % (', '.join(imp_cols), start, batch_size) + else: + select_stmt = ''' + SELECT md.molregno, %s, cs.canonical_smiles + FROM compound_properties cp, + molecule_dictionary md, + compound_structures cs + WHERE cp.molregno = md.molregno + AND md.molregno = cs.molregno + AND md.molregno in (%s) + LIMIT %d, %d + ''' % (', '.join(imp_cols), " ,".join(list(map(str, molregnos))), start, batch_size) + df = pandas.read_sql(select_stmt, sqlite3.connect(self.chembl_db, uri=True)) @@ -168,6 +207,7 @@ def _fetch_mol_embedding(self, def fetch_mol_embedding(self, num_recs=None, batch_size=5000, + molregnos=None, **transformation_kwargs): """ Returns compound properties and structure for the first N number of @@ -178,25 +218,18 @@ def fetch_mol_embedding(self, if not num_recs or num_recs < 0: num_recs = self.fetch_molecule_cnt() - transformation = self.fp_type(**transformation_kwargs) - - prop_meta = {'id': pandas.Series([], dtype='int64')} - prop_meta.update(dict(zip(IMP_PROPS + ADDITIONAL_FEILD, - IMP_PROPS_TYPE + ADDITIONAL_FEILD_TYPE))) - prop_meta.update({i: pandas.Series([], dtype='float32') for i in range(len(transformation))}) + meta_df = self._meta_df(**transformation_kwargs) - meta_df = pandas.DataFrame(prop_meta) dls = [] for start in range(0, num_recs, batch_size): bsize = min(num_recs - start, batch_size) - dls.append(self._fetch_mol_embedding( - start, - batch_size=bsize, - **transformation_kwargs)) + dl_data = delayed(self._fetch_mol_embedding) \ + (start=start, batch_size=bsize, molregnos=molregnos, **transformation_kwargs) + dls.append(dl_data) return dataframe.from_delayed(dls, meta=meta_df) - def save_fingerprints(self, hdf_path='data/filter_*.h5', num_recs=None,): + def save_fingerprints(self, hdf_path='data/filter_*.h5', num_recs=None): """ Generates fingerprints for all ChEMBL ID's in the database """ diff --git a/nvidia/cheminformatics/interactive/chemvisualize.py b/nvidia/cheminformatics/interactive/chemvisualize.py index bc1633d7..6cd35b76 100644 --- a/nvidia/cheminformatics/interactive/chemvisualize.py +++ b/nvidia/cheminformatics/interactive/chemvisualize.py @@ -41,14 +41,12 @@ def __init__(self, workflow): self.workflow = workflow self.n_clusters = workflow.n_clusters - self.chem_data = ChEmblData() - self.wf = 'nvidia.cheminformatics.wf.cluster.gpukmeansumap.GpuKmeansUmap' # Store colors to avoid plots changes colors on events such as # molecule selection, etc. - self.cluster_colors = None + self.cluster_colors = generate_colors(self.n_clusters) # Construct the UI self.app.layout = self.constuct_layout() @@ -111,6 +109,12 @@ def __init__(self, workflow): [State('north_star', 'value'), State('hidden_northstar', 'value')])(self.handle_mark_north_star) + self.app.callback( + [Output('error_msg', 'children'), + Output('md_error', 'is_open')], + [Input('recluster_error', 'children'), + Input('bt_close_err', 'n_clicks')])(self.handle_error) + def _fetch_event_data(self): if not dash.callback_context.triggered: raise dash.exceptions.PreventUpdate @@ -136,14 +140,13 @@ def handle_reset(self, bt_reset, bt_apply_wf, refresh_main_fig, sl_wf): # Change the refresh variable to force main-figure refresh return refresh_main_fig + 1 - def recluster(self, filter_values=None, filter_column=None, - new_figerprints=None, new_chembl_ids=None, reload_data=False): + def recluster(self, filter_values=None, filter_column=None, reload_data=False): + + self.workflow.n_clusters = self.n_clusters if reload_data: return self.workflow.cluster() else: - return self.workflow.re_cluster(filter_column, filter_values, - new_figerprints=new_figerprints, - new_chembl_ids=new_chembl_ids, + return self.workflow.recluster(filter_column, filter_values, n_clusters=self.n_clusters) def recluster_selection(self, @@ -152,14 +155,16 @@ def recluster_selection(self, gradient_prop=None, north_stars=None, reload_data=False, + recluster_data=True, color_col='cluster'): - if filter_value is not None: - self.recluster(filter_values=filter_value, - filter_column=filter_column, - reload_data=reload_data) + df_embedding = self.workflow.df_embedding + if recluster_data: + df_embedding = self.recluster(filter_values=filter_value, + filter_column=filter_column, + reload_data=reload_data) - return self.create_graph(self.workflow.df_embedding, + return self.create_graph(df_embedding, color_col=color_col, gradient_prop=gradient_prop, north_stars=north_stars) @@ -178,6 +183,9 @@ def create_graph(self, df, color_col='cluster', north_stars=None, gradient_prop= moi_filter = ldf['id'].isin(moi_molregno) northstar_df = ldf[moi_filter] + if hasattr(ldf, 'compute'): + ldf = ldf.compute() + # Create a map with MoI and cluster to which they belong chemble_cluster_map = {} northstar_cluster = [] @@ -228,9 +236,6 @@ def create_graph(self, df, color_col='cluster', north_stars=None, gradient_prop= if self.workflow.is_gpu_enabled(): clusters = clusters.values_host - if self.cluster_colors is None or len(self.cluster_colors) != len(clusters): - self.cluster_colors = generate_colors(len(clusters)) - scatter_traces = [] for cluster_id in clusters: cdf = ldf.query('cluster == ' + str(cluster_id)) @@ -389,6 +394,7 @@ def constuct_layout(self): html.Div(children=[ dcc.Dropdown(id='sl_wf', multi=False, options=[{'label': 'Gpu KmeansUmap', 'value': 'nvidia.cheminformatics.wf.cluster.gpukmeansumap.GpuKmeansUmap'}, + {'label': 'GPU Random Projection - Single GPU', 'value': 'nvidia.cheminformatics.wf.cluster.gpurandomprojection.GpuWorkflowRandomProjection'}, {'label': 'Cpu KmeansUmap', 'value': 'nvidia.cheminformatics.wf.cluster.cpukmeansumap.CpuKmeansUmap'},], value='alogp', clearable=False), @@ -424,6 +430,7 @@ def constuct_layout(self): html.Div(children=[ dcc.Markdown(""" **Cluster Selection** + Click a point to select a cluster. """)], style={'marginTop': 18, 'marginLeft': 6}, @@ -484,8 +491,7 @@ def constuct_layout(self): style={"height": "25px"}) ], className='three columns', - style={ - 'paddingRight': 60, 'verticalAlign': 'text-bottom', 'text-align': 'right'} + style={'paddingRight': 60, 'verticalAlign': 'text-bottom', 'text-align': 'right'} ), ]), @@ -502,9 +508,30 @@ def constuct_layout(self): html.Div(id='hidden_northstar', style={'display': 'none'}), html.Div(id='north_star_clusterid_map', style={'display': 'none'}), html.Div(id='recluster_error'), - html.Div(id='mol_selection_error') + html.Div(id='mol_selection_error'), + html.Div(className='row', children=[ + dbc.Modal([ + dbc.ModalHeader("Error"), + dbc.ModalBody( + html.Div(id='error_msg', style={'color': 'red'}), + ), + dbc.ModalFooter( + dbc.Button("Close", id="bt_close_err", className="ml-auto") + ), + ], id="md_error"), + ]), ]) + def handle_error(self, recluster_error, bt_close_err): + comp_id, event_type = self._fetch_event_data() + + if comp_id == 'bt_close_err' and event_type == 'n_clicks': + return '', False + + if not recluster_error: + raise dash.exceptions.PreventUpdate + return recluster_error, True + @report_ui_error(5) def handle_molecule_selection(self, mf_selected_data, selected_columns, sl_prop_gradient, prev_click, next_click, @@ -555,9 +582,8 @@ def handle_data_selection(self, mf_click_data, mf_selected_data, if comp_id == 'main-figure' and event_type == 'clickData': # Event - On selecting cluster on the main scatter plot - if not curr_clusters: - clusters = [] - else: + clusters = [] + if curr_clusters: clusters = list(map(int, curr_clusters.split(","))) points = mf_click_data['points'] @@ -616,14 +642,21 @@ def handle_mark_north_star(self, bt_north_star_click, north_star, hidden_northst @report_ui_error(3) def handle_re_cluster(self, bt_cluster_clicks, bt_point_clicks, bt_north_star_clicks, - north_star_hidden, sl_prop_gradient, sl_nclusters, refresh_main_fig, + north_star_hidden, sl_prop_gradient, sl_nclusters, refresh_main_fig, selected_clusters, selected_points, north_star): - comp_id, event_type = self._fetch_event_data() - self.n_clusters = int(sl_nclusters) + if comp_id == 'sl_nclusters': + if sl_nclusters: + self.n_clusters = int(sl_nclusters) + self.cluster_colors = generate_colors(self.n_clusters) + + raise dash.exceptions.PreventUpdate + filter_values = None filter_column = None reload_data = False + recluster_data = True + error_msg = '' if selected_clusters and comp_id == 'bt_recluster_clusters' and event_type == 'n_clicks': filter_values = list(map(int, selected_clusters.split(","))) @@ -637,11 +670,18 @@ def handle_re_cluster(self, bt_cluster_clicks, bt_point_clicks, bt_north_star_cl filter_column = 'id' elif comp_id == 'bt_north_star' and event_type == 'n_clicks': - north_star_hidden = self.update_new_chembl(north_star) - if not north_star_hidden: - raise dash.exceptions.PreventUpdate + if north_star: + north_star = north_star.split(',') + missing_mols, molregnos, _ = self.workflow.add_molecules(north_star) + recluster_data = len(missing_mols) > 0 + + north_star_hidden = " ,".join(list(map(str, molregnos))) + else: + north_star_hidden = '' + recluster_data = False elif comp_id == 'hidden_northstar' and event_type == 'value': + recluster_data = False if not north_star_hidden: raise dash.exceptions.PreventUpdate @@ -654,42 +694,7 @@ def handle_re_cluster(self, bt_cluster_clicks, bt_point_clicks, bt_north_star_cl gradient_prop=sl_prop_gradient, north_stars=north_star_hidden, color_col='cluster', - reload_data = reload_data) - - return figure, ','.join(northstar_cluster), chembl_clusterid_map, '' - - def update_new_chembl(self, north_stars, radius=2, nBits=512): - north_stars = list(map(str.strip, north_stars.split(','))) - north_stars = list(map(str.upper, north_stars)) - molregnos = [row[0] for row in self.chem_data.fetch_molregno_by_chemblId(north_stars)] - - # TODO: Avoid using self.workflow.df_embedding - df = self.workflow.df_embedding - df['id_exists'] = df['id'].isin(molregnos) - - ldf = df.query('id_exists == True') - ldf = ldf.compute() - df.drop(['id_exists'], axis=1) - - missing_molregno = set(molregnos).difference(ldf['id'].to_array()) - # CHEMBL10307, CHEMBL103071, CHEMBL103072 - if missing_molregno: - missing_molregno = list(missing_molregno) - ldf = self.chem_data.fetch_props_df_by_molregno(missing_molregno) - - if ldf.shape[0] > 0: - smiles = [] - for i in range(0, ldf.shape[0]): - smiles.append( - ldf.iloc[i]['canonical_smiles'].to_array()[0]) - - morgan_fingerprint = MorganFingerprint() - results = list(morgan_fingerprint.transform_many(smiles)) - fingerprints = cupy.stack(results).astype(np.float32) - tdf = self.recluster(df, fingerprints, missing_molregno) - if tdf: - df = tdf - else: - return None + reload_data = reload_data, + recluster_data=recluster_data) - return " ,".join(list(map(str, molregnos))) + return figure, ','.join(northstar_cluster), chembl_clusterid_map, error_msg diff --git a/nvidia/cheminformatics/utils/dask.py b/nvidia/cheminformatics/utils/dask.py index dd788b8e..a6a98938 100644 --- a/nvidia/cheminformatics/utils/dask.py +++ b/nvidia/cheminformatics/utils/dask.py @@ -4,6 +4,7 @@ import rmm import cupy +import cudf from dask_cuda import initialize, LocalCUDACluster from dask.distributed import Client, LocalCluster @@ -13,9 +14,6 @@ def initialize_cluster(use_gpu=True, n_cpu=None, n_gpu=-1): - rmm.reinitialize(managed_memory=True) - cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) - enable_tcp_over_ucx = True enable_nvlink = False enable_infiniband = False @@ -23,29 +21,35 @@ def initialize_cluster(use_gpu=True, n_cpu=None, n_gpu=-1): logger.info('Starting dash cluster...') if use_gpu: initialize.initialize(create_cuda_context=True, - enable_tcp_over_ucx=enable_tcp_over_ucx, - enable_nvlink=enable_nvlink, - enable_infiniband=enable_infiniband) + enable_tcp_over_ucx=enable_tcp_over_ucx, + enable_nvlink=enable_nvlink, + enable_infiniband=enable_infiniband) if n_gpu == -1: - n_gpu = get_n_gpus() - 1 + n_gpu = get_n_gpus() - CUDA_VISIBLE_DEVICES = cuda_visible_devices(1, range(n_gpu)).split(',') - CUDA_VISIBLE_DEVICES = [int(x) for x in CUDA_VISIBLE_DEVICES] - # CUDA_VISIBLE_DEVICES = [0, 1] + device_list = cuda_visible_devices(1, range(n_gpu)).split(',') + CUDA_VISIBLE_DEVICES = [] + for device in device_list: + try: + CUDA_VISIBLE_DEVICES.append(int(device)) + except ValueError as vex: + logger.warn(vex) + CUDA_VISIBLE_DEVICES = [0, 1] logger.info('Using GPUs {} ...'.format(CUDA_VISIBLE_DEVICES)) cluster = LocalCUDACluster(protocol="ucx", - dashboard_address=':9001', - # TODO: automate visible device list + dashboard_address=':8787', CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES, enable_tcp_over_ucx=enable_tcp_over_ucx, enable_nvlink=enable_nvlink, enable_infiniband=enable_infiniband) else: logger.info('Using {} CPUs ...'.format(n_cpu)) - cluster = LocalCluster(dashboard_address=':9001', + cluster = LocalCluster(dashboard_address=':8787', n_workers=n_cpu, threads_per_worker=4) - return Client(cluster) + client = Client(cluster) + client.run(cupy.cuda.set_allocator) + return client diff --git a/nvidia/cheminformatics/utils/logger.py b/nvidia/cheminformatics/utils/logger.py index fac137cf..d931a96e 100644 --- a/nvidia/cheminformatics/utils/logger.py +++ b/nvidia/cheminformatics/utils/logger.py @@ -37,6 +37,7 @@ def initialize_logfile(benchmark_file=BENCHMARK_FILE): fh.write(f'# {config_message}\n') fh.write('date,benchmark_type,step,time(hh:mm:ss.ms),n_molecules,n_workers,metric_name,metric_value\n') + class MetricsLogger(object): def __init__(self, @@ -81,7 +82,16 @@ def __exit__(self, type, value, traceback): metric_value=self.metric_value, benchmark_file=context.benchmark_file) -def log_results(date, benchmark_type, step, time, n_molecules, n_workers, metric_name='', metric_value='', benchmark_file=BENCHMARK_FILE): + +def log_results(date, + benchmark_type, + step, + time, + n_molecules, + n_workers, + metric_name='', + metric_value='', + benchmark_file=BENCHMARK_FILE): """Log benchmark results to a file""" out_list = [date, benchmark_type, step, time, n_molecules, n_workers, metric_name, metric_value] diff --git a/nvidia/cheminformatics/utils/plot_benchmark_results.py b/nvidia/cheminformatics/utils/plot_benchmark_results.py index 619054fb..caadd9fc 100644 --- a/nvidia/cheminformatics/utils/plot_benchmark_results.py +++ b/nvidia/cheminformatics/utils/plot_benchmark_results.py @@ -15,6 +15,7 @@ # limitations under the License. import os +import logging import argparse import pandas as pd import numpy as np @@ -22,6 +23,10 @@ import matplotlib import sys + +logger = logging.getLogger(__name__) + + # defaults to categorize steps for sorting STEP_TYPE_DICT = {'dim_reduction': ['pca', 'svd'], 'clustering': ['kmeans'], @@ -55,6 +60,7 @@ def parse_args(): def prepare_benchmark_df(benchmark_file, step_type_dict=STEP_TYPE_DICT, step_type_cat=STEP_TYPE_CAT): """Read and prepare the benchmark data as a dataframe""" + logger.info('Processing %s...', benchmark_file) # Load and format data with open(benchmark_file, 'r') as fh: machine_config = pd.Series({'Machine Config': fh.readlines()[0].replace('#', '').strip()}) @@ -133,7 +139,7 @@ def prepare_acceleration_stacked_plot(df, machine_config, output_path, palette=N if (n_rows, n_cols) == (2, 1): n_rows, n_cols = n_cols, n_rows - + fig, axList = plt.subplots(nrows=n_rows, ncols=n_cols) fig.set_size_inches(6 * n_cols, 6 * n_rows) @@ -152,10 +158,10 @@ def prepare_acceleration_stacked_plot(df, machine_config, output_path, palette=N for ax, (n_molecules, dat) in zip(axList, df_plot.iterrows()): dat.plot(kind='bar', ax=ax, color=palette, width=bar_width) - + bars = [rect for rect in ax.get_children() if isinstance(rect, matplotlib.patches.Rectangle)] indexes = [tuple([n_molecules] + list(x)) for x in dat.index.to_list()] - + # Assemble index and label bars for bar, index in zip(bars, indexes): total = df.loc[index, ('stats', 'total')] @@ -168,7 +174,7 @@ def prepare_acceleration_stacked_plot(df, machine_config, output_path, palette=N xpos = bar.get_x() + (bar.get_width() / 2.0) ax.text(xpos, ypos, label, horizontalalignment='center', verticalalignment='bottom') - + xticklabels = [f'{x[1]} CPU cores' if x[0] == 'CPU' else f'{x[1]} GPU(s)' for x in dat.index.to_list()] ax.set_xticklabels(xticklabels, rotation=25) ax.set(title=f'{n_molecules:,} Molecules', xlabel='') @@ -187,6 +193,6 @@ def prepare_acceleration_stacked_plot(df, machine_config, output_path, palette=N args = parse_args() # Read and prepare the dataframe then plot - bench_df, machine_config = prepare_benchmark_df(benchmark_file=args.benchmark_file, step_type_dict=STEP_TYPE_DICT, + bench_df, machine_config = prepare_benchmark_df(benchmark_file=args.benchmark_file, step_type_dict=STEP_TYPE_DICT, step_type_cat=STEP_TYPE_CAT) prepare_acceleration_stacked_plot(bench_df, machine_config, output_path=args.output_path) diff --git a/nvidia/cheminformatics/wf/cluster/__init__.py b/nvidia/cheminformatics/wf/cluster/__init__.py index ef3a7f45..c59d1ee8 100644 --- a/nvidia/cheminformatics/wf/cluster/__init__.py +++ b/nvidia/cheminformatics/wf/cluster/__init__.py @@ -1,5 +1,51 @@ +import cupy +import numpy +from typing import List + +from cuml.metrics import pairwise_distances + +from nvidia.cheminformatics.data.helper.chembldata import ADDITIONAL_FEILD, IMP_PROPS +from nvidia.cheminformatics.utils.distance import tanimoto_calculate +from nvidia.cheminformatics.utils.metrics import batched_silhouette_scores, spearmanr + + class BaseClusterWorkflow: + def _remove_ui_columns(self, embedding): + for col in ['x', 'y', 'cluster', 'filter_col', 'index', 'molregno']: + if col in embedding.columns: + embedding = embedding.drop([col], axis=1) + + return embedding + + def _remove_non_numerics(self, embedding): + embedding = self._remove_ui_columns(embedding) + + other_props = ['id'] + IMP_PROPS + ADDITIONAL_FEILD + # Tempraryly store columns not required during processesing + prop_series = {} + for col in other_props: + if col in embedding.columns: + prop_series[col] = embedding[col] + if len(prop_series) > 0: + embedding = embedding.drop(other_props, axis=1) + + return embedding, prop_series + + + def _compute_spearman_rho(self, embedding, X_train, Xt): + n_indexes = min(self.n_spearman, X_train.shape[0]) + numpy.random.seed(self.seed) + indexes = numpy.random.choice(numpy.array(range(X_train.shape[0])), + size=n_indexes, + replace=False) + fp_sample = embedding.compute().values[indexes] + Xt_sample = Xt.compute().values[indexes] + + dist_array_tani = tanimoto_calculate(fp_sample, calc_distance=True) + dist_array_eucl = pairwise_distances(Xt_sample) + return cupy.nanmean(spearmanr(dist_array_tani, dist_array_eucl, top_k=100)) + def is_gpu_enabled(self): return True @@ -7,19 +53,28 @@ def cluster(self, embedding): """ Runs clustering workflow on the data fetched from database/cache. """ - pass + NotImplemented - def recluster(self, new_df=None, new_molecules=None): + def recluster(self, + filter_column=None, + filter_values=None, + n_clusters=None): """ Runs reclustering on original dataframe or on the new dataframe passed. The new dataframe is usually a subset of the original dataframe. Caller may ask to include additional molecules. """ - pass + NotImplemented + + def add_molecules(self, chemblids:List): + """ + ChembleId's accepted as argument to the existing database. Duplicates + must be ignored. + """ + NotImplemented def compute_qa_matric(self): """ Collects all quality matrix and log. """ - pass - + NotImplemented diff --git a/nvidia/cheminformatics/wf/cluster/cpukmeansumap.py b/nvidia/cheminformatics/wf/cluster/cpukmeansumap.py index 6ffb668b..dc2413ad 100644 --- a/nvidia/cheminformatics/wf/cluster/cpukmeansumap.py +++ b/nvidia/cheminformatics/wf/cluster/cpukmeansumap.py @@ -69,10 +69,9 @@ def _compute_spearman_rho(self, mol_df, X_train): return cupy.nanmean(spearmanr(dist_array_tani, dist_array_eucl, top_k=100)) def cluster(self, - df_molecular_embedding=None, - cache_directory=None): - + df_molecular_embedding=None): logger.info("Executing CPU workflow...") + cache_directory = Context().cache_directory if df_molecular_embedding is None: self.n_molecules = Context().n_molecule diff --git a/nvidia/cheminformatics/wf/cluster/gpukmeansumap.py b/nvidia/cheminformatics/wf/cluster/gpukmeansumap.py index 1150bece..966dc569 100644 --- a/nvidia/cheminformatics/wf/cluster/gpukmeansumap.py +++ b/nvidia/cheminformatics/wf/cluster/gpukmeansumap.py @@ -15,34 +15,32 @@ # limitations under the License. import logging -from nvidia.cheminformatics.data.helper.chembldata import ADDITIONAL_FEILD, IMP_PROPS from . import BaseClusterWorkflow from nvidia.cheminformatics.utils.singleton import Singleton from nvidia.cheminformatics.config import Context -from nvidia.cheminformatics.utils.metrics import batched_silhouette_scores, spearman_rho -from nvidia.cheminformatics.utils.distance import tanimoto_calculate +from nvidia.cheminformatics.utils.metrics import batched_silhouette_scores from nvidia.cheminformatics.data import ClusterWfDAO from nvidia.cheminformatics.data.cluster_wf import ChemblClusterWfDao from nvidia.cheminformatics.utils.logger import MetricsLogger -import numpy - import cudf -import cupy import dask import dask_cudf from functools import singledispatch +from typing import List from cuml.manifold import UMAP as cuUMAP from cuml.dask.decomposition import PCA as cuDaskPCA from cuml.dask.cluster import KMeans as cuDaskKMeans from cuml.dask.manifold import UMAP as cuDaskUMAP -from cuml.metrics import pairwise_distances logger = logging.getLogger(__name__) +MIN_RECLUSTER_SIZE = 200 + + @singledispatch def _gpu_cluster_wrapper(embedding, n_pca, self): return NotImplemented @@ -62,7 +60,7 @@ def _(embedding, n_pca, self): @_gpu_cluster_wrapper.register(cudf.DataFrame) def _(embedding, n_pca, self): embedding = dask_cudf.from_cudf(embedding, - int(chunksize=embedding.shape * 0.1)) + chunksize=int(embedding.shape[0] * 0.1)) return self._cluster(embedding, n_pca) @@ -78,25 +76,13 @@ def __init__(self, self.dao = dao self.n_molecules = n_molecules self.pca_comps = pca_comps + self.pca = None self.n_clusters = n_clusters self.df_embedding = None self.seed = seed self.n_spearman = 5000 - def _compute_spearman_rho(self, embedding, X_train, Xt): - n_indexes = min(self.n_spearman, X_train.shape[0]) - numpy.random.seed(self.seed) - indexes = numpy.random.choice(numpy.array(range(X_train.shape[0])), - size=n_indexes, - replace=False) - fp_sample = cupy.fromDlpack(embedding.compute().to_dlpack())[indexes] - Xt_sample = cupy.fromDlpack(Xt.compute().to_dlpack())[indexes] - - dist_array_tani = tanimoto_calculate(fp_sample, calc_distance=True) - dist_array_eucl = pairwise_distances(Xt_sample) - return cupy.nanmean(spearmanr(dist_array_tani, dist_array_eucl, top_k=100)) - def _cluster(self, embedding, n_pca): """ Generates UMAP transformation on Kmeans labels generated from @@ -105,30 +91,24 @@ def _cluster(self, embedding, n_pca): dask_client = Context().dask_client embedding = embedding.reset_index() + n_molecules = embedding.compute().shape[0] # Before reclustering remove all columns that may interfere - ids = embedding['id'] - for col in ['x', 'y', 'cluster', 'id', 'filter_col', 'index', 'molregno']: - if col in embedding.columns: - embedding = embedding.drop([col], axis=1) - - other_props = IMP_PROPS + ADDITIONAL_FEILD - # Tempraryly store columns not required during processesing - prop_series = {} - for col in other_props: - if col in embedding.columns: - prop_series[col] = embedding[col] - if len(prop_series) > 0: - embedding = embedding.drop(other_props, axis=1) - - if n_pca: - with MetricsLogger('pca', self.n_molecules) as ml: - pca = cuDaskPCA(client=dask_client, n_components=n_pca) - embedding = pca.fit_transform(embedding) - - with MetricsLogger('kmeans', self.n_molecules) as ml: + embedding, prop_series = self._remove_non_numerics(embedding) + + if n_pca and embedding.shape[1] > n_pca: + with MetricsLogger('pca', n_molecules) as ml: + if self.pca == None: + self.pca = cuDaskPCA(client=dask_client, n_components=n_pca) + self.pca.fit(embedding) + embedding = self.pca.transform(embedding) + + with MetricsLogger('kmeans', n_molecules) as ml: + if n_molecules < MIN_RECLUSTER_SIZE: + raise Exception('Reclustering less then %d molecules is not supported.' % MIN_RECLUSTER_SIZE) + kmeans_cuml = cuDaskKMeans(client=dask_client, - n_clusters=self.n_clusters) + n_clusters=self.n_clusters) kmeans_cuml.fit(embedding) kmeans_labels = kmeans_cuml.predict(embedding) @@ -137,7 +117,7 @@ def _cluster(self, embedding, n_pca): ml.metric_func_args = (embedding, kmeans_labels) ml.metric_func_kwargs = {'on_gpu': True, 'seed': self.seed} - with MetricsLogger('umap', self.n_molecules) as ml: + with MetricsLogger('umap', n_molecules) as ml: X_train = embedding.compute() local_model = cuUMAP() @@ -160,10 +140,9 @@ def _cluster(self, embedding, n_pca): embedding['cluster'] = kmeans_labels embedding['x'] = Xt[0] embedding['y'] = Xt[1] - embedding['id'] = ids # Add back the prop columns - for col in other_props: + for col in prop_series.keys(): embedding[col] = prop_series[col] return embedding @@ -175,24 +154,21 @@ def cluster(self, df_mol_embedding=None): if df_mol_embedding is None: self.n_molecules = Context().n_molecule - cache_directory = Context().cache_directory - df_mol_embedding = self.dao.fetch_molecular_embedding( self.n_molecules, - cache_directory=cache_directory) + cache_directory=Context().cache_directory) df_mol_embedding = df_mol_embedding.persist() - self.n_molecules = df_mol_embedding.compute().shape[0] self.df_embedding = _gpu_cluster_wrapper(df_mol_embedding, self.pca_comps, self) return self.df_embedding - def re_cluster(self, filter_column, filter_values, - new_figerprints=None, - new_chembl_ids=None, - n_clusters=None): + def recluster(self, + filter_column=None, + filter_values=None, + n_clusters=None): df_embedding = self.df_embedding if filter_values is not None: @@ -207,3 +183,36 @@ def re_cluster(self, filter_column, filter_values, self.df_embedding = _gpu_cluster_wrapper(df_embedding, None, self) return self.df_embedding + + def add_molecules(self, chemblids:List): + + chem_mol_map = {row[0]: row[1] for row in self.dao.fetch_id_from_chembl(chemblids)} + molregnos = list(chem_mol_map.keys()) + + self.df_embedding['id_exists'] = self.df_embedding['id'].isin(molregnos) + + ldf = self.df_embedding.query('id_exists == True') + if hasattr(ldf, 'compute'): + ldf = ldf.compute() + + self.df_embedding = self.df_embedding.drop(['id_exists'], axis=1) + missing_mol = set(molregnos).difference(ldf['id'].to_array()) + + chem_mol_map = {id: chem_mol_map[id] for id in missing_mol} + + missing_molregno = chem_mol_map.keys() + if self.pca and len(missing_molregno) > 0: + new_fingerprints = self.dao.fetch_molecular_embedding_by_id(missing_molregno) + new_fingerprints, prop_series = self._remove_non_numerics(new_fingerprints) + new_fingerprints = self.pca.transform(new_fingerprints) + + # Add back the prop columns + for col in prop_series.keys(): + new_fingerprints[col] = prop_series[col] + + self.df_embedding = self._remove_ui_columns(self.df_embedding) + + # TODO: Should we maintain the original PCA result for use here + self.df_embedding = self.df_embedding.append(new_fingerprints) + + return chem_mol_map, molregnos, self.df_embedding diff --git a/nvidia/cheminformatics/wf/cluster/gpurandomprojection.py b/nvidia/cheminformatics/wf/cluster/gpurandomprojection.py new file mode 100644 index 00000000..630e199b --- /dev/null +++ b/nvidia/cheminformatics/wf/cluster/gpurandomprojection.py @@ -0,0 +1,185 @@ +#!/opt/conda/envs/rapids/bin/python3 +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from functools import singledispatch +from nvidia.cheminformatics.utils.singleton import Singleton +from nvidia.cheminformatics.wf.cluster import BaseClusterWorkflow +from typing import List + +import cupy +import cudf +import dask +import pandas +import dask_cudf + +from cuml import SparseRandomProjection, KMeans + +from nvidia.cheminformatics.utils.logger import MetricsLogger +from nvidia.cheminformatics.data import ClusterWfDAO +from nvidia.cheminformatics.data.cluster_wf import ChemblClusterWfDao +from nvidia.cheminformatics.config import Context +from nvidia.cheminformatics.utils.metrics import batched_silhouette_scores + + +logger = logging.getLogger(__name__) + + +@singledispatch +def _gpu_random_proj_wrapper(embedding, self): + return NotImplemented + + +@_gpu_random_proj_wrapper.register(dask.dataframe.core.DataFrame) +def _(embedding, self): + logger.info('Converting from dask.dataframe.core.DataFrame...') + embedding = embedding.compute() + return _gpu_random_proj_wrapper(embedding, self) + + +@_gpu_random_proj_wrapper.register(dask_cudf.core.DataFrame) +def _(embedding, self): + logger.info('Converting from dask_cudf.core.DataFrame...') + embedding = embedding.compute() + return _gpu_random_proj_wrapper(embedding, self) + + +@_gpu_random_proj_wrapper.register(pandas.DataFrame) +def _(embedding, self): + logger.info('Converting from pandas.DataFrame...') + embedding = cudf.from_pandas(embedding) + return self._cluster(embedding) + + +@_gpu_random_proj_wrapper.register(cudf.DataFrame) +def _(embedding, self): + return self._cluster(embedding) + + +class GpuWorkflowRandomProjection(BaseClusterWorkflow, metaclass=Singleton): + + def __init__(self, + n_molecules: int = None, + dao: ClusterWfDAO = ChemblClusterWfDao(), + n_clusters=7, + seed=0): + + self.dao = dao + self.n_molecules = n_molecules + self.n_clusters = n_clusters + self.pca = None + self.seed = seed + self.srp_embedding = SparseRandomProjection(n_components=2) + + def rand_jitter(self, arr): + stdev = .023 * cupy.subtract(cupy.max(arr), cupy.min(arr)) + for i in range(arr.shape[1]): + rnd = cupy.multiply(cupy.random.randn(len(arr)), stdev) + arr[:, i] = cupy.add(arr[:, i], rnd) + + return arr + + def _cluster(self, embedding): + logger.info('Computing cluster...') + embedding = embedding.reset_index() + n_molecules = embedding.shape[0] + + # Before reclustering remove all columns that may interfere + embedding, prop_series = self._remove_non_numerics(embedding) + + with MetricsLogger('random_proj', n_molecules) as ml: + srp = self.srp_embedding.fit_transform(embedding.values) + + ml.metric_name = 'spearman_rho' + ml.metric_func = self._compute_spearman_rho + ml.metric_func_args = (embedding, embedding, srp) + + with MetricsLogger('kmeans', n_molecules) as ml: + kmeans_cuml = KMeans(n_clusters=self.n_clusters) + kmeans_cuml.fit(srp) + kmeans_labels = kmeans_cuml.predict(srp) + + ml.metric_name = 'silhouette_score' + ml.metric_func = batched_silhouette_scores + ml.metric_func_args = (srp, kmeans_labels) + ml.metric_func_kwargs = {'on_gpu': True, 'seed': self.seed} + + # Add back the column required for plotting and to correlating data + # between re-clustering + srp = self.rand_jitter(srp) + embedding['cluster'] = kmeans_labels + embedding['x'] = srp[:, 0] + embedding['y'] = srp[:, 1] + + # Add back the prop columns + for col in prop_series.keys(): + embedding[col] = prop_series[col] + + return embedding + + def cluster(self, df_mol_embedding=None): + logger.info("Executing GPU workflow...") + + if df_mol_embedding is None: + self.n_molecules = Context().n_molecule + + df_mol_embedding = self.dao.fetch_molecular_embedding( + self.n_molecules, + cache_directory=Context().cache_directory) + df_mol_embedding = df_mol_embedding.persist() + + self.df_embedding = _gpu_random_proj_wrapper(df_mol_embedding, self) + return self.df_embedding + + def recluster(self, + filter_column=None, + filter_values=None, + n_clusters=None): + + if filter_values is not None: + self.df_embedding['filter_col'] = self.df_embedding[filter_column].isin(filter_values) + self.df_embedding = self.df_embedding.query('filter_col == True') + + if n_clusters is not None: + self.n_clusters = n_clusters + + self.df_embedding = _gpu_random_proj_wrapper(self.df_embedding, self) + return self.df_embedding + + def add_molecules(self, chemblids:List): + + chem_mol_map = {row[0]: row[1] for row in self.dao.fetch_id_from_chembl(chemblids)} + molregnos = list(chem_mol_map.keys()) + + self.df_embedding['id_exists'] = self.df_embedding['id'].isin(molregnos) + + ldf = self.df_embedding.query('id_exists == True') + if hasattr(ldf, 'compute'): + ldf = ldf.compute() + + self.df_embedding = self.df_embedding.drop(['id_exists'], axis=1) + missing_mol = set(molregnos).difference(ldf['id'].to_array()) + + chem_mol_map = {id: chem_mol_map[id] for id in missing_mol} + missing_molregno = chem_mol_map.keys() + if len(missing_molregno) > 0: + new_fingerprints = self.dao.fetch_molecular_embedding_by_id(missing_molregno) + new_fingerprints = new_fingerprints.compute() + + self.df_embedding = self._remove_ui_columns(self.df_embedding) + self.df_embedding = self.df_embedding.append(new_fingerprints) + + return chem_mol_map, molregnos, self.df_embedding diff --git a/setup/cuchem_rapids_0.17.yml b/setup/cuchem_rapids_0.17.yml index cbc17c32..ede00e11 100644 --- a/setup/cuchem_rapids_0.17.yml +++ b/setup/cuchem_rapids_0.17.yml @@ -12,6 +12,8 @@ dependencies: - cudatoolkit=11.0 # Ensure version matches with base image in Dockerfile - rdkit==2020.09.1 - plotly==4.9.0 + - pytest==6.2.2 + - umap-learn==0.5.1 - pip - pip: - tensorflow-gpu==1.15.5 @@ -30,5 +32,4 @@ dependencies: - chembl_webresource_client==0.10.2 - dask_ml==1.8.0 - matplotlib==3.3.4 - - umap - git+https://github.com/jrwnter/cddd.git \ No newline at end of file diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index f09dbb09..83b12558 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -15,82 +15,92 @@ # limitations under the License. import pytest -import unittest import tempfile import os import shutil -import sys -import shlex import glob +import logging +from pydoc import locate import pandas as pd -# Define paths for the tests -_this_directory = os.path.dirname(os.path.realpath(__file__)) -_parent_directory = os.path.dirname(_this_directory) -_data_dir = os.path.join(_this_directory, 'data') +from tests.utils import _create_context +from nvidia.cheminformatics.utils.logger import initialize_logfile +from nvidia.cheminformatics.utils.plot_benchmark_results \ + import prepare_benchmark_df, prepare_acceleration_stacked_plot -sys.path.insert(0, _parent_directory) # TODO better way to add this directory to the path -from startdash import Launcher -from nvidia.cheminformatics.utils.plot_benchmark_results import prepare_benchmark_df, prepare_acceleration_stacked_plot -# Output directory -temp_dir = tempfile.mkdtemp() +logger = logging.getLogger(__name__) -# Parameter lists -run_benchmark_params = [ ([{'test_type': 'gpu', 'n_workers': 1, 'n_mol': -1}, - {'test_type': 'cpu', 'n_workers': 19, 'n_mol': -1}], _data_dir, temp_dir) ] -load_benchmark_params = [(temp_dir)] - - -@pytest.mark.parametrize('benchmark_config_list, data_dir, output_dir', run_benchmark_params) -def test_run_benchmark(benchmark_config_list, data_dir, output_dir): +# Parameter lists +run_benchmark_params = [ ([{'test_type': 'nvidia.cheminformatics.wf.cluster.gpukmeansumap.GpuKmeansUmap', + 'use_gpu': True, + 'n_workers': 1, + 'n_mol': 5000}, + {'test_type': 'nvidia.cheminformatics.wf.cluster.cpukmeansumap.CpuKmeansUmap', + 'use_gpu': False, + 'n_workers': 10, + 'n_mol': 5000}]) ] + +@pytest.mark.parametrize('benchmark_config_list', run_benchmark_params) +def test_run_benchmark(benchmark_config_list): + + output_dir = tempfile.tempdir output_file = os.path.join(output_dir, 'benchmark.csv') - if os.path.exists(output_file): - os.remove(output_file) + initialize_logfile(output_file) + max_n_mol = 0 for config in benchmark_config_list: test_type = config['test_type'] + use_gpu = config['use_gpu'] n_workers = config['n_workers'] n_mol = config['n_mol'] - - # Create run command and inject into sys.argv - command = f'startdash.py analyze -b --cache {data_dir} ' - if test_type == 'cpu': - command += f'--{test_type} ' - command += f'--n_{test_type} {n_workers} --n_mol {n_mol} --output_dir {output_dir}' + max_n_mol = max(max_n_mol, n_mol) + + context = _create_context(use_gpu=use_gpu, + n_workers=n_workers, + benchmark_file=output_file) + context.n_molecule = n_mol + context.cache_directory = None + context.is_benchmark = True + + wf_class = locate(test_type) + workflow = wf_class() - sys_argv = shlex.split(command) - with unittest.mock.patch('sys.argv', sys_argv): - Launcher() + workflow.cluster() + workflow.compute_qa_matric() + + context.dask_client.cluster.close() + context.dask_client.close() + context.dask_client = None # Filename is set in workflow -- move to create randomized name - temp_file = tempfile.NamedTemporaryFile(prefix='benchmark_', suffix='.csv', dir=output_dir, delete=False).name + temp_file = tempfile.NamedTemporaryFile(prefix='benchmark_', + suffix='.csv', + dir=output_dir, + delete=False).name shutil.move(output_file, temp_file) assert os.path.exists(temp_file) - + benchmark_results = pd.read_csv(temp_file, comment='#') + logger.info(benchmark_results) + nrows, ncols = benchmark_results.shape - assert nrows == len(benchmark_config_list) * 5 assert ncols == 8 + assert nrows >= len(benchmark_config_list) assert benchmark_results['n_molecules'].min() > 0 - - -@pytest.mark.parametrize('output_dir', load_benchmark_params) -def test_load_benchmarks(output_dir): - - csv_path = os.path.join(output_dir, 'benchmark_*.csv') - for benchmark_file in glob.glob(csv_path): - df, machine_config = prepare_benchmark_df(benchmark_file) - basename = os.path.splitext(benchmark_file)[0] - excel_file = basename + '.xlsx' - assert os.path.exists(excel_file) - md_file = basename + '.md' - assert os.path.exists(md_file) - - png_file = basename + '.png' - prepare_acceleration_stacked_plot(df, machine_config, output_path=png_file) - assert os.path.exists(png_file) + assert benchmark_results['n_molecules'].min() < max_n_mol + + df, machine_config = prepare_benchmark_df(temp_file) + basename = os.path.splitext(temp_file)[0] + excel_file = basename + '.xlsx' + assert os.path.exists(excel_file) + md_file = basename + '.md' + assert os.path.exists(md_file) + + png_file = basename + '.png' + prepare_acceleration_stacked_plot(df, machine_config, output_path=png_file) + assert os.path.exists(png_file) # TODO add test for metrics and values \ No newline at end of file diff --git a/tests/test_fp_cache.py b/tests/test_fp_cache.py new file mode 100644 index 00000000..b1886590 --- /dev/null +++ b/tests/test_fp_cache.py @@ -0,0 +1,35 @@ +import os +import logging +import tempfile +import dask + +from nvidia.cheminformatics.data.helper.chembldata import ChEmblData +from nvidia.cheminformatics.data.cluster_wf import FINGER_PRINT_FILES + +logger = logging.getLogger(__name__) + + +def test_cache(): + """ + Verify fetching data from chemblDB. + """ + num_recs = 1000 + + cache_dir = os.path.join(tempfile.mkdtemp()) + logger.info('Creating cache at %s' % cache_dir) + logger.info(type(cache_dir)) + + # Write to cache + chem_data = ChEmblData() + chem_data.save_fingerprints(os.path.join(cache_dir, FINGER_PRINT_FILES), + num_recs=num_recs) + + # Verify cache + hdf_path = os.path.join(cache_dir, FINGER_PRINT_FILES) + logger.info('Reading molecules from %s...' % hdf_path) + mol_df = dask.dataframe.read_hdf(hdf_path, 'fingerprints') + mol_df = mol_df.compute() + + logger.info('Expected %s rec found %s.', num_recs, mol_df.shape[0]) + assert mol_df.shape[0] <= num_recs, \ + ('Expected %d rec found %d.' % (num_recs, mol_df.shape[0])) \ No newline at end of file diff --git a/tests/test_workflow.py b/tests/test_workflow.py index ca7a5cd9..976d5d93 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -1,61 +1,87 @@ -import os -import dask +import cudf import logging -import tempfile -from nvidia.cheminformatics.data.cluster_wf import ChemblClusterWfDao + +from tests.utils import _fetch_chembl_test_dataset, _create_context + from nvidia.cheminformatics.wf.cluster.gpukmeansumap import GpuKmeansUmap -from nvidia.cheminformatics.utils.dask import initialize_cluster -from nvidia.cheminformatics.config import Context +from nvidia.cheminformatics.wf.cluster.gpurandomprojection import GpuWorkflowRandomProjection +from nvidia.cheminformatics.data.helper.chembldata import ChEmblData logger = logging.getLogger(__name__) -def _create_context(): - context = Context() - if context.dask_client is None: - context.dask_client = initialize_cluster() - context.is_benchmark = False - context.benchmark_file = os.path.join(tempfile.tempdir, 'benchmark.csv') - context.cache_directory=tempfile.tempdir +def test_random_proj(): + """ + Verify fetching data from chemblDB when the input is a pandas df. + """ + n_molecules, dao, mol_df = _fetch_chembl_test_dataset() - return context + context = _create_context() + wf = GpuWorkflowRandomProjection(n_molecules=n_molecules, + dao=dao) + wf.cluster(df_mol_embedding=mol_df) def test_gpukmeansumap_dask(): """ Verify fetching data from chemblDB when the input is a pandas df. """ - n_molecules = 1000 - - dao = ChemblClusterWfDao() - mol_df = dao.fetch_molecular_embedding(n_molecules=n_molecules) - assert isinstance(mol_df, dask.dataframe.core.DataFrame),\ - 'Incorrect data structure from DAO' + n_molecules, dao, mol_df = _fetch_chembl_test_dataset() context = _create_context() - with context.dask_client: - wf = GpuKmeansUmap(n_molecules=n_molecules, - dao=dao, pca_comps=64) - wf.cluster(df_mol_embedding=mol_df) + wf = GpuKmeansUmap(n_molecules=n_molecules, + dao=dao, pca_comps=64) + wf.cluster(df_mol_embedding=mol_df) def test_gpukmeansumap_cudf(): """ Verify fetching data from chemblDB when the input is a cudf df. """ - n_molecules = 1000 + context = _create_context() - dao = ChemblClusterWfDao() - mol_df = dao.fetch_molecular_embedding(n_molecules=n_molecules) - assert isinstance(mol_df, dask.dataframe.core.DataFrame),\ - 'Incorrect data structure from DAO' + n_molecules, dao, mol_df = _fetch_chembl_test_dataset() + wf = GpuKmeansUmap(n_molecules=n_molecules, + dao=dao, pca_comps=64) + mol_df = mol_df.compute() + wf.cluster(df_mol_embedding=mol_df) + +def test_add_molecule(): + """ + Verify fetching data from chemblDB when the input is a cudf df. + """ context = _create_context() - with context.dask_client: - wf = GpuKmeansUmap(n_molecules=n_molecules, - dao=dao, pca_comps=64) + n_molecules, dao, mol_df = _fetch_chembl_test_dataset() + + if hasattr(mol_df, 'compute'): mol_df = mol_df.compute() - wf.cluster(df_mol_embedding=mol_df) + mol_df = cudf.from_pandas(mol_df) + n_molecules = mol_df.shape[0] + + # test mol should container aviable and new molecules + test_mol = mol_df[n_molecules - 20:] + mols_tobe_added = test_mol['id'].to_array().tolist() + + chData = ChEmblData() + logger.info('Fetching ChEMBLLE id for %s', mols_tobe_added) + mols_tobe_added = [str(row[0]) for row in chData.fetch_chemblId_by_molregno(mols_tobe_added)] + logger.info('ChEMBL ids to be added %s', mols_tobe_added) + + # Molecules to be used for clustering + mol_df = mol_df[:n_molecules - 10] + + wf = GpuKmeansUmap(n_molecules=n_molecules, dao=dao, pca_comps=64) + wf.cluster(df_mol_embedding=mol_df) + + missing_mols, molregnos, df_embedding = wf.add_molecules(mols_tobe_added) + assert len(missing_mols) == 10, 'Expected 10 missing molecules found %d' % len(missing_mols) + + # TODO: Once the issue with add_molecule in multi-gpu env. is fixed, the + # number of missing_molregno found should be 0 + missing_mols, molregnos, df_embedding = wf.add_molecules(mols_tobe_added) + assert len(missing_mols) == 0, 'Expected no missing molecules found %d' % len(missing_mols) + # assert len(missing_mols) == 10, 'Expected 10 missing molecules found %d' % len(missing_mols) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 00000000..69592755 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,45 @@ +import os +import dask +import tempfile +import logging + +from nvidia.cheminformatics.data.cluster_wf import ChemblClusterWfDao +from nvidia.cheminformatics.utils.dask import initialize_cluster +from nvidia.cheminformatics.config import Context + + +logger = logging.getLogger(__name__) + + +def _fetch_chembl_test_dataset(n_molecules=None): + if n_molecules is None: + n_molecules = 1000 + + dao = ChemblClusterWfDao() + mol_df = dao.fetch_molecular_embedding(n_molecules=n_molecules) + assert isinstance(mol_df, dask.dataframe.core.DataFrame),\ + 'Incorrect data structure from DAO' + + return n_molecules, dao, mol_df + + +def _create_context(use_gpu=True, + n_workers=-1, + benchmark_file=None, + cache_directory=None): + context = Context() + if context.dask_client is None: + context.dask_client = initialize_cluster(use_gpu=use_gpu, + n_gpu=n_workers, + n_cpu=n_workers) + context.is_benchmark = False + + context.cache_directory = cache_directory + if cache_directory is None: + context.cache_directory = tempfile.tempdir + + context.benchmark_file = benchmark_file + if benchmark_file is None: + context.benchmark_file = os.path.join(tempfile.tempdir, 'benchmark.csv') + + return context