In [4]:
import re
import csv
import sqlite3
from pathlib import Path
import rdkit
from rdkit.Chem import Lipinski
from rdkit.Chem import Descriptors
from rdkit.Chem import Crippen
import pandas as pd

In [2]:
class DatabaseManager():
    def __init__(self, database_path: str):
        """
        Parameters
        ----------
        database_path: str
            Full path of existing SQLite database or one to be created automatically
        """

        # Store SQLite database path for reference
        self._database_path = database_path

        # Connect to and Create (if doesn't already exist) SQLite database
        self._conn = sqlite3.connect(database_path)

        # List of tables that need to be dropped to reset the SQLite database
        self._drop_order = ('assays', 'compounds',)


    def get_conn(self):
        """
        get_conn returns a connection to the SQLite database self._database_path

        Returns
        -------
            SQLite connection object
        """
        return self._conn
    
    def drop_all(self):
        """
        drop_all drops all tables created by this class to reset the SQLite database
        """

        # Get connection to SQLite database
        conn = self.get_conn()

        # Drop all tables in dependency order
        for table_name in self._drop_order:
            conn.execute('DROP TABLE IF EXISTS ' + table_name)


    def create(self):
        """
        create - creates all tables required by this class in the SQLite database
        """

        # Get connection to SQLite database
        conn = self.get_conn()

        # Create a table to store all COVID Moonshot assay data
        conn.execute('''
CREATE TABLE assays
(
    CID VARCHAR(20) PRIMARY KEY,
    r_avg_IC50 DECIMAL,
    f_avg_IC50 DECIMAL,
    trypsin_IC50 DECIMAL,
    acrylamide VARCHAR(5),
    chloroacetamide VARCHAR(5),
    series VARCHAR(30),
    frag_id VARCHAR(6),
    FOREIGN KEY(CID) REFERENCES compounds(CID) 
)
        ''')

        # Create a table to store all COVID Moonshot compound submissions
        conn.execute('''
CREATE TABLE compounds
(
    CID VARCHAR(20) PRIMARY KEY,
    smiles VARCHAR(250) not null,
    NumHDonors REAL,
    NumHAcceptors REAL,
    ExactMolWt REAL,
    MolLogP REAL
)
        ''')


    def populate_compounds_table(self, all_data_file: Path):
        """
        Populate the table compounds by reading out all of the unique submissions from $all_data_file and 
        adding features for Lipinski's rule of 5
        """

        compounds_list = []
        with open(all_data_file, mode = 'r') as csv_file:
            csv_reader = csv.DictReader(csv_file)
            for row in csv_reader:
                compound_id = row["CID"]
                smiles = row["SMILES"]
                mol = rdkit.Chem.MolFromSmiles(smiles)
                NumHDonors = Descriptors.NumHDonors(mol)
                NumHAcceptors = Descriptors.NumHAcceptors(mol)
                ExactMolWt = Descriptors.ExactMolWt(mol)
                MolLogP = Descriptors.MolLogP(mol)
                comp_tuple = (compound_id, smiles, NumHDonors,
                                NumHAcceptors, ExactMolWt,
                                MolLogP)
                if comp_tuple not in compounds_list:
                    compounds_list.append(comp_tuple)

        conn = self.get_conn()
        conn.executemany('''INSERT INTO compounds (CID, SMILES, NumHDonors,
                                NumHAcceptors, ExactMolWt,
                                MolLogP) VALUES(?,?,?,?,?,?)''', compounds_list)

    def populate_assays_table(self, all_data_file: Path):
        """
        Populate the table assays by reading out all of the unique submissions from $all_data_file
        """

        assays_list = []
        with open(all_data_file, mode = 'r') as csv_file:
            csv_reader = csv.DictReader(csv_file)
            for row in csv_reader:
                compound_id = row["CID"]
                r_avg_IC50 = row["r_avg_IC50"]
                f_avg_IC50 = row["f_avg_IC50"]
                trypsin_IC50 = row["trypsin_IC50"]
                acrylamide = row["acrylamide"]
                chloroacetamide = row["chloroacetamide"]
                series = row["series"]
                frag_id = row["frag_id"]
                assay_tuple = (compound_id,
                                r_avg_IC50,
                                f_avg_IC50,
                                trypsin_IC50,
                                acrylamide,
                                chloroacetamide,
                                series,
                                frag_id)
                if assay_tuple not in assays_list:
                    assays_list.append(assay_tuple)

        conn = self.get_conn()
        conn.executemany('INSERT INTO assays (CID, r_avg_IC50, f_avg_IC50,trypsin_IC50,acrylamide,chloroacetamide,series,frag_id) VALUES(?,?,?,?,?,?,?,?)', assays_list)

    def lipinski_compounds(self):
        """
        Extract all compounds compliant with the Lipinski rule of 5 from the DB
        """
        conn = self.get_conn()

        cur = conn.execute('''
            SELECT c.*,
            a.r_avg_IC50, a.f_avg_IC50, a.trypsin_IC50, a.acrylamide, a.chloroacetamide, a.series, a.frag_id
            FROM
            compounds c LEFT JOIN assays a
            ON
            c.CID = a.CID
            WHERE
            c.NumHDonors <= 5 AND c.NumHAcceptors <= 10 AND c.ExactMolWt < 500 AND c.MolLogP < 5
            
        ''')
        
        # Print the outputs to check query works as intended
        # print("Lipinski compounds:\n")
        # for row in cur:
        #     print(row)

        # Return Lipinski compounds as list
        lipinski_comps = pd.DataFrame(cur.fetchall(), columns=['CID', 'smiles','NumHDonors','NumHAcceptors',
                                        'ExactMolWt', 'MolLogP', 'r_avg_IC50', 'f_avg_IC50', 'trypsin_IC50',
                                        'acrylamide', 'chloroacetamide', 'series', 'frag_id'])


    def print_table_tops(self):
        """
        REMOVE LATER - Print the first 15 rows of both tables created to test they have been created correctly - REMOVE LATER
        """
        conn = self.get_conn()
        cur_comps = conn.execute('''
            SELECT * FROM compounds LIMIT 15
        ''')

        for row in cur_comps:
            print(row)

        print("\n")

        cur_assays = conn.execute('''
            SELECT * FROM assays LIMIT 15
        ''')

        for row in cur_assays:
            print(row)


In [3]:
if __name__ == '__main__':
    all_data_file = Path('activity_data.csv')

    manager = DatabaseManager(database_path='activity_data.db')
    manager.drop_all()
    manager.create()
    manager.populate_compounds_table(all_data_file=all_data_file)
    manager.populate_assays_table(all_data_file=all_data_file)

    # print_table_tops shows first 15 rows of each table - can be used to check tables are generated correctly
    # manager.print_table_tops()

    lipinski_comps = manager.lipinski_compounds()

    manager.get_conn().commit()