### Imports

In [2]:
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, CollectionInvalid
import pandas as pd
from pandas import json_normalize
import numpy as np
import psycopg2
from psycopg2.errors import *
from sqlalchemy import create_engine
import ipywidgets as widgets
import plotly.graph_objects as go
from IPython.core.display import display
from re import search
from plotly.subplots import make_subplots
import logging

In [19]:
logging.basicConfig(filename='yelp_business_errors.log', level=logging.INFO)

### Getting Data and Dumping in MongoDB

In [6]:
def mongo_connection(db_name, collection_name):
    '''Function to create a connection with MongoDB
    INPUTS: The function expects two input variables, namely:
        db_name: Name of the Database holding the collection
        collection_name: Name of the collection to retrieve
    OUTPUT:
        collection: An object of the given collection name
    '''
    try:
        client = MongoClient('mongodb://localhost:27017')
        db = client[db_name]
        collection = db[collection_name]
        return collection
    
    except CollectionInvalid:
        print('Unable to connect to Database, check Database name!')
        logging.error("Exception occurred at mongo_connection", exc_info=True)
    except CollectionInvalid:
        print("Invalid Collection Name provided, check Collection name!")
        logging.error("Exception occurred at mongo_connection", exc_info=True)
    except Exception as e:
        print('Something went wrong check logs for more info!')
        logging.error("Exception occurred at mongo_connection", exc_info=True)
    

In [7]:
collection = mongo_connection('dap', 'drugs_recalled')

In [10]:
import requests
import time
for skip_records in range(0,13000, 1000):
    url = 'https://api.fda.gov/drug/enforcement.json?api_key=jzBDRKRffIUITo0uKg9EEWqasttW2cDni2b7ncUB&limit=1000&skip={0}'.format(skip_records)
    response = requests.get(url)
    json_to_dump = response.json()['results']
    collection.insert_many(json_to_dump)
    
    


{'error': {'code': 'NOT_FOUND', 'message': 'No matches found!'}}


In [8]:
drugs_recalled_df = json_normalize(collection.find())

### Drop columns if null percentage greater than given percentage

In [108]:
def drop_cols(df, null_percentage):
    '''The function drops the columns which have null percentage greater than given null percentage by the user,
    INPUTS:
        df = The dataframe from which columns need to be dropped
        null_percentage = A numerical figure provided by user which acts as a threshold for null_percenatge 
    OUTPUT
        The function returns True if all columns are dropped successfully else it returns False
    '''
    try:
        total_rows = len(df)
        for column in df.columns:
            if (df[column].isnull().sum() / total_rows)*100> null_percentage:
                df.drop(columns=column, inplace=True)
        return True
    except Exception as e:
        logging.error("Exception occurred at drop_cols", exc_info=True)
        return False
if drop_cols(drugs_recalled_df, 85):
    print('Columns Dropped Successfully')
else:
    print('Unable to drop columns check logs for more info')

Columns Dropped Successfully


### Data Cleaning

#### Retrieveing values from lists inside columns

In [89]:
# import pdb
col_list = ['openfda.application_number',
       'openfda.brand_name', 'openfda.generic_name',
       'openfda.manufacturer_name', 'openfda.product_ndc',
       'openfda.product_type', 'openfda.route', 'openfda.substance_name',
       'openfda.rxcui', 'openfda.spl_id', 'openfda.spl_set_id',
       'openfda.package_ndc', 'openfda.is_original_packager', 'openfda.upc',
       'openfda.nui', 'openfda.pharm_class_epc', 'openfda.unii']
# temp_df = pd.DataFrame()
for column in col_list:
#     pdb.set_trace()
    col = column.split('.')[1]
    drugs_recalled_df[col] = drugs_recalled_df[column].astype('str').apply(lambda x: x.replace("'", "").replace('[','').replace(']','') if x!='nan' else None)
    drugs_recalled_df.drop(columns = column, inplace=True)
#     print(drugs_recalled_df[col])


0        None
1        None
2        None
3        None
4        None
         ... 
12994    None
12995    None
12996    None
12997    None
12998    None
Name: application_number, Length: 12999, dtype: object
0        None
1        None
2        None
3        None
4        None
         ... 
12994    None
12995    None
12996    None
12997    None
12998    None
Name: brand_name, Length: 12999, dtype: object
0        None
1        None
2        None
3        None
4        None
         ... 
12994    None
12995    None
12996    None
12997    None
12998    None
Name: generic_name, Length: 12999, dtype: object
0        None
1        None
2        None
3        None
4        None
         ... 
12994    None
12995    None
12996    None
12997    None
12998    None
Name: manufacturer_name, Length: 12999, dtype: object
0        None
1        None
2        None
3        None
4        None
         ... 
12994    None
12995    None
12996    None
12997    None
12998    None
Name: product_ndc, Length

In [92]:
drugs_recalled_df.drop(columns = ['_id', 'product_type'], inplace=True)#Dropping redundant columns

#### Converting Date columns

In [101]:
date_cols = [col for col in drugs_recalled_df.columns if 'date' in col]

In [107]:
for column in date_cols:
    drugs_recalled_df[column] = pd.to_datetime(drugs_recalled_df[column], format = '%Y%m%d')

AttributeError: module 'pandas' has no attribute 'to_date'

#### Getting Labels out of the reason_for_recall column

In [111]:
drugs_recalled_df['reason_main'] = pd.Series(drugs_recalled_df['reason_for_recall'].str.split('[:;-]').str.get(0))

In [112]:
drugs_recalled_df['reason_description'] = drugs_recalled_df['reason_for_recall'].str.split('[:;-]').apply(lambda x: "".join(x[1:]))

### Data Transformation

In [None]:
from functools import partial
from geopy.geocoders import Nominatim
geolocator = Nominatim(user_agent="Google")
def get_state(x):
    geocode = partial(geolocator.geocode, language="es")
    loc = geocode(x)
    if loc is None:
        return loc
    else:
        state = loc[0].split(',')[-2].strip()
        if state.isnumeric():
            state = loc[0].split(',')[-3].strip()
        return state
city_list = new_df[new_df['state']=='']['city'].unique()
state_dict={}
for city in city_list:
    state_dict[city] = get_state(city)
for city, state in state_dict.items():
    drugs_recalled_df.loc[(drugs_recalled_df['city']==city) & (drugs_recalled_df['state']==''), 'state']= state

### Connection to Postgres

In [184]:
def connect(user, password, host, port, db_name):
    '''The fucntion returns a postgres connection cursor for the given database
    INPUTS:
        user: Name of the user
        password: User's password to connect user to the service
        host: IP address of the host
        port: port number of postgres service
        db_name: name of the database to connect to
    OUTPUTS:
        dbCursor: A psycopg connection object to the given database.
    '''
    try:
        dbConnection = psycopg2.connect(
            user = user,
            password = password,
            host = host,
            port = port,
            database = db_name)
        dbConnection.set_isolation_level(0)
        dbCursor = dbConnection.cursor()
        return dbCursor
    except OperationalError:
        print('Operational Error occured, check logs for detailed info!')
        logging.error("Exception occurred at connect", exc_info=True)
    except Exception as e:
        logging.error("Exception occurred at connect", exc_info=True)
        print('Something went wrong, check logs for more info!')

In [185]:
def execute_statement(cursor, sql_statement):
    '''This function executes sql statements using the cursor created from connect function.
    The finction returns True on successful execution of scrip, else it returns False and loggs the error.
    '''
    try:
        cursor.execute(sql_statement)
        cursor.close()
        return True
    except OperationalError:
        print('Operational Error occured, check logs for detailed info!')
        logging.error("Exception occurred at execute_statement", exc_info=True)
        return False
    except InFailedSqlTransaction:
        print('Incorrect SQL Statement, check the sql statement!')
    except Exception as e:
        logging.error("Exception occurred at execute_statement", exc_info=True)
        print('Something went wrong, check logs for more info!')

In [186]:
def insert_into_table(engine, df, table_name):
    '''Function to load data in table from pandas dataframe. 
    INPUTS: 
        engine: An SQL Alchemy connection object
        df: dataframe to insert in Postgres
        table_name: Name of the table to be created
    OUTPUTS:
        The function returns True if load is successful else it would return False
    '''
    try:
        df.to_sql(table_name, engine, index=False)
        return True
    except ValueError:
        print('Table already exists, change table name or drop the preivous table.')
        logging.error("Exception occurred at insert_into_table", exc_info=True)
        return False
    except Exception as e:
        print('Something went wrong, check logs for more info!')
        logging.error("Exception occurred at insert_into_table", exc_info=True)
        return False
        

In [189]:
user = "postgres"
password = "root"
host = 'localhost'
port = '5432'
database = "postgres"
engine = create_engine('postgresql://{0}:{1}@{2}:{3}/{4}'.format(user, password, host, port, database))
if insert_into_table(engine, business_df, 'yelp_business'):
    print('Load Successfull')
else:
    print('Unable to Load Data in table')

### Creating TreeMap Graph to explain Reason and Event relation

In [133]:
grouped_df = drugs_recalled_df.groupby(by=['event_id','reason_main', 'reason_description']).agg({'reason_main':'count'})

In [134]:
grouped_df.rename(columns= {'reason_main':"Count"}, inplace=True)
grouped_df.reset_index(inplace=True)

In [175]:
grouped_df['reason'] = ['Reasons for Recall']*len(grouped_df)

In [182]:
fig = px.treemap(grouped_df,
                 path=['reason','reason_main','event_id', 'reason_description'],
                 values='Count',
                 color='Count',
                 maxdepth=2,
                 color_continuous_scale = 'blues'
                )
fig.show()


### Drugs Recalled wrt to States and Classification

In [317]:
new_df = drugs_recalled_df[['country', 'classification','state', 'report_date']]

In [368]:
new_df['year'] = new_df['report_date'].dt.year
new_df['report_date'] = pd.to_datetime(new_df['report_date'].dt.strftime('%Y-%m'))

figure_dict = {}
new_df = new_df[new_df['country']=='United States']


for year in sorted(new_df['year'].unique()):
    figure = go.FigureWidget(layout = dict(width=400, height=400, title=str(year)))
    for classification in new_df['classification'].unique():
        temp_df = new_df[(new_df['classification']==classification)& (new_df['year']==year)]
        values = temp_df['report_date'].value_counts().reset_index().sort_values(by='index')
        trace = go.Scatter(mode = 'lines', x = values['index'].astype('str'), y = values['report_date'], name=classification)
        figure.add_trace(trace)
    figure.update_layout(xaxis={'title':'report_date'}, yaxis={'title':'Number of Dugs Recalled'})
    temp_df_2 = new_df[new_df['year']==year][['state', 'classification']].value_counts()
    temp_df_2 = temp_df_2.reset_index()
    temp_df_2.rename(columns = {0:'Drugs_Recalled'}, inplace=True)
    fig = px.bar(temp_df_2, x = 'classification', y='Drugs_Recalled', color ='state', barmode='group', width=400, height=400)
    figure_2 = go.FigureWidget(fig)
    figure_dict[year] = [figure, figure_2]
    


In [369]:
for year, figure_list in figure_dict.items():
    display(widgets.HBox(figure_list))

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

HBox(children=(FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Class II',
              '…

In [371]:
drugs_recalled_df.columns

Index(['country', 'city', 'address_1', 'reason_for_recall', 'address_2',
       'product_quantity', 'code_info', 'center_classification_date',
       'distribution_pattern', 'state', 'product_description', 'report_date',
       'classification', 'recalling_firm', 'recall_number',
       'initial_firm_notification', 'event_id', 'termination_date',
       'recall_initiation_date', 'postal_code', 'voluntary_mandated', 'status',
       'application_number', 'brand_name', 'generic_name', 'manufacturer_name',
       'product_ndc', 'route', 'substance_name', 'rxcui', 'spl_id',
       'spl_set_id', 'package_ndc', 'is_original_packager', 'unii',
       'reason_main', 'reason_description'],
      dtype='object')

In [373]:
drugs_recalled_df[~drugs_recalled_df['brand_name'].isnull()][['brand_name','report_date', 'event_id', 'status']].to_csv('recalled_drugs.csv', index=False)