Import findspark with spark installation path 

In [1]:
import findspark

Initialize findspark

In [49]:
findspark.init("/usr/local/spark")

Import pyspark

In [2]:
import pyspark

Instantiate SparkSession with Hive support

In [3]:
from pyspark.sql import SparkSession

In [34]:
spark = SparkSession.builder.appName("capstone").config("spark.sql.warehouse.dir","https://localhost:50070/user/hive/warehouse").enableHiveSupport().getOrCreate()

Import necessary libraries

In [27]:
from pyspark.sql import Row
from pyspark.sql import *
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType,StringType
from pyspark.sql import functions
import pandas as pd
from pyspark.sql.functions import isnan, when, count, col

Pyspark show dataframe with horizontal scroll

In [46]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Task 3.1 - Data Analysis using Big Data Tools
## Big Data technologies like HDFS, Hive and PySpark need to be used as the historical data increases in size. As part of this task the following activities need to be done.
## ●       Develop a PySpark application to load data Spark DataFrames and save it into Hive tables on a Hadoop cluster in an optimized format.
## ●       Perform profiling of the data through PySpark and ensure that it is migrated correctly whereever the source is an RDBMS
## ●       Write PySpark routines to cleanse the data, prepare the data to handle missing values, and the data transformations identified in task 1.1 again making sure that the data is written into Hive tables in an efficient format
## ●       If the predictive model identified in task 2.4 available in Spark MLlib then develop a PySpark application to implement and evaluate the ML model identified with appropriate metrics\
## ●       Ensure that the best practices are followed and the design & code use the features of Spark and take advantage thereof.

#### PySpark application to load data Spark DataFrames and infer the schema

In [12]:
loan_df = spark.read.load('Loan_details_datasets_P5/Loan_details.csv',format='csv',sep=',',inferSchema="true",header="true")
status_df = spark.read.load('Loan_details_datasets_P5/Loan_status.csv',format='csv',sep=',',inferSchema="true",header="true")
Branch_df = spark.read.load('Loan_details_datasets_P5/Branch_region_mapping.csv',format='csv',sep=',',inferSchema="true",header="true")

Print the schema of loan_df with its features and their data types

In [13]:
loan_df.printSchema()

root
 |-- Loan_id: integer (nullable = true)
 |-- disbursed_amount: integer (nullable = true)
 |-- asset_cost: integer (nullable = true)
 |-- ltv: double (nullable = true)
 |-- branch_id: integer (nullable = true)
 |-- Date.of.Birth: string (nullable = true)
 |-- Employment.Type: string (nullable = true)
 |-- DisbursalDate: string (nullable = true)
 |-- MobileNo_Avl_Flag: integer (nullable = true)
 |-- Aadhar_flag: integer (nullable = true)
 |-- PAN_flag: integer (nullable = true)
 |-- VoterID_flag: integer (nullable = true)
 |-- Driving_flag: integer (nullable = true)
 |-- Passport_flag: integer (nullable = true)
 |-- PERFORM_CNS.SCORE: integer (nullable = true)
 |-- DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS: integer (nullable = true)
 |-- CREDIT.HISTORY.LENGTH: string (nullable = true)
 |-- NO.OF_INQUIRIES: integer (nullable = true)



Show first 3 records from loan_df

In [14]:
loan_df.show(3)

+-------+----------------+----------+-----+---------+-------------+---------------+-------------+-----------------+-----------+--------+------------+------------+-------------+-----------------+-----------------------------------+---------------------+---------------+
|Loan_id|disbursed_amount|asset_cost|  ltv|branch_id|Date.of.Birth|Employment.Type|DisbursalDate|MobileNo_Avl_Flag|Aadhar_flag|PAN_flag|VoterID_flag|Driving_flag|Passport_flag|PERFORM_CNS.SCORE|DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS|CREDIT.HISTORY.LENGTH|NO.OF_INQUIRIES|
+-------+----------------+----------+-----+---------+-------------+---------------+-------------+-----------------+-----------+--------+------------+------------+-------------+-----------------+-----------------------------------+---------------------+---------------+
|      1|           36439|     65850|56.19|       64|   14-06-1990|  Self employed|   28-09-2018|                1|          1|       0|           0|           0|            0|                0

Print the schema of status_df with its features and their data types

In [15]:
status_df.printSchema()

root
 |-- Loan_id: integer (nullable = true)
 |-- loan_default: integer (nullable = true)



Show first 3 records from status_df

In [16]:
status_df.show(3)

+-------+------------+
|Loan_id|loan_default|
+-------+------------+
|      1|           0|
|      2|           0|
|      3|           1|
+-------+------------+
only showing top 3 rows



Print the schema of Branch_df with its features and their data types

In [17]:
Branch_df.printSchema()

root
 |-- branch_id: integer (nullable = true)
 |-- region: string (nullable = true)



Show first 3 records from Branch_df

In [18]:
Branch_df.show(3)

+---------+------+
|branch_id|region|
+---------+------+
|        1|  East|
|        2|  East|
|        3|  East|
+---------+------+
only showing top 3 rows



Creating master dataframe by combining all subsidiary data

In [36]:
master_loan_df=loan_df.join(Branch_df,"branch_id").join(status_df,"loan_id")

Print the schema of master_loan_df with its features and their data types

In [37]:
master_loan_df.printSchema()

root
 |-- Loan_id: integer (nullable = true)
 |-- branch_id: integer (nullable = true)
 |-- disbursed_amount: integer (nullable = true)
 |-- asset_cost: integer (nullable = true)
 |-- ltv: double (nullable = true)
 |-- Date.of.Birth: string (nullable = true)
 |-- Employment.Type: string (nullable = true)
 |-- DisbursalDate: string (nullable = true)
 |-- MobileNo_Avl_Flag: integer (nullable = true)
 |-- Aadhar_flag: integer (nullable = true)
 |-- PAN_flag: integer (nullable = true)
 |-- VoterID_flag: integer (nullable = true)
 |-- Driving_flag: integer (nullable = true)
 |-- Passport_flag: integer (nullable = true)
 |-- PERFORM_CNS.SCORE: integer (nullable = true)
 |-- DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS: integer (nullable = true)
 |-- CREDIT.HISTORY.LENGTH: string (nullable = true)
 |-- NO.OF_INQUIRIES: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- loan_default: integer (nullable = true)



Show the records of master_loan_df

In [38]:
master_loan_df.show()

+-------+---------+----------------+----------+-----+-------------+---------------+-------------+-----------------+-----------+--------+------------+------------+-------------+-----------------+-----------------------------------+---------------------+---------------+------+------------+
|Loan_id|branch_id|disbursed_amount|asset_cost|  ltv|Date.of.Birth|Employment.Type|DisbursalDate|MobileNo_Avl_Flag|Aadhar_flag|PAN_flag|VoterID_flag|Driving_flag|Passport_flag|PERFORM_CNS.SCORE|DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS|CREDIT.HISTORY.LENGTH|NO.OF_INQUIRIES|region|loan_default|
+-------+---------+----------------+----------+-----+-------------+---------------+-------------+-----------------+-----------+--------+------------+------------+-------------+-----------------+-----------------------------------+---------------------+---------------+------+------------+
|      1|       64|           36439|     65850|56.19|   14-06-1990|  Self employed|   28-09-2018|                1|          1|      

In [43]:
type(master_loan_df)

pyspark.sql.dataframe.DataFrame

Datatypes of all features of master_loan_df

In [42]:
master_loan_df.describe

<bound method DataFrame.describe of DataFrame[Loan_id: int, branch_id: int, disbursed_amount: int, asset_cost: int, ltv: double, Date.of.Birth: string, Employment.Type: string, DisbursalDate: string, MobileNo_Avl_Flag: int, Aadhar_flag: int, PAN_flag: int, VoterID_flag: int, Driving_flag: int, Passport_flag: int, PERFORM_CNS.SCORE: int, DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS: int, CREDIT.HISTORY.LENGTH: string, NO.OF_INQUIRIES: int, region: string, loan_default: int]>

#### Save the master_loan_df into Hive tables on a Hadoop cluster in an optimized format

In [39]:
master_loan_df.write.saveAsTable("master")

In [40]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|   master|      false|
+--------+---------+-----------+



In [52]:
master_loan_df.toPandas().describe()

Unnamed: 0,Loan_id,branch_id,disbursed_amount,asset_cost,ltv,MobileNo_Avl_Flag,Aadhar_flag,PAN_flag,VoterID_flag,Driving_flag,Passport_flag,PERFORM_CNS.SCORE,DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS,NO.OF_INQUIRIES,loan_default
count,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0,23315.0
mean,11658.0,72.079262,54297.647309,75842.182887,74.701607,1.0,0.845078,0.075531,0.141068,0.023161,0.002059,291.234956,0.100622,0.20579,0.219858
std,6730.605099,69.095008,13061.877434,18988.525635,11.462722,0.0,0.361838,0.264252,0.348099,0.150418,0.045328,338.490214,0.388782,0.699901,0.414159
min,1.0,1.0,13369.0,37230.0,17.13,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,5829.5,13.0,46949.0,65629.0,68.83,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,11658.0,61.0,53759.0,70929.0,76.71,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,17486.5,121.0,60379.0,79354.5,83.63,1.0,1.0,0.0,0.0,0.0,0.0,679.0,0.0,0.0,0.0
max,23315.0,261.0,592460.0,715186.0,94.98,1.0,1.0,1.0,1.0,1.0,1.0,890.0,7.0,23.0,1.0


Storing the dataframe in parquet

In [20]:
loan_df.write.parquet("Loan_details_saved.parquet")

In [21]:
status_df.write.parquet("status_details_saved.parquet")

In [22]:
Branch_df.write.parquet("branch_details_saved.parquet")

Profiling the data

In [30]:
def dataprofile(data_all_df,data_cols):
    data_df = data_all_df.select(data_cols)
    columns2Bprofiled = data_df.columns
    global schema_name, table_name
    if not 'schema_name' in globals():
        schema_name = 'schema_name'
    if not 'table_name' in globals():
        table_name = 'table_name' 
    dprof_df = pd.DataFrame({'schema_name':[schema_name] * len(data_df.columns),\
                             'table_name':[table_name] * len(data_df.columns),\
                             'column_names':data_df.columns,\
                             'data_types':[x[1] for x in data_df.dtypes]}) 
    dprof_df = dprof_df[['schema_name','table_name','column_names', 'data_types']]
    dprof_df.set_index('column_names', inplace=True, drop=False)
    # ======================
    num_rows = data_df.count()
    dprof_df['num_rows'] = num_rows
    # ======================    
    # number of rows with nulls and nans   
    df_nacounts = data_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_df.columns \
                                  if data_df.select(c).dtypes[0][1]!='timestamp']).toPandas().transpose()
    df_nacounts = df_nacounts.reset_index()  
    df_nacounts.columns = ['column_names','num_null']
    dprof_df = pd.merge(dprof_df, df_nacounts, on = ['column_names'], how = 'left')
    # ========================
    # number of rows with white spaces (one or more space) or blanks
    num_spaces = [data_df.where(F.col(c).rlike('^\\s+$')).count() for c in data_df.columns]
    dprof_df['num_spaces'] = num_spaces
    num_blank = [data_df.where(F.col(c)=='').count() for c in data_df.columns]
    dprof_df['num_blank'] = num_blank
    # =========================
    # using the in built describe() function 
    desc_df = data_df.describe().toPandas().transpose()
    desc_df.columns = ['count', 'mean', 'stddev', 'min', 'max']
    desc_df = desc_df.iloc[1:,:]  
    desc_df = desc_df.reset_index()  
    desc_df.columns.values[0] = 'column_names'  
    desc_df = desc_df[['column_names','count', 'mean', 'stddev']] 
    dprof_df = pd.merge(dprof_df, desc_df , on = ['column_names'], how = 'left')
    # ===========================================
    allminvalues = [data_df.select(F.min(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    allmaxvalues = [data_df.select(F.max(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    allmincounts = [data_df.where(col(x) == y).count() for x,y in zip(columns2Bprofiled, allminvalues)]
    allmaxcounts = [data_df.where(col(x) == y).count() for x,y in zip(columns2Bprofiled, allmaxvalues)]    
    df_counts = dprof_df[['column_names']]
    df_counts.insert(loc=0, column='min', value=allminvalues)
    df_counts.insert(loc=0, column='counts_min', value=allmincounts)
    df_counts.insert(loc=0, column='max', value=allmaxvalues)
    df_counts.insert(loc=0, column='counts_max', value=allmaxcounts)
    df_counts = df_counts[['column_names','min','counts_min','max','counts_max']]
    dprof_df = pd.merge(dprof_df, df_counts , on = ['column_names'], how = 'left')  
    # ==========================================
    # number of distinct values in each column
    dprof_df['num_distinct'] = [data_df.select(x).distinct().count() for x in columns2Bprofiled]
    # ============================================
    # most frequently occuring value in a column and its count
    dprof_df['most_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=False).limit(1).\
                                       toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['most_freq_value'] = [x[0] for x in dprof_df['most_freq_valwcount']]
    dprof_df['most_freq_value_count'] = [x[1] for x in dprof_df['most_freq_valwcount']]
    dprof_df = dprof_df.drop(['most_freq_valwcount'],axis=1)
    # least frequently occuring value in a column and its count
    dprof_df['least_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=True).limit(1).\
                                        toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['least_freq_value'] = [x[0] for x in dprof_df['least_freq_valwcount']]
    dprof_df['least_freq_value_count'] = [x[1] for x in dprof_df['least_freq_valwcount']]
    dprof_df = dprof_df.drop(['least_freq_valwcount'],axis=1)

    return dprof_df

In [31]:
profile_cols=loan_df.columns

In [44]:
dprofile=dataprofile(loan_df,profile_cols)

AnalysisException: "cannot resolve '`Date.of.Birth`' given input columns: [DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS, Employment.Type, CREDIT.HISTORY.LENGTH, NO.OF_INQUIRIES, disbursed_amount, branch_id, DisbursalDate, MobileNo_Avl_Flag, PAN_flag, Passport_flag, PERFORM_CNS.SCORE, VoterID_flag, asset_cost, Driving_flag, Date.of.Birth, ltv, Aadhar_flag, Loan_id];;\n'Project [Loan_id#246, disbursed_amount#247, asset_cost#248, ltv#249, branch_id#250, 'Date.of.Birth, 'Employment.Type, DisbursalDate#253, MobileNo_Avl_Flag#254, Aadhar_flag#255, PAN_flag#256, VoterID_flag#257, Driving_flag#258, Passport_flag#259, 'PERFORM_CNS.SCORE, 'DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS, 'CREDIT.HISTORY.LENGTH, 'NO.OF_INQUIRIES]\n+- Relation[Loan_id#246,disbursed_amount#247,asset_cost#248,ltv#249,branch_id#250,Date.of.Birth#251,Employment.Type#252,DisbursalDate#253,MobileNo_Avl_Flag#254,Aadhar_flag#255,PAN_flag#256,VoterID_flag#257,Driving_flag#258,Passport_flag#259,PERFORM_CNS.SCORE#260,DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS#261,CREDIT.HISTORY.LENGTH#262,NO.OF_INQUIRIES#263] csv\n"

In [None]:
dprofile