## Data Cleaning and Analysis using PySpark 
* Dataset - https://www.kaggle.com/wordsforthewise/lending-club
* Steps involved:
  - Cleaning the data in the column using regex expressions
  - Analysing the correlation between different features
  - Pattern understanding from inter column frequency table
  - Handling null values
  - Merging of columns
  - Transforming multi label data into binary data
  
** Note - The notebook was practised on databricks community version and hence doesnt contain any sparksession initialization.

In [None]:
# File location and type
file_location = "/FileStore/tables/LoanStats_2018Q4.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

In [None]:
df.printSchema()

In [None]:
df_sel = df.select('term', 'home_ownership', 'grade', 'purpose', 'int_rate', 'addr_state',
                  'loan_status', 'application_type', 'loan_amnt', 'emp_length', 'annual_inc',
                  'dti', 'delinq_2yrs', 'revol_util', 'total_acc', 'num_tl_90g_dpd_24m','dti_joint')
df_sel.describe().show()

In [None]:
df_sel.cache()

In [None]:
from pyspark.sql.functions import regexp_replace, regexp_extract
from pyspark.sql.functions import *

In [None]:
#Cleaning the columns emp_length, term
df_sel = df_sel.withColumn('emp_cleaned', regexp_extract(col('emp_length'), '\\d+', 0)).withColumn('term_cleaned', regexp_replace(col('term'), 'months', ''))
df_sel.show(5)

In [None]:
# Create a view or table
temp_table_name = 'loan_status_intermediate'
df_sel.createOrReplaceTempView(temp_table_name)

In [None]:
spark.sql('''
          select * from loan_status_intermediate
          ''').show(5)

In [None]:
#Verify the correlation between Annual inc and loan amount
'''
Correlation is a normalized measure of covariance that is easier to understand, as it provides quantitative measurements of the statistical dependence between two random variables.
'''
df_sel.stat.corr('annual_inc', 'loan_amnt')

In [None]:
#Analysing the frequency distribution between two features i.e loan_status and grade using cross tab.
'''
Cross Tabulation provides a table of the frequency distribution for a set of variables.
'''
df_sel.stat.crosstab('loan_status', 'grade').show()

In [None]:
#Checking the frequent occurences in the column 'purpose' and 'grade' 
freq = df_sel.stat.freqItems(['purpose', 'grade'], 0.3)
freq.collect()

In [None]:
df_sel.groupBy(df_sel.purpose).count().orderBy(desc('count')).show()

In [None]:
#Tradeoff between statiscal accurcy and time of operation. 
#The more lenient the allowable_err the more faster the execution
quants =[0.25, 0.5, 0.75]
allowable_err = 0.05
df_sel.approxQuantile('loan_amnt', quants, allowable_err)

In [None]:
#Analysing null values in the columns
df_sel.select([count(when(isnan(c)|col(c).isNull(), c)).alias(c) for c in df_sel.columns]).show()

In [None]:
df_sel = df_sel.dropna(how='all', subset=['loan_status'])
df_sel.select([count(when(isnan(c)|col(c).isNull(), c)).alias(c) for c in df_sel.columns]).show()

In [None]:
df_sel = df_sel.withColumn('revol_util_cleaned', regexp_extract(col('revol_util'), '\\d+', 0))
df_sel.select(['revol_util','revol_util_cleaned']).show(7)

In [None]:
def fill_avg(df, column_name):
  return df.select(column_name).agg(avg(column_name))

In [None]:
rev_avg = fill_avg(df_sel, 'revol_util_cleaned').first()[0]
df_sel = df_sel.withColumn('revol_util_avg', lit(rev_avg))
df_sel.show(5)

In [None]:
#Replace the Null value in the revol util column with the average value
df_sel = df_sel.withColumn('revol_util_cleaned', coalesce(col('revol_util_cleaned'), col('revol_util_avg')))
df_sel = df_sel.withColumn('revol_util_cleaned', col('revol_util_cleaned').cast('double'))
df_sel.describe('revol_util_cleaned').show()

In [None]:
spark.sql('''
          select application_type, dti, dti_joint from loan_status_intermediate where dti is null
          ''').show(10)

In [None]:
#replace the null values from dti column with the values from dti_joint along that row.
df_sel= df_sel.withColumn('dti_cleaned', coalesce(col('dti'), col('dti_joint')))

In [None]:
distinct_ids = [x.loan_status for x in df_sel.select('loan_status').distinct().collect()]
distinct_ids

In [None]:
#Converting multi label data into final by transforming different types of loan into two categories as Yes and No. Yes means bad loan and No mean good loan.
df_sel = df_sel.withColumn('bad_loan', when(df_sel.loan_status.isin(['Fully Paid', 'Current']), 'No').otherwise('Yes'))
df_sel.groupBy('bad_loan').count().show()

In [None]:
df_sel.drop('dti_joint', 'revol_util', 'dti')
df_sel.printSchema()

In [None]:
df_sel.stat.crosstab('bad_loan', 'grade').show()
#inference - grade A is very good loan and the goodness decreases along B, C, D....Reason - The Yes and No ratio is seen to increase from A to G.

In [None]:
permanent_table = 'processes_loan'
df_sel.write.format('parquet').saveAsTable(permanent_table)

In [None]:
spark.sql('''
          SELECT * from processes_loan
          ''').show()

In [None]:
# Create a view or table

temp_table_name = "LoanStats_2018Q4_csv"

df.createOrReplaceTempView(temp_table_name)