In [93]:
from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

### Cleaning of customers data

#### 1 create dataframe with proper datatypes

In [94]:
customer_schema = 'member_id string,emp_title  string,emp_length string,home_ownership string,annual_inc float,addr_state string,zip_code string,country string,grade string,sub_grade string,verification_status string,tot_hi_cred_lim float,application_type string,annual_inc_joint float,verification_status_joint string' 

In [95]:
customer_raw_df = spark.read \
.format('csv') \
.schema(customer_schema) \
.option('header',True) \
.load('/user/itv014325/lendingclubproject/raw/customers_data_csv')

In [96]:
customer_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
bfcec8da7b3f1fe83...,Regional Manager,8 years,RENT,130000.0,CT,069xx,USA,C,C3,Verified,135904.0,Individual,,
36399458db4295868...,,,RENT,12000.0,CA,953xx,USA,C,C1,Verified,7700.0,Joint App,62000.0,Verified
e4912b650878ca941...,Manager,6 years,MORTGAGE,103000.0,CA,922xx,USA,A,A2,Not Verified,511251.0,Individual,,
3d6f35ad2e2be4572...,Clinical Coordinator,6 years,MORTGAGE,64300.0,NM,871xx,USA,D,D1,Source Verified,233646.0,Individual,,
478e1cd37e35c20f7...,Warehouse supervisor,3 years,RENT,32000.0,NJ,073xx,USA,C,C4,Not Verified,44200.0,Individual,,
a5d42e35cc0be2789...,Sales Manager,5 years,MORTGAGE,140000.0,CA,917xx,USA,C,C5,Source Verified,455867.0,Individual,,
7f3ef55c784bf6b16...,Owner,10+ years,MORTGAGE,98000.0,NV,891xx,USA,A,A3,Source Verified,265000.0,Individual,,
beb30abaeca08f49b...,Alternate Media S...,2 years,RENT,56000.0,CA,940xx,USA,D,D1,Source Verified,75023.0,Individual,,
233e5e6797dd0a68b...,Hammerman,7 years,MORTGAGE,70000.0,OH,440xx,USA,C,C4,Source Verified,233341.0,Individual,,
c4b178ffaf80ed472...,Civil Engineer,< 1 year,RENT,120000.0,FL,321xx,USA,B,B3,Source Verified,97065.0,Individual,,


In [97]:
customer_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



#### 2 Rename columns

In [98]:
customer_df_renamed = customer_raw_df.withColumnRenamed("annual_inc","annual_income") \
.withColumnRenamed("addr_state","address_state") \
.withColumnRenamed("zip_code","address_zipcode") \
.withColumnRenamed("country","address_country") \
.withColumnRenamed("tot_hi_cred_lim","total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint","join_annual_income")

In [99]:
customer_df_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint
bfcec8da7b3f1fe83...,Regional Manager,8 years,RENT,130000.0,CT,069xx,USA,C,C3,Verified,135904.0,Individual,,
36399458db4295868...,,,RENT,12000.0,CA,953xx,USA,C,C1,Verified,7700.0,Joint App,62000.0,Verified
e4912b650878ca941...,Manager,6 years,MORTGAGE,103000.0,CA,922xx,USA,A,A2,Not Verified,511251.0,Individual,,
3d6f35ad2e2be4572...,Clinical Coordinator,6 years,MORTGAGE,64300.0,NM,871xx,USA,D,D1,Source Verified,233646.0,Individual,,
478e1cd37e35c20f7...,Warehouse supervisor,3 years,RENT,32000.0,NJ,073xx,USA,C,C4,Not Verified,44200.0,Individual,,
a5d42e35cc0be2789...,Sales Manager,5 years,MORTGAGE,140000.0,CA,917xx,USA,C,C5,Source Verified,455867.0,Individual,,
7f3ef55c784bf6b16...,Owner,10+ years,MORTGAGE,98000.0,NV,891xx,USA,A,A3,Source Verified,265000.0,Individual,,
beb30abaeca08f49b...,Alternate Media S...,2 years,RENT,56000.0,CA,940xx,USA,D,D1,Source Verified,75023.0,Individual,,
233e5e6797dd0a68b...,Hammerman,7 years,MORTGAGE,70000.0,OH,440xx,USA,C,C4,Source Verified,233341.0,Individual,,
c4b178ffaf80ed472...,Civil Engineer,< 1 year,RENT,120000.0,FL,321xx,USA,B,B3,Source Verified,97065.0,Individual,,


#### 3 ingestion of new column with current timestamp

In [100]:
from pyspark.sql.functions import current_timestamp

In [101]:
customer_df_ingested = customer_df_renamed.withColumn("ingestion_date",current_timestamp())

In [102]:
customer_df_ingested

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingestion_date
bfcec8da7b3f1fe83...,Regional Manager,8 years,RENT,130000.0,CT,069xx,USA,C,C3,Verified,135904.0,Individual,,,2024-11-27 02:31:...
36399458db4295868...,,,RENT,12000.0,CA,953xx,USA,C,C1,Verified,7700.0,Joint App,62000.0,Verified,2024-11-27 02:31:...
e4912b650878ca941...,Manager,6 years,MORTGAGE,103000.0,CA,922xx,USA,A,A2,Not Verified,511251.0,Individual,,,2024-11-27 02:31:...
3d6f35ad2e2be4572...,Clinical Coordinator,6 years,MORTGAGE,64300.0,NM,871xx,USA,D,D1,Source Verified,233646.0,Individual,,,2024-11-27 02:31:...
478e1cd37e35c20f7...,Warehouse supervisor,3 years,RENT,32000.0,NJ,073xx,USA,C,C4,Not Verified,44200.0,Individual,,,2024-11-27 02:31:...
a5d42e35cc0be2789...,Sales Manager,5 years,MORTGAGE,140000.0,CA,917xx,USA,C,C5,Source Verified,455867.0,Individual,,,2024-11-27 02:31:...
7f3ef55c784bf6b16...,Owner,10+ years,MORTGAGE,98000.0,NV,891xx,USA,A,A3,Source Verified,265000.0,Individual,,,2024-11-27 02:31:...
beb30abaeca08f49b...,Alternate Media S...,2 years,RENT,56000.0,CA,940xx,USA,D,D1,Source Verified,75023.0,Individual,,,2024-11-27 02:31:...
233e5e6797dd0a68b...,Hammerman,7 years,MORTGAGE,70000.0,OH,440xx,USA,C,C4,Source Verified,233341.0,Individual,,,2024-11-27 02:31:...
c4b178ffaf80ed472...,Civil Engineer,< 1 year,RENT,120000.0,FL,321xx,USA,B,B3,Source Verified,97065.0,Individual,,,2024-11-27 02:31:...


#### 4 remove duplicates

In [103]:
customer_df_ingested.count()

2260701

In [104]:
customer_df_distinct = customer_df_ingested.distinct()

In [105]:
customer_df_distinct.count()

2260638

#### 5 remove rows where annual income is null 

In [106]:
customer_df_distinct.createOrReplaceTempView("customers")

In [107]:
spark.sql("select * from customers limit 5")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingestion_date
dc450b16903e42804...,Design Supervisor,10+ years,MORTGAGE,110000.0,MI,480xx,USA,B,B5,Source Verified,220741.0,Individual,,,2024-11-27 02:32:...
503072e98746b4984...,PTA,2 years,RENT,94000.0,AZ,855xx,USA,A,A3,Source Verified,234083.0,Individual,,,2024-11-27 02:32:...
4d55b61ff26df21b9...,inspection,10+ years,OWN,60840.0,KY,402xx,USA,D,D3,Not Verified,182418.0,Individual,,,2024-11-27 02:32:...
f075aa38a51d20294...,Claims Examiner,4 years,RENT,27000.0,FL,337xx,USA,A,A3,Not Verified,66835.0,Individual,,,2024-11-27 02:32:...
6bd5e77c47d99bac4...,President,5 years,OWN,55000.0,MS,390xx,USA,D,D1,Source Verified,30820.0,Individual,,,2024-11-27 02:32:...


In [108]:
spark.sql(""" select count(1) from customers where annual_income is null
""")

count(1)
5


In [109]:
spark.sql(""" select count(*) from from customers where annual_income is not null""")

from
2260633


In [110]:
customers_income_filtered = spark.sql("select * from customers where annual_income is not null")

In [111]:
customers_income_filtered

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingestion_date
4e4a66a1a44845fcc...,Simulation Coordi...,10+ years,MORTGAGE,75000.0,NV,895xx,USA,A,A5,Not Verified,353171.0,Individual,,,2024-11-27 02:32:...
36e6676aba97d33d4...,Mortgage Loan Pro...,2 years,RENT,50000.0,FL,323xx,USA,C,C1,Not Verified,57834.0,Individual,,,2024-11-27 02:32:...
63286baa7acdd1cd5...,Medical Coder,4 years,MORTGAGE,65000.0,AZ,857xx,USA,C,C3,Not Verified,264289.0,Individual,,,2024-11-27 02:32:...
e36e30e0f0ac81074...,Material specialist,10+ years,RENT,47174.0,PA,172xx,USA,E,E5,Not Verified,57493.0,Individual,,,2024-11-27 02:32:...
dddfb95150c484bc0...,,,MORTGAGE,74000.0,NV,891xx,USA,B,B2,Not Verified,259606.0,Individual,,,2024-11-27 02:32:...
9ab08ee3e41e0d595...,Server,1 year,RENT,15000.0,FL,334xx,USA,F,F5,Verified,14336.0,Individual,,,2024-11-27 02:32:...
2a2c5b05dd792ab14...,Line lead,< 1 year,RENT,32000.0,WI,532xx,USA,A,A1,Source Verified,87944.0,Individual,,,2024-11-27 02:32:...
68beb834846f21561...,Franchisee/ Owner,4 years,MORTGAGE,98300.0,TX,770xx,USA,E,E4,Not Verified,185272.0,Individual,,,2024-11-27 02:32:...
8dbaa75f49b9b4ae4...,commercial desk,10+ years,MORTGAGE,58000.0,MI,497xx,USA,C,C1,Source Verified,168471.0,Individual,,,2024-11-27 02:32:...
79d76a3fd5f54fd73...,Security Specialist,< 1 year,RENT,118000.0,NY,142xx,USA,A,A4,Not Verified,75800.0,Individual,,,2024-11-27 02:32:...


In [112]:
customers_income_filtered.createOrReplaceTempView("customers_income_filtered")

In [113]:
spark.sql("select count(*) from customers_income_filtered")

count(1)
2260633


#### convert emp_length to integer

In [114]:
spark.sql("select distinct(emp_length) from customers_income_filtered").show()

+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|      null|
|    1 year|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [115]:
from pyspark.sql.functions import regexp_replace, col

In [116]:
customers_emplength_cleaned = customers_income_filtered.withColumn("emp_length",regexp_replace(col("emp_length"),"(\D)",""))

In [117]:
customers_emplength_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingestion_date
fdc7159945735c5c9...,Estee Lauder,2.0,RENT,113000.0,NY,100xx,USA,C,C1,Not Verified,,Individual,,,2024-11-27 02:33:...
8ba603fd5fb4e99b9...,Livable Forest Fence,6.0,MORTGAGE,42000.0,TX,773xx,USA,A,A5,Not Verified,,Individual,,,2024-11-27 02:33:...
35c7b1f45bbb91d5e...,FunGoPlay,3.0,MORTGAGE,132000.0,NJ,070xx,USA,C,C1,Not Verified,,Individual,,,2024-11-27 02:33:...
e7bc49dd0dc8b149d...,EBMS,7.0,MORTGAGE,78000.0,MT,591xx,USA,B,B1,Not Verified,,Individual,,,2024-11-27 02:33:...
185e2391829026f1a...,USAA,10.0,MORTGAGE,57000.0,AZ,853xx,USA,C,C5,Verified,,Individual,,,2024-11-27 02:33:...
5856bab1b1fb0dfa8...,Duke Energy,10.0,MORTGAGE,135000.0,NC,287xx,USA,A,A4,Source Verified,,Individual,,,2024-11-27 02:33:...
59ab6fd8219efc969...,DTCC,10.0,RENT,63700.0,NY,112xx,USA,C,C4,Not Verified,,Individual,,,2024-11-27 02:33:...
6505d0990adbb1ae2...,Stonehenge Advisors,1.0,RENT,30000.0,NJ,080xx,USA,A,A3,Source Verified,,Individual,,,2024-11-27 02:33:...
36adee5ac261d7b5a...,,,RENT,86400.0,CA,920xx,USA,B,B2,Verified,,Individual,,,2024-11-27 02:33:...
a423c37785e2b8762...,The Sherwin-Willi...,1.0,MORTGAGE,115200.0,FL,323xx,USA,A,A2,Source Verified,,Individual,,,2024-11-27 02:33:...


In [118]:
customers_emplength_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingestion_date: timestamp (nullable = false)



In [119]:
customers_emp_lenght_casted = customers_emplength_cleaned.withColumn("emp_length",customers_emplength_cleaned.emp_length.cast('int'))

In [120]:
customers_emp_lenght_casted

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingestion_date
4bd94ddd4bcb36da0...,,,MORTGAGE,28000.0,KY,400xx,USA,C,C4,Verified,38505.0,Individual,,,2024-11-27 02:33:...
8e3b5d19e9ae66187...,Portfolio Manager,10.0,MORTGAGE,150000.0,CT,066xx,USA,B,B1,Not Verified,340526.0,Individual,,,2024-11-27 02:33:...
bee05b905fcac97d5...,Transportation Su...,2.0,MORTGAGE,65000.0,NJ,087xx,USA,A,A5,Verified,85124.0,Individual,,,2024-11-27 02:33:...
dd3f8f0eefc43c649...,Abf,5.0,RENT,48000.0,VA,234xx,USA,C,C1,Verified,84441.0,Individual,,,2024-11-27 02:33:...
b8612e97733ea8b0b...,Ops Manager,10.0,OWN,188000.0,NY,117xx,USA,E,E1,Not Verified,130493.0,Individual,,,2024-11-27 02:33:...
8d0e471f3a1ebe8b1...,Unit Manager,10.0,MORTGAGE,98000.0,PA,160xx,USA,A,A3,Verified,265854.0,Individual,,,2024-11-27 02:33:...
e255601833e59fe57...,Plans review Tech,9.0,RENT,53000.0,CO,802xx,USA,B,B3,Source Verified,40373.0,Individual,,,2024-11-27 02:33:...
5d7e7089f85c307c5...,Director Mail Ser...,10.0,MORTGAGE,75000.0,CA,935xx,USA,B,B1,Not Verified,353480.0,Individual,,,2024-11-27 02:33:...
6feeeca8d783a17ce...,,,MORTGAGE,18984.0,GA,302xx,USA,D,D3,Verified,162422.0,Individual,,,2024-11-27 02:33:...
8cfaaad65b7c80eaf...,Analyst,10.0,RENT,75000.0,GA,300xx,USA,D,D5,Source Verified,82874.0,Individual,,,2024-11-27 02:33:...


In [121]:
customers_emp_lenght_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingestion_date: timestamp (nullable = false)



#### 7 we need to replace all the nulls in emp_length column with average of this column

In [122]:
customers_emp_lenght_casted.filter("emp_length is null").count()

146903

In [123]:
customers_emp_lenght_casted.createOrReplaceTempView("customers")

In [124]:
avg_emp_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers").collect()

In [125]:
print(avg_emp_length)

[Row(avg_emp_length=6)]


In [126]:
avg_emp_duration = avg_emp_length[0][0]

In [127]:
print(avg_emp_duration)

6


In [128]:
customers_emp_lenght_replaced = customers_emp_lenght_casted.na.fill(avg_emp_duration,subset=['emp_length'])

In [129]:
customers_emp_lenght_replaced.filter("emp_length is null").count()

0

#### 8 clean the address_state (it should be 2 characters only), replace all others with NA

In [130]:
customers_emp_lenght_replaced.createOrReplaceTempView("customers")

In [131]:
spark.sql("select distinct(address_state) from customers").show()

+--------------------+
|       address_state|
+--------------------+
|Helping Kenya's D...|
|               223xx|
|175 (total projec...|
|                  AZ|
|                  SC|
|so Plan ""C"" is ...|
|I am 56 yrs. old ...|
|financially I mad...|
|but no one will l...|
|                  LA|
|         etc.  First|
|                  MN|
|yet Capital One n...|
|               499xx|
|               850xx|
|Advocate business...|
|and MBA's are ove...|
|Eliminating Credi...|
|               662xx|
|               951xx|
+--------------------+
only showing top 20 rows



In [132]:
spark.sql("select count(address_state) from customers where length(address_state)>2")

count(address_state)
254


In [133]:
from pyspark.sql.functions import when, col, length

In [134]:
customers_state_cleaned = customers_emp_lenght_replaced.withColumn(
    "address_state",
    when(length(col("address_state"))>2,"NA").otherwise(col("address_state"))
)

In [135]:
customers_state_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingestion_date
e2b2c42b0a212a2e2...,personal banker,5,MORTGAGE,90000.0,MN,554xx,USA,C,C3,Not Verified,362093.0,Individual,,,2024-11-27 02:35:...
662b630bb1dff9a65...,Software Engineer...,10,MORTGAGE,79458.68,PA,153xx,USA,B,B3,Not Verified,205945.0,Individual,,,2024-11-27 02:35:...
7d38d72b728a56cc1...,SUPERVISOR,5,MORTGAGE,65500.0,TN,385xx,USA,B,B5,Source Verified,564171.0,Individual,,,2024-11-27 02:35:...
c6bc0f71045c1ac94...,LPN,1,OWN,37500.0,AL,351xx,USA,C,C2,Verified,59116.0,Individual,,,2024-11-27 02:35:...
5f9eec6bbd7092dac...,Staff nurse,8,RENT,130000.0,CA,950xx,USA,C,C1,Source Verified,103070.0,Individual,,,2024-11-27 02:35:...
e94489557b4ff7c81...,Utilities Service,10,MORTGAGE,37960.0,IN,473xx,USA,C,C2,Not Verified,25386.0,Individual,,,2024-11-27 02:35:...
5472a7ba8c894e407...,Manager,5,MORTGAGE,60000.0,DE,199xx,USA,C,C1,Verified,192581.0,Individual,,,2024-11-27 02:35:...
a0fca4b34084e7075...,Operations Manager,3,MORTGAGE,125000.0,CA,956xx,USA,C,C5,Verified,560455.0,Individual,,,2024-11-27 02:35:...
f6bef55b4a1ed18a6...,Operator,10,MORTGAGE,110000.0,NJ,080xx,USA,E,E5,Verified,450917.0,Individual,,,2024-11-27 02:35:...
7f9f496d75559e8eb...,Admin Services/Se...,4,RENT,32000.0,NH,038xx,USA,C,C1,Source Verified,39366.0,Individual,,,2024-11-27 02:35:...


In [136]:
customers_state_cleaned.select("address_state").distinct()

address_state
AZ
SC
LA
MN
NJ
DC
OR
""
VA
""


In [137]:
customers_state_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path","/user/itv014325/lendingclubproject/cleaned/customers_parquet") \
.save()

In [None]:
customers_state_cleaned.write \
.option("header",True) \
.format("csv") \
.mode("overwrite") \
.option("path","/user/itv014325/lendingclubproject/cleaned/customers_csv") \
.save()