In [0]:
customers_data_path ="dbfs:/FileStore/tables/lendingclubproject/raw/customers_data_csv"
loans_path="dbfs:/FileStore/tables/lendingclubproject/raw/loans_data_csv"
loans_repayment_path ="/FileStore/tables/lendingclubproject/raw/loans_repayments_csv"
loans_default_path ="dbfs:/FileStore/tables/lendingclubproject/raw/loans_defaulters_csv"


In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .getOrCreate()

# Check Spark session
print(spark.version)

3.3.2


In [0]:
customer_schema = 'member_id string, emp_title string, emp_length string, home_ownership string, annual_inc float, addr_state string, zip_code string, country string, grade string, sub_grade string, verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float, verification_status_joint string'

In [0]:
customers_raw_df = (
    spark.read.format("csv")
.option("header", True)
.schema(customer_schema)
.load(customers_data_path)
)


In [0]:
customers_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



In [0]:
columns_renamed_dict ={
    "annual_inc": "annual_income",
    "addr_state": "address_state",
    "zip_code": "address_zip_code",
    "country": "address_country",
    "tot_hi_cred_lim": "total_high_credit_limit",
    "annual_inc_joint":"join_annual_income"
}

# Apply renames

for old_name, new_name in columns_renamed_dict.items():
    customers_df_renamed = customers_df_renamed.withColumnRenamed(old_name, new_name)




In [0]:
customers_df_renamed.printSchema()


root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zip_code: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



In [0]:
from pyspark.sql.functions import current_timestamp

customers_ingested_df = customers_df_renamed.withColumn("ingest_date", current_timestamp())

In [0]:
customers_ingested_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zip_code: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [0]:
#remove duplicates
customers_ingested_df.count()

Out[53]: 2260701

In [0]:
customers_distinct = customers_ingested_df.distinct()
customers_distinct.count()

Out[57]: 2260638

In [0]:
customers_distinct.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[emp_title#311, zip_code#316, home_ownership#313, sub_grade#319, tot_hi_cred_lim#321, emp_length#312, verification_status#320, country#317, join_annual_income#808, grade#318, addr_state#315, 2025-09-11 04:38:40.591#1459, member_id#310, annual_inc#314, application_type#322, verification_status_joint#324], functions=[])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 0, Statistics(sizeInBytes=659.9 MiB, rowCount=2.26E+6, isRuntime=true)
         +- Exchange hashpartitioning(emp_title#311, zip_code#316, home_ownership#313, sub_grade#319, tot_hi_cred_lim#321, emp_length#312, verification_status#320, country#317, join_annual_income#808, grade#318, addr_state#315, 2025-09-11 04:38:40.591#1459, member_id#310, annual_inc#314, application_type#322, verification_status_joint#324, 200), ENSURE_REQUIREMENTS, [plan_id=798]
            +- *(1) HashAggregate(keys=[emp_title#311, zip_code#31

In [0]:
#total rows : 2260701
#Distinct Rows: 2260638
#remove duplicates


In [0]:
customers_distinct.createOrReplaceTempView("customers")

In [0]:
spark.sql("select * from customers").show(5)

+--------------------+--------------------+----------+--------------+-------------+-------------+----------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|address_zip_code|address_country|grade|sub_grade|verification_status|total_high_credit_limit|application_type|join_annual_income|verification_status_joint|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+----------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|f74e401c1ab0adf78...| Contract Specialist|   3 years|      MORTGAGE|     104433.0|           PA|           174xx|            USA|    F|       F1|    Source Verified|            

In [0]:
customers_inc_fil = spark.sql("select * from customers where annual_income is not null")
customers_inc_fil.count()

Out[60]: 2260633

In [0]:
customers_inc_fil.createOrReplaceTempView("customers")
spark.sql("select distinct(emp_length) from customers").show()


+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|      null|
|    1 year|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [0]:
#convert emp_length to integer
from pyspark.sql.functions import regexp_replace, col

cust_emp_clean = customers_inc_fil.withColumn("emp_length", regexp_replace(col("emp_length"),"(\D)", "").cast("integer"))
cust_emp_clean.show(10)

+--------------------+--------------------+----------+--------------+-------------+-------------+----------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|address_zip_code|address_country|grade|sub_grade|verification_status|total_high_credit_limit|application_type|join_annual_income|verification_status_joint|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+----------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|b24d55f21390533c5...|         road driver|        10|      MORTGAGE|      85000.0|           SC|           293xx|            USA|    B|       B1|       Not Verified|            

In [0]:
cust_emp_clean.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zip_code: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [0]:
cust_emp_clean.filter("emp_length is null").count()

Out[68]: 146903

In [0]:
from pyspark.sql.functions import col, avg

# 1. Calculate average emp_length (ignoring nulls)
avg_emp_length = cust_emp_clean.select(avg(col("emp_length"))).collect()[0][0]

# 2. Replace nulls with this average
cust_emp_imputed = cust_emp_clean.fillna({"emp_length": avg_emp_length})

cust_emp_imputed.show(10)


+--------------------+--------------------+----------+--------------+-------------+-------------+----------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|address_zip_code|address_country|grade|sub_grade|verification_status|total_high_credit_limit|application_type|join_annual_income|verification_status_joint|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+----------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|b5e7938b0a2da4cea...|            Engineer|        10|      MORTGAGE|      65000.0|           SD|           577xx|            USA|    C|       C1|       Not Verified|            

In [0]:
cust_emp_imputed.filter("emp_length is null").count()

Out[70]: 0

In [0]:
cust_emp_imputed.createOrReplaceTempView("customers")
spark.sql("select count(address_state) from customers where length (address_state)>2").show()

+--------------------+
|count(address_state)|
+--------------------+
|                 254|
+--------------------+



In [0]:
from pyspark.sql.functions import when, col, length

# Overwrite existing DataFrame
cust_emp_state= cust_emp_imputed.withColumn(
    "address_state",
    when(length(col("address_state")) > 2, "NA")
    .otherwise(col("address_state"))
)

In [0]:
cust_emp_state.createOrReplaceTempView("customers")
spark.sql("select count(address_state) from customers where length (address_state)>2").show()

+--------------------+
|count(address_state)|
+--------------------+
|                   0|
+--------------------+



In [0]:
cust_emp_state.select("address_state").distinct().show()

+-------------+
|address_state|
+-------------+
|           SC|
|           AZ|
|           MN|
|           NJ|
|           VA|
|           RI|
|           CA|
|           NC|
|           MD|
|           MO|
|           IL|
|           WA|
|           AL|
|           IN|
|           NM|
|           PA|
|           SD|
|           NY|
|           TX|
|           GA|
+-------------+
only showing top 20 rows



In [0]:
cust_emp_state.write.format("parquet")\
.mode("overwrite")\
.option ("path", "dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet")\
.save()

In [0]:
%fs ls dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet


path,name,size,modificationTime
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/_SUCCESS,_SUCCESS,0,1757569373000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/_committed_2330909018883528478,_committed_2330909018883528478,924,1757569373000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/_started_2330909018883528478,_started_2330909018883528478,0,1757569345000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/part-00000-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-297-1-c000.snappy.parquet,part-00000-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-297-1-c000.snappy.parquet,24057378,1757569372000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/part-00001-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-301-1-c000.snappy.parquet,part-00001-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-301-1-c000.snappy.parquet,23054327,1757569372000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/part-00002-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-303-1-c000.snappy.parquet,part-00002-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-303-1-c000.snappy.parquet,23071480,1757569371000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/part-00003-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-299-1-c000.snappy.parquet,part-00003-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-299-1-c000.snappy.parquet,24007068,1757569371000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/part-00004-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-300-1-c000.snappy.parquet,part-00004-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-300-1-c000.snappy.parquet,22990794,1757569370000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/part-00005-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-296-1-c000.snappy.parquet,part-00005-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-296-1-c000.snappy.parquet,23971367,1757569372000
dbfs:/FileStore/tables/lendingclubproject/cleaned/customers_data_parquet/part-00006-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-298-1-c000.snappy.parquet,part-00006-tid-2330909018883528478-38cb962c-beea-44c6-9aa9-a208d80cd79d-298-1-c000.snappy.parquet,23935151,1757569372000
