In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2, concat_ws
from pyspark.sql.functions import *
spark  = SparkSession.builder\
                    .appName("lendingClubProject")\
                    .config("spark.sql.warehouse.dir","/user/anil/warehouse")\
                    .enableHiveSupport()\
                    .master('yarn')\
                    .getOrCreate()

# Adding Proper Schema

In [36]:
loansRepaymentSchema = 'loan_id_ string, total_principal_received float, total_interest_received float, total_late_fee_received float, total_payment_received float, last_payment_amount float, last_payment_date string, next_payment_date string'

In [37]:
loansRepayment = spark.read \
.format("csv") \
.option("header",True) \
.schema(loansRepaymentSchema) \
.load("/user/anil/lendingClubProject/loans_repayments")
loansRepayment

loan_id_,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date
68407277,3600.0,821.72,0.0,4421.724,122.67,Jan-2019,
68355089,24700.0,979.66,0.0,25679.66,926.35,Jun-2016,
68341763,20000.0,2705.92,0.0,22705.924,15813.3,Jun-2017,
66310712,19102.35,12361.66,0.0,31464.01,829.9,Feb-2019,Apr-2019
68476807,10400.0,1340.5,0.0,11740.5,10128.96,Jul-2016,
68426831,11950.0,1758.95,0.0,13708.948,7653.56,May-2017,
68476668,20000.0,1393.8,0.0,21393.8,15681.05,Nov-2016,
67275481,20000.0,1538.51,0.0,21538.51,14618.23,Jan-2017,
68466926,10000.0,998.97,0.0,10998.972,1814.48,Aug-2018,
68616873,8000.0,939.58,0.0,8939.58,4996.24,Apr-2017,


In [5]:
loansRepayment.printSchema()

root
 |-- loan_id_: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)



# Adding Ingested Timestamp

In [6]:
loansRepaymentIngested = loansRepayment.withColumn('IngestedDateTime',current_timestamp())

In [7]:
loansRepaymentIngested

loan_id_,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,IngestedDateTime
68407277,3600.0,821.72,0.0,4421.724,122.67,Jan-2019,,2024-11-15 01:51:...
68355089,24700.0,979.66,0.0,25679.66,926.35,Jun-2016,,2024-11-15 01:51:...
68341763,20000.0,2705.92,0.0,22705.924,15813.3,Jun-2017,,2024-11-15 01:51:...
66310712,19102.35,12361.66,0.0,31464.01,829.9,Feb-2019,Apr-2019,2024-11-15 01:51:...
68476807,10400.0,1340.5,0.0,11740.5,10128.96,Jul-2016,,2024-11-15 01:51:...
68426831,11950.0,1758.95,0.0,13708.948,7653.56,May-2017,,2024-11-15 01:51:...
68476668,20000.0,1393.8,0.0,21393.8,15681.05,Nov-2016,,2024-11-15 01:51:...
67275481,20000.0,1538.51,0.0,21538.51,14618.23,Jan-2017,,2024-11-15 01:51:...
68466926,10000.0,998.97,0.0,10998.972,1814.48,Aug-2018,,2024-11-15 01:51:...
68616873,8000.0,939.58,0.0,8939.58,4996.24,Apr-2017,,2024-11-15 01:51:...


In [8]:
loansRepaymentIngested.printSchema()

root
 |-- loan_id_: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)
 |-- IngestedDateTime: timestamp (nullable = false)



# Drop records if any columns are null

In [9]:
loansRepaymentIngested.count()

2260701

In [10]:
columnsLookup = ["total_principal_received", "total_interest_received", "total_late_fee_received", "total_payment_received", "last_payment_amount"]

In [11]:
loansRepaymentcleansed = loansRepaymentIngested.na.drop(subset = columnsLookup )

In [12]:
loansRepaymentcleansed.count()

2260498

# total_payment_received should n't be zero when total principle recieved

In [13]:
loansRepaymentcleansed.filter(col('total_payment_received') == 0 ).count()

995

In [14]:
loansRepaymentcleansed.filter((col('total_payment_received') == 0) & (col('total_principal_received') != 0) ).count()

46

In [16]:
loansRepaymentModified = loansRepaymentcleansed.withColumn('total_payment_received', when((col('total_payment_received') == 0 ) & (col('total_principal_received') != 0) ,col('total_principal_received') + col('total_interest_received') + col('total_late_fee_received')).otherwise('total_payment_received'))

In [17]:
loansRepaymentModified.filter((col('total_payment_received') == 0) & (col('total_principal_received') != 0) ).count()

0

In [23]:
loansRepaymentModified.filter(col('last_payment_date') == 0.0 ).count()

48

In [24]:
loansRepaymentModified.filter(col('next_payment_date') == 0.0 ).count()

24

In [25]:
loansRepaymentModified.filter("last_payment_date is null").count()

2426

In [26]:
loansRepaymentModified.filter("next_payment_date is null").count()

1345084

In [27]:
loansPaymentsLdateFixed = loansRepaymentModified.withColumn(
  "last_payment_date",
   when(
       (col("last_payment_date") == 0.0),
       None
       ).otherwise(col("last_payment_date"))
)

In [28]:
loansPaymentsNdateFixed = loansPaymentsLdateFixed.withColumn(
  "last_payment_date",
   when(
       (col("next_payment_date") == 0.0),
       None
       ).otherwise(col("next_payment_date"))
)

In [29]:
loansPaymentsNdateFixed.filter("last_payment_date = 0.0").count()

0

In [30]:
loansPaymentsNdateFixed.filter("next_payment_date = 0.0").count()

24

In [31]:
loansPaymentsNdateFixed.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/anil/lendingClubProject/cleansed/loansRepaymentParquet") \
.save()

In [32]:
loansPaymentsNdateFixed.write \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/anil/lendingClubProject/cleansed/loansRepaymentcsv") \
.save()

In [38]:
spark.stop()