In [1]:
from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/itv012740/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
customer_schema = 'member_id string, emp_title string, emp_length string, home_ownership string, annual_inc float, addr_state string, zip_code string, country string, grade string, sub_grade string, verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float, verification_status_joint string'

In [3]:
customers_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customer_schema) \
.load("/user/itv012740/lendingclubproject/raw/customers_data_csv")

In [5]:
customers_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
707271898dcabc8b2...,Physician Service...,3 years,RENT,40400.0,CO,801xx,USA,A,A2,Not Verified,68759.0,Individual,,
8e1ea10aca3c4ad8f...,Operations,10+ years,MORTGAGE,53000.0,AR,720xx,USA,B,B2,Source Verified,63143.0,Individual,,
1d6546a2cbc1fd240...,Underwriter,2 years,RENT,65000.0,ME,040xx,USA,B,B4,Not Verified,66695.0,Individual,,
d6208beced388988f...,Crome restorer sp...,10+ years,MORTGAGE,60000.0,IL,606xx,USA,C,C1,Not Verified,68900.0,Individual,,
b4af936688c28c165...,Program Coordinator,1 year,RENT,38000.0,FL,322xx,USA,A,A5,Not Verified,76877.0,Individual,,
2c04e047879ada04e...,Executive Director,10+ years,MORTGAGE,166000.0,IL,601xx,USA,C,C2,Not Verified,217868.0,Individual,,
39dfcd293cb7b2c17...,Emergency Managme...,4 years,MORTGAGE,81000.0,TX,761xx,USA,C,C4,Not Verified,293276.0,Individual,,
5e6e1f8ad59c71a0b...,Clinical Applicat...,3 years,MORTGAGE,82000.0,CO,801xx,USA,A,A1,Not Verified,393500.0,Individual,,
afd3b57e55eb95ed8...,Systems Analyst 3,4 years,OWN,118030.0,MI,482xx,USA,A,A3,Not Verified,82137.0,Individual,,
8b5eed45ac53a0238...,Director of Front...,4 years,RENT,62000.0,NY,110xx,USA,A,A5,Not Verified,17400.0,Individual,,


In [6]:
customers_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



In [12]:
customer_df_renamed = customers_raw_df.withColumnRenamed("annual_inc", "annual_income") \
.withColumnRenamed("addr_state", "address_state") \
.withColumnRenamed("zip_code", "address_zipcode") \
.withColumnRenamed("country", "address_country") \
.withColumnRenamed("tot_hi_cred_lim", "total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint", "join_annual_income")

In [13]:
customer_df_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint
707271898dcabc8b2...,Physician Service...,3 years,RENT,40400.0,CO,801xx,USA,A,A2,Not Verified,68759.0,Individual,,
8e1ea10aca3c4ad8f...,Operations,10+ years,MORTGAGE,53000.0,AR,720xx,USA,B,B2,Source Verified,63143.0,Individual,,
1d6546a2cbc1fd240...,Underwriter,2 years,RENT,65000.0,ME,040xx,USA,B,B4,Not Verified,66695.0,Individual,,
d6208beced388988f...,Crome restorer sp...,10+ years,MORTGAGE,60000.0,IL,606xx,USA,C,C1,Not Verified,68900.0,Individual,,
b4af936688c28c165...,Program Coordinator,1 year,RENT,38000.0,FL,322xx,USA,A,A5,Not Verified,76877.0,Individual,,
2c04e047879ada04e...,Executive Director,10+ years,MORTGAGE,166000.0,IL,601xx,USA,C,C2,Not Verified,217868.0,Individual,,
39dfcd293cb7b2c17...,Emergency Managme...,4 years,MORTGAGE,81000.0,TX,761xx,USA,C,C4,Not Verified,293276.0,Individual,,
5e6e1f8ad59c71a0b...,Clinical Applicat...,3 years,MORTGAGE,82000.0,CO,801xx,USA,A,A1,Not Verified,393500.0,Individual,,
afd3b57e55eb95ed8...,Systems Analyst 3,4 years,OWN,118030.0,MI,482xx,USA,A,A3,Not Verified,82137.0,Individual,,
8b5eed45ac53a0238...,Director of Front...,4 years,RENT,62000.0,NY,110xx,USA,A,A5,Not Verified,17400.0,Individual,,


In [14]:
from pyspark.sql.functions import current_timestamp

In [15]:
customers_df_ingestd = customer_df_renamed.withColumn("ingest_date", current_timestamp())

In [16]:
customers_df_ingestd

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
707271898dcabc8b2...,Physician Service...,3 years,RENT,40400.0,CO,801xx,USA,A,A2,Not Verified,68759.0,Individual,,,2024-12-15 05:53:...
8e1ea10aca3c4ad8f...,Operations,10+ years,MORTGAGE,53000.0,AR,720xx,USA,B,B2,Source Verified,63143.0,Individual,,,2024-12-15 05:53:...
1d6546a2cbc1fd240...,Underwriter,2 years,RENT,65000.0,ME,040xx,USA,B,B4,Not Verified,66695.0,Individual,,,2024-12-15 05:53:...
d6208beced388988f...,Crome restorer sp...,10+ years,MORTGAGE,60000.0,IL,606xx,USA,C,C1,Not Verified,68900.0,Individual,,,2024-12-15 05:53:...
b4af936688c28c165...,Program Coordinator,1 year,RENT,38000.0,FL,322xx,USA,A,A5,Not Verified,76877.0,Individual,,,2024-12-15 05:53:...
2c04e047879ada04e...,Executive Director,10+ years,MORTGAGE,166000.0,IL,601xx,USA,C,C2,Not Verified,217868.0,Individual,,,2024-12-15 05:53:...
39dfcd293cb7b2c17...,Emergency Managme...,4 years,MORTGAGE,81000.0,TX,761xx,USA,C,C4,Not Verified,293276.0,Individual,,,2024-12-15 05:53:...
5e6e1f8ad59c71a0b...,Clinical Applicat...,3 years,MORTGAGE,82000.0,CO,801xx,USA,A,A1,Not Verified,393500.0,Individual,,,2024-12-15 05:53:...
afd3b57e55eb95ed8...,Systems Analyst 3,4 years,OWN,118030.0,MI,482xx,USA,A,A3,Not Verified,82137.0,Individual,,,2024-12-15 05:53:...
8b5eed45ac53a0238...,Director of Front...,4 years,RENT,62000.0,NY,110xx,USA,A,A5,Not Verified,17400.0,Individual,,,2024-12-15 05:53:...


#### 4. Remove complete duplicate rows

In [17]:
customers_df_ingestd.count()

2260701

In [18]:
customers_distinct = customers_df_ingestd.distinct()

In [19]:
customers_distinct.count()

2260638

In [20]:
customers_distinct.createOrReplaceTempView("customers")

In [21]:
spark.sql("select * from customers")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
1d60164155b7d2a09...,Owner,10+ years,MORTGAGE,96000.0,TX,760xx,USA,A,A2,Verified,324931.0,Individual,,,2024-12-15 05:54:...
d009c76829b993a4a...,Family advocate,3 years,MORTGAGE,35000.0,PA,191xx,USA,C,C3,Verified,166796.0,Individual,,,2024-12-15 05:54:...
0481001f05c93b070...,,,RENT,30480.0,MI,482xx,USA,A,A4,Not Verified,38776.0,Individual,,,2024-12-15 05:54:...
b575f78cd5ab8ae86...,Operator,< 1 year,RENT,51000.0,NY,117xx,USA,C,C4,Verified,93641.0,Joint App,158000.0,Verified,2024-12-15 05:54:...
34bab7f2f87c39040...,Internet sales ma...,< 1 year,RENT,100000.0,CA,946xx,USA,B,B2,Source Verified,42869.0,Individual,,,2024-12-15 05:54:...
7d06b966f23bcf0dd...,,,MORTGAGE,45000.0,DE,199xx,USA,C,C3,Verified,354406.0,Individual,,,2024-12-15 05:54:...
519a1884da62a8217...,lead therapist,5 years,RENT,45000.0,MA,015xx,USA,B,B2,Source Verified,38928.0,Joint App,148000.0,Source Verified,2024-12-15 05:54:...
2d8242514989ed4dd...,Computer Operator,10+ years,MORTGAGE,750003.0,CA,921xx,USA,A,A2,Source Verified,362300.0,Individual,,,2024-12-15 05:54:...
110254b21bad4cd15...,Wellness Associate,< 1 year,OWN,24960.0,GA,300xx,USA,C,C4,Source Verified,11334.0,Individual,,,2024-12-15 05:54:...
25471cfaf8aaea4e9...,Executive Chef,8 years,MORTGAGE,79000.0,MS,386xx,USA,D,D2,Verified,149900.0,Individual,,,2024-12-15 05:54:...


In [22]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
5


In [23]:
customers_income_filtered = spark.sql("select * from customers where annual_income is not null")

In [24]:
customers_income_filtered.createOrReplaceTempView("customers")

In [25]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
0


In [26]:
spark.sql("select distinct(emp_length) from customers")

emp_length
5 years
9 years
""
1 year
2 years
7 years
8 years
4 years
6 years
3 years


In [27]:
from pyspark.sql.functions import regexp_replace, col

In [28]:
customers_emplength_cleaned = customers_income_filtered.withColumn("emp_length", regexp_replace(col("emp_length"), "(\D)",""))

In [29]:
customers_emplength_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
6e59482480a86baf1...,TRX,2.0,MORTGAGE,150000.0,SC,290xx,USA,A,A3,Not Verified,,Individual,,,2024-12-15 05:55:...
84e0ce11e550a6c0f...,mayo clinic roche...,4.0,MORTGAGE,41008.0,MN,559xx,USA,D,D2,Not Verified,,Individual,,,2024-12-15 05:55:...
f60dc956e619f9fb8...,,,MORTGAGE,25000.0,AZ,850xx,USA,A,A3,Source Verified,,Individual,,,2024-12-15 05:55:...
35de8763344168d36...,SC Education Lottery,10.0,MORTGAGE,50000.0,SC,296xx,USA,B,B3,Not Verified,,Individual,,,2024-12-15 05:55:...
69d359742b011aacb...,Raytheon Company,10.0,RENT,144720.0,CA,930xx,USA,D,D2,Verified,,Individual,,,2024-12-15 05:55:...
88e5c7b828ea3966e...,Northeast Redistr...,5.0,MORTGAGE,44000.0,VA,226xx,USA,E,E5,Verified,,Individual,,,2024-12-15 05:55:...
a6c046d5b4554ff5c...,nestle purina,5.0,OWN,60000.0,GA,302xx,USA,D,D3,Verified,,Individual,,,2024-12-15 05:55:...
7cf885603d3ab6792...,Redlands Unified ...,8.0,OWN,75000.0,CA,925xx,USA,B,B2,Verified,,Individual,,,2024-12-15 05:55:...
db870aadc97f30f3b...,Interstate hotels...,8.0,OWN,55000.0,TX,750xx,USA,C,C1,Source Verified,,Individual,,,2024-12-15 05:55:...
05f7a34bdcdaf3667...,Federal Aviation ...,9.0,MORTGAGE,116000.0,NM,871xx,USA,E,E2,Verified,,Individual,,,2024-12-15 05:55:...


In [30]:
customers_emplength_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [31]:
customers_emplength_casted = customers_emplength_cleaned.withColumn("emp_length", customers_emplength_cleaned.emp_length.cast('int'))

In [32]:
customers_emplength_casted

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
95479952a09c82f81...,Retail personal b...,10.0,MORTGAGE,45000.0,MI,480xx,USA,C,C5,Verified,46402.0,Joint App,95000.0,Verified,2024-12-15 05:55:...
e490e5dd80f6eef89...,Music Teacher,1.0,RENT,45000.0,NY,135xx,USA,C,C4,Not Verified,71515.0,Individual,,,2024-12-15 05:55:...
9c4f39ba56bef4285...,Supervisor,10.0,MORTGAGE,78000.0,NY,147xx,USA,C,C2,Source Verified,309722.0,Individual,,,2024-12-15 05:55:...
6f928c148abe420cc...,Social Work,3.0,RENT,63000.0,SD,577xx,USA,C,C5,Verified,257651.0,Individual,,,2024-12-15 05:55:...
1ddb4436b7b0c1415...,Managing partner,3.0,MORTGAGE,120088.0,CA,945xx,USA,D,D5,Verified,63590.0,Individual,,,2024-12-15 05:55:...
0153910e4720863ab...,Chief instructor,10.0,MORTGAGE,62000.0,AZ,853xx,USA,E,E3,Verified,385367.0,Individual,,,2024-12-15 05:55:...
aed25052d608aa0b4...,Investigator,10.0,MORTGAGE,82000.0,TX,775xx,USA,B,B5,Verified,116004.0,Individual,,,2024-12-15 05:55:...
b52f17d70f81a1511...,,,MORTGAGE,32712.0,MS,389xx,USA,A,A5,Verified,106809.0,Individual,,,2024-12-15 05:55:...
e6bf7f9186532cd7a...,Teacher/Coach,10.0,OWN,78986.0,GA,319xx,USA,D,D5,Source Verified,215456.0,Individual,,,2024-12-15 05:55:...
b02d2397cf468b732...,Sergeant,10.0,MORTGAGE,190000.0,WA,982xx,USA,B,B3,Source Verified,682991.0,Individual,,,2024-12-15 05:55:...


In [33]:
customers_emplength_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



#### 7. we need to replace all the nulls in emp_length column with average of this column

In [34]:
customers_emplength_casted.filter("emp_length is null").count()

146903

In [35]:
customers_emplength_casted.createOrReplaceTempView("customers")

In [36]:
avg_emp_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers").collect()

In [37]:
print(avg_emp_length)

[Row(avg_emp_length=6)]


In [38]:
avg_emp_duration = avg_emp_length[0][0]

In [39]:
print(avg_emp_duration)

6


In [40]:
customers_emplength_replaced = customers_emplength_casted.na.fill(avg_emp_duration, subset=['emp_length'])

In [41]:
customers_emplength_replaced

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
c3d0ce3ab4edd215f...,owner,7,RENT,45000.0,PA,194xx,USA,B,B4,Not Verified,95410.0,Joint App,122500.0,Not Verified,2024-12-15 05:56:...
1490283fd9e87f568...,Technician,7,OWN,55000.0,NY,114xx,USA,B,B1,Not Verified,90390.0,Individual,,,2024-12-15 05:56:...
24f25160182f918f4...,Forklift driver,10,MORTGAGE,75000.0,CA,959xx,USA,D,D5,Source Verified,264393.0,Joint App,98000.0,Source Verified,2024-12-15 05:56:...
81ee6f7b78b493e7d...,Deputy Chief Exec...,5,MORTGAGE,146000.0,PA,191xx,USA,A,A2,Not Verified,321027.0,Individual,,,2024-12-15 05:56:...
efaf4fb854e3b2fbf...,Sous chef,3,RENT,40000.0,CO,803xx,USA,B,B1,Source Verified,97903.0,Individual,,,2024-12-15 05:56:...
7b823ac777337238e...,,6,RENT,37620.0,CA,956xx,USA,C,C1,Source Verified,18100.0,Individual,,,2024-12-15 05:56:...
95a33e46a8f59311f...,Sales,3,MORTGAGE,57000.0,NY,105xx,USA,E,E4,Verified,183735.0,Individual,,,2024-12-15 05:56:...
4b4ea7c591ae86271...,Office Manager,10,MORTGAGE,57000.0,MS,390xx,USA,D,D3,Source Verified,141564.0,Individual,,,2024-12-15 05:56:...
5f797a0c3eed500eb...,Conductor,10,MORTGAGE,150000.0,NJ,072xx,USA,C,C5,Source Verified,246081.0,Individual,,,2024-12-15 05:56:...
f39aab264023064a5...,,6,OWN,17000.0,PA,182xx,USA,C,C3,Verified,6700.0,Individual,,,2024-12-15 05:56:...


In [42]:
customers_emplength_replaced.filter("emp_length is null").count()

0

In [43]:
customers_emplength_replaced.createOrReplaceTempView("customers")

In [44]:
spark.sql("select distinct(address_state) from customers")

address_state
Helping Kenya's D...
223xx
175 (total projec...
AZ
SC
I am 56 yrs. old ...
"so Plan """"C"""" is ..."
financially I mad...
but no one will l...
LA


In [45]:
spark.sql("select count(address_state) from customers where length(address_state)>2")

count(address_state)
254


In [46]:
from pyspark.sql.functions import when, col, length

In [47]:
customers_state_cleaned = customers_emplength_replaced.withColumn(
    "address_state",
    when(length(col("address_state"))> 2, "NA").otherwise(col("address_state"))
)

In [48]:
customers_state_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
ce191f43f5c78b91a...,Supervisor,10,MORTGAGE,104000.0,MI,482xx,USA,C,C3,Source Verified,84852.0,Individual,,,2024-12-15 05:56:...
285ab34b0ca60ee0c...,,6,MORTGAGE,60000.0,TX,796xx,USA,B,B1,Not Verified,276696.0,Individual,,,2024-12-15 05:56:...
a3fb2bc90f651e2b4...,Rn,1,RENT,91000.0,TX,774xx,USA,C,C1,Source Verified,101704.0,Individual,,,2024-12-15 05:56:...
533ae0cb0749f1687...,Production Coordi...,3,MORTGAGE,40000.0,TX,761xx,USA,C,C2,Verified,155376.0,Individual,,,2024-12-15 05:56:...
02fa74c0ff360cc66...,Maintenance,3,RENT,42000.0,OR,970xx,USA,B,B5,Source Verified,49000.0,Individual,,,2024-12-15 05:56:...
860219576cd7a20a1...,,6,OWN,100000.0,UT,846xx,USA,C,C3,Verified,114195.0,Individual,,,2024-12-15 05:56:...
bd1cc32eb6ca527c8...,VP Signals and Co...,5,RENT,159000.0,TX,760xx,USA,B,B1,Not Verified,159398.0,Individual,,,2024-12-15 05:56:...
5fb03a5c551347897...,Assembly,10,OWN,35000.0,MI,480xx,USA,B,B2,Not Verified,155142.0,Individual,,,2024-12-15 05:56:...
9d09e0cdb6da70c82...,Lift Driver,2,RENT,31900.0,NC,274xx,USA,D,D2,Verified,31132.0,Individual,,,2024-12-15 05:56:...
a119ca5e1a567a6f3...,Technician,10,MORTGAGE,58000.0,WI,531xx,USA,C,C3,Not Verified,358951.0,Individual,,,2024-12-15 05:56:...


In [49]:
customers_state_cleaned.select("address_state").distinct()

address_state
AZ
SC
LA
MN
NJ
DC
OR
""
VA
""


In [50]:
customers_state_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv012740/lendingclubproject/raw/cleaned/customers_parquet") \
.save()

In [51]:
customers_state_cleaned.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv012740/lendingclubproject/raw/cleaned/customers_csv") \
.save()