## Building Pipeline to create the Data Model (Prototype)

In [21]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import isnan, when, count, col

In [5]:
accepted = pd.read_csv('accepted_2007_to_2018Q4.csv')
accepted.head()

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


Unnamed: 0,id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,...,hardship_payoff_balance_amount,hardship_last_payment_amount,disbursement_method,debt_settlement_flag,debt_settlement_flag_date,settlement_status,settlement_date,settlement_amount,settlement_percentage,settlement_term
0,68407277,,3600.0,3600.0,3600.0,36 months,13.99,123.03,C,C4,...,,,Cash,N,,,,,,
1,68355089,,24700.0,24700.0,24700.0,36 months,11.99,820.28,C,C1,...,,,Cash,N,,,,,,
2,68341763,,20000.0,20000.0,20000.0,60 months,10.78,432.66,B,B4,...,,,Cash,N,,,,,,
3,66310712,,35000.0,35000.0,35000.0,60 months,14.85,829.9,C,C5,...,,,Cash,N,,,,,,
4,68476807,,10400.0,10400.0,10400.0,60 months,22.45,289.91,F,F1,...,,,Cash,N,,,,,,


In [7]:
rejected = pd.read_csv('rejected_2007_to_2018Q4.csv')
rejected.head()

Unnamed: 0,Amount Requested,Application Date,Loan Title,Risk_Score,Debt-To-Income Ratio,Zip Code,State,Employment Length,Policy Code
0,1000.0,2007-05-26,Wedding Covered but No Honeymoon,693.0,10%,481xx,NM,4 years,0.0
1,1000.0,2007-05-26,Consolidating Debt,703.0,10%,010xx,MA,< 1 year,0.0
2,11000.0,2007-05-27,Want to consolidate my debt,715.0,10%,212xx,MD,1 year,0.0
3,6000.0,2007-05-27,waksman,698.0,38.64%,017xx,MA,< 1 year,0.0
4,1500.0,2007-05-27,mdrigo,509.0,9.43%,209xx,MD,< 1 year,0.0


## Creating Fact and Dimension Tables and writing the data to parquet files for future consumption 

In [10]:
accepted_loans_table = accepted[['id', 'member_id', 'loan_amnt', 'term', 'int_rate', 'grade', 
                                 'sub_grade','issue_d', 'loan_status', 'purpose', 'earliest_cr_line'
]]
accepted_loans_table.write.parquet("results/accepted_loans_table.parquet", "append")

In [11]:
accepted_time_table = accepted[['datetime', 'hour', 'day', 'week', 'month', 'year', 
]]
accepted_time_table.write.parquet("results/accepted_time_table.parquet", "append")

In [12]:
accepted_user_table = accepted[['member_id', 'issue_d', 'fico_range_high', 'home_ownership', 'annual_inc', 
]]
accepted_user_table.write.parquet("results/accepted_user_table.parquet", "append")

In [86]:
rejected_loans_table = rejected[['Amount Requested', 'Application Date', 'Loan Title', 'State', 'Policy Code', 
]]
rejected_loans_table.write.parquet("results/rejected_loans_table.parquet", "append")

In [15]:
rejecetd_time_table = rejected[['Application Date', 'State', 
]]
rejected_time_table.write.parquet("results/rejected_time_table.parquet", "append")

In [84]:
rejected_user_table = rejected[['Amount Requested', 'Risk_Score', 'Debt-To-Income Ratio', 'State', 'Zip Code' 
]]
rejected_user_table.write.parquet("results/rejected_user_table.parquet", "append")

In [17]:
rejecetd_location_table = rejected[['State', 'Zip Code' 
]]
rejected_location_table.write.parquet("results/rejected_location_table.parquet", "append")

## Data Quality Checks

In [77]:
def quality_check(df, description):
    '''
        Input: Panda dataframe, desciption the the data
        This function performs a data check to make sure the data was successfuly stored in the tables.
        It returns the number of records in the table.
    '''
    
    data = df.count()
    if data.empty == True:        
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, data))

In [78]:
quality_check(accepted_loans_table, 'accepted loans table')

Data quality check passed for accepted loans table with id                  2260701
member_id                 0
loan_amnt           2260668
term                2260668
int_rate            2260668
grade               2260668
sub_grade           2260668
issue_d             2260668
loan_status         2260668
purpose             2260668
earliest_cr_line    2260639
dtype: int64 records


In [80]:
quality_check(accepted_time_table, 'accepted time table')

In [81]:
quality_check(accepted_user_table, 'accepted user table')

Data quality check passed for accepted user table with member_id                0
issue_d            2260668
fico_range_high    2260668
home_ownership     2260668
annual_inc         2260664
dtype: int64 records


In [87]:
quality_check(rejected_loans_table, 'rejected loans table')

Data quality check passed for rejected loans table with Amount Requested    27648741
Application Date    27648741
Loan Title          27647438
State               27648719
Policy Code         27647823
dtype: int64 records


In [88]:
quality_check(rejecetd_time_table, 'rejected time table')

Data quality check passed for rejected time table with Application Date    27648741
State               27648719
dtype: int64 records


In [91]:
quality_check(rejected_user_table, 'rejected user table')

Data quality check passed for rejected user table with Amount Requested        27648741
Risk_Score               9151111
Debt-To-Income Ratio    27648741
State                   27648719
Zip Code                27648448
dtype: int64 records


In [93]:
quality_check(rejecetd_location_table, 'rejected location table')

Data quality check passed for rejected location table with State       27648719
Zip Code    27648448
dtype: int64 records
