In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
!! hadoop fs -ls /public/trendytech/lendingclubproject/raw

['Found 4 items',
 'drwxr-xr-x   - itv005857 supergroup          0 2023-09-15 14:40 /public/trendytech/lendingclubproject/raw/customers_data_csv',
 'drwxr-xr-x   - itv005857 supergroup          0 2023-09-17 22:57 /public/trendytech/lendingclubproject/raw/loans_data_csv',
 'drwxr-xr-x   - itv005857 supergroup          0 2023-09-18 07:32 /public/trendytech/lendingclubproject/raw/loans_defaulters_csv',
 'drwxr-xr-x   - itv005857 supergroup          0 2023-09-18 07:31 /public/trendytech/lendingclubproject/raw/loans_repayments_csv']

In [3]:
 loan_repay_raw_df = spark.read \
.format("csv") \
.option("header", True) \
.option("inferschema", True) \
.load("/public/trendytech/lendingclubproject/raw/loans_repayments_csv")

In [4]:
loan_repay_raw_df

loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019


In [5]:
loan_repay_raw_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_rec_prncp: string (nullable = true)
 |-- total_rec_int: string (nullable = true)
 |-- total_rec_late_fee: string (nullable = true)
 |-- total_pymnt: string (nullable = true)
 |-- last_pymnt_amnt: string (nullable = true)
 |-- last_pymnt_d: string (nullable = true)
 |-- next_pymnt_d: string (nullable = true)



In [6]:
loans_repay_schema = '''loan_id string, total_principal_received float, total_intrest_received float, 
total_late_fee_received float, total_payment_received float, last_payment_amount float, last_payment_date string, next_payment_date string'''

In [7]:
 loan_repay_raw_df = spark.read \
.format("csv") \
.option("header", True) \
.schema(loans_repay_schema) \
.load("/public/trendytech/lendingclubproject/raw/loans_repayments_csv")

In [8]:
loan_repay_raw_df

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019


In [9]:
loan_repay_raw_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_intrest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)



In [10]:
from pyspark.sql.functions import current_timestamp

In [11]:
loans_repay_df_ingest = loan_repay_raw_df.withColumn("ingest_date",current_timestamp())

In [12]:
loans_repay_df_ingest

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2026-01-19 00:33:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2026-01-19 00:33:...
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019,2026-01-19 00:33:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2026-01-19 00:33:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2026-01-19 00:33:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2026-01-19 00:33:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2026-01-19 00:33:...
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019,2026-01-19 00:33:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2026-01-19 00:33:...
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019,2026-01-19 00:33:...


In [13]:
loans_repay_df_ingest.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_intrest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [14]:
loans_repay_df_ingest.count()

2260701

In [15]:
loans_repay_df_ingest.createOrReplaceTempView("loans_repayments")

In [16]:
spark.sql("select * from loans_repayments")

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2026-01-19 00:33:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2026-01-19 00:33:...
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019,2026-01-19 00:33:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2026-01-19 00:33:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2026-01-19 00:33:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2026-01-19 00:33:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2026-01-19 00:33:...
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019,2026-01-19 00:33:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2026-01-19 00:33:...
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019,2026-01-19 00:33:...


## removed all the null value rows for the major columns where the calculations will be done

In [17]:
spark.sql("select count(*) from loans_repayments where total_principal_received is null")

count(1)
69


In [18]:
columns_to_check = ['total_principal_received','total_intrest_received','total_late_fee_received','total_payment_received','last_payment_amount']

In [19]:
loans_repay_filtered_df = loans_repay_df_ingest.na.drop(subset = columns_to_check)

In [20]:
loans_repay_filtered_df

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2026-01-19 00:33:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2026-01-19 00:33:...
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019,2026-01-19 00:33:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2026-01-19 00:33:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2026-01-19 00:33:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2026-01-19 00:33:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2026-01-19 00:33:...
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019,2026-01-19 00:33:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2026-01-19 00:33:...
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019,2026-01-19 00:33:...


In [21]:
loans_repay_filtered_df.count()

2260498

In [22]:
loans_repay_filtered_df.createOrReplaceTempView("loan_repayments")

In [23]:
spark.sql("select * from loan_repayments where total_payment_received = 0.0 ")

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
141313188,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
141013155,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
141415504,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
141242794,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
141422742,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
141305199,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
140438338,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
140621549,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
141320790,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...
141241575,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:33:...


In [24]:
spark.sql("select count(*) from loan_repayments where total_payment_received = 0.0 ")

count(1)
995


## when total_principal_received	total_intrest_received	total_late_fee_received 
## are not null but total_payment_received is null then 
## we need to sum up all the three total_principal_received,total_intrest_received , total_late_fee_received

In [25]:
spark.sql("select * from loan_repayments where total_payment_received = 0.0 and total_principal_received != 0.0 ")

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,0.0,0.0,0.0,Dec-2014,2026-01-19 00:33:...
516382,21890.229,21856.03,16000.0,0.0,0.0,0.0,Mar-2014,2026-01-19 00:33:...
528899,3045.0364,3019.64,2500.0,0.0,0.0,0.0,Jan-2013,2026-01-19 00:33:...
527598,2398.9092,2220.51,2200.0,0.0,0.0,0.0,Jul-2011,2026-01-19 00:33:...
525697,21797.86,19894.9,15750.0,0.0,0.0,0.0,Jun-2015,2026-01-19 00:33:...
522641,3146.8193,3146.82,3000.0,0.0,0.0,0.0,Sep-2011,2026-01-19 00:33:...
515655,29938.576,29905.75,22800.0,0.0,0.0,0.0,May-2013,2026-01-19 00:33:...
501234,15219.313,15155.9,12000.0,0.0,0.0,0.0,May-2013,2026-01-19 00:33:...
498194,11642.714,11031.47,10000.0,0.0,0.0,0.0,Jan-2013,2026-01-19 00:33:...
495171,11138.843,10024.96,10000.0,0.0,0.0,0.0,Apr-2013,2026-01-19 00:33:...


In [26]:
spark.sql("select count(*) from loan_repayments where total_payment_received = 0.0 and total_principal_received != 0.0 ")

count(1)
46


In [27]:
from pyspark.sql.functions import when, col

In [28]:
loans_payment_fixed_df = loans_repay_filtered_df.withColumn(
   "total_payment_received",
    when(
        (col("total_principal_received") != 0.0) &
        (col("total_payment_received") == 0.0),
        col("total_principal_received") + col("total_intrest_received") + col("total_late_fee_received")
    ).otherwise(col("total_payment_received"))
)

In [29]:
loans_payment_fixed_df

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2026-01-19 00:34:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2026-01-19 00:34:...
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019,2026-01-19 00:34:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2026-01-19 00:34:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2026-01-19 00:34:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2026-01-19 00:34:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2026-01-19 00:34:...
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019,2026-01-19 00:34:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2026-01-19 00:34:...
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019,2026-01-19 00:34:...


In [30]:
loans_payment_fixed_df.filter("loan_id == '1064185'")

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,33201.96,0.0,0.0,Dec-2014,2026-01-19 00:34:...


In [31]:
loans_payment_fixed_df.filter("total_payment_received = 0.0")

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
141313188,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
141013155,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
141415504,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
141242794,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
141422742,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
141305199,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
140438338,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
140621549,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
141320790,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...
141241575,0.0,0.0,0.0,0.0,0.0,,,2026-01-19 00:34:...


In [32]:
loans_payment_fixed_df.filter("total_payment_received = 0.0").count()

949

In [33]:
loans_payments_fixed2_df = loans_payment_fixed_df.filter("total_payment_received != 0.0")

In [34]:
loans_payments_fixed2_df

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2026-01-19 00:34:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2026-01-19 00:34:...
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019,2026-01-19 00:34:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2026-01-19 00:34:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2026-01-19 00:34:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2026-01-19 00:34:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2026-01-19 00:34:...
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019,2026-01-19 00:34:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2026-01-19 00:34:...
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019,2026-01-19 00:34:...


In [35]:
loans_payments_fixed2_df.filter("last_payment_date = 0.0")

loan_id,total_principal_received,total_intrest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,33201.96,0.0,0.0,Dec-2014,2026-01-19 00:34:...
516382,21890.229,21856.03,16000.0,59746.258,0.0,0.0,Mar-2014,2026-01-19 00:34:...
529353,1.0,7288.0,47.0,109.0,0.0,0.0,21221.5644086071,2026-01-19 00:34:...
528899,3045.0364,3019.64,2500.0,8564.676,0.0,0.0,Jan-2013,2026-01-19 00:34:...
527598,2398.9092,2220.51,2200.0,6819.419,0.0,0.0,Jul-2011,2026-01-19 00:34:...
525697,21797.86,19894.9,15750.0,57442.758,0.0,0.0,Jun-2015,2026-01-19 00:34:...
522641,3146.8193,3146.82,3000.0,9293.64,0.0,0.0,Sep-2011,2026-01-19 00:34:...
515655,29938.576,29905.75,22800.0,82644.33,0.0,0.0,May-2013,2026-01-19 00:34:...
500643,17215.0,91.6,14.0,8.0,14166.7705,0.0,14135.97,2026-01-19 00:34:...
501234,15219.313,15155.9,12000.0,42375.215,0.0,0.0,May-2013,2026-01-19 00:34:...


In [36]:
loans_payments_fixed2_df.filter("last_payment_date = 0.0").count()

48

In [37]:
loans_payments_fixed2_df.filter("next_payment_date = 0.0").count()

24

In [38]:
loans_payments_fixed2_df.filter("last_payment_date is null").count()

1477

In [39]:
loans_payments_fixed2_df.filter("next_payment_date is null").count()

1344240

In [40]:
loans_payment_last_date_fixed_df = loans_payments_fixed2_df.withColumn(
   "last_payment_date",
    when(
        (col("last_payment_date") == 0.0) , None
    ).otherwise(col("last_payment_date"))
)

In [41]:
loans_payment_next_date_fixed_df = loans_payment_last_date_fixed_df.withColumn(
   "next_payment_date",
    when(
        (col("next_payment_date") == 0.0) , None
    ).otherwise(col("next_payment_date"))
)

In [42]:
loans_payment_next_date_fixed_df.filter("last_payment_date = 0.0").count()

0

In [43]:
loans_payment_next_date_fixed_df.filter("next_payment_date = 0.0").count()

0

In [46]:
loans_payment_next_date_fixed_df.write \
.option("header" , True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv022692/lendingclubproject/cleaned/loans_repayments_csv") \
.save()

In [47]:
loans_payment_next_date_fixed_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv022692/lendingclubproject/cleaned/loans_repayments_parquet") \
.save()