In [3]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config('spark.shuffle.useOldFetchProtocol','true'). \
config("spark.sql.warehouse.dir", f"/user/itv007136/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [4]:
spark

In [5]:
customers_df=spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/public/trendytech/lendingclubproject/raw/customers_data_csv")

In [6]:
customers_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [7]:
customers_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: double (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: string (nullable = true)
 |-- verification_status_joint: string (nullable = true)



In [8]:
customers_schema="member_id string,emp_title string,emp_length string,home_ownership string,annual_inc float,addr_state string,zip_code string,country string,grade string,sub_grade string,verification_status string,tot_hi_cred_lim float,application_type string,annual_inc_joint float,verification_status_joint string"

In [9]:
customers_df=spark.read.format("csv").option("header","true").schema(customers_schema).load("/public/trendytech/lendingclubproject/raw/customers_data_csv")

In [10]:
customers_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [11]:
customers_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



### Question 2: Renaming the Columns

In [12]:
customers_df_renamed=customers_df \
.withColumnRenamed("annual_inc","annual_income") \
.withColumnRenamed("addr_state","address_state") \
.withColumnRenamed("zip_code","address_zipcode") \
.withColumnRenamed("country","address_country") \
.withColumnRenamed("tot_hi_cred_lim","total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint","joint_annual_income")

In [13]:
customers_df_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,joint_annual_income,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


### Question 3: Add column Ingestion date which will record the timestamp whenever data is processed

In [14]:
from pyspark.sql.functions import current_timestamp

In [15]:
customers_df_ingested=customers_df_renamed.withColumn("ingest_date",current_timestamp())

In [16]:
customers_df_ingested

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,joint_annual_income,verification_status_joint,ingest_date
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,,2024-04-03 22:37:...
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,,2024-04-03 22:37:...
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,,2024-04-03 22:37:...
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,,2024-04-03 22:37:...
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,,2024-04-03 22:37:...
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,,2024-04-03 22:37:...
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,,2024-04-03 22:37:...
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,,2024-04-03 22:37:...
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,,2024-04-03 22:37:...
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,,2024-04-03 22:37:...


### Question 4 Remove Duplicate Rows from the dataset

In [17]:
customers_df_ingested.count()

2260701

In [18]:
customers_df_ingested.distinct().count()

2260638

In [19]:
customers_df_unique=customers_df_ingested.distinct()

In [20]:
customers_df_unique

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,joint_annual_income,verification_status_joint,ingest_date
d73add4840e38e9c7...,Teacher,4 years,RENT,72000.0,CA,946xx,USA,A,A4,Not Verified,140290.0,Individual,,,2024-04-03 22:38:...
7876aaa0f79c67b66...,Marketing Coordin...,2 years,MORTGAGE,68000.0,FL,328xx,USA,B,B1,Not Verified,42394.0,Individual,,,2024-04-03 22:38:...
8e5b970aca6dffc18...,Loan Officer,9 years,MORTGAGE,42000.0,MT,598xx,USA,A,A3,Verified,225922.0,Individual,,,2024-04-03 22:38:...
838fb4c658cbcc7af...,Waiver worker,< 1 year,MORTGAGE,15000.0,AR,719xx,USA,B,B2,Not Verified,229562.0,Joint App,68000.0,Not Verified,2024-04-03 22:38:...
4f754624e9751131e...,MEDICAL SUPPORT A...,< 1 year,RENT,44377.0,MN,554xx,USA,B,B2,Source Verified,28300.0,Joint App,92110.0,Source Verified,2024-04-03 22:38:...
5c49dad7059326288...,,,MORTGAGE,53000.0,WA,982xx,USA,A,A2,Not Verified,145100.0,Individual,,,2024-04-03 22:38:...
ad74843f3081ba459...,,,RENT,45540.0,CA,917xx,USA,B,B1,Source Verified,45989.0,Individual,,,2024-04-03 22:38:...
46cd8f2e6ebabc1bf...,Ophthalmologist A...,8 years,RENT,50000.0,CA,900xx,USA,D,D3,Verified,29163.0,Individual,,,2024-04-03 22:38:...
bd4db0131c343ee45...,Title Associate,1 year,MORTGAGE,40000.0,MD,211xx,USA,C,C4,Verified,65826.0,Individual,,,2024-04-03 22:38:...
d2fcab7eccb3ef342...,computer analyst,8 years,OWN,80000.0,TX,753xx,USA,A,A5,Not Verified,225642.0,Individual,,,2024-04-03 22:38:...


In [21]:
customers_df_unique.createOrReplaceTempView("customers")

In [22]:
spark.sql("select * from customers")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,joint_annual_income,verification_status_joint,ingest_date
60cbc6ce5a397c670...,Teacher,10+ years,MORTGAGE,89000.0,CA,945xx,USA,B,B5,Source Verified,423905.0,Individual,,,2024-04-03 22:38:...
f644369bcc810a973...,Senior Manager,5 years,MORTGAGE,108000.0,NV,891xx,USA,A,A5,Source Verified,343100.0,Individual,,,2024-04-03 22:38:...
dacdc1a4145568fee...,,,RENT,37000.0,NJ,080xx,USA,C,C4,Verified,22607.0,Individual,,,2024-04-03 22:38:...
ec7b0b9620dc54126...,Associate Pastor,10+ years,MORTGAGE,73500.0,VA,244xx,USA,E,E1,Verified,89883.0,Individual,,,2024-04-03 22:38:...
2829f2b31bb578009...,Owner/Operator,10+ years,MORTGAGE,80000.0,IL,616xx,USA,B,B4,Source Verified,299322.0,Individual,,,2024-04-03 22:38:...
fd0db3755842db93c...,Process tech,4 years,OWN,55000.0,LA,707xx,USA,D,D3,Verified,67496.0,Individual,,,2024-04-03 22:38:...
e65091ed9dfb3c8a1...,Patient Experienc...,7 years,MORTGAGE,37420.0,NC,287xx,USA,E,E3,Source Verified,214816.0,Individual,,,2024-04-03 22:38:...
af99b513c97a99a9b...,Warehouse,4 years,RENT,35000.0,CA,917xx,USA,B,B2,Not Verified,35893.0,Individual,,,2024-04-03 22:38:...
5b3d622466579ac32...,Medical Office Co...,5 years,RENT,38000.0,KS,662xx,USA,E,E2,Verified,40024.0,Individual,,,2024-04-03 22:38:...
fdf9915c08f378530...,Business Anaylst,10+ years,MORTGAGE,75000.0,AZ,853xx,USA,F,F1,Source Verified,154368.0,Individual,,,2024-04-03 22:38:...


### Question 5 Remove rows where annual income is null

In [23]:
spark.sql("select count(*) as count from customers where annual_income is null")

count
5


In [24]:
customers_df_filtered=spark.sql("select * from customers where annual_income is not null")

In [25]:
customers_df_filtered.createOrReplaceTempView("customers")

In [26]:
customers_df_filtered

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,joint_annual_income,verification_status_joint,ingest_date
3f98a84d321e4a6f5...,LABORER,10+ years,MORTGAGE,67000.0,PA,179xx,USA,C,C1,Verified,48166.0,Individual,,,2024-04-03 22:38:...
668f6fe3f3b11ed5d...,,,OWN,32000.0,IL,604xx,USA,A,A4,Verified,36981.0,Individual,,,2024-04-03 22:38:...
b7fb17f661ea017e4...,welder,3 years,MORTGAGE,49000.0,TX,770xx,USA,C,C2,Source Verified,131640.0,Individual,,,2024-04-03 22:38:...
c1f8efe817e25c654...,Service Writer,1 year,RENT,35000.0,NV,891xx,USA,A,A5,Source Verified,86798.0,Individual,,,2024-04-03 22:38:...
224fdb0bb81297cc0...,production,10+ years,OWN,70000.0,MI,482xx,USA,D,D4,Source Verified,88684.0,Individual,,,2024-04-03 22:38:...
a6508d7359eec16e3...,Assistant Guest S...,5 years,MORTGAGE,45000.0,MO,657xx,USA,C,C1,Source Verified,164709.0,Individual,,,2024-04-03 22:38:...
0b883d4be09797273...,Do owner,5 years,RENT,45000.0,CA,920xx,USA,E,E1,Source Verified,40159.0,Individual,,,2024-04-03 22:38:...
2829f2b31bb578009...,Owner/Operator,10+ years,MORTGAGE,80000.0,IL,616xx,USA,B,B4,Source Verified,299322.0,Individual,,,2024-04-03 22:38:...
27f2b90138e817f03...,International Tea...,2 years,RENT,110000.0,GA,303xx,USA,C,C5,Source Verified,131116.0,Individual,,,2024-04-03 22:38:...
10c3d0ef2db9cfcd2...,Operations,5 years,MORTGAGE,170000.0,MD,206xx,USA,C,C5,Not Verified,496642.0,Individual,,,2024-04-03 22:38:...


In [27]:
### Question 6 Convert emp_length to integer

In [28]:
spark.sql("select distinct emp_length from customers")

emp_length
9 years
5 years
""
1 year
2 years
7 years
8 years
4 years
6 years
3 years


In [29]:
from pyspark.sql.functions import col,regexp_replace

In [30]:
customers_df_emp_length=customers_df_filtered.withColumn("emp_length",regexp_replace("emp_length","(\D)",""))

In [31]:
customers_df_emp_length.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- joint_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [32]:
customers_df_emp_length_casted=customers_df_emp_length.withColumn("emp_length",col('emp_length').cast("integer"))

In [33]:
customers_df_emp_length_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- joint_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [34]:
customers_df_emp_length_casted.createOrReplaceTempView("customers")

In [35]:
customers_df_emp_length_casted.where("emp_length is null").count()

146903

In [38]:
from pyspark.sql.functions import avg,floor

In [39]:
avg_emp_length=customers_df_emp_length_casted.agg(floor(avg("emp_length"))).collect()[0][0]

In [40]:
print(avg_emp_length)

6


In [41]:
customers_df_emp_length_replaced=customers_df_emp_length_casted.fillna(avg_emp_length,subset=["emp_length"])

In [42]:
customers_df_emp_length_replaced

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,joint_annual_income,verification_status_joint,ingest_date
690af1677f546cf59...,Co/owner,10,MORTGAGE,160000.0,GA,302xx,USA,A,A3,Verified,344686.0,Individual,,,2024-04-03 22:39:...
b1211ae57397145ed...,Training Director,9,MORTGAGE,57500.0,MO,648xx,USA,C,C4,Not Verified,184900.0,Joint App,89500.0,Not Verified,2024-04-03 22:39:...
71e636c49c7ed313d...,Loan Processor,2,RENT,46500.0,AZ,850xx,USA,B,B1,Source Verified,71469.0,Individual,,,2024-04-03 22:39:...
0c0ec688cf90b4fa3...,Engineer,1,RENT,118000.0,CA,945xx,USA,A,A3,Not Verified,266485.0,Individual,,,2024-04-03 22:39:...
fef83a2d59cb8471b...,,6,OWN,40000.0,CA,948xx,USA,B,B5,Verified,57000.0,Individual,,,2024-04-03 22:39:...
036e51aa0c87cbf35...,Insurance,1,MORTGAGE,65000.0,CA,907xx,USA,B,B1,Not Verified,464344.0,Individual,,,2024-04-03 22:39:...
70034f7b62a3cd8aa...,Owner,10,MORTGAGE,50000.0,TX,787xx,USA,C,C1,Not Verified,228042.0,Individual,,,2024-04-03 22:39:...
95d7905b793de05a3...,Personal Consultant,1,MORTGAGE,4500.0,TX,779xx,USA,C,C3,Not Verified,566114.0,Joint App,362500.0,Not Verified,2024-04-03 22:39:...
7cf3069cb055aaf74...,CORRECTIONAL OFFI...,5,RENT,41769.24,TX,760xx,USA,B,B3,Not Verified,294063.0,Joint App,100437.24,Not Verified,2024-04-03 22:39:...
f13260d426266ce4e...,Social Worker,1,RENT,43000.0,NY,112xx,USA,B,B4,Not Verified,123038.0,Individual,,,2024-04-03 22:39:...


### Question 8: Clean the address state, it should be 2 characters only replace all others with NA

In [43]:
customers_df_emp_length_replaced.select("address_state").distinct()

address_state
Helping Kenya's D...
223xx
175 (total projec...
AZ
SC
"so Plan """"C"""" is ..."
I am 56 yrs. old ...
financially I mad...
but no one will l...
LA


In [46]:
from pyspark.sql.functions import when,length

In [47]:
customers_cleaned=customers_df_emp_length_replaced.withColumn("address_state",when(length(col('address_state'))>2,"NA").otherwise(col('address_state')))

In [48]:
customers_cleaned.select("address_state").distinct()

address_state
SC
AZ
LA
MN
NJ
DC
OR
""
VA
""


### Write the files in parquet format back to the disk

In [49]:
customers_cleaned.write.format("parquet").mode("overwrite").option("path","/user/itv007136/lendingclubproject/cleaned/customers_parquet").save()

In [51]:
customers_cleaned.write.format("csv").mode("overwrite").option("path","/user/itv007136/lendingclubproject/cleaned/customers_csv").save()