In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp,current_date,when,col

In [2]:
spark=SparkSession.builder.config("'spark.shuffle.useOldFetchProtocol', 'true'")\
.enableHiveSupport()\
.appName("Project_Pyspark")\
.master('yarn')\
.getOrCreate()

In [3]:
spark

In [172]:
loans_repay_raw_df=spark.read.format("csv").option("header",True).option("inferSchema",True)\
.load("/user/itv012010/lendingclubproject/raw/loans_repayment_csv")

In [173]:
loans_repay_raw_df.show(2)

+--------+---------------+-------------+------------------+-----------+---------------+------------+------------+
| loan_id|total_rec_prncp|total_rec_int|total_rec_late_fee|total_pymnt|last_pymnt_amnt|last_pymnt_d|next_pymnt_d|
+--------+---------------+-------------+------------------+-----------+---------------+------------+------------+
|69642551|        3246.03|       990.53|               0.0|    4965.02|         326.35|    Feb-2017|        null|
|69336966|        12000.0|       1080.5|               0.0|    13080.5|        9501.81|    Nov-2016|        null|
+--------+---------------+-------------+------------------+-----------+---------------+------------+------------+
only showing top 2 rows



In [174]:
loans_repay_raw_df.printSchema() #we can see all are string we need to define schema

root
 |-- loan_id: string (nullable = true)
 |-- total_rec_prncp: string (nullable = true)
 |-- total_rec_int: string (nullable = true)
 |-- total_rec_late_fee: string (nullable = true)
 |-- total_pymnt: string (nullable = true)
 |-- last_pymnt_amnt: string (nullable = true)
 |-- last_pymnt_d: string (nullable = true)
 |-- next_pymnt_d: string (nullable = true)



In [175]:
loans_repay_schema='loan_id string,total_principal_received float,total_interest_received \
float,total_late_fee_received float,total_payment_received \
float,last_payment_amount float,last_payment_date string,next_payment_date \
string '

In [176]:
loans_repay_raw_df=spark.read.format("csv").option("header",True).schema(loans_repay_schema)\
.load("/user/itv012010/lendingclubproject/raw/loans_repayment_csv")

In [177]:
loans_repay_raw_df.printSchema() #we can see names & datatypes are changed

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)



In [178]:
loans_repay_ingest_date=loans_repay_raw_df.withColumn("ingest_date",current_timestamp()) #to generate new column

In [179]:
loans_repay_ingest_date.count() #total count 

2260701

In [181]:
loans_repay_ingest_date.createOrReplaceTempView('loans_repayments')

In [182]:
spark.sql("select count(*) from loans_repayments \
          where total_principal_received is null") #to check nulls in principal amount received

count(1)
69


In [183]:
columns_to_check =["total_principal_received","total_interest_received","total_late_fee_received","\
total_payment_received","last_payment_amount"]

In [184]:
loans_repay_filtered_df=loans_repay_ingest_date.na.drop(subset=columns_to_check)

In [185]:
loans_repay_filtered_df.count() #count after droping columns with NA

2260498

In [188]:
loans_repay_filtered_df.createOrReplaceTempView('loans_repayments')

In [189]:
spark.sql("select * from loans_repayments where total_payment_received =0.0 and total_principal_received !=0")
#total_payment_received = total_principal_received + total_interest_received +  total_late_fee_received

#we can see there are records where total_payment_received is zero but total_principal_received,total_interest_received,total_late_fee_received
#are not

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,0.0,0.0,0.0,Dec-2014,2024-03-30 09:52:...
516382,21890.229,21856.03,16000.0,0.0,0.0,0.0,Mar-2014,2024-03-30 09:52:...
528899,3045.0364,3019.64,2500.0,0.0,0.0,0.0,Jan-2013,2024-03-30 09:52:...
527598,2398.9092,2220.51,2200.0,0.0,0.0,0.0,Jul-2011,2024-03-30 09:52:...
525697,21797.86,19894.9,15750.0,0.0,0.0,0.0,Jun-2015,2024-03-30 09:52:...
522641,3146.8193,3146.82,3000.0,0.0,0.0,0.0,Sep-2011,2024-03-30 09:52:...
515655,29938.576,29905.75,22800.0,0.0,0.0,0.0,May-2013,2024-03-30 09:52:...
501234,15219.313,15155.9,12000.0,0.0,0.0,0.0,May-2013,2024-03-30 09:52:...
498194,11642.714,11031.47,10000.0,0.0,0.0,0.0,Jan-2013,2024-03-30 09:52:...
495171,11138.843,10024.96,10000.0,0.0,0.0,0.0,Apr-2013,2024-03-30 09:52:...


In [190]:
loans_payments_fixed=loans_repay_filtered_df.withColumn( 
    "total_payment_received", 
    when( 
        (col("total_principal_received")!=0.0) & 
        (col("total_payment_received")==0.0), 
        col("total_principal_received")+col("total_interest_received")+col("total_late_fee_received")).otherwise(col("total_payment_received"))
) #this checks if there is a condition like above them add all three columns else keep as it is

In [194]:
loans_payments_fixed.filter("loan_id = '1064185'") #we can see here now the data is changed total_payment_received is not zero now

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,33201.96,0.0,0.0,Dec-2014,2024-03-30 09:53:...


In [195]:
#now we need to remove rows where total_payment_received is zero 

In [196]:
loans_payment_fixed2=loans_payments_fixed.filter("total_payment_received!='0.0'")

In [197]:
loans_payment_fixed2#this has data where payments has been cleaned as per above logics

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
69642551,3246.03,990.53,0.0,4965.02,326.35,Feb-2017,,2024-03-30 09:53:...
69336966,12000.0,1080.5,0.0,13080.5,9501.81,Nov-2016,,2024-03-30 09:53:...
69226186,28000.0,3049.36,0.0,31049.361,9306.81,Mar-2018,,2024-03-30 09:53:...
69084024,16500.0,900.32,0.0,17400.32,14085.82,Aug-2016,,2024-03-30 09:53:...
69422635,21000.0,4452.42,0.0,25452.418,12705.78,Jun-2018,,2024-03-30 09:53:...
69542436,1220.15,1252.23,0.0,4533.19,497.94,Jul-2016,,2024-03-30 09:53:...
69492501,29000.0,5405.32,47.8,34453.12,771.08,Mar-2019,Apr-2019,2024-03-30 09:53:...
69522527,33000.0,2092.69,0.0,35092.69,65.34,Jul-2016,,2024-03-30 09:53:...
68781653,18000.0,4167.34,0.0,22167.344,13021.32,Jan-2018,,2024-03-30 09:53:...
69371334,16000.0,1140.7,0.0,17140.701,13910.33,Dec-2016,,2024-03-30 09:53:...


In [198]:
#we can see last_payment_amount & next_payment_date is zero in many columns we need to replace them with nulls 

In [201]:
loans_payment_fixed2.filter("last_payment_date=0.0").count()

48

In [202]:
loans_payment_fixed2.filter("next_payment_date=0.0").count()

24

In [140]:
#need to replace above with nulls bcz date cannot be zero

In [203]:
loans_paymentl_date_fixed=loans_payment_fixed2.withColumn(
    "last_payment_date", 
    when(
     (col("last_payment_date")==0.0),
     None
     ).otherwise(col("last_payment_date"))
)

In [204]:
loans_paymentn_date_fixed=loans_paymentl_date_fixed.withColumn(
    "next_payment_date", 
    when(
     (col("next_payment_date")==0.0),
     None
     ).otherwise(col("next_payment_date"))
)

In [205]:
loans_paymentn_date_fixed.filter("next_payment_date = 0.0").count()

0

In [159]:
loans_paymentn_date_fixed.filter("last_payment_date = 0.0").count() #we can see now there are no last_payment_date & next_payment_date with zeros

0

In [206]:
loans_paymentn_date_fixed.write.mode("overwrite").option("header",True).format("csv")\
.option("path","/user/itv012010/lendingclubproject/cleaned/loans_repayment_csv").save() #saving as csv

In [207]:
loans_paymentn_date_fixed.write.mode("overwrite").option("header",True).format("parquet")\
.option("path","/user/itv012010/lendingclubproject/cleaned/loans_repayment_parquet").save() #saving as parquet

In [209]:
loans_paymentn_date_fixed.rdd.getNumPartitions() #there would be two files created 

2