In [1]:
# all imports at one place here
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Loading the Data from url

In [2]:
file_loc = "<data-url>"
file_type = "csv"

infer_schema = "true"
first_row_header = "true"
delimiter = ","

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

spark_df = spark.read.format(file_type) \
     .option("inferSchema", infer_schema) \
     .option("header", first_row_header) \
     .option("sep", delimiter) \
     .load(file_loc)


# Exploratory Analysis

In [3]:
spark_df.printSchema()

root
 |-- disbursed_amount: integer (nullable = true)
 |-- asset_cost: integer (nullable = true)
 |-- ltv: double (nullable = true)
 |-- branch_id: integer (nullable = true)
 |-- Date.of.Birth: string (nullable = true)
 |-- Employment.Type: string (nullable = true)
 |-- DisbursalDate: string (nullable = true)
 |-- MobileNo_Avl_Flag: integer (nullable = true)
 |-- Aadhar_flag: integer (nullable = true)
 |-- PAN_flag: integer (nullable = true)
 |-- VoterID_flag: integer (nullable = true)
 |-- Driving_flag: integer (nullable = true)
 |-- Passport_flag: integer (nullable = true)
 |-- PERFORM_CNS.SCORE: integer (nullable = true)
 |-- DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS: integer (nullable = true)
 |-- CREDIT.HISTORY.LENGTH: string (nullable = true)
 |-- NO.OF_INQUIRIES: integer (nullable = true)
 |-- loan_default: integer (nullable = true)



# Cleansing the columns, refactoring the columns which have dot

In [4]:
# replace dots in all the columns with _
spark_df1 = spark_df.withColumnRenamed("Date.of.Birth","Date_Of_Birth") \
.withColumnRenamed("Employment.Type","Employment_Type")  \
.withColumnRenamed("PERFORM_CNS.SCORE","PERFORM_CNS_CORE") \
.withColumnRenamed("DELINQUENT.ACCTS.IN.LAST.SIX.MONTHS", "DELINQUENT_ACCTS_IN_LAST_SIX_MONTHS") \
.withColumnRenamed("CREDIT.HISTORY.LENGTH", "CREDIT_HISTORY_LENGTH") \
.withColumnRenamed("NO.OF_INQUIRIES", "NO_OF_INQUIRIES") 

spark_df1.printSchema()

spark_df = spark_df1


root
 |-- disbursed_amount: integer (nullable = true)
 |-- asset_cost: integer (nullable = true)
 |-- ltv: double (nullable = true)
 |-- branch_id: integer (nullable = true)
 |-- Date_Of_Birth: string (nullable = true)
 |-- Employment_Type: string (nullable = true)
 |-- DisbursalDate: string (nullable = true)
 |-- MobileNo_Avl_Flag: integer (nullable = true)
 |-- Aadhar_flag: integer (nullable = true)
 |-- PAN_flag: integer (nullable = true)
 |-- VoterID_flag: integer (nullable = true)
 |-- Driving_flag: integer (nullable = true)
 |-- Passport_flag: integer (nullable = true)
 |-- PERFORM_CNS_CORE: integer (nullable = true)
 |-- DELINQUENT_ACCTS_IN_LAST_SIX_MONTHS: integer (nullable = true)
 |-- CREDIT_HISTORY_LENGTH: string (nullable = true)
 |-- NO_OF_INQUIRIES: integer (nullable = true)
 |-- loan_default: integer (nullable = true)



In [5]:
display(spark_df)

DataFrame[disbursed_amount: int, asset_cost: int, ltv: double, branch_id: int, Date_Of_Birth: string, Employment_Type: string, DisbursalDate: string, MobileNo_Avl_Flag: int, Aadhar_flag: int, PAN_flag: int, VoterID_flag: int, Driving_flag: int, Passport_flag: int, PERFORM_CNS_CORE: int, DELINQUENT_ACCTS_IN_LAST_SIX_MONTHS: int, CREDIT_HISTORY_LENGTH: string, NO_OF_INQUIRIES: int, loan_default: int]

In [6]:
spark_df.count()

23315

# Checking for Duplicates

In [7]:
df_duplicates = spark_df.groupBy(spark_df.columns).count().filter("count > 1")
df_duplicates.show()

+----------------+----------+---+---------+-------------+---------------+-------------+-----------------+-----------+--------+------------+------------+-------------+----------------+-----------------------------------+---------------------+---------------+------------+-----+
|disbursed_amount|asset_cost|ltv|branch_id|Date_Of_Birth|Employment_Type|DisbursalDate|MobileNo_Avl_Flag|Aadhar_flag|PAN_flag|VoterID_flag|Driving_flag|Passport_flag|PERFORM_CNS_CORE|DELINQUENT_ACCTS_IN_LAST_SIX_MONTHS|CREDIT_HISTORY_LENGTH|NO_OF_INQUIRIES|loan_default|count|
+----------------+----------+---+---------+-------------+---------------+-------------+-----------------+-----------+--------+------------+------------+-------------+----------------+-----------------------------------+---------------------+---------------+------------+-----+
+----------------+----------+---+---------+-------------+---------------+-------------+-----------------+-----------+--------+------------+------------+-------------+---

#### There are no duplicates as we can see the dataframe returned is empty

# Checking for Nulls

In [8]:
# checking null/na in all the columns
nulls_df = {col:spark_df.filter(spark_df[col].isNull()).count() for col in spark_df.columns}
nulls_df

{'disbursed_amount': 0,
 'asset_cost': 0,
 'ltv': 0,
 'branch_id': 0,
 'Date_Of_Birth': 0,
 'Employment_Type': 770,
 'DisbursalDate': 0,
 'MobileNo_Avl_Flag': 0,
 'Aadhar_flag': 0,
 'PAN_flag': 0,
 'VoterID_flag': 0,
 'Driving_flag': 0,
 'Passport_flag': 0,
 'PERFORM_CNS_CORE': 0,
 'DELINQUENT_ACCTS_IN_LAST_SIX_MONTHS': 0,
 'CREDIT_HISTORY_LENGTH': 0,
 'NO_OF_INQUIRIES': 0,
 'loan_default': 0}

#### We can observe the Employment_Type column has 770 nulls, we will now treat the null values

## Treating the null values

In [9]:
spark_df.groupBy('Employment_Type').count().show()

+---------------+-----+
|Employment_Type|count|
+---------------+-----+
|           null|  770|
|  Self employed|12724|
|       Salaried| 9821|
+---------------+-----+



#### Self employed is having high count compared to others so we will replace the null with Self employed

In [10]:
spark_df = spark_df.fillna("Self employed",["Employment_Type"])

In [11]:
#Validation
spark_df.groupBy('Employment_Type').count().show()

+---------------+-----+
|Employment_Type|count|
+---------------+-----+
|  Self employed|13494|
|       Salaried| 9821|
+---------------+-----+



# Cleansing the Date Of Birth

In [12]:
spark_df.select("Date_Of_Birth").show()

+-------------+
|Date_Of_Birth|
+-------------+
|     14-06-90|
|     1/1/1991|
|     16-08-93|
|     1/1/1989|
|     31-12-74|
|     23-11-64|
|    1/10/1989|
|     1/1/1995|
|     15-06-94|
|     23-11-82|
|     1/1/1984|
|     15-02-90|
|    5/12/1975|
|    11/9/1995|
|     29-10-75|
|     28-02-84|
|    2/11/1975|
|     21-10-90|
|   11/10/1994|
|     5/6/1987|
+-------------+
only showing top 20 rows



#### The date of birth has two formats of dates the dd-MM-yy is not giving proper date with defualt to_date() function we we will be using the spark.sql.legacy.timeParserPolicy to handle that dd-MM-yy format 

#### Note: Tried with spark.sql.legacy.timeParserPolicy but it was not working, so finally applied regex on the date for cleanup

In [13]:
new_df = spark_df.withColumn(
    'Date_Of_Birth',
    when(
        spark_df["Date_Of_Birth"].rlike(r'^.*-[^/]+$') & spark_df["Date_Of_Birth"].rlike(r'^.*-\d{2}$') & (spark_df["Date_Of_Birth"].substr(-2, 2) != '00'),
        regexp_replace(spark_df["Date_Of_Birth"], r'-(\d{2})$', '/19$1')
    ) 
    .when(
        spark_df["Date_Of_Birth"].rlike(r'^.*-[^/]+$') & spark_df["Date_Of_Birth"].rlike(r'^.*-\d{2}$') & (spark_df["Date_Of_Birth"].substr(-2, 2) == '00'),
        regexp_replace(spark_df["Date_Of_Birth"], r'-(\d{2})$', '/20$1')
    )
    .otherwise(spark_df["Date_Of_Birth"])    
)

In [14]:
new_df = new_df.withColumn('Date_Of_Birth',regexp_replace(new_df["Date_Of_Birth"], '-', '/'))

In [15]:
new_df = new_df.withColumn(
    'Date_Of_Birth',
    when(
        new_df["Date_Of_Birth"].contains('/'),
        when(
            split(new_df["Date_Of_Birth"], '/')[0].cast("integer").between(1, 9),
            concat(
                lpad(split(new_df["Date_Of_Birth"], '/')[0].cast("integer"), 2, '0'),
                lit('/'),
                split(new_df["Date_Of_Birth"], '/')[1],
                lit('/'),
                split(new_df["Date_Of_Birth"], '/')[2]
            )
        )
        .otherwise(new_df["Date_Of_Birth"])
    )
    .otherwise(new_df["Date_Of_Birth"])
)


In [16]:
new_df.select("Date_Of_Birth").show()

+-------------+
|Date_Of_Birth|
+-------------+
|   14/06/1990|
|    01/1/1991|
|   16/08/1993|
|    01/1/1989|
|   31/12/1974|
|   23/11/1964|
|   01/10/1989|
|    01/1/1995|
|   15/06/1994|
|   23/11/1982|
|    01/1/1984|
|   15/02/1990|
|   05/12/1975|
|    11/9/1995|
|   29/10/1975|
|   28/02/1984|
|   02/11/1975|
|   21/10/1990|
|   11/10/1994|
|    05/6/1987|
+-------------+
only showing top 20 rows



#### Similary claning up the DisbursalDate

In [17]:
new_df = spark_df.withColumn(
    'DisbursalDate',when(
        spark_df["DisbursalDate"].rlike(r'^.*-[^/]+$') & spark_df["DisbursalDate"].rlike(r'^.*-\d{2}$'),
        regexp_replace(spark_df["DisbursalDate"], r'-(\d{2})$', '/20$1')
    )
    .otherwise(spark_df["DisbursalDate"])    
)

new_df = new_df.withColumn('DisbursalDate',regexp_replace(new_df["DisbursalDate"], '-', '/'))

#spark_df1.withColumn("Date_Of_Birth", to_date("Date_Of_Birth", "dd-MM-yy"))park_df1.withColumn("Date_Of_Birth", to_date("Date_Of_Birth", "dd-MM-yy"))

In [18]:
new_df.select('DisbursalDate').show()

+-------------+
|DisbursalDate|
+-------------+
|   28/09/2018|
|    10/9/2018|
|   31/08/2018|
|   13/10/2018|
|   14/09/2018|
|   17/08/2018|
|   16/08/2018|
|   26/08/2018|
|   15/10/2018|
|   26/10/2018|
|     3/9/2018|
|   24/08/2018|
|   28/08/2018|
|   21/09/2018|
|   20/08/2018|
|   26/10/2018|
|   30/09/2018|
|   30/09/2018|
|   31/10/2018|
|   25/10/2018|
+-------------+
only showing top 20 rows



In [19]:
#new_df = new_df.withColumn("Date_Of_Birth", to_date("Date_Of_Birth", "d/M/y"))
# new_df = new_df.withColumn("DisbursalDate", to_date("DisbursalDate", "dd/MM/yyyy"))

In [20]:
spark.conf.get("spark.sql.legacy.timeParserPolicy")

'EXCEPTION'

In [21]:
#new_df.show()

# Converting the CREDIT_HISTORY_LENGTH to months (2yrs 6mon) will become 30 months

In [22]:
def convert_months(col):
    split_col = split(col, " ")
    years = split_col.getItem(0).cast("int")
    months = split_col.getItem(1).cast("int")
    total_months = years * 12 + months
    return total_months

new_df = new_df.withColumn("CREDIT_HISTORY_LENGTH_CLN", regexp_replace("CREDIT_HISTORY_LENGTH", "yrs", ""))
new_df = new_df.withColumn("CREDIT_HISTORY_LENGTH_CLN", regexp_replace("CREDIT_HISTORY_LENGTH_CLN", "mon", ""))

new_df = new_df.withColumn("CREDIT_HISTORY_LENGTH_CLN",convert_months("CREDIT_HISTORY_LENGTH_CLN"))

new_df = new_df.drop("CREDIT_HISTORY_LENGTH")
#spark_df.withColumnRenamed("CREDIT_HISTORY_LENGTH_CLN","CREDIT_HISTORY_LENGTH")



In [23]:
#validating the CREDIT_HISTORY_LENGTH_CLN

# spark_df.createOrReplaceTempView(temp_loans_table)
# spark_df.select("*").show()
# spark_df.select([col for col in spark_df.columns]).show()

#new_df.select("CREDIT_HISTORY_LENGTH_CLN").show()

In [24]:
new_df.write.mode('overwrite').parquet("<dest-url>")