In [1]:
from pyspark.sql import SparkSession 

In [2]:
spark = SparkSession .\
builder .\
appName("spark") .\
config("spark.sql.warehouse.dir","/user/itv016843/warehouse") .\
enableHiveSupport() .\
master("yarn") .\
getOrCreate()

In [3]:
spark

In [4]:
from pyspark.sql.functions import *

In [5]:
from pyspark.sql import *

In [6]:
! hadoop fs -ls /public/trendytech/lendingclubproject/raw

Found 4 items
drwxr-xr-x   - itv005857 supergroup          0 2023-09-15 14:40 /public/trendytech/lendingclubproject/raw/customers_data_csv
drwxr-xr-x   - itv005857 supergroup          0 2023-09-17 22:57 /public/trendytech/lendingclubproject/raw/loans_data_csv
drwxr-xr-x   - itv005857 supergroup          0 2023-09-18 07:32 /public/trendytech/lendingclubproject/raw/loans_defaulters_csv
drwxr-xr-x   - itv005857 supergroup          0 2023-09-18 07:31 /public/trendytech/lendingclubproject/raw/loans_repayments_csv


In [7]:
loan_schema = 'loan_id string,total_rec_prncp float,total_rec_int float,total_rec_late_fee float,total_pymnt float,last_pymnt_amnt float,last_pymnt_d string,next_pymnt_d string'


In [8]:
df1 = spark.read \
.format("csv") \
.option("header",True) \
.schema(loan_schema) \
.load("/public/trendytech/lendingclubproject/raw/loans_repayments_csv")

In [9]:
df1.limit(5)

loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019


In [10]:
df2 = df1.withColumn("Time",current_timestamp())

In [11]:
df2.limit(5)

loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d,Time
28674390,7884.28,5178.41,0.0,14329.62,482.23,Nov-2016,,2025-07-26 04:22:...
28563513,9000.0,955.61,0.0,9955.61,6420.29,Nov-2015,,2025-07-26 04:22:...
28694706,22125.0,751.39,0.0,22876.39,22073.85,Dec-2014,,2025-07-26 04:22:...
28722876,3196.23,1259.77,0.0,4639.79,178.24,Nov-2016,,2025-07-26 04:22:...
28644402,26400.0,2150.7,0.0,28550.7,19292.99,Oct-2015,,2025-07-26 04:22:...


In [12]:
drop_columns = [
    "loan_id", "total_rec_prncp", "total_rec_int", "total_rec_late_fee", "total_pymnt",
    "last_pymnt_amnt", "last_pymnt_d", "next_pymnt_d"
]


In [13]:
df3 = df2.na.drop(subset = drop_columns)

In [14]:
df3.filter("total_pymnt is null")

loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d,Time


In [15]:
df4 = df3.withColumn(
    "total_pymnt",
    when(
        (col("total_rec_prncp") != 0.0) & (col("total_pymnt") == 0.0),
        col("total_rec_prncp") + col("total_rec_int") + col("total_rec_late_fee")
    ).otherwise(col("total_pymnt"))
)

In [16]:
df4.limit(5)

loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d,Time
28654395,18134.86,10868.86,0.0,29003.72,547.24,Mar-2019,Apr-2019,2025-07-26 04:22:...
28714308,10134.68,4648.3,0.0,14782.98,279.1,Mar-2019,Apr-2019,2025-07-26 04:22:...
7383129,14472.21,5487.4,0.0,19959.61,375.33,Mar-2019,Apr-2019,2025-07-26 04:22:...
28613524,12062.42,6639.6,0.0,18702.02,352.41,Mar-2019,Apr-2019,2025-07-26 04:22:...
27512265,10157.19,4456.22,0.0,14613.41,275.2,Mar-2019,Apr-2019,2025-07-26 04:22:...


In [17]:
df4.filter("total_pymnt=0.0")

loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d,Time


In [18]:

df5 = df4.withColumn(
    "last_pymnt_d",
    when(col("last_pymnt_d").rlike("^[0-9]"), 'NULL')  
    .otherwise(col("last_pymnt_d"))  
)

In [19]:
df6 = df4.withColumn(
    "next_pymnt_d",
    when(col("next_pymnt_d").rlike("^[0-9]"), 'NULL')  
    .otherwise(col("next_pymnt_d"))  
)

In [20]:
df6.write \
.format("csv") \
.option("header",True) \
.mode("overwrite") \
.option("path","/user/itv016843/cleaned/loans_1")

<pyspark.sql.readwriter.DataFrameWriter at 0x7f8004f7d0b8>