In [0]:
loan_repayment_raw=spark.read\
    .format("csv")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("dbfs:/user/lendingclub_rawfiles/temp/loans_repayments_csv")

In [0]:
loan_repayment_raw.show()

+--------+---------------+-------------+------------------+------------------+---------------+------------+------------+
| loan_id|total_rec_prncp|total_rec_int|total_rec_late_fee|       total_pymnt|last_pymnt_amnt|last_pymnt_d|next_pymnt_d|
+--------+---------------+-------------+------------------+------------------+---------------+------------+------------+
|68407277|         3600.0|       821.72|               0.0| 4421.723916800001|         122.67|    Jan-2019|        null|
|68355089|        24700.0|       979.66|               0.0|          25679.66|         926.35|    Jun-2016|        null|
|68341763|        20000.0|      2705.92|               0.0|22705.924293878397|        15813.3|    Jun-2017|        null|
|66310712|       19102.35|     12361.66|               0.0|          31464.01|          829.9|    Feb-2019|    Apr-2019|
|68476807|        10400.0|       1340.5|               0.0|           11740.5|       10128.96|    Jul-2016|        null|
|68426831|        11950.0|      

##1. Changing Datatypes and column name

In [0]:
loan_repayment_schema='loan_id string,total_principal_received float,total_interest_recieved float,total_late_fee_received float,total_payment_received float,last_payment_amount float,last_payment_date string,next_payment_date string'

In [0]:
loan_repayment_raw=spark.read\
    .format("csv")\
    .option("header","true")\
    .schema(loan_repayment_schema)\
    .load("dbfs:/user/lendingclub_rawfiles/temp/loans_repayments_csv")

In [0]:
  loan_repayment_raw.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_recieved: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)



##2. Ingesting current timestamp

In [0]:
from pyspark.sql.functions import current_timestamp

loan_repay_ingestd=loan_repayment_raw.withColumn("ingest_date",current_timestamp())

In [0]:
loan_repay_ingestd.display()

loan_id,total_principal_received,total_interest_recieved,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
68407277,3600.0,821.72,0.0,4421.724,122.67,Jan-2019,,2025-04-04T22:36:59.209+0000
68355089,24700.0,979.66,0.0,25679.66,926.35,Jun-2016,,2025-04-04T22:36:59.209+0000
68341763,20000.0,2705.92,0.0,22705.924,15813.3,Jun-2017,,2025-04-04T22:36:59.209+0000
66310712,19102.35,12361.66,0.0,31464.01,829.9,Feb-2019,Apr-2019,2025-04-04T22:36:59.209+0000
68476807,10400.0,1340.5,0.0,11740.5,10128.96,Jul-2016,,2025-04-04T22:36:59.209+0000
68426831,11950.0,1758.95,0.0,13708.948,7653.56,May-2017,,2025-04-04T22:36:59.209+0000
68476668,20000.0,1393.8,0.0,21393.8,15681.05,Nov-2016,,2025-04-04T22:36:59.209+0000
67275481,20000.0,1538.51,0.0,21538.51,14618.23,Jan-2017,,2025-04-04T22:36:59.209+0000
68466926,10000.0,998.97,0.0,10998.972,1814.48,Aug-2018,,2025-04-04T22:36:59.209+0000
68616873,8000.0,939.58,0.0,8939.58,4996.24,Apr-2017,,2025-04-04T22:36:59.209+0000


In [0]:
loan_repay_ingestd.createOrReplaceTempView("loan_repayments")

In [0]:
%sql
select count(*) from loan_repayments

count(1)
100


In [0]:
loan_repay_ingestd.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_recieved: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



##2. Removing Nulls


In [0]:
%sql
select count(*) from loan_repayments where total_payment_received
 is null

count(1)
0


In [0]:
loan_repay_ingestd.count()

Out[12]: 100

In [0]:
coloumns_to_check=["total_principal_received","total_interest_recieved","total_payment_received","last_payment_amount"]

In [0]:
loan_repay_filtered=loan_repay_ingestd.na.drop(subset=coloumns_to_check)

In [0]:
loan_repay_filtered.count()

Out[15]: 100

In [0]:
loan_repay_filtered.createOrReplaceTempView("loan_repayments")

In [0]:
%sql
select * from loan_repayments where total_payment_received=0.0 and total_principal_received !=0.0



loan_id,total_principal_received,total_interest_recieved,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date


##5.Fixing Loans total payment recieved coloumn


In [0]:
from pyspark.sql.functions import when,col
loans_payment_fixed=loan_repay_filtered.withColumn(
    "total_payment_received",
    when(
        (col("total_principal_received") !=0.0)&
        (col("total_payment_received")==0.0),
        col("total_principal_received")+col("total_interest_recieved")+
        col("total_late_fee_received")
        ).otherwise(col("total_payment_received"))
)



In [0]:
loans_payment_fixed.display()

loan_id,total_principal_received,total_interest_recieved,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
68407277,3600.0,821.72,0.0,4421.724,122.67,Jan-2019,,2025-04-04T22:38:53.593+0000
68355089,24700.0,979.66,0.0,25679.66,926.35,Jun-2016,,2025-04-04T22:38:53.593+0000
68341763,20000.0,2705.92,0.0,22705.924,15813.3,Jun-2017,,2025-04-04T22:38:53.593+0000
66310712,19102.35,12361.66,0.0,31464.01,829.9,Feb-2019,Apr-2019,2025-04-04T22:38:53.593+0000
68476807,10400.0,1340.5,0.0,11740.5,10128.96,Jul-2016,,2025-04-04T22:38:53.593+0000
68426831,11950.0,1758.95,0.0,13708.948,7653.56,May-2017,,2025-04-04T22:38:53.593+0000
68476668,20000.0,1393.8,0.0,21393.8,15681.05,Nov-2016,,2025-04-04T22:38:53.593+0000
67275481,20000.0,1538.51,0.0,21538.51,14618.23,Jan-2017,,2025-04-04T22:38:53.593+0000
68466926,10000.0,998.97,0.0,10998.972,1814.48,Aug-2018,,2025-04-04T22:38:53.593+0000
68616873,8000.0,939.58,0.0,8939.58,4996.24,Apr-2017,,2025-04-04T22:38:53.593+0000


In [0]:
loans_payment_fixed.filter("loan_id==1064185").display()

loan_id,total_principal_received,total_interest_recieved,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date


In [0]:
loans_payment_fixed2=loans_payment_fixed.filter("total_payment_received!=0.0")

In [0]:
loans_payment_fixed2.filter("total_payment_received=0.0").display()

loan_id,total_principal_received,total_interest_recieved,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date


In [0]:
loans_payment_fixed2.createOrReplaceTempView("loan_repayments")

In [0]:
%sql
select count(*) from loan_repayments where total_payment_received=0.0 

count(1)
0


##6.Fixing Date Column

In [0]:
loans_payment_ndate_fixed = loans_payment_fixed2.withColumn(
    "last_payment_date",
    when(col("last_payment_date") == 0.0, None).otherwise(col("last_payment_date"))
)


In [0]:
loans_payment_ldate_fixed = loans_payment_ndate_fixed.withColumn(
    "next_payment_date",
    when(col("next_payment_date") == 0.0, None).otherwise(col("next_payment_date"))
)

In [0]:
loans_payment_ldate_fixed.filter("last_payment_date= 0.0").count()

Out[27]: 0

##7.Writing to file

In [0]:
loans_payment_ldate_fixed.write\
    .option("header","true")\
    .format("parquet")\
    .mode("overwrite")\
    .option("path","dbfs:/user/lendingclubproject/newcleaned/loans_repayment_parquet")\
    .save()

In [0]:
loans_payment_ldate_fixed.write\
    .option("header","true")\
    .format("csv")\
    .mode("overwrite")\
    .option("path","dbfs:/user/lendingclubproject/newcleaned/loans_repayment_csv")\
    .save()

In [0]:
%sql
create database pranithdb_lending_club

In [0]:
%sql
CREATE TABLE Pranithdb_lending_club.loans_repayment
USING PARQUET
LOCATION 'dbfs:/user/lendingclubproject/newcleaned/loans_repayment_parquet';


In [0]:
%sql
select * from Pranithdb_lending_club.loans_repayment limit 5

loan_id,total_principal_received,total_interest_recieved,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
68407277,3600.0,821.72,0.0,4421.724,122.67,Jan-2019,,2025-04-04T22:40:28.103+0000
68355089,24700.0,979.66,0.0,25679.66,926.35,Jun-2016,,2025-04-04T22:40:28.103+0000
68341763,20000.0,2705.92,0.0,22705.924,15813.3,Jun-2017,,2025-04-04T22:40:28.103+0000
66310712,19102.35,12361.66,0.0,31464.01,829.9,Feb-2019,Apr-2019,2025-04-04T22:40:28.103+0000
68476807,10400.0,1340.5,0.0,11740.5,10128.96,Jul-2016,,2025-04-04T22:40:28.103+0000
