# SQL to MongoDB ETL pipeline: Extract, Transform, Load

## Import packages 

In [1]:
import psycopg2 as pg
import pandas as pd
import pandas.io.sql as pd_sql
import pandas as pd
import pickle
import re
import json
from pymongo import MongoClient
import sys
import numpy as np

## Create a login parameter file 

* Create an account on [AACT](https://aact.ctti-clinicaltrials.org/users/sign_up) to get access to the clinical trials database
* Create a login.txt file with the database and login credentials formatted as below, adding in your specific username and password:

`
{"host": "aact-db.ctti-clinicaltrials.org", "user": "username", "password": "password", "dbname": "aact", "port": 5432}`

* Save the login.txt file in the same directory as the python scripts/notebooks

## Create functions for the SQL to MongoDB ETL pipeline

### Cleaning functions: creating a dictionary (document) from a SQL record

* The SQL cursor will return an unlabeled tuple, instead of a dataframe with labeled columns like a standard query would return. So I need to index the tuple based on where the information I want is using the [AACT database schema](https://aact.ctti-clinicaltrials.org/static/documentation/aact_schema.png) 
* A different cleaning function will be specified for each SQL table queried from AACT
* Cleaning functions will be referenced later in the pipeline function

#### Eligibilities table cleaning function

In [2]:
def clean_eligibility_record(record):
    
    """Takes an AACT database read from an SQL cursor and produces a dictionary. 
    Removes new lines and extra spaces from eligibility criteria. 
    Returns a dictionary in document form to be sent to mongodb."""
    
    document = {}
    document['study_id'] = record[1]
    document['minimum_age'] = record[4]
    document['maximum_age'] = record[5]
    document['gender'] = record[3]
    document['keyword'] = record[-1]
    
    eligibility = record[8]
    eligibility = eligibility.replace('\n             ', ' ')  
    
    # Focus on records with 1 set of inclusion criteria and 1 set of exclusion criteria
    regex = '-\s\s(.+)\n\n'
    if eligibility.count('Inclusion') == 1 and eligibility.count('Exclusion') == 1:
        inclusion, exclusion = eligibility.split('Exclusion')
        clean_inclusion = re.findall(regex, inclusion)
        clean_exclusion = re.findall(regex, exclusion)
        # Currently allows empty list values to be passed to dictionary
        # Many records have numbered lists of criteria, so regex fails
        # These empty values are caught in the sql_to_mongo pipeline
        document['inclusion_criteria'] = clean_inclusion
        document['exclusion_criteria'] = clean_exclusion
    else:
        document['inclusion_criteria'] = None
        document['exclusion_criteria'] = None
        print(f"Skipped study {document['study_id']}: Inclusion/Exclusion set != 1\n")
    return document

###  Add document to MongoDB function

In [3]:
def send_to_mongodb(document, client_connection):
    
    """Takes a dictionary in document form and sends it to a pre-specified database
    and collection in mongodb. document is the document to enter into the database. 
    client_connection is an already opened connection with a MongoDB client pointing
    to a specified location."""
    
    mongo_result = client_connection.insert_one(document)
    
    if not mongo_result.acknowledged: 
        raise ValueError("Failed to add document to MongoDB. Check connection and document.")

###  SQL query to MongoDB pipeline function

In [4]:
def sql_to_mongo(query, cleaning_func, login, database, collection, check_size=None):
    
    """SQL to MongoDB pipeline. Retrieves single SQL record from a cursor, 
    converts it into a dictionary, and inputs that to MongoDB.
    query is a SQL query. cleaning_func is the cleaning function to use, depends on 
    the SQL table being queried.
    login is a text file with the login parameters for the SQL database. 
    database and collections are strings of MongoDB locations.
    check_size can be True or False."""
    
    print("Connecting to SQL database...\n")
    connection_args = json.load(open(login))
    connection = pg.connect(**connection_args)
    cursor = connection.cursor()
    print("Executing query...\n")
    cursor.execute(query)
    
    print("Connecting to Mongo database...\n")
    client = MongoClient()
    db = client[database]
    mongo_loc = db[collection]
    
    print("Cleaning data and sending to Mongo...\n")
    for result in cursor:
        document = cleaning_func(result)
        # add extra line to check which cleaning function - do different data check depending on that
        if document['inclusion_criteria']:
            send_to_mongodb(document, mongo_loc)
        else:
            print(f"Skipped study {document['study_id']}: formatted differently\n")
    
    if check_size:
        size_data = db.command("dbstats")
        print("------ Database Stats ------\n", 
              "\nNumber of collections:", size_data['collections'], 
              "\nNumber of objects:", size_data['objects'],
             "\nAverage object size:", size_data['avgObjSize'], 'bytes', 
             "\nData Size:", size_data['dataSize'], 'bytes', 
              "\nStorage Size:", size_data['storageSize'], 'bytes\n')
    print("Done.")
    connection.close()
    client.close()

## Query test for keywords 

In [22]:
# client = MongoClient()
# db = client['clinical_trials']
# mongo_loc = db['eligibilities']

# mongo_loc.update_many({}, {"$set" : {"keyword":null}})

<pymongo.results.UpdateResult at 0x10dfe3188>

In [30]:
import pymongo
pymongo.version

'3.6.1'

In [None]:
query = "SELECT nct_id, downcase_name FROM keywords;"

# query = "SELECT * FROM keywords LIMIT 10"

connection_args = json.load(open('login.txt'))
connection = pg.connect(**connection_args)
cursor = connection.cursor()
cursor.execute(query)

client = MongoClient()
db = client['clinical_trials']
mongo_loc = db['eligibilities']
i = 0

for result in cursor:
    i += 1
    if i % 5000 == 0:
        print('On study', i)
    nct_id = result[0]
    keyword = result[1]
    mongo_loc.update_one({"study_id": nct_id}, {"$set": {"keyword": keyword}})

In [46]:
result

('NCT02553811', 'diagnosis')

## Query test for recruitment status 

In [8]:
client = MongoClient()
db = client['clinical_trials']
mongo_loc = db['eligibilities']

mongo_loc.update_many({}, {"$set" : {"status": 'none'}})

<pymongo.results.UpdateResult at 0x10d012308>

In [9]:
query = "SELECT nct_id, overall_status FROM studies;"

# query = "SELECT * FROM keywords LIMIT 10"

connection_args = json.load(open('login.txt'))
connection = pg.connect(**connection_args)
cursor = connection.cursor()
cursor.execute(query)
i = 0

for status in cursor:
    i += 1
    if i % 5000 == 0:
        print('On study', i)
    nct_id = status[0]
    study_status = status[1]
    mongo_loc.update_one({"study_id": nct_id}, {"$set": {"status": study_status}})

On study 5000
On study 10000
On study 15000
On study 20000
On study 25000
On study 30000
On study 35000
On study 40000
On study 45000
On study 50000
On study 55000
On study 60000
On study 65000
On study 70000
On study 75000
On study 80000
On study 85000
On study 90000
On study 95000
On study 100000
On study 105000
On study 110000
On study 115000
On study 120000
On study 125000
On study 130000
On study 135000
On study 140000
On study 145000
On study 150000
On study 155000
On study 160000
On study 165000
On study 170000
On study 175000
On study 180000
On study 185000
On study 190000
On study 195000
On study 200000
On study 205000
On study 210000
On study 215000
On study 220000
On study 225000
On study 230000
On study 235000
On study 240000
On study 245000
On study 250000
On study 255000
On study 260000
On study 265000
On study 270000
On study 275000
On study 280000


## Test a query 

In [9]:
query = "SELECT * FROM eligibilities JOIN keywords ON eligibilities.nct_id = keywords.nct_id LIMIT 3;"

# query = "SELECT * FROM keywords LIMIT 10"

connection_args = json.load(open('login.txt'))
connection = pg.connect(**connection_args)
cursor = connection.cursor()
cursor.execute(query)

for result in cursor:
    print(result[-1])

social anxiety disorder
quetiapine xr
prevention


In [11]:
query = "SELECT * FROM keywords LIMIT 3;"
sql_to_mongo(query, clean_eligibility_record, 'login.txt', 'testdb', 'keywords')

Connecting to SQL database...

Executing query...

Connecting to Mongo database...

Cleaning data and sending to Mongo...



IndexError: tuple index out of range

## Estimate average size of one document to determine whether to store the full set of 280k trials on local machine or AWS

In [372]:
query = "SELECT * FROM eligibilities LIMIT 100;"
sql_to_mongo(query, clean_eligibility_record, 'login.txt', 'file_size', 'trials', check_size=True)

Connecting to SQL database...

Executing query...

Connecting to Mongo database...

Cleaning data and sending to Mongo...

Skipped study NCT00864357: formatted differently

Skipped study NCT00864890: formatted differently

Skipped study NCT00864877: formatted differently

Skipped study NCT00864825: formatted differently

Skipped study NCT00864747: formatted differently

Skipped study NCT00864604: formatted differently

Skipped study NCT00864552: Inclusion/Exclusion set != 1

Skipped study NCT00864552: formatted differently

Skipped study NCT00864526: formatted differently

Skipped study NCT00864487: Inclusion/Exclusion set != 1

Skipped study NCT00864487: formatted differently

Skipped study NCT00864435: formatted differently

Skipped study NCT00858208: formatted differently

Skipped study NCT00864331: Inclusion/Exclusion set != 1

Skipped study NCT00864331: formatted differently

Skipped study NCT00864318: formatted differently

Skipped study NCT00864279: formatted differently

Skippe

##  Get all eligibility data from the AACT clinical trials database

In [None]:
query = "SELECT * FROM eligibilities;"
sql_to_mongo(query, clean_eligibility_record, 'login.txt', 'clinical_trials', 'eligibilities', check_size=True)

# Next steps 

## Improving the pipeline

* Open a branch to work on including studies currently skipped due to eligibility formatting.
   Around ~30% of studies are currently excluded: 
> * some have numbered lists of criteria instead of bulleted lists and are not matched to the current regex for dashed bullets. Therefore sometimes document\['inclusion criteria'\] values are turning up False/empty and not sent to MongoDB
> * some have multiple sets of inclusion and exclusion criteria for different cohorts, treatments, control vs. treatment, etc.
> * some have the word 'inclusion' or 'exclusion' used more than once within a single set of critera
> * a small percent (<0.05%) have INCLUSION or EXCLUSION listed in caps instead of lower case (use a regex and then only split on the first instance of 'exclusion')
> * Some studies have exclusion criteria before inclusion criteria, so splitting on exclusion leads to no inclusion criteria  
* The below searches for case-insensitive values, but does worse overall because it excludes records that have a capital 'Inclusion' that denotes the inclusion criteria but a lower-case inclusion clarifying data - as well as the above issues

```python   
    regex_crit = '-\s\s(.+)\n\n'
    regex_inc = 'inclusion'
    regex_exc = 'exclusion'
    
    num_inc_matches = len(re.findall(regex_inc, eligibility, re.IGNORECASE))
    num_exc_matches = len(re.findall(regex_exc, eligibility, re.IGNORECASE))

    # if case-sensitive 'inclusion' appears more than once, it gets dropped
    if num_inc_matches == 1 and num_exc_matches == 1:
        inclusion, exclusion = re.split('Exclusion', eligibility, flags=re.IGNORECASE)
        clean_inclusion = re.findall(regex_crit, inclusion)
        clean_exclusion = re.findall(regex_crit, exclusion)
        document['inclusion_criteria'] = clean_inclusion
        document['exclusion_criteria'] = clean_exclusion
    # looking at studies even if they don't have exclusion criteria
    elif num_inc_matches == 1 and num_exc_matches == 0:
        clean_inclusion = re.findall(regex_crit, eligibility)
        document['inclusion_criteria'] = clean_inclusion
        document['exclusion_criteria'] = None
    else:
        document['inclusion_criteria'] = None
        document['exclusion_criteria'] = None
        print(f"Skipped study {document['study_id']}\n")
    return document
```

* Create a list of skipped studies so they can be returned and printed for the user at the end if requested

Querying the conditions categorization for each study is challenging. The categories aren't clearly defined; I would need to do topic modeling or keyword analysis on the categories as well to group them

In [2]:
query = "SELECT * FROM conditions LIMIT 20;"
connection_args = json.load(open('login.txt'))
connection = pg.connect(**connection_args)
cursor = connection.cursor()
cursor.execute(query)
for result in cursor:
    print(result)

(2447850, 'NCT01071824', 'Colon Cancer', 'colon cancer')
(2274109, 'NCT02495987', 'ICP', 'icp')
(2614334, 'NCT03608072', 'Healthy', 'healthy')
(2340732, 'NCT01940796', 'GVHD', 'gvhd')
(2300873, 'NCT02266394', 'Renovascular Disease', 'renovascular disease')
(2300903, 'NCT02266199', 'Pain', 'pain')
(2274296, 'NCT02494492', 'Uveitis', 'uveitis')
(2267381, 'NCT02554201', 'Dysuria', 'dysuria')
(2274399, 'NCT02493647', 'HIV', 'hiv')
(2301080, 'NCT02264665', 'Pancreatic Neuroendocrine Tumor, Well Differentiated and Progressive', 'pancreatic neuroendocrine tumor, well differentiated and progressive')
(2336121, 'NCT01977521', 'Mood Disorders', 'mood disorders')
(2301533, 'NCT02260466', 'Heart Disease', 'heart disease')
(2301534, 'NCT02260466', 'Aging', 'aging')
(2166734, 'NCT03364608', 'Asthma', 'asthma')
(2268033, 'NCT02548624', 'Pressure Ulcer', 'pressure ulcer')
(2301880, 'NCT02257489', 'Musculoskeletal Diseases', 'musculoskeletal diseases')
(2275161, 'NCT02486562', 'Multiple Sclerosis', 'mu

## Error checking and production aspects to add 

* One of the error messages is redundant - every study that gives a 'wrong number of inclusion/exclusion' will also give an 'improperly formatted' error
* Unit testing: check the output of the functions
* add Parameters separated by ------ and Returns followed by ----- to docstrings
* add comments to explain difficult to interpret blocks
* return skip_list in cleaning function so the user can view all the skipped records at the end of the pipeline - maybe return document, skip_list