In [1]:
import sys
sys.path.append('/code')

from database.models import (Protein, Organism, Classification, Molecule, Activity, ActivityType, Source, Quality, CID)

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
import os
import glob
from rdkit import Chem
from razi.rdkit_postgresql.functions import morganbv_fp

import pandas as pd
# import modin.pandas as pd
import numpy as np
import json

import multiprocessing
import gc
from tqdm import tqdm
from copy import copy
import numpy as np


def get_db_session():
    engine = create_engine(
        'postgresql://postgres:postgres@papyrusdb/papyrus', convert_unicode=True,
        pool_recycle=3600, pool_size=10)
    db_session = scoped_session(sessionmaker(
        autocommit=False, autoflush=False, bind=engine))
    
    return db_session


def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
#         created = False
        return instance
    else:
#         created = True
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        session.flush()
        session.refresh(instance)
        return instance
    
def get_or_instance(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        instance = model(**kwargs)
        return instance
    
def sanitize_and_split(row, keyval, length, spl=';'):
    split = [v.rstrip() for v in str(row[keyval]).split(spl)]
    if len(split)!= length:
        split = [split[0] for i in range(0,length)]
    
    split = [None if x == '' else x for x in split]
    
    return split

def sanitize_and_split(row, length, spl=';'):
    split = [v.rstrip() for v in str(row).split(spl)]
    if len(split)!= length:
        split = [split[0] for i in range(0,length)]
    
    split = [None if x == '' else x for x in split]
    
    return split


class TypeDecoder(json.JSONDecoder):
    """Custom json decoder to support types as values."""

    def __init__(self, *args, **kwargs):
        """Simple json decoder handling types as values."""
        json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)

    def object_hook(self, obj):
        """Handle types."""
        if '__type__' not in obj:
            return obj
        module = obj['__type__']['module']
        type_ = obj['__type__']['type']
        if module == 'builtins':
            return getattr(__builtins__, type_)
        loaded_module = importlib.import_module(module)
        return getattr(loaded_module, type_)
    


dtype_file = '../.data/papyrus/05.5/data_types.json'
activity_data = '../.data/papyrus/05.5/05.5_combined_set_without_stereochemistry.tsv.xz'
protein_data = '../.data/papyrus/05.5/05.5_combined_set_protein_targets.tsv.xz'


In [2]:
with open(dtype_file, 'r') as jsonfile:
        dtypes = json.load(jsonfile, cls=TypeDecoder)['papyrus']

In [None]:
protein_df = pd.read_csv(protein_data, sep='\t', dtype=dtypes)

In [None]:
organisms = list(set(protein_df['Organism']))
classifications = []
for cstr in protein_df['Classification']:
    classifications.extend(str(cstr).split('->'))

classes = list(set(classifications))

In [None]:
db_session = get_db_session()

rows = []

for i, row in protein_df.iterrows():
    organism = get_or_create(session=db_session, model=Organism, organism=row['Organism'])
    classifications_list = str(row['Classification']).split('->')
    classifications = [get_or_create(session=db_session, model=Classification, classification=c) for c in classifications_list]
    
    review_mapping = {'reviewed':1, 'Unreviewed':0, 'unreviewed':0}
    
    prot = Protein(
        target_id = row['target_id'],
        HGNC_symbol = str(row['HGNC_symbol']),
        uniprot_id = row['UniProtID'],
        reviewed = review_mapping[row['Status']],
        organism = organism.id,
        length = row['Length'],
        sequence = row['Sequence'], 
        classifications = classifications
    )
    
    rows.append(prot)
    
db_session.add_all(rows)
db_session.commit()
db_session.remove()
    

In [3]:
def process_activity_frame(df):
#     df_obj = df.select_dtypes(['object'])
    db_session = get_db_session()
    rows = []
    
    activity_type_map = {
        '1000':'IC50',
        '0100':'EC50',
        '0010':'KD',
        '0001':'Ki',
        '0000':'other',
    }

    print('processing frame')
    
    # do this bit in parallel?
    for row in tqdm(df.itertuples()):
        sources_list = row.source.split(';')
        cids_list = row.CID.split(';')
        sources_cids_list = list(zip(sources_list, cids_list))
        cids = [      
            get_or_create(session=db_session, 
                          model=CID, 
                          cid=c[1], 
                          source=get_or_create(session=db_session, 
                                               model=Source, 
                                               source=c[0]).source) for c in sources_cids_list]

        mol = Chem.MolFromSmiles(row.SMILES)
        fp = morganbv_fp(row.SMILES)

        # change this to use InChI and/or SMILES
        smiles = Chem.CanonSmiles(row.SMILES)
        molecule = get_or_instance(session=db_session,model=Molecule,smiles=smiles,inchi=row.InChI)
#         molecule = get_or_instance(session=db_session,model=Molecule,mol=mol)
        
        if molecule.cids != cids:
            molecule.cids=cids
            molecule.smiles=smiles
            molecule.mol=mol
#             molecule.smiles=row.SMILES
            molecule.inchi_key=row.InChIKey
            molecule.inchi=row.InChI
            molecule.inchi_auxinfo=row.InChI_AuxInfo
            molecule.fp=fp
            molecule.connectivity=row.connectivity
            db_session.add(molecule)
            db_session.commit()
            db_session.flush()
            db_session.refresh(molecule)

        quality = get_or_create(session=db_session, model=Quality, quality=row.Quality).id
        target_id = get_or_create(session=db_session, model=Protein, target_id=row.target_id).target_id
        molecule_id = molecule.id

        slice_list = []
        if ';' in str(row.pchembl_value):

            pchembl_values = [v.rstrip() for v in row.pchembl_value.split(';')]
            length = len(pchembl_values)

            aids = sanitize_and_split(row=row.AID,length=length)        
            doc_ids = sanitize_and_split(row=row.all_doc_ids,length=length)
            years = sanitize_and_split(row=row.all_years,length=length)
            type_IC50s = sanitize_and_split(row=row.type_IC50,length=length)         
            type_EC50s = sanitize_and_split(row=row.type_EC50,length=length)
            type_KDs = sanitize_and_split(row=row.type_KD,length=length)
            type_Kis = sanitize_and_split(row=row.type_Ki,length=length)

            for j in range(0, len(pchembl_values)):
                update_dict = {
                    'pchembl_value': pchembl_values[j],
                    'AID': aids[j],
                    'doc_id': doc_ids[j],
                    'Year': years[j],
                    'type_IC50': type_IC50s[j],
                    'type_EC50': type_EC50s[j],
                    'type_KD': type_KDs[j],
                    'type_Ki': type_Kis[j]
                }
                row_copy = copy(row._asdict())

                row_copy.update(update_dict)

                slice_list.append(row_copy)

        else:
            slice_list.append(row._asdict())

        for s in slice_list:

            a = f"{s['type_IC50']}{s['type_EC50']}{s['type_KD']}{s['type_Ki']}"
            activity_type_str = activity_type_map[a]

            activity_type = get_or_create(session=db_session, model=ActivityType, type=activity_type_str).id

            try:
                y = int(s['Year'])
            except:
                y = None

            activity = Activity(
                papyrus_activity_id=s['Activity_ID'],
                quality=quality,
                target_id=target_id,
                molecule_id = molecule_id,
                accession=s['accession'],
                protein_type=s['Protein_Type'],
                aid = s['AID'],
                doc_id = s['doc_id'],
                year = y,
                type = activity_type, 
                relation = s['relation'],
                pchembl_value = s['pchembl_value'],
                pchembl_value_mean = s['pchembl_value_Mean'],
                pchembl_value_stdev = s['pchembl_value_StdDev'],
                pchembl_value_SEM = s['pchembl_value_SEM'],
                pchembl_value_n = s['pchembl_value_N'],
                pchembl_value_median = s['pchembl_value_Median'],
                pchembl_value_mad = s['pchembl_value_MAD'],   
            )

            rows.append(activity)
            
    print('processing complete')
            
    del(df)
    db_session.add_all(rows)
    print('committing activities')
    db_session.commit()
    db_session.close()
    db_session.remove()
    gc.collect()
            
    return True


In [None]:
reader = pd.read_csv(activity_data, sep='\t', compression='xz', chunksize = 10000, iterator=True, dtype=dtypes)

# out_dir = '/tmp'
# if not os.path.isdir(out_dir):
#     os.mkdir(out_dir)

# print('writing files')
# count = 0
# # for df in reader:
# #     df.to_csv(f'{out_dir}/activity-chunk_{str(count)}.csv', index=False)
# #     count += 1
    
# cores = 5    
# chunked = glob.glob(f'{out_dir}/*.csv')

# option 1: run each chunked file of 10000 rows separatley
# for fn in chunked:
#     df = pd.read_csv(fn, sep='\t')
#     process_activity_frame(df)
    
# def run(fn):
#     reader = pd.read_csv(fn, chunksize = 10000, iterator=True, dtype=dtypes)
#     for df in reader:
#         process_activity_frame(df)
    
# # option 2: run files in parallel
# print('running inserts')
# with multiprocessing.Pool(processes=cores) as p:
#         with tqdm(total=len(chunked)) as pbar:
#             for _ in p.imap_unordered(run, [fn for fn in chunked]):
#                 pbar.update()

pool = multiprocessing.Pool(5) # use 4 processes

for df in reader:
    # process each data frame
    pool.apply_async(process_activity_frame,[df])
    


  engine = create_engine(


processing frame


  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  engine = create_engine(


processing frame


  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  engine = create_engine(


processing frame


  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  engine = create_engine(


processing frame


  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  engine = create_engine(


processing frame


  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
  instance = session.query(model).filter_by(**kwargs).first()
10000it [02:52, 57.86it/s]


processing complete


9705it [02:52, 53.87it/s]

committing activities


9824it [02:55, 39.58it/s]

processing frame


10000it [02:57, 56.45it/s]


processing complete


9656it [02:57, 38.64it/s]

committing activities


117it [00:02, 50.46it/s]]




10000it [02:57, 56.21it/s]


processing complete

123it [00:02, 46.91it/s]]

committing activities


10000it [02:59, 55.86it/s]


processing complete


9740it [02:59, 28.74it/s]

committing activities

159it [00:03, 27.78it/s]




183it [00:05, 17.71it/s]]

processing frame


275it [00:09, 35.29it/s]]

processing frame


10000it [03:08, 53.07it/s]


processing complete


94it [00:07,  6.06it/s]

committing activities


307it [00:28,  3.22it/s]

processing frame


63it [00:02, 33.30it/s]]

processing frame


4259it [03:39, 37.33it/s]