In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config("spark.sql.warehouse.dir", f"/user/itv010698/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
cust_raw_df = spark.read \
.format("csv") \
.option("inferSchema", "true") \
.option("header", "true") \
.load("lending_club_project/raw_data/customers_data_csv")

In [3]:
cust_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
90828734b5f4150d0...,Supervisor,10+ years,MORTGAGE,125000.0,KY,424xx,USA,E,E3,Verified,594540.0,Individual,,
148f7f93997c46a1f...,Security,< 1 year,RENT,54000.0,WI,532xx,USA,A,A4,Not Verified,57729.0,Individual,,
7190fbf030fe6cdda...,Manager,5 years,MORTGAGE,28000.0,AR,721xx,USA,C,C4,Verified,82780.0,Individual,,
115721e01855eec21...,Teller,3 years,RENT,26000.0,WI,532xx,USA,A,A5,Not Verified,41114.0,Individual,,
1477fb5f4e5e8c96e...,Quality Control A...,3 years,RENT,67000.0,TX,760xx,USA,F,F2,Verified,43449.0,Individual,,
577b9141f4d3e00f1...,Technician,2 years,RENT,65000.0,CA,900xx,USA,C,C3,Verified,127916.0,Joint App,118000.0,Verified
ed5ba1091dba4fa84...,Host,< 1 year,MORTGAGE,325000.0,NY,111xx,USA,B,B5,Source Verified,41000.0,Individual,,
6f138cafd3e520190...,Guest Services Su...,2 years,MORTGAGE,60000.0,AZ,850xx,USA,C,C2,Source Verified,280890.0,Individual,,
de0ba908fc731ac67...,general manager,10+ years,MORTGAGE,74000.0,GA,310xx,USA,A,A3,Source Verified,306897.0,Individual,,
f8417d62d055055ff...,Sheet Metal Mechanic,9 years,MORTGAGE,54496.0,GA,307xx,USA,B,B1,Verified,153345.0,Joint App,119496.0,Verified


In [4]:
cust_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: double (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: string (nullable = true)
 |-- verification_status_joint: string (nullable = true)



### Change schema

In [5]:
schema = "member_id string, emp_title string, emp_length string, home_ownership string, annual_inc float, addr_state string, zip_code string, country string, grade string, subgrade string, verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float, verification_status_joint string"

In [6]:
cust_raw_df = spark.read \
.format("csv") \
.schema(schema) \
.option("header", "true") \
.load("lending_club_project/raw_data/customers_data_csv")

In [7]:
cust_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- subgrade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



## Rename Column

### withColumnRenamed()

In [8]:
cust_raw_renamed = cust_raw_df.withColumnRenamed("annual_inc", "annual_income") \
.withColumnRenamed("addr_state", "address_state") \
.withColumnRenamed("zip_code", "address_zipcode") \
.withColumnRenamed("country", "address_country") \
.withColumnRenamed("annual_inc_joint", "joint_annual_income") \
.withColumnRenamed("annual_inc", "annual_income")

In [9]:
cust_raw_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,subgrade,verification_status,tot_hi_cred_lim,application_type,joint_annual_income,verification_status_joint
90828734b5f4150d0...,Supervisor,10+ years,MORTGAGE,125000.0,KY,424xx,USA,E,E3,Verified,594540.0,Individual,,
148f7f93997c46a1f...,Security,< 1 year,RENT,54000.0,WI,532xx,USA,A,A4,Not Verified,57729.0,Individual,,
7190fbf030fe6cdda...,Manager,5 years,MORTGAGE,28000.0,AR,721xx,USA,C,C4,Verified,82780.0,Individual,,
115721e01855eec21...,Teller,3 years,RENT,26000.0,WI,532xx,USA,A,A5,Not Verified,41114.0,Individual,,
1477fb5f4e5e8c96e...,Quality Control A...,3 years,RENT,67000.0,TX,760xx,USA,F,F2,Verified,43449.0,Individual,,
577b9141f4d3e00f1...,Technician,2 years,RENT,65000.0,CA,900xx,USA,C,C3,Verified,127916.0,Joint App,118000.0,Verified
ed5ba1091dba4fa84...,Host,< 1 year,MORTGAGE,325000.0,NY,111xx,USA,B,B5,Source Verified,41000.0,Individual,,
6f138cafd3e520190...,Guest Services Su...,2 years,MORTGAGE,60000.0,AZ,850xx,USA,C,C2,Source Verified,280890.0,Individual,,
de0ba908fc731ac67...,general manager,10+ years,MORTGAGE,74000.0,GA,310xx,USA,A,A3,Source Verified,306897.0,Individual,,
f8417d62d055055ff...,Sheet Metal Mechanic,9 years,MORTGAGE,54496.0,GA,307xx,USA,B,B1,Verified,153345.0,Joint App,119496.0,Verified


## 

### insert a new column named as ingestion date (current time)

In [10]:
from pyspark.sql.functions import *

In [11]:
cust_df_ingest_date = cust_raw_renamed.withColumn("ingest_date", current_timestamp())

In [12]:
cust_df_ingest_date

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,subgrade,verification_status,tot_hi_cred_lim,application_type,joint_annual_income,verification_status_joint,ingest_date
90828734b5f4150d0...,Supervisor,10+ years,MORTGAGE,125000.0,KY,424xx,USA,E,E3,Verified,594540.0,Individual,,,2024-03-20 10:07:...
148f7f93997c46a1f...,Security,< 1 year,RENT,54000.0,WI,532xx,USA,A,A4,Not Verified,57729.0,Individual,,,2024-03-20 10:07:...
7190fbf030fe6cdda...,Manager,5 years,MORTGAGE,28000.0,AR,721xx,USA,C,C4,Verified,82780.0,Individual,,,2024-03-20 10:07:...
115721e01855eec21...,Teller,3 years,RENT,26000.0,WI,532xx,USA,A,A5,Not Verified,41114.0,Individual,,,2024-03-20 10:07:...
1477fb5f4e5e8c96e...,Quality Control A...,3 years,RENT,67000.0,TX,760xx,USA,F,F2,Verified,43449.0,Individual,,,2024-03-20 10:07:...
577b9141f4d3e00f1...,Technician,2 years,RENT,65000.0,CA,900xx,USA,C,C3,Verified,127916.0,Joint App,118000.0,Verified,2024-03-20 10:07:...
ed5ba1091dba4fa84...,Host,< 1 year,MORTGAGE,325000.0,NY,111xx,USA,B,B5,Source Verified,41000.0,Individual,,,2024-03-20 10:07:...
6f138cafd3e520190...,Guest Services Su...,2 years,MORTGAGE,60000.0,AZ,850xx,USA,C,C2,Source Verified,280890.0,Individual,,,2024-03-20 10:07:...
de0ba908fc731ac67...,general manager,10+ years,MORTGAGE,74000.0,GA,310xx,USA,A,A3,Source Verified,306897.0,Individual,,,2024-03-20 10:07:...
f8417d62d055055ff...,Sheet Metal Mechanic,9 years,MORTGAGE,54496.0,GA,307xx,USA,B,B1,Verified,153345.0,Joint App,119496.0,Verified,2024-03-20 10:07:...


### remove complete duplicate rows

In [13]:
cust_df_ingest_date.count()

2260701

In [14]:
cust_df_ingest_date.distinct().count()

2260638

In [15]:
cust_distinct_df = cust_df_ingest_date.distinct()   # This is the dataframe with removed duplicates

In [16]:
cust_distinct_df.createOrReplaceTempView("customers")

In [17]:
spark.sql("SELECT * FROM customers")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,subgrade,verification_status,tot_hi_cred_lim,application_type,joint_annual_income,verification_status_joint,ingest_date
181dd9a754bfae9be...,Senior Technician,10+ years,MORTGAGE,76000.0,LA,700xx,USA,B,B3,Source Verified,70245.0,Individual,,,2024-03-20 10:07:...
84559eaec5cb3444a...,Teacher/coach,10+ years,MORTGAGE,107000.0,GA,310xx,USA,D,D2,Source Verified,229245.0,Individual,,,2024-03-20 10:07:...
1b4bffb31a7f1d3cd...,Fishing tool supe...,3 years,MORTGAGE,200000.0,OH,446xx,USA,D,D1,Verified,147750.0,Individual,,,2024-03-20 10:07:...
be22c3147ef63c7d7...,Resource R.N.,10+ years,MORTGAGE,72000.0,MT,594xx,USA,A,A4,Verified,175573.0,Individual,,,2024-03-20 10:07:...
db67ea24adc7aa838...,Recruitment/Reten...,6 years,RENT,28000.0,NJ,088xx,USA,B,B5,Not Verified,47288.0,Individual,,,2024-03-20 10:07:...
28a57b5e5f95c8154...,,,RENT,30000.0,AL,350xx,USA,D,D4,Source Verified,16464.0,Individual,,,2024-03-20 10:07:...
b183aea67237243e4...,Sales,5 years,MORTGAGE,71000.0,IN,469xx,USA,C,C4,Verified,135866.0,Individual,,,2024-03-20 10:07:...
264f46dfd76aadccd...,Store Manager,10+ years,OWN,110000.0,FL,326xx,USA,D,D3,Verified,123456.0,Individual,,,2024-03-20 10:07:...
742fc1d6a6eeba680...,controller,10+ years,MORTGAGE,89000.0,FL,320xx,USA,C,C4,Verified,218268.0,Individual,,,2024-03-20 10:07:...
64880b7e92672dca2...,President,10+ years,MORTGAGE,150000.0,PA,170xx,USA,B,B3,Source Verified,176990.0,Individual,,,2024-03-20 10:07:...


### remove the rows where annual_income is null

In [18]:
spark.sql("SELECT COUNT(*) FROM customers WHERE annual_income is null")

count(1)
5


In [19]:
cust_income_filtered = spark.sql("SELECT * FROM customers WHERE annual_income is not null")

### convert emp_length to integer

In [20]:
spark.sql("SELECT DISTINCT(emp_length) FROM customers").show()

+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|      null|
|    1 year|
| reactors"|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [21]:
cust_emp_length_cleaned = cust_income_filtered.withColumn("emp_length", regexp_replace(col("emp_length"), "(\D)", ""))

In [22]:
cust_emp_length_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,subgrade,verification_status,tot_hi_cred_lim,application_type,joint_annual_income,verification_status_joint,ingest_date
90794bb907d643882...,Brand Manager,2.0,OWN,175000.0,NY,120xx,USA,C,C4,Not Verified,226600.0,Individual,,,2024-03-20 10:08:...
298c08bf2e7802054...,Teller,2.0,RENT,23784.0,GA,309xx,USA,B,B1,Source Verified,14700.0,Individual,,,2024-03-20 10:08:...
64b7c5929c9aa06f8...,teacher,10.0,OWN,55000.0,TX,754xx,USA,B,B4,Source Verified,83341.0,Individual,,,2024-03-20 10:08:...
b244e54ac03470c15...,Senior Field Supe...,2.0,MORTGAGE,85000.0,LA,704xx,USA,C,C3,Not Verified,306488.0,Individual,,,2024-03-20 10:08:...
e34dacbddec7da893...,Lineman,1.0,RENT,75000.0,LA,707xx,USA,D,D2,Source Verified,43516.0,Individual,,,2024-03-20 10:08:...
3c870307110176f6f...,SVP,4.0,RENT,550000.0,CT,068xx,USA,C,C1,Verified,37269.0,Individual,,,2024-03-20 10:08:...
5b850f49003590f0c...,Director of Quality,1.0,MORTGAGE,118300.0,OK,740xx,USA,D,D4,Source Verified,148783.0,Individual,,,2024-03-20 10:08:...
dc2dca6e21b2162c0...,secretary,4.0,OWN,55000.0,TX,754xx,USA,B,B4,Source Verified,164000.0,Individual,,,2024-03-20 10:08:...
c6e3b607b2deb6d92...,Vice President,10.0,MORTGAGE,153000.0,FL,339xx,USA,B,B1,Source Verified,306270.0,Individual,,,2024-03-20 10:08:...
a22f2114212cdead0...,,1.0,RENT,100000.0,MO,631xx,USA,D,D1,Not Verified,186373.0,Individual,,,2024-03-20 10:08:...


In [23]:
cust_emp_length_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- subgrade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- joint_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [24]:
cust_emp_length_casted = cust_emp_length_cleaned.withColumn("emp_length", cust_emp_length_cleaned.emp_length.cast("int"))

In [25]:
cust_emp_length_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- subgrade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- joint_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



### replace all the nulls in emp_length column with average of this column

In [26]:
cust_emp_length_casted.filter("emp_length is null").count()

146903

In [27]:
cust_emp_length_casted.createOrReplaceTempView("customers")

In [28]:
avg_emp_length = spark.sql("SELECT floor(AVG(emp_length)) as avg_emp_length FROM customers").collect()
print(avg_emp_length)

[Row(avg_emp_length=6)]


In [29]:
avg_emp_duration = avg_emp_length[0][0]

In [30]:
cust_nulls_filled = cust_emp_length_casted.na.fill(avg_emp_duration, subset = ['emp_length'])

In [31]:
cust_nulls_filled

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,subgrade,verification_status,tot_hi_cred_lim,application_type,joint_annual_income,verification_status_joint,ingest_date
b3a78dda1fb4da92f...,STORE MANAGER,2,RENT,75000.0,CA,927xx,USA,D,D1,Source Verified,98130.0,Individual,,,2024-03-20 10:08:...
2578dd8447002d070...,Police Officer,10,RENT,50932.0,TX,774xx,USA,D,D2,Source Verified,76818.0,Individual,,,2024-03-20 10:08:...
1d7adf0390924b2ba...,Security Guard,10,MORTGAGE,39000.0,MN,554xx,USA,A,A5,Source Verified,252250.0,Individual,,,2024-03-20 10:08:...
b83fa13f8387883c8...,,6,RENT,53226.32,CA,950xx,USA,C,C4,Verified,150368.0,Individual,,,2024-03-20 10:08:...
342511465d352751f...,Benefits Analyst,10,MORTGAGE,63000.0,PA,170xx,USA,B,B4,Not Verified,324215.0,Individual,,,2024-03-20 10:08:...
9a13f0c6c1ad480f3...,Medical Consultant,2,RENT,45000.0,NY,114xx,USA,B,B2,Source Verified,95700.0,Individual,,,2024-03-20 10:08:...
7e87003387fcf91e3...,District Manager,10,OWN,98000.0,CA,932xx,USA,B,B1,Not Verified,417490.0,Individual,,,2024-03-20 10:08:...
4b0fe664b2852815b...,Agent,5,MORTGAGE,0.0,FL,325xx,USA,D,D1,Verified,153771.0,Joint App,56000.0,Not Verified,2024-03-20 10:08:...
9d53d5a87d902cb5e...,Manager,3,RENT,80000.0,NY,104xx,USA,A,A3,Not Verified,10000.0,Individual,,,2024-03-20 10:08:...
878b5d7c026988719...,Claims Adjuster,3,RENT,54900.0,NJ,080xx,USA,D,D2,Source Verified,17241.0,Individual,,,2024-03-20 10:08:...


In [32]:
cust_nulls_filled.filter("emp_length is null")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,subgrade,verification_status,tot_hi_cred_lim,application_type,joint_annual_income,verification_status_joint,ingest_date


In [33]:
cust_nulls_filled.createOrReplaceTempView("customers")

###  clean the address_state (it should be 2 characters only), replace all others with NA

In [34]:
spark.sql("SELECT DISTINCT(address_state) FROM customers")

address_state
Helping Kenya's D...
223xx
175 (total projec...
AZ
SC
I am 56 yrs. old ...
"so Plan """"C"""" is ..."
financially I mad...
but no one will l...
LA


In [36]:
spark.sql("SELECT COUNT(address_state) FROM customers WHERE length(address_state) > 2")

count(address_state)
254


In [43]:
cust_cleaned = cust_nulls_filled.withColumn(
    "address_state",
    when(length(col("address_state")) > 2, "NA").otherwise(col("address_state"))
)

In [45]:
cust_cleaned.select("address_state").distinct()

address_state
AZ
SC
LA
MN
NJ
DC
OR
""
VA
""


In [50]:
cust_cleaned.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "lending_club_project/cleaned_data/customers_cleaned_parquet") \
.save()

In [54]:
cust_cleaned.write \
.option("header","true") \
.format("csv") \
.mode("overwrite") \
.option("path", "lending_club_project/cleaned_data/customers_cleaned_csv") \
.save()