In [7]:
from pyspark.sql import SparkSession 
import getpass
username = getpass.getuser ()
spark = SparkSession.\
    builder. \
    config('spark-ui.port', '0'). \
    config('spark-shuffle.use0ldFetchProtocol', 'true'). \
    config ("spark.sql warehouse.dir", f"/user/(username)/warehouse"). \
    enableHiveSupport(). \
    master ('yarn'). \
    getOrCreate()


#### Creating a dataframe


In [8]:
customers_schema = 'member_id string , emp_title string, emp_length string, home_ownership string, annual_inc float, addr_state string, zip_code string, country string, grade string, sub_grade string, verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float , verification_status_joint string'

In [9]:
customers_raw_df = spark.read \
.format("csv") \
.option("header","True") \
.schema(customers_schema) \
.option("inferSchema","True") \
.load("/public/trendytech/lendingclubproject/raw/customers_data_csv")

In [10]:
customers_raw_df

member_id,emp_title,emp_length,home_ownership,annual_inc,addr_state,zip_code,country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,annual_inc_joint,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


In [11]:
customers_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



####RENAME COLUMNS

In [12]:
customers_raw_renamed = customers_raw_df.withColumnRenamed("annual_inc","annual_income") \
.withColumnRenamed("addr_state","address_state") \
.withColumnRenamed("zip_code","address_zipcode") \
.withColumnRenamed("country","address_country") \
.withColumnRenamed("tot_hi_cred_lim","total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint","join_annual_income") 

In [13]:
customers_raw_renamed

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,


#### add a new column

In [14]:
from pyspark.sql.functions import current_timestamp

In [15]:
customers_df_ingested = customers_raw_renamed.withColumn("ingest_date",current_timestamp())

In [16]:
customers_df_ingested

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
b59d80da191f5b573...,,,RENT,50000.0,OR,973xx,USA,A,A5,Source Verified,8600.0,Individual,,,2024-02-19 06:39:...
202d9f56ecb7c3bc9...,police officer,7 years,OWN,85000.0,TX,799xx,USA,A,A5,Source Verified,272384.0,Individual,,,2024-02-19 06:39:...
e5a140c0922b554b9...,community living ...,6 years,RENT,48000.0,NY,146xx,USA,B,B2,Source Verified,85092.0,Individual,,,2024-02-19 06:39:...
e12aefc548f750777...,Office,10+ years,OWN,33000.0,CT,067xx,USA,F,F1,Verified,7100.0,Individual,,,2024-02-19 06:39:...
1b3a50d854fbbf97e...,Special Tooling I...,10+ years,MORTGAGE,81000.0,TX,791xx,USA,E,E5,Verified,190274.0,Individual,,,2024-02-19 06:39:...
1c4329e5f17697127...,Mine ops tech 6,2 years,MORTGAGE,68000.0,AZ,855xx,USA,C,C3,Not Verified,182453.0,Individual,,,2024-02-19 06:39:...
5026c86ad983175eb...,caregiver,4 years,RENT,76020.0,WA,993xx,USA,C,C2,Source Verified,15308.0,Individual,,,2024-02-19 06:39:...
9847d8c1e9d0b2084...,,,OWN,65000.0,IL,624xx,USA,E,E3,Verified,128800.0,Individual,,,2024-02-19 06:39:...
8340dbe1adea41fb4...,Vice President Re...,8 years,MORTGAGE,111000.0,CT,063xx,USA,A,A1,Not Verified,343507.0,Individual,,,2024-02-19 06:39:...
d4de0de3ab7d79ad4...,FOREMAN,10+ years,MORTGAGE,67000.0,WA,992xx,USA,G,G2,Verified,211501.0,Individual,,,2024-02-19 06:39:...


In [17]:
customers_df_ingested.count()

2260701

In [18]:
customers_distinct = customers_df_ingested.distinct()

In [19]:
customers_distinct.count()

2260638

In [20]:
customers_distinct.createOrReplaceTempView("customers")

In [21]:
spark.sql("select * from customers")

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
4591211237ab1fe81...,Director,1 year,RENT,160000.0,NY,100xx,USA,B,B4,Not Verified,57400.0,Individual,,,2024-02-19 06:40:...
5607fe14c7c596bca...,Treasury Manager,2 years,MORTGAGE,67100.0,LA,705xx,USA,A,A1,Not Verified,249350.0,Individual,,,2024-02-19 06:40:...
202714291ec771c57...,Software Engineer...,4 years,RENT,86058.0,KY,405xx,USA,A,A5,Source Verified,30700.0,Individual,,,2024-02-19 06:40:...
9bb7a085f039e3584...,Coach and Teacher,6 years,RENT,48000.0,TX,755xx,USA,C,C1,Source Verified,93380.0,Individual,,,2024-02-19 06:40:...
bcd6b23ef6af70379...,Customer Service,2 years,RENT,25324.8,NE,685xx,USA,F,F5,Verified,33000.0,Individual,,,2024-02-19 06:40:...
74f4e22ac220be66e...,,,MORTGAGE,40632.0,NV,890xx,USA,A,A2,Not Verified,329525.0,Individual,,,2024-02-19 06:40:...
635aded968f324605...,Behavior Interven...,1 year,OWN,20000.0,CA,917xx,USA,C,C1,Source Verified,9900.0,Individual,,,2024-02-19 06:40:...
29382e0a9253cafd7...,Assistant Manager,8 years,MORTGAGE,77000.0,FL,330xx,USA,B,B4,Source Verified,178277.0,Individual,,,2024-02-19 06:40:...
9eb9bc2ee637b3441...,Field operations,3 years,MORTGAGE,72000.0,AL,357xx,USA,C,C2,Source Verified,34600.0,Individual,,,2024-02-19 06:40:...
60c7b2ea84a17218b...,Supervisor,3 years,MORTGAGE,64000.0,CA,917xx,USA,C,C1,Not Verified,434959.0,Individual,,,2024-02-19 06:40:...


In [22]:
spark.sql("select count(*) from customers where annual_income is null")

count(1)
5


In [23]:
customers_income_filtered = spark.sql("select * from customers where annual_income is not null")

In [24]:
customers_income_filtered.createOrReplaceTempView("customers")

In [25]:
spark.sql("select distinct (emp_length) from customers").show()

+----------+
|emp_length|
+----------+
|   9 years|
|   5 years|
|      null|
|    1 year|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [26]:
from pyspark.sql.functions import regexp_replace,col

In [27]:
customers_emplength_cleaned = customers_income_filtered.withColumn("emp_length",regexp_replace(col("emp_length"),"(\D)",""))

In [28]:
customers_emplength_cleaned

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
47f1d894253e49028...,Vendor Management...,1.0,RENT,38000.0,GA,303xx,USA,D,D5,Verified,51040.0,Individual,,,2024-02-19 06:40:...
1401e244d62c3f7ad...,Customer Service ...,1.0,RENT,32000.0,VA,223xx,USA,E,E4,Source Verified,47676.0,Individual,,,2024-02-19 06:40:...
2e893fd171ce59df3...,Senior Underwriter,3.0,MORTGAGE,115000.0,WA,981xx,USA,C,C1,Verified,1346864.0,Individual,,,2024-02-19 06:40:...
f8b45c11dcac74a60...,Pharmacy technician,2.0,RENT,24000.0,CT,061xx,USA,C,C5,Source Verified,45350.0,Individual,,,2024-02-19 06:40:...
68b3e17a15db33079...,facilities,5.0,OWN,60000.0,MI,486xx,USA,B,B5,Verified,44700.0,Individual,,,2024-02-19 06:40:...
38f4563bf310120d1...,Field Supervisor,10.0,RENT,52000.0,KY,412xx,USA,D,D5,Source Verified,42499.0,Individual,,,2024-02-19 06:40:...
81e31927252bb7f63...,Infants Teacher,4.0,RENT,19500.0,PA,184xx,USA,D,D4,Verified,25600.0,Joint App,79500.0,Verified,2024-02-19 06:40:...
d2f3047232a3ffacb...,Senior Payroll Sp...,10.0,RENT,64709.0,MD,217xx,USA,C,C2,Source Verified,65615.0,Individual,,,2024-02-19 06:40:...
e2d535b3233726ef5...,,,OWN,30696.0,NY,104xx,USA,A,A4,Not Verified,70617.0,Individual,,,2024-02-19 06:40:...
61f07d32460bd41eb...,Structual Enginee...,10.0,MORTGAGE,83471.0,IN,467xx,USA,C,C4,Not Verified,218934.0,Individual,,,2024-02-19 06:40:...


In [29]:
customers_emplength_cleaned.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [30]:
customers_emplength_casted = customers_emplength_cleaned.withColumn("emp_length",customers_emplength_cleaned.emp_length.cast('int'))

In [31]:
customers_emplength_casted

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,total_high_credit_limit,application_type,join_annual_income,verification_status_joint,ingest_date
3a02ff0f7e8bc5de2...,Manager,6.0,RENT,43000.0,CA,926xx,USA,C,C3,Not Verified,48495.0,Individual,,,2024-02-19 06:41:...
0d5ff84859788a028...,Supply Chain Manager,7.0,MORTGAGE,82000.0,TX,786xx,USA,B,B2,Not Verified,288520.0,Individual,,,2024-02-19 06:41:...
82c015a2d80c824b6...,physical therapis...,7.0,MORTGAGE,66500.0,WA,982xx,USA,B,B3,Not Verified,259926.0,Individual,,,2024-02-19 06:41:...
9e5c868073dee96c1...,Salon Manager,1.0,RENT,50000.0,AR,724xx,USA,B,B1,Source Verified,99445.0,Individual,,,2024-02-19 06:41:...
f9e5f70b0280a894c...,,,RENT,28000.0,SC,294xx,USA,C,C3,Verified,12940.0,Individual,,,2024-02-19 06:41:...
396e2d77a876e9f06...,intake coordinator,1.0,RENT,38340.0,UT,840xx,USA,B,B2,Not Verified,33328.0,Individual,,,2024-02-19 06:41:...
d9af6aba979703a44...,Automation Engineer,2.0,MORTGAGE,60000.0,OH,452xx,USA,A,A5,Verified,235000.0,Individual,,,2024-02-19 06:41:...
98153da9f9e254bbe...,Administrative As...,10.0,MORTGAGE,80000.0,TX,786xx,USA,D,D1,Verified,249317.0,Individual,,,2024-02-19 06:41:...
9deb2919b478ab578...,Tech Support Rep,6.0,MORTGAGE,41000.0,TX,799xx,USA,C,C1,Not Verified,107615.0,Individual,,,2024-02-19 06:41:...
867d45f628805225a...,Director of Insti...,1.0,MORTGAGE,86000.0,CT,060xx,USA,A,A5,Verified,231591.0,Individual,,,2024-02-19 06:41:...


In [32]:
customers_emplength_casted.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_annual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = false)



In [33]:
customers_emplength_casted.filter("emp_length is null ").count()

146903

In [34]:
customers_emplength_casted.createOrReplaceTempView("customers")

In [35]:
spark.sql("select avg(emp_length) as avg_emp_length from customers")

avg_emp_length
6.021258628112389


In [36]:
avg_emp_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers").collect()

In [37]:
print(avg_emp_length )

[Row(avg_emp_length=6)]


In [38]:
avg_emp_duration = print( avg_emp_length)

[Row(avg_emp_length=6)]
