# Functions for data pipline 

## Import Statements

In [None]:
import sqlalchemy
import numpy as np
import pandas as pd
import psycopg2
import seaborn as sns
import yaml

import matplotlib.pyplot as plt
%matplotlib inline

# Functions from Yoni (to be replaced with calls to util functions)

In [None]:
# List of functions

def open_config_file():
    #Read in config file
    with open("../conf/local/db.yaml") as f:
        conf=yaml.safe_load(f)
    return conf

def create_connection(conf):
    # set up sqlalchemy engine
    host = conf['host']
    DB = conf['DB']
    user = conf['user']
    pw = conf['pw']
    connection_string = "postgresql://{}:{}@{}/{}".format(user, pw, host, DB)
    conn = sqlalchemy.create_engine(connection_string)
    return conn

def get_outlier_thresholds(boxplot):
    outlier_min, outlier_max = [item.get_ydata()[0] for item in boxplot['caps']]
    return outlier_min, outlier_max

# Functions to merge

In [None]:
def get_sql_table(table_name, connection):
    '''
    Fetches a sql table from the postgres database
    :param table_name: name of postgres database to return
    :param connection: sql alchemy engine connection 
    :return: a pandas dataframe of the sql table
    '''
    query = ("""
    select * 
    from {};
    """).format(table_name)

    sql_table = pd.read_sql(query, connection)
    return sql_table 

In [None]:
def replace_all_in_column(df_column, what_to_replace, replace_with):
    '''
    replaces all dataframe column values with a standard value
    :param df_column: a one column dataframe
    :param what_to_replace: value you would like replaced eg. ''
    :param replace_with: value you would like to replace with eg. '1'
    :return replaced_df_column: a pandas dataframe column with replaced values
    '''
    replaced_df_column = df_column.replace(what_to_replace, replace_with)
    return replaced_df_column

In [None]:
def convert_column(df_column, type_to_convert_to):
    '''
    Converts a dataframe column to the specified type
    :param df_column: a one column dataframe
    :param type_to_convert_to: datatype, eg 'int', 'float64'
    :return converted_df_column: a converted pandas dataframe column
    '''
    
    #error handling required!!
    converted_df_column = df_column.astype(type_to_convert_to)
    return converted_df_column

In [None]:
def create_boxplot(df_column, boxplot_title):
    '''
    creates and displays a boxplot
    :param df_column: a one column dataframe
    :param boxplot_title: a string for the boxplot heading
    :return resulting_boxplot
    '''
    resulting_boxplot = plt.boxplot(df_column)
    plt.title(boxplot_title)
    plt.show() #might turn this off?
    return resulting_boxplot

In [None]:
def replace_outliers(df_column, outlier_min, outlier_max, replace_with):
    '''
    creates and displays a boxplot
    :param df_column: a one column dataframe
    :param outlier_min, outlier_max: figures derived from a boxplot
    :param replace_with: value you would like to replace with eg. '1'
    :return replaced_df_column: a pandas dataframe column with replaced values
    '''
    replaced_df_column= df_column.apply(lambda x: replace_with if x > outlier_max else x)
    replaced_df_column= replaced_df_column.apply(lambda x: replace_with if x < outlier_min else x)
    return replaced_df_column

In [None]:
def calc_bmi(weight_kg, height_cm):
    '''
    calculates BMI based on formula: [weight (kg) / height (cm) / height (cm)] x 10,000
    formula from https://www.cdc.gov/nccdphp/dnpao/growthcharts/training/bmiage/page5_1.html
    :param weight_kg: weight in kilos
    :param height_cm: height in centimenters
    :return BMI
    '''
    return ((weight_kg/height_cm/height_cm)*10000)

In [None]:
def create_BMI_column(df):
    '''
    creates a BMI column based on the results from the calc_bmi function
    :param df: dataframe where you would like the BMI column
    :return df: a dataframe with a BMI column
    '''
    # Create BMI Columns
    df['BMI'] = df.apply(lambda x: calc_bmi(x.patientweight, x.patientheight), axis=1)
    return df

In [None]:
def report_column_cleaning_required(df_name, df_column_name, desired_data_type):
    '''
    this function takes a numerical column and prints out the statistics demonstrating the amount of cleaning required
    
    #Doesn't report on outliers?
    
    
    :param df_name: a one column dataframe
    :param df_column_name:
    :param: desired data type
    :return nothing
    '''
    #Check how many are blank
    print('Number of cells in the {} column that are blank'.format(df_column_name))
    print(df_name[df_column_name][df_name[df_column_name]==''].count())
    print('Number of cells in the {} column that are np.nan'.format(df_column_name))
    print(df_name[df_column_name][df_name[df_column_name]==np.nan].count())
    
    #Check how many cells contain commas
    print('Number of cells in the {} column that contain comma'.format(df_column_name))
    print(df_name[df_column_name][df_name[df_column_name].str.contains(',', na=False)].count())
    
    #Doesn't report on outliers?

In [None]:
def clean_numerical_column(df_name, df_column_name, desired_data_type, replace_with):
    '''
    this function takes a numerical column returns a cleaned version with the following
    - blank and NaN cells replaced with specified replace_with value
    - cells containing commas replaced with decimals
    - remove outliers based on a boxplot
    
    Intended for use on the age, weight, height and BMI columns in the study summary table
    
    :param df_name: a one column dataframe
    :param df_column_name:
    :param: desired data type
    :param replace_with: value you would like to replace with eg. '1'
    :return: cleaned_column
    '''

    #Replace blanks in the column with 1
    df_name[df_column_name] = replace_all_in_column(df_name[df_column_name], '', replace_with)
    
    #Replace comma in the column with decimal points
    df_name[df_column_name] = df_name[df_column_name].str.replace(',','.')
    
    #Fill NA
    df_name[df_column_name] = df_name[df_column_name].fillna(replace_with)

    #convert column to desired data type
    df_name[df_column_name] = convert_column(df_name[df_column_name], desired_data_type) 
    # NEED ERROR HANDLING HERE (or in function?)!!!

    #Remove outliers
    boxplot = create_boxplot(df_name[df_column_name], 'Distribution of {} (pre-removal of outliers)'.format(df_column_name))
    outlier_min, outlier_max = get_outlier_thresholds(boxplot)
    df_name[df_column_name] = replace_outliers(df_name[df_column_name], outlier_min, outlier_max, replace_with)
    boxplot = create_boxplot(df_name[df_column_name], 'Distribution of {} (post-removal of outliers)'.format(df_column_name))
    
    return df_name[df_column_name]   

In [None]:
def convert_comma_sep_str_column_to_list_column(df_column):
    df_column = df_column.apply(lambda x: x.split(","))
    return df_column

In [None]:
def create_single_pathology_column(study_table, path_codes, path_name):
    #puts the code of interest in a new column
    study_table[path_name] = study_table.findingcode.apply(lambda x: intersection_two_lists(x, path_codes)) 
    #replace the codes with 0 or 1 (NOTE: could return higher than 1 error?  needs to be handled)
    study_table[path_name] = study_table[path_name].apply(lambda x : len(x))
    return study_table

In [None]:
def intersection_two_lists(lst1, lst2): 
#https://www.geeksforgeeks.org/python-intersection-of-multiple-lists/      
    return [item for item in lst1 if item in lst2] 

def deliminate_list_to_many_to_many_table(df, df_row_id_colum_name, df_list_colum_name):
    '''
    From a two column dataframe; containing one an row_id and another containing a list, produce a new 
    table with a row for every item of the list, linked to the row_id
    :param df: a dataframe containing columns labelled with the 
    :param df_row_id_colum_name:
    :param df_list_colum_name:
    :return many_to_many_table
    '''
    many_to_many_table = df[[df_row_id_colum_name, df_list_colum_name]].copy()
    temp = many_to_many_table.apply(lambda x: pd.Series(x[df_list_colum_name]),axis=1).stack().reset_index(level=1, drop=True)
    temp.name = ('{}_deliminated'.format(df_list_colum_name))
    many_to_many_table = many_to_many_table.drop(df_list_colum_name, axis=1).join(temp)
    many_to_many_table = many_to_many_table.reset_index(drop=True)
    print(many_to_many_table.head())
    
    return many_to_many_table

In [None]:
# Pathology codes variables - how do i merge these?
HC_T_codes = {'LV-0144', 'LV-0068'}
HC_F_codes = {'LV-0062', 'LV-0065', 'LV-0061'}
HC_C_codes = {'LV-0069', 'LV-0070'}

RLVEF_T_codes = {'LV-0080'}
RLVEF_F_codes = {'LV-0061', 'LV-0077', 'LV-0078'}

DLA_T_codes = {'LA-0016'}
DLA_F_codes = {'LA-0013'}

Norm_codes = {'SU-0032'}

# Script to run functions above

In [None]:
sql_table_name ='DM_Spain_VIEW_study_summary'

In [None]:
# Open config file and create connections
configuration = open_config_file()
connection = create_connection(configuration);

In [None]:
# Get summary table from sql database
summary_table = get_sql_table(sql_table_name, connection)
print('Got Summary Table')
#print(sum_table.head(2))

In [None]:
#Make a version of the summary table for cleaned values
summary_table_cleaned = summary_table.copy()

In [None]:
#Clean gender column: USAL confirmed that all blank gender values can be coded as 'U' for "Unsure"
print(summary_table['gender'].value_counts())
summary_table_cleaned['gender'] = replace_all_in_column(summary_table['gender'], '', 'U')
print(summary_table_cleaned['gender'].value_counts())
print("Gender column clean")

In [None]:
report_column_cleaning_required(summary_table_cleaned, 'age', 'int')
report_column_cleaning_required(summary_table_cleaned, 'patientweight', 'float')
report_column_cleaning_required(summary_table_cleaned, 'patientheight', 'float')

In [None]:
clean_numerical_column(summary_table_cleaned, 'age', 'int', 1)
clean_numerical_column(summary_table_cleaned, 'patientweight', 'float', 1)
clean_numerical_column(summary_table_cleaned, 'patientheight', 'float', 1)

In [None]:
summary_table_cleaned = create_BMI_column(summary_table_cleaned)
print(summary_table_cleaned.head(2))

In [None]:
# convert finding codes from string to a list
summary_table_cleaned['findingcode'] = convert_comma_sep_str_column_to_list_column(summary_table_cleaned['findingcode'])
summary_table_cleaned.head(1)

In [None]:
# Create the pathology columns
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, HC_T_codes, "HC_T")
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, HC_F_codes, "HC_F")
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, HC_C_codes, "HC_C")
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, RLVEF_T_codes, "RLVEF_T")
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, RLVEF_F_codes, "RLVEF_F")
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, DLA_T_codes, "DLA_T")
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, DLA_F_codes, "DLA_F")
summary_table_cleaned = create_single_pathology_column(summary_table_cleaned, Norm_codes, "Norm")
summary_table_cleaned.head()

In [None]:
#Write table to csv
summary_table_cleaned.to_csv('clean_summary_table.csv')