### Imports

In [None]:
import pandas as pd
import numpy as np
from pandas import json_normalize
from functools import partial
from geopy.geocoders import Nominatim
from utils.mongo_conn import MongoConnect
from IPython.core.display import display
import logging
import datetime as dt
import ipywidgets as widgets
from utils.postgre_conn import PostgresConnect
import warnings
warnings.filterwarnings('ignore')

In [None]:
logging.basicConfig(filename='./logs/drugs_errors.log', level=logging.INFO)

### Connecting to Mongo

In [None]:
recall_mongo_conn = MongoConnect('team_G_db_mongo_final_1', 'drugs_recalled')
recall_collection = recall_mongo_conn.connect()

drugs_event_conn = MongoConnect('team_G_db_mongo_final_1', 'drugs_event')
event_collection = drugs_event_conn.connect()

label_mongo_conn = MongoConnect('team_G_db_mongo_final_1', 'Drugs_Product_Label')
drug_product_labelling = label_mongo_conn.connect()

### Drugs Enforcement

In [None]:
status_label = widgets.Label('Starting Data Cleaning... Working on Recalls Enforced Data!')
display(status_label)
progress_bar = widgets.IntProgress(min=0, max=21, bar_style='info') # instantiate the bar
display(widgets.HBox([widgets.Label('Progress:'),progress_bar]))
drugs_recalled_df = json_normalize(recall_collection.find())


#### Replacing Blanks and dropping Nulls

In [None]:
drugs_recalled_df = drugs_recalled_df.replace("", np.nan)
def drop_cols(df, null_percentage):
    '''The function drops the columns which have null percentage greater than given null percentage by the user,
    INPUTS:
        df = The dataframe from which columns need to be dropped
        null_percentage = A numerical figure provided by user which acts as a threshold for null_percenatge 
    OUTPUT
        The function returns True if all columns are dropped successfully else it returns False
    '''
    try:
        total_rows = len(df)
        for column in df.columns:
            if (df[column].isnull().sum() / total_rows)*100> null_percentage and column!='brand_name':
                df.drop(columns=column, inplace=True)
        return True
    except Exception as e:
        logging.error("Exception occurred at drop_cols", exc_info=True)
        return False
drop_cols(drugs_recalled_df, 85)
progress_bar.value+=1

#### Getting values from lists in columns

In [None]:
''' The columns in the dataframe contains unstructured data, in form of lists,
using string replace to get values from those lists.'''
try:
    
    col_list = [col for col in drugs_recalled_df.columns if 'openfda' in col]

    for column in col_list:
        col = column.split('.')[1]
        drugs_recalled_df[col] = drugs_recalled_df[column].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)
        drugs_recalled_df.drop(columns = column, inplace=True)

    drugs_recalled_df.drop(columns = [ '_id', 'product_type'], inplace=True)#Dropping redundant columns
except Exception as e:
    logging.error("Exception occurred at list to data convetrsion", exc_info=True)
    
    

In [None]:
drugs_recalled_df['country'].fillna(list(drugs_recalled_df[drugs_recalled_df['recalling_firm']=='Pfizer Inc.']['country'])[0], inplace=True)
drugs_recalled_df['city'].fillna(list(drugs_recalled_df[drugs_recalled_df['recalling_firm']=='Pfizer Inc.']['city'])[0], inplace=True)
drugs_recalled_df['address_1'].fillna(list(drugs_recalled_df[drugs_recalled_df['recalling_firm']=='Pfizer Inc.']['address_1'])[0], inplace=True)

progress_bar.value+=1

#### Converting Datetime Columns

In [None]:
try:
    date_cols = [col for col in drugs_recalled_df.columns if 'date' in col][:-1]
    for column in date_cols:
        drugs_recalled_df[column] = pd.to_datetime(drugs_recalled_df[column], format = '%Y%m%d')
except Exception as e:
    logging.error("Exception occurred at conversion to datetime!", exc_info=True)
    
progress_bar.value+=1

#### Getting Labels from reason_for_recall

In [None]:
drugs_recalled_df['reason_main'] = pd.Series(drugs_recalled_df['reason_for_recall'].str.split('[:;.]').str.get(0)).str.upper()
drugs_recalled_df['reason_description'] = drugs_recalled_df['reason_for_recall'].str.split('[:;.]').apply(lambda x: "".join(x[1:]))
drugs_recalled_df.drop(columns = ['reason_for_recall'], inplace=True)
progress_bar.value+=1

#### Filling blank states with Actual State

In [None]:
'''
The state column contains nulls, but that record holds values for city and country. Using these values,
the function tries to get name of the state using the geopy.
'''

geolocator = Nominatim(user_agent="Google")
def get_state(x):
    geocode = partial(geolocator.geocode, language="es")
    loc = geocode(x)
    if loc is None:
        return loc
    else:
        state = loc[0].split(',')[-2].strip()
        if state.isnumeric():
            state = loc[0].split(',')[-3].strip()
        return state
city_list = drugs_recalled_df[drugs_recalled_df['state'].isnull()]['city'].unique()
state_dict={}


try:
    for city in city_list:
        state_dict[city] = get_state(city)
        for city, state in state_dict.items():
            drugs_recalled_df.loc[(drugs_recalled_df['city']==city) & (drugs_recalled_df['state'].isnull()), 'state']= state
except Exception as e:
    logging.error("Exception occurred while finding states!", exc_info=True)
    
progress_bar.value+=1

#### Creating Time_delta column

In [None]:
drugs_recalled_df['time_diff'] = drugs_recalled_df['termination_date']-drugs_recalled_df['recall_initiation_date']
drugs_recalled_df['voluntary_mandated'] = drugs_recalled_df['voluntary_mandated'].str.upper()
progress_bar.value+=1

### Drug Events

In [None]:
status_label.value = 'Cleaning Drug Events data... Kindly wait!'
df=pd.DataFrame(event_collection.find()) 
#normalizing data to retrieve qualification and reporter country, nested inside primarysource
df2 = pd.json_normalize(df["primarysource"])
df = pd.concat([df, df2.reindex(df.index)], axis=1)

#normalizing to retrieve the patient's age and sex from patient
df3 = pd.json_normalize(df["patient"])
df = pd.concat([df, df3[df3.columns[:3]].reindex(df.index)], axis=1)

progress_bar.value+=1

In [None]:
country = []
companynumb = []
safety_report_id = []
serious = []
drug_event_date = []
qualification = []
seriousness_disabling = []
seriousness_other = []
seriousness_hospitalization = []
seriousness_lifethreatening = []
seriousness_congenitalanomali = []
patient_age = []
patient_sex = []
drug_event_date =[]

companynumb = df["companynumb"]
safety_report_id = df["safetyreportid"]
serious = df["serious"]
drug_event_date = df["receiptdate"]
qualification = df["qualification"]
country = df["reportercountry"]
seriousness_disabling = df["seriousnessdisabling"]
seriousness_other = df["seriousnessother"]
seriousness_hospitalization = df["seriousnesshospitalization"]
seriousness_lifethreatening = df["seriousnesslifethreatening"]
seriousness_congenitalanomali = df["seriousnesscongenitalanomali"]
patient_age = df["patientonsetage"]
patient_sex = df["patientsex"]
drug_event_date = df["receivedate"]

In [None]:
drug_event_details=pd.DataFrame({"safety_report_id":safety_report_id,
                                 "drug_event_date":drug_event_date, 
                                 "qualification":qualification, 
                                 "country":country, 
                                 "companynumb":companynumb, 
                                 "patient_age":patient_age, 
                                 "patient_sex":patient_sex,
                                 "serious":serious, 
                                 "seriousness_disabling":seriousness_disabling, 
                                 "seriousness_other":seriousness_other, 
                                 "seriousness_hospitalization":seriousness_hospitalization, 
                                 "seriousness_lifethreatening":seriousness_lifethreatening, 
                                 "seriousness_congenitalanomali":seriousness_congenitalanomali
                                 })  
drug_event_details=drug_event_details.drop_duplicates()

progress_bar.value+=1

In [None]:
'''Retrieving the drug medicinal names reported by each user and extracting the user safetyreportid to keep track'''
drug=[]
safety_drug_id=[]
drug_event_date_merge=[]
for i in range(0,len(df)):
    
    for j in range(0,len(df.iloc[i]["patient"]["drug"])):
        
        drug_event_date_merge.append(drug_event_date[i])
        safety_drug_id.append(safety_report_id[i])
        drug.append(df.iloc[i]["patient"]["drug"][j]["medicinalproduct"])
progress_bar.value+=1

In [None]:
'''Retrieving the combined drug reaction reported by each user and extracting the user safetyreportid to keep track'''
reactions=[]
safety_reaction_id=[]

for i in range(0,len(df)):              
    for k in range(0,len(df.iloc[i]["patient"]["reaction"])):
        
        safety_reaction_id.append(safety_report_id[i])
        reactions.append(df.iloc[i]["patient"]["reaction"][k]["reactionmeddrapt"])
progress_bar.value+=1

In [None]:
mode_country=drug_event_details["country"].mode()
drug_event_details['country'] = drug_event_details['country'].replace(['COUNTRY NOT SPECIFIED'],mode_country)

median_age = drug_event_details["patient_age"].median()
drug_event_details['patient_age'] = drug_event_details['patient_age'].replace(['nan'],median_age.astype(int))

mode_sex =drug_event_details["patient_sex"].mode()
m=mode_sex.astype(str)
drug_event_details['patient_sex'] = drug_event_details['patient_sex'].replace(['nan'],m)

drug_event_details['patient_sex'] = drug_event_details['patient_sex'].replace(['2'],"FEMALE")

drug_event_details['patient_sex'] = drug_event_details['patient_sex'].replace(['0'],"NOT SPECIFIED")

mode_qualification=drug_event_details["qualification"].mode()
m=mode_qualification.astype(str)
drug_event_details['qualification'] = drug_event_details['qualification'].replace(['nan'],m)

drug_event_details['qualification'] = drug_event_details['qualification'].replace(['5'],"CONSUMER")

drug_event_details['qualification'] = drug_event_details['qualification'].replace(['1'],"HEALTHCARE PROFESSIONAL")

drug_event_details['qualification'] = drug_event_details['qualification'].replace(['3'],"NOT SPECIFIED")

drug_event_details['qualification'] = drug_event_details['qualification'].replace(['4'],"NOT SPECIFIED")

drug_event_details['qualification'] = drug_event_details['qualification'].replace(['2'],"OTHERS")

drug_event_details["drug_event_date"]=pd.to_datetime(drug_event_details["drug_event_date"],format='%Y-%m-%d')

drug_event_details["drug_event_year"] = drug_event_details["drug_event_date"].dt.year

drug_event_details["drug_event_year"] = drug_event_details["drug_event_date"].dt.year

drug_event_details["index"] = drug_event_details.index


progress_bar.value+=1

In [None]:
'''
* Creation of new dataframe called drug list, since a patient can report multiple drugs.
* Creation of dataframe is done in such a way that it is mapped with patient's safety_report_id and the date it has got generated.
* By this we can collect information of each patient's details through drug_event_details.
'''
drug_list = pd.DataFrame({"safety_report_id":safety_drug_id,"brand_name":drug,"drug_event_date":drug_event_date_merge})
# print(len(drug_list))
drug_list = drug_list.drop_duplicates()
# print(len(drug_list))

In [None]:

drug_list["drug_event_date"] = pd.to_datetime(drug_list["drug_event_date"],format='%Y-%m-%d')
drug_list["year"] = drug_list["drug_event_date"].dt.year

'''* Creation of dataframe for reactions which gives us the combined reaction of the drug used by a user'''

drug_event_reactions=pd.DataFrame({"safety_report_id":safety_reaction_id, "reactions":reactions})
drug_event_reactions = drug_event_reactions.drop_duplicates()

progress_bar.value+=1

### Drugs Labels

In [None]:
status_label.value = 'Data Preparation for Events Dataset completed! Starting cleaning process for Labels Dataset...'
drug_df = json_normalize(list(drug_product_labelling.find()))
'''Extracting brand_name, manufacturer_name, product_type from the openfda column'''
drug_df['brand_name'] = drug_df['openfda.brand_name'].str.get(0)
drug_df.drop(columns = ['openfda.brand_name'], inplace=True)

drug_df['manufacturer_name'] = drug_df['openfda.manufacturer_name'].str.get(0)
drug_df.drop(columns = ['openfda.manufacturer_name'], inplace=True)

drug_df['product_type'] = drug_df['openfda.product_type'].str.get(0)
drug_df.drop(columns = ['openfda.product_type'], inplace=True)
progress_bar.value+=1

In [None]:
status_label.value = 'Cleaning Labels Dataset... Kindly wait!'
drug_df['inactive_ingredient'] = drug_df['inactive_ingredient'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

drug_df['pregnancy_or_breast_feeding'] = drug_df['pregnancy_or_breast_feeding'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

drug_df['indications_and_usage'] = drug_df['indications_and_usage'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

drug_df['keep_out_of_reach_of_children'] = drug_df['keep_out_of_reach_of_children'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

drug_df['warnings'] = drug_df['warnings'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

drug_df['stop_use'] = drug_df['stop_use'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

drug_df['do_not_use'] = drug_df['do_not_use'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

drug_df['precautions'] = drug_df['precautions'].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)

progress_bar.value+=1

In [None]:
final_drug = drug_df[['brand_name','effective_time','pregnancy_or_breast_feeding','manufacturer_name','product_type','indications_and_usage','keep_out_of_reach_of_children','warnings','stop_use','do_not_use','precautions']]

'''classifying the data's to True or False for making it more easier to analyse'''

final_drug['pregnancy_or_breast_feeding']=np.where(final_drug['pregnancy_or_breast_feeding'].isnull(), False, True)
final_drug['keep_out_of_reach_of_children']=np.where(final_drug['keep_out_of_reach_of_children'].isnull(), False, True)
final_drug['stop_use']=np.where(final_drug['stop_use'].isnull(), False, True)
final_drug['do_not_use']=np.where(final_drug['do_not_use'].isnull(), False, True)


progress_bar.value+=1

In [None]:
'''extracting year,month and date from effective_time column into three different columns and then droping it
'''
final_drug["effective_year"] = pd.to_datetime(final_drug["effective_time"]).dt.year
final_drug["effective_month"] = pd.to_datetime(final_drug["effective_time"]).dt.month
final_drug["effective_day"] = pd.to_datetime(final_drug["effective_time"]).dt.day

final_drug.drop(["effective_time"], axis = 1, inplace = True)



final_drug['warning_s'] = final_drug['warnings'].apply(lambda n: len(str(n).split(' ')))
final_drug=final_drug.dropna()


progress_bar.value+=1
status_label.value = 'Data Cleaning complete! Loading data to PostgreSQL'

### Dumping Data to Postgres

In [None]:
db_connection_dict = {    'drivername' : 'postgres',
    'host'      : '192.168.56.30',
    'port'      : '5432',
    'database'  : 'team_G_db',
    'username'  : 'dap',
    'password'  : 'dap'
}
postgres_conn = PostgresConnect(db_connection_dict)
postgres_conn.createDB_table('drugs_recalled', drugs_recalled_df)
progress_bar.value+=1
postgres_conn.createDB_table('drugs_event', drug_event_details)
progress_bar.value+=1
postgres_conn.createDB_table('drugs_event_lists', drug_list)
progress_bar.value+=1
postgres_conn.createDB_table('drugs_event_reaction', drug_event_reactions)
progress_bar.value+=1
postgres_conn.createDB_table('drug_labels', final_drug)
progress_bar.value+=1
progress_bar.bar_style = 'success'