In [1]:
from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
customers_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.option("inferSchema", True) \
.load("/public/trendytech/lendingclubproject/raw/customers_data_csv")

In [3]:
customers_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [4]:
customers_raw_df.show(10)

+--------------------+--------------------+----------+--------------+----------+----------+--------+-------+-----+---------+-------------------+---------------+----------------+----------------+-------------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_inc|addr_state|zip_code|country|grade|sub_grade|verification_status|tot_hi_cred_lim|application_type|annual_inc_joint|verification_status_joint|
+--------------------+--------------------+----------+--------------+----------+----------+--------+-------+-----+---------+-------------------+---------------+----------------+----------------+-------------------------+
|b59d80da191f5b573...|                null|      null|          RENT|   50000.0|        OR|   973xx|    USA|    A|       A5|    Source Verified|         8600.0|      Individual|            null|                     null|
|202d9f56ecb7c3bc9...|      police officer|   7 years|           OWN|   85000.0|        TX|   799xx|    USA|    A|  

In [5]:
customer_df_renamed = customers_raw_df.withColumnRenamed("annual_inc", "annualincome") \
.withColumnRenamed("tot_hi_credit_lim", "total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint", "join_annual_income")

In [6]:
customer_df_renamed

member_id,emp_title,emp_length,home_ownership,annualincome,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [7]:
from pyspark.sql.functions import current_timestamp

In [8]:
customer_df_renamed1 = customer_df_renamed.withColumn("ingest_date",current_timestamp())

In [9]:
customer_df_renamed1

member_id,emp_title,emp_length,home_ownership,annualincome,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,,2025-01-09 06:23:...
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,,2025-01-09 06:23:...
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,,2025-01-09 06:23:...
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,,2025-01-09 06:23:...
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,,2025-01-09 06:23:...
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,,2025-01-09 06:23:...
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,,2025-01-09 06:23:...
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,,2025-01-09 06:23:...
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,,2025-01-09 06:23:...
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,,2025-01-09 06:23:...


In [10]:
customer_df_renamed1.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annualincome: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: double (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: string (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [11]:
customer_df_renamed1.count()

2260701

In [12]:
customers_distinct = customer_df_renamed1.distinct()

In [13]:
customers_distinct.count()

2260638

In [14]:
customers_distinct.createOrReplaceTempView("customers")

In [15]:
customers_income_filtered = spark.sql("select * from customers where annualincome is not null")

In [16]:
customers_income_filtered

member_id,emp_title,emp_length,home_ownership,annualincome,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
4bc4ad1a0316cae20...,Table Games Super...,10+ years,RENT,60000.0,NV,891xx,USA,D,D1,Source Verified,56303.0,Individual,,,2025-01-09 06:23:...
0288c25190a653ddd...,Owner,5 years,MORTGAGE,40000.0,WA,981xx,USA,B,B1,Verified,118292.0,Individual,,,2025-01-09 06:23:...
fad773e0f4eeef14c...,Store manager,10+ years,MORTGAGE,84000.0,NC,281xx,USA,C,C5,Source Verified,194444.0,Individual,,,2025-01-09 06:23:...
06d55a7ec4a501d15...,nurse,10+ years,MORTGAGE,98978.0,NJ,073xx,USA,C,C4,Source Verified,482813.0,Individual,,,2025-01-09 06:23:...
edaa0691990b1060b...,Office Manager,10+ years,RENT,85000.0,CA,956xx,USA,B,B3,Source Verified,17400.0,Individual,,,2025-01-09 06:23:...
d2037dd155617dee4...,Maintenance Manager,8 years,MORTGAGE,130000.0,IN,471xx,USA,C,C5,Not Verified,567940.0,Individual,,,2025-01-09 06:23:...
2d4947df94f7e662c...,Supervisor,9 years,RENT,59776.0,CA,959xx,USA,B,B5,Source Verified,96464.0,Individual,,,2025-01-09 06:23:...
7b968fbc3f681f2d7...,Administrativr ma...,10+ years,RENT,76000.0,NY,112xx,USA,C,C4,Source Verified,22500.0,Individual,,,2025-01-09 06:23:...
92c7ae48c5430f8af...,Heavy Machine Ope...,3 years,MORTGAGE,90000.0,PA,190xx,USA,C,C5,Source Verified,288256.0,Individual,,,2025-01-09 06:23:...
e89e95ce26ddfdbcb...,Courier,3 years,MORTGAGE,106000.0,NJ,077xx,USA,B,B5,Verified,460296.0,Individual,,,2025-01-09 06:23:...


In [17]:
customers_income_filtered.count()

2260634

In [18]:
customers_income_filtered.createOrReplaceTempView("customers")

In [19]:
from pyspark.sql.functions import regexp_replace, col

In [20]:
customers_emplength_cleaned = customers_income_filtered.withColumn("emp_length", regexp_replace(col("emp_length"), "(\D)",""))

In [21]:
customers_emplength_cleaned

member_id,emp_title,emp_length,home_ownership,annualincome,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
8ac391fc8a7d24518...,Customer support ...,3,RENT,105000.0,OR,971xx,USA,C,C2,Not Verified,65200.0,Individual,,,2025-01-09 06:23:...
db40e59a31b8f366c...,Workforce Coordin...,2,MORTGAGE,175000.0,IL,605xx,USA,B,B2,Source Verified,288757.0,Individual,,,2025-01-09 06:23:...
2eb69e84de3570cff...,Radiology assistant,10,MORTGAGE,100000.0,NM,870xx,USA,C,C5,Not Verified,309294.0,Individual,,,2025-01-09 06:23:...
d48591a49149ab42f...,CEO/CFO,10,OWN,60000.0,CA,956xx,USA,B,B1,Not Verified,100085.0,Individual,,,2025-01-09 06:23:...
8e58059bb3863d669...,Driver,10,RENT,190000.0,FL,334xx,USA,A,A4,Not Verified,75000.0,Individual,,,2025-01-09 06:23:...
54e665021732b61f4...,Delivery supervisor,10,MORTGAGE,75000.0,CA,930xx,USA,B,B1,Source Verified,353630.0,Individual,,,2025-01-09 06:23:...
fd53e5c5fc9ba4c54...,Accountant,1,RENT,83500.0,IL,604xx,USA,C,C1,Source Verified,129118.0,Individual,,,2025-01-09 06:23:...
8900cff0f248033b5...,Mechanic,1,RENT,110000.0,VA,236xx,USA,B,B5,Source Verified,57399.0,Individual,,,2025-01-09 06:23:...
a2821b2a819f45b01...,Supervisor,10,MORTGAGE,98000.0,MA,011xx,USA,E,E5,Source Verified,201939.0,Individual,,,2025-01-09 06:23:...
750569c5ba3870e24...,"Cable Installer, ...",9,MORTGAGE,61811.0,NC,283xx,USA,A,A1,Source Verified,179990.0,Individual,,,2025-01-09 06:23:...


In [22]:
customers_emplength_casted = customers_emplength_cleaned.withColumn("emp_length", customers_emplength_cleaned.emp_length.cast('int'))

In [23]:
customers_emplength_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annualincome: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: double (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: string (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [24]:
customers_emplength_casted.createOrReplaceTempView("customers")

In [25]:
 spark.sql("select * from customers where emp_length is null")

member_id,emp_title,emp_length,home_ownership,annualincome,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
85a9456290c267660...,,,MORTGAGE,95000.0,TX,777xx,USA,C,C4,Source Verified,204908.0,Individual,,,2025-01-09 06:23:...
b13d99e47a571fdcd...,,,MORTGAGE,105000.0,OK,743xx,USA,B,B2,Verified,407912.0,Individual,,,2025-01-09 06:23:...
a86d409ba90a49142...,,,RENT,32000.0,MO,630xx,USA,D,D5,Verified,25373.0,Individual,,,2025-01-09 06:23:...
47b660060cfb23954...,,,MORTGAGE,61000.0,TN,373xx,USA,C,C4,Verified,311209.0,Individual,,,2025-01-09 06:23:...
27724d4de65a0b134...,,,MORTGAGE,33563.0,AL,357xx,USA,D,D2,Verified,149332.0,Individual,,,2025-01-09 06:23:...
65d107d545aaccb20...,,,RENT,33000.0,PA,191xx,USA,E,E3,Verified,42668.0,Individual,,,2025-01-09 06:23:...
c072448b566da89eb...,,,MORTGAGE,37884.0,MO,657xx,USA,E,E3,Verified,57605.0,Individual,,,2025-01-09 06:23:...
7f25480753ed84c06...,,,RENT,132000.0,AZ,852xx,USA,C,C2,Verified,85467.0,Individual,,,2025-01-09 06:23:...
bd622b8383da49053...,,,RENT,45000.0,MI,490xx,USA,E,E2,Verified,19800.0,Individual,,,2025-01-09 06:23:...
f6f71710ec04347db...,,,MORTGAGE,58000.0,CA,902xx,USA,C,C3,Verified,58600.0,Individual,,,2025-01-09 06:23:...


In [26]:
avg_emp_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers").collect()

In [27]:
avg_emp_length

[Row(avg_emp_length=6)]

In [28]:
avg_emp_duration = avg_emp_length[0][0]

In [29]:
customers_emplength_replaced = customers_emplength_casted.na.fill(avg_emp_duration, subset=['emp_length'])

In [30]:
customers_emplength_replaced

member_id,emp_title,emp_length,home_ownership,annualincome,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
7165a91acf0cc30a3...,mail assistant,10,MORTGAGE,40000.0,VA,220xx,USA,C,C2,Not Verified,223400.0,Individual,,,2025-01-09 06:23:...
d659f5f90ca551c32...,Social worker I'VE,10,RENT,48000.0,HI,967xx,USA,F,F1,Source Verified,39554.0,Individual,,,2025-01-09 06:23:...
7b759c21bb6ca4192...,,6,MORTGAGE,13400.0,TN,377xx,USA,C,C3,Verified,58583.0,Individual,,,2025-01-09 06:23:...
480f7c46dae9bc890...,Supervisor,10,MORTGAGE,106000.0,VA,235xx,USA,B,B3,Verified,409734.0,Individual,,,2025-01-09 06:23:...
597502e50abbbf05f...,Director of Sales,8,MORTGAGE,55000.0,FL,336xx,USA,A,A2,Not Verified,168228.0,Individual,,,2025-01-09 06:23:...
b9a2d4777f7e938c5...,cashier/ lead,10,RENT,32000.0,CA,940xx,USA,D,D5,Source Verified,40656.0,Individual,,,2025-01-09 06:23:...
7458381d45fbb3285...,RN,1,RENT,58000.0,NC,282xx,USA,C,C2,Not Verified,86065.0,Individual,,,2025-01-09 06:23:...
ef8545a3f099d560e...,Customer service ...,10,OWN,45000.0,MD,211xx,USA,C,C2,Not Verified,193934.0,Individual,,,2025-01-09 06:23:...
392a222e732ab6e22...,field clerk Hydro,10,OWN,91375.0,CA,956xx,USA,D,D2,Verified,276994.0,Individual,,,2025-01-09 06:23:...
d5a4dbe570f5fc41a...,Senior Analyst,6,RENT,105000.0,CA,926xx,USA,A,A2,Source Verified,96416.0,Individual,,,2025-01-09 06:23:...


In [31]:
customers_emplength_replaced.createOrReplaceTempView("customers")

In [32]:
spark.sql("select distinct(addr_state) from customers").limit(10)

addr_state
Helping Kenya's D...
223xx
175 (total projec...
SC
AZ
I am 56 yrs. old ...
"so Plan """"C"""" is ..."
financially I mad...
but no one will l...
LA


In [33]:
spark.sql("select count(addr_state) from customers where length(addr_state)>2")

count(addr_state)
255


In [34]:
from pyspark.sql.functions import when, col, length

In [35]:
customers_state_cleaned = customers_emplength_replaced.withColumn(
    "address_state",
    when(length(col("addr_state"))> 2, "NA").otherwise(col("addr_state"))
)

In [36]:
customers_state_cleaned

member_id,emp_title,emp_length,home_ownership,annualincome,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date,address_state
2876c04fcfc9f5c22...,Home care,7,MORTGAGE,48000.0,NV,890xx,USA,C,C1,Source Verified,225700.0,Joint App,98000.0,Source Verified,2025-01-09 06:24:...,NV
b882202a058500b40...,Technician,2,RENT,36000.0,CA,913xx,USA,B,B5,Not Verified,10300.0,Individual,,,2025-01-09 06:24:...,CA
3407259d06d62b74b...,Vice President,3,RENT,112000.0,CA,917xx,USA,A,A5,Source Verified,62967.0,Individual,,,2025-01-09 06:24:...,CA
7d1ae9f56877cd7f7...,Senior Constructi...,7,MORTGAGE,84000.0,MD,216xx,USA,B,B1,Not Verified,531828.0,Individual,,,2025-01-09 06:24:...,MD
c365eed5b21a6b8c1...,Assiant Cordinator,10,RENT,62000.0,TX,770xx,USA,D,D1,Not Verified,27500.0,Individual,,,2025-01-09 06:24:...,TX
36ab9c627f8b610a9...,Receptionist,10,RENT,15857.0,PA,191xx,USA,D,D4,Verified,16900.0,Individual,,,2025-01-09 06:24:...,PA
8ee121e3aa1238fe5...,truck driver,3,OWN,65000.0,MN,559xx,USA,D,D3,Verified,125618.0,Individual,,,2025-01-09 06:24:...,MN
b38fc9fe4515eba43...,MLS,10,MORTGAGE,56000.0,LA,713xx,USA,B,B3,Verified,242322.0,Individual,,,2025-01-09 06:24:...,LA
629d231432cc6fb39...,,6,MORTGAGE,41531.76,IL,604xx,USA,C,C2,Source Verified,15100.0,Individual,,,2025-01-09 06:24:...,IL
24f3de8999f8b4379...,,6,RENT,71000.0,CA,936xx,USA,D,D5,Source Verified,53774.0,Individual,,,2025-01-09 06:24:...,CA


In [37]:
customers_state_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv016245/lendingclubproject/raw/cleaned/customers_parquet") \
.save()

In [38]:
customers_state_cleaned.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv016245/lendingclubproject/raw/cleaned/customers_csv") \
.save()