In [45]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    appName("SparkDemo"). \
    config('spark.ui.port','0'). \
    config('spark.shuffle.useOldFethProtocol','true'). \
    config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [46]:
loans_repay_schema = """loan_id string,total_principal_received float,total_interest_received float,
total_late_fee_received float,total_payment_received float,last_payment_amount float,
last_payment_date string,next_payment_date string """

In [47]:
loans_repay_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(loans_repay_schema) \
.load("/user/itv009959/lendingclubproject/raw/loans_repayments_csv")

In [48]:
loans_repay_raw_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date
6300587,12000.0,1864.87,20.86,13885.727,108.83,Nov-2014,
6311418,7125.0,95.92,0.0,7220.9243,7220.52,Sep-2013,
6321535,24000.0,7418.84,0.0,31418.84,13979.48,Apr-2016,
6300526,24000.0,2296.28,0.0,26296.275,730.17,Aug-2016,
6301469,18000.0,9239.79,0.0,27239.795,452.24,Aug-2018,
6306881,20000.0,2469.32,0.0,22469.322,16744.64,Oct-2014,
6301579,12000.0,2089.07,0.0,14089.07,4189.07,Oct-2015,
6291661,28000.0,12441.42,0.0,40441.42,10660.91,Apr-2017,
6311612,3629.9,847.39,0.0,4478.25,179.13,Sep-2015,
6311704,23600.0,2077.3,0.0,25677.297,21445.23,Apr-2014,


In [49]:
loans_repay_raw_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)



In [50]:
from pyspark.sql.functions import current_timestamp

In [51]:
ingest_repay_ingest_df = loans_repay_raw_df.withColumn("ingest_date",current_timestamp())

In [52]:
ingest_repay_ingest_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
6300587,12000.0,1864.87,20.86,13885.727,108.83,Nov-2014,,2024-03-12 20:56:...
6311418,7125.0,95.92,0.0,7220.9243,7220.52,Sep-2013,,2024-03-12 20:56:...
6321535,24000.0,7418.84,0.0,31418.84,13979.48,Apr-2016,,2024-03-12 20:56:...
6300526,24000.0,2296.28,0.0,26296.275,730.17,Aug-2016,,2024-03-12 20:56:...
6301469,18000.0,9239.79,0.0,27239.795,452.24,Aug-2018,,2024-03-12 20:56:...
6306881,20000.0,2469.32,0.0,22469.322,16744.64,Oct-2014,,2024-03-12 20:56:...
6301579,12000.0,2089.07,0.0,14089.07,4189.07,Oct-2015,,2024-03-12 20:56:...
6291661,28000.0,12441.42,0.0,40441.42,10660.91,Apr-2017,,2024-03-12 20:56:...
6311612,3629.9,847.39,0.0,4478.25,179.13,Sep-2015,,2024-03-12 20:56:...
6311704,23600.0,2077.3,0.0,25677.297,21445.23,Apr-2014,,2024-03-12 20:56:...


In [53]:
ingest_repay_ingest_df.createOrReplaceTempView("loan_repayments")

In [54]:
spark.sql("select count(*) from loan_repayments where total_principal_received is null")

count(1)
69


In [55]:
columns_to_be_checked = ["total_principal_received","total_interest_received","total_late_fee_received","total_payment_received","last_payment_amount"]

In [57]:
loans_repay_filtered_df = ingest_repay_ingest_df.na.drop(subset=columns_to_be_checked)

In [58]:
loans_repay_filtered_df.count()

2260498

In [59]:
loans_repay_filtered_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
6300587,12000.0,1864.87,20.86,13885.727,108.83,Nov-2014,,2024-03-12 20:56:...
6311418,7125.0,95.92,0.0,7220.9243,7220.52,Sep-2013,,2024-03-12 20:56:...
6321535,24000.0,7418.84,0.0,31418.84,13979.48,Apr-2016,,2024-03-12 20:56:...
6300526,24000.0,2296.28,0.0,26296.275,730.17,Aug-2016,,2024-03-12 20:56:...
6301469,18000.0,9239.79,0.0,27239.795,452.24,Aug-2018,,2024-03-12 20:56:...
6306881,20000.0,2469.32,0.0,22469.322,16744.64,Oct-2014,,2024-03-12 20:56:...
6301579,12000.0,2089.07,0.0,14089.07,4189.07,Oct-2015,,2024-03-12 20:56:...
6291661,28000.0,12441.42,0.0,40441.42,10660.91,Apr-2017,,2024-03-12 20:56:...
6311612,3629.9,847.39,0.0,4478.25,179.13,Sep-2015,,2024-03-12 20:56:...
6311704,23600.0,2077.3,0.0,25677.297,21445.23,Apr-2014,,2024-03-12 20:56:...


In [60]:
loans_repay_filtered_df.createOrReplaceTempView("loans")

In [61]:
spark.sql("select count(*) from loans where total_payment_received=0.0 and total_principal_received > 0.0")

count(1)
46


In [63]:
from pyspark.sql.functions import when, col

In [67]:
loans_payments_fixed_df = loans_repay_filtered_df.withColumn(
                                    "total_payment_received",
                                   when(
                                       (col("total_payment_received")==0.0) &
                                        (col("total_principal_received")!=0.0),
                                   col("total_principal_received")+col("total_interest_received")+col("total_late_fee_received"))
.otherwise(col("total_payment_received")))

In [68]:
loans_payments_fixed_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
6300587,12000.0,1864.87,20.86,13885.727,108.83,Nov-2014,,2024-03-12 21:02:...
6311418,7125.0,95.92,0.0,7220.9243,7220.52,Sep-2013,,2024-03-12 21:02:...
6321535,24000.0,7418.84,0.0,31418.84,13979.48,Apr-2016,,2024-03-12 21:02:...
6300526,24000.0,2296.28,0.0,26296.275,730.17,Aug-2016,,2024-03-12 21:02:...
6301469,18000.0,9239.79,0.0,27239.795,452.24,Aug-2018,,2024-03-12 21:02:...
6306881,20000.0,2469.32,0.0,22469.322,16744.64,Oct-2014,,2024-03-12 21:02:...
6301579,12000.0,2089.07,0.0,14089.07,4189.07,Oct-2015,,2024-03-12 21:02:...
6291661,28000.0,12441.42,0.0,40441.42,10660.91,Apr-2017,,2024-03-12 21:02:...
6311612,3629.9,847.39,0.0,4478.25,179.13,Sep-2015,,2024-03-12 21:02:...
6311704,23600.0,2077.3,0.0,25677.297,21445.23,Apr-2014,,2024-03-12 21:02:...


In [69]:
loans_payments_fixed_df.createOrReplaceTempView("loans")

In [70]:
spark.sql("select count(*) from loans where total_payment_received=0.0 and total_principal_received > 0.0")

count(1)
0


In [72]:
loans_payments_fixed_df.filter("total_payment_received =0.0").count()

949

In [74]:
loans_payments_fixed_df2 = loans_payments_fixed_df.filter("total_payment_received !=0.0")

In [76]:
loans_payments_fixed_df2.filter("total_payment_received =0.0").count()

0

In [78]:
loan_payment_ldate_fdf = loans_payments_fixed_df2.withColumn("last_payment_date",when(
                                   (col("last_payment_date")==0.0),None
                                        ).otherwise(col("last_payment_date"))
                                   )

In [79]:
loan_payment_ldate_fdf

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
6300587,12000.0,1864.87,20.86,13885.727,108.83,Nov-2014,,2024-03-12 21:17:...
6311418,7125.0,95.92,0.0,7220.9243,7220.52,Sep-2013,,2024-03-12 21:17:...
6321535,24000.0,7418.84,0.0,31418.84,13979.48,Apr-2016,,2024-03-12 21:17:...
6300526,24000.0,2296.28,0.0,26296.275,730.17,Aug-2016,,2024-03-12 21:17:...
6301469,18000.0,9239.79,0.0,27239.795,452.24,Aug-2018,,2024-03-12 21:17:...
6306881,20000.0,2469.32,0.0,22469.322,16744.64,Oct-2014,,2024-03-12 21:17:...
6301579,12000.0,2089.07,0.0,14089.07,4189.07,Oct-2015,,2024-03-12 21:17:...
6291661,28000.0,12441.42,0.0,40441.42,10660.91,Apr-2017,,2024-03-12 21:17:...
6311612,3629.9,847.39,0.0,4478.25,179.13,Sep-2015,,2024-03-12 21:17:...
6311704,23600.0,2077.3,0.0,25677.297,21445.23,Apr-2014,,2024-03-12 21:17:...


In [80]:
loan_payment_nextandlastdate_fixed_df = loan_payment_ldate_fdf.withColumn("next_payment_date",when(
                                   (col("next_payment_date")==0.0),None
                                        ).otherwise(col("next_payment_date"))
                                   )

In [81]:
loan_payment_nextandlastdate_fixed_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
6300587,12000.0,1864.87,20.86,13885.727,108.83,Nov-2014,,2024-03-12 21:20:...
6311418,7125.0,95.92,0.0,7220.9243,7220.52,Sep-2013,,2024-03-12 21:20:...
6321535,24000.0,7418.84,0.0,31418.84,13979.48,Apr-2016,,2024-03-12 21:20:...
6300526,24000.0,2296.28,0.0,26296.275,730.17,Aug-2016,,2024-03-12 21:20:...
6301469,18000.0,9239.79,0.0,27239.795,452.24,Aug-2018,,2024-03-12 21:20:...
6306881,20000.0,2469.32,0.0,22469.322,16744.64,Oct-2014,,2024-03-12 21:20:...
6301579,12000.0,2089.07,0.0,14089.07,4189.07,Oct-2015,,2024-03-12 21:20:...
6291661,28000.0,12441.42,0.0,40441.42,10660.91,Apr-2017,,2024-03-12 21:20:...
6311612,3629.9,847.39,0.0,4478.25,179.13,Sep-2015,,2024-03-12 21:20:...
6311704,23600.0,2077.3,0.0,25677.297,21445.23,Apr-2014,,2024-03-12 21:20:...


In [83]:
loan_payment_nextandlastdate_fixed_df.write \
.format("csv") \
.option("header",True) \
.mode('overwrite') \
.option("path","/user/itv009959/lendingclubproject/cleaned/loans_repayment_csv") \
.save()

In [84]:
loan_payment_nextandlastdate_fixed_df.write \
.format("parquet") \
.mode('overwrite') \
.option("path","/user/itv009959/lendingclubproject/cleaned/loans_repayment_parquet") \
.save()

In [85]:
spark.stop()