# SQL to MongoDB ETL pipeline: Extract, Transform, Load

## Import packages 

In [86]:
import psycopg2 as pg
import pandas as pd
import pandas.io.sql as pd_sql
import pandas as pd
import pickle
import re
import json
from pymongo import MongoClient
import sys

## Create a login parameter file 

* Create an account on [AACT](https://aact.ctti-clinicaltrials.org/users/sign_up) to get access to the clinical trials database
* Create a login.txt file with the database and login credentials formatted as below, adding in your specific username and password:

`
{"host": "aact-db.ctti-clinicaltrials.org", "user": "username", "password": "password", "dbname": "aact", "port": 5432}`

* Save the login.txt file in the same directory as the python scripts/notebooks

## Create functions for the SQL to MongoDB ETL pipeline

### Cleaning functions: creating a dictionary (document) from a SQL record

* The SQL cursor will return an unlabeled tuple, instead of a dataframe with labeled columns like a standard query would return. So I need to index the tuple based on where the information I want is based on the [AACT database schema](https://aact.ctti-clinicaltrials.org/static/documentation/aact_schema.png) 
* A different cleaning function will be specified for each SQL table queried from AACT
* Cleaning functions will be referenced later in the pipeline function

#### Eligibilities table cleaning function

In [82]:
def clean_eligibility_record(record):
    
    """Takes an AACT database read from an SQL cursor and produces a dictionary. 
    Removes new lines and extra spaces from eligibility criteria. 
    Returns a dictionary in document form to be sent to mongodb."""
    
    document = {}
    document['study_id'] = record[1]
    document['minimum_age'] = record[4]
    document['maximum_age'] = record[5]
    document['gender'] = record[3]
    
    eligibility = record[8]
    eligibility = eligibility.replace('\n             ', ' ') 
    
    # need to test if there is 'Exclusion Criteria:' in the dataset
    # if there isn't Exclusion Criteria, don't have to split the eligibility
    # could add if/else, or just next the record if there isn't exlusion criteria
    
    inclusion, exclusion = eligibility.split('Exclusion Criteria:')
    regex = '-\s\s(.+)\n\n'
    clean_inclusion = re.findall(regex, inclusion)
    clean_exclusion = re.findall(regex, exclusion)
    document['inclusion_criteria'] = clean_inclusion
    document['exclusion_criteria'] = clean_exclusion
    return document

###  Add document to MongoDB function

In [83]:
def send_to_mongodb(document, client_connection):
    
    """Takes a dictionary in document form and sends it to a pre-specified database
    and collection in mongodb. document is the document to enter into the database. 
    client_connection is an already opened connection with a MongoDB client pointing
    to a specified location."""
    
    mongo_result = client_connection.insert_one(document)
    
    if not mongo_result.acknowledged: 
        raise ValueError("Failed to add document to MongoDB. Check connection and document.")

###  SQL query to MongoDB pipeline function

In [84]:
def sql_to_mongo(query, cleaning_func, login, database, collection):
    
    """SQL to MongoDB pipeline. Retrieves single SQL record from a cursor, 
    converts it into a dictionary, and inputs that to MongoDB.
    query is a SQL query. cleaning_func is the cleaning function to use.
    login is a text file with the login parameters for the SQL database. 
    database and collections are strings of MongoDB locations."""
    
    connection_args = json.load(open(login))
    connection = pg.connect(**connection_args)
    cursor = connection.cursor()
    cursor.execute(query)
    
    client = MongoClient()
    db = client[database]
    mongo_loc = db[collection]
    
    for result in cursor:
        document = cleaning_func(result)
        send_to_mongodb(document, mongo_loc)
        
    connection.close()
    client.close()

## Test a query 

In [85]:
query = "SELECT * FROM eligibilities LIMIT 3;"

sql_to_mongo(query, clean_eligibility_record, 'login.txt', 'testdb', 'trials')

## Estimate average size of one document to determine whether to store the full set of 280k trials on local machine or AWS

Below is broken: need to update cleaning function to deal with records that only have inclusion data. 

In [89]:
# query = "SELECT * FROM eligibilities LIMIT 100;"
# sql_to_mongo(query, clean_eligibility_record, 'login.txt', 'file_size', 'trials')

# pull 100 records - format as dictionaries, look at average size
# sys.getsizeof

## Additional error checking and production level aspects to add 

* Unit testing: check the output of the functions