#### Cleaning Customer Dataset - Plan of Action

1. **Initialize SparkSession**

2. **Define customer schema**

3. **Load raw customers CSV**
   - Set `header=True`
   - Apply predefined schema

4. **Inspect raw DataFrame**
   - Show sample rows
   - Print schema

5. **Rename columns for clarity**

6. **Add ingestion timestamp** (`ingest_date`)

7. **Deduplicate rows**

8. **Create temp view** `"customers"`

9. **Filter out rows with NULL** `annual_income`

10. **Clean `emp_length`**
    - Show distinct raw values
    - Remove non-digits via `regexp_replace`
    - Cast to `Integer`
    - Count resulting NULLs
    - Compute `floor(avg(emp_length))`
    - Fill NULL `emp_length` with average

11. **Clean `address_state`**
    - Inspect distinct state codes
    - Count codes where length > 2
    - Replace invalid codes (>2 chars) with `"NA"`

12. **Save final DataFrame**
    - Parquet format to `/cleaned/customers_parquet`
    - CSV format to `/cleaned/customers_csv`


####  1. SET UP SPARK SESSION 

In [1]:
# Build a SparkSession with Hive support on YARN

from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

#### 2. DEFINE AND LOAD CUSTOMERS DATA 

In [2]:
# Define schema for customers CSV 
customer_schema = 'member_id string, emp_title string, emp_length string, home_ownership string, annual_inc float, addr_state string, zip_code string, country string, grade string, sub_grade string, verification_status string, tot_hi_cred_lim float, application_type string, annual_inc_joint float, verification_status_joint string'

In [3]:
# Read the raw customers CSV into a DataFrame
customers_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(customer_schema) \
.load("/public/trendytech/lendingclubproject/raw/customers_data_csv")

In [4]:
# Peek at the loaded DataFrame
customers_raw_df.show(5)

+--------------------+--------------------+----------+--------------+----------+----------+--------+-------+-----+---------+-------------------+---------------+----------------+----------------+-------------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_inc|addr_state|zip_code|country|grade|sub_grade|verification_status|tot_hi_cred_lim|application_type|annual_inc_joint|verification_status_joint|
+--------------------+--------------------+----------+--------------+----------+----------+--------+-------+-----+---------+-------------------+---------------+----------------+----------------+-------------------------+
|b59d80da191f5b573...|                null|      null|          RENT|   50000.0|        OR|   973xx|    USA|    A|       A5|    Source Verified|         8600.0|      Individual|            null|                     null|
|202d9f56ecb7c3bc9...|      police officer|   7 years|           OWN|   85000.0|        TX|   799xx|    USA|    A|  

In [5]:
# verify the schema
customers_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



#### 3. RENAME COLUMNS FOR CLARITY

In [6]:
customer_df_renamed = customers_raw_df.withColumnRenamed("annual_inc", "annual_income") \
.withColumnRenamed("addr_state", "address_state") \
.withColumnRenamed("zip_code", "address_zipcode") \
.withColumnRenamed("country", "address_country") \
.withColumnRenamed("tot_hi_credit_lim", "total_high_credit_limit") \
.withColumnRenamed("annual_inc_joint", "join_annual_income")

In [7]:
customer_df_renamed.show(2)

+--------------------+--------------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+---------------+----------------+------------------+-------------------------+
|           member_id|     emp_title|emp_length|home_ownership|annual_income|address_state|address_zipcode|address_country|grade|sub_grade|verification_status|tot_hi_cred_lim|application_type|join_annual_income|verification_status_joint|
+--------------------+--------------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+---------------+----------------+------------------+-------------------------+
|b59d80da191f5b573...|          null|      null|          RENT|      50000.0|           OR|          973xx|            USA|    A|       A5|    Source Verified|         8600.0|      Individual|              null|                     null|
|202d9f56ecb7c3bc9...|police officer|   7 years|

#### 4. ADD INGESTION TIMESTAMP

In [8]:
from pyspark.sql.functions import current_timestamp

# Tag each row with the timestamp when we ingested it
customers_df_ingestd = customer_df_renamed.withColumn("ingest_date", current_timestamp())
customers_df_ingestd.show(5)

+--------------------+--------------------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+---------------+----------------+------------------+-------------------------+--------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|address_zipcode|address_country|grade|sub_grade|verification_status|tot_hi_cred_lim|application_type|join_annual_income|verification_status_joint|         ingest_date|
+--------------------+--------------------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+---------------+----------------+------------------+-------------------------+--------------------+
|b59d80da191f5b573...|                null|      null|          RENT|      50000.0|           OR|          973xx|            USA|    A|       A5|    Source Verified|         8600.0|      Individual|       

In [9]:
# Quick counts before deduplication
print("Total rows before dedup:", customers_df_ingestd.count())

Total rows before dedup: 2260701


#### 5. DEDUPLICATE

In [10]:
customers_distinct = customers_df_ingestd.distinct()
print("Total rows after dedup:", customers_distinct.count())

Total rows after dedup: 2260638


In [11]:
# Make a SQL view for the next steps
customers_distinct.createOrReplaceTempView("customers")

#### 6. FILTER OUT MISSING ANNUAL INCOME

In [12]:
# How many have null annual_income?
spark.sql("select count(*) from customers where annual_income is null").show()

+--------+
|count(1)|
+--------+
|       5|
+--------+



In [13]:
# Keep only those with a valid annual_income
customers_income_filtered = spark.sql("""
    select * from customers
    where annual_income is not null
""")

In [14]:
customers_income_filtered.createOrReplaceTempView("customers")

In [15]:
spark.sql('select * from customers limit 5')

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
c4859e0784796249c...,Maintenance,5 years,MORTGAGE,35000.0,WA,985xx,USA,C,C3,Verified,100738.0,Individual,,,2025-04-29 05:58:...
7b1944fca4bfea51d...,Assistant Store M...,8 years,MORTGAGE,59000.0,AZ,857xx,USA,D,D4,Source Verified,81431.0,Individual,,,2025-04-29 05:58:...
2f8508959b2c820c7...,PURCHASING AGENT,10+ years,MORTGAGE,58000.0,FL,321xx,USA,B,B3,Source Verified,101925.0,Individual,,,2025-04-29 05:58:...
94f56a6900da95ab7...,,,OWN,13842.0,OR,971xx,USA,B,B4,Verified,13100.0,Individual,,,2025-04-29 05:58:...
c8ece305f5d43e4fd...,Legal Assistant,7 years,RENT,57000.0,FL,331xx,USA,D,D4,Verified,37114.0,Individual,,,2025-04-29 05:58:...


#### 7. CLEAN EMPLOYMENT LENGTH

In [16]:
# See what values exist
spark.sql("select distinct(emp_length) from customers").show()

+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|      null|
|    1 year|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [17]:
from pyspark.sql.functions import regexp_replace, col

# Strip out any non-digits (e.g. '10+ years' → '10')
customers_emplength_cleaned = customers_income_filtered.withColumn(
    "emp_length",
    regexp_replace(col("emp_length"), "\D", ""))

In [18]:
# Cast the cleaned string to integer
customers_emplength_casted = customers_emplength_cleaned.withColumn(
    "emp_length",
    customers_emplength_cleaned.emp_length.cast("int")
)

In [19]:
# Check how many ended up null after cast
print("Null emp_length after cast:", 
      customers_emplength_casted.filter("emp_length is null").count())


Null emp_length after cast: 146903


In [20]:
# Make a SQL table for the next steps
customers_emplength_casted.createOrReplaceTempView("customers")

In [21]:
# Compute the average (floored) and fill nulls
avg_emp_length = spark.sql("select floor(avg(emp_length)) as avg_emp_length from customers") \
                     .collect()[0][0]

In [22]:
customers_emplength_replaced = customers_emplength_casted.na.fill(
    avg_emp_length, subset=["emp_length"]
)

In [23]:
customers_emplength_replaced.createOrReplaceTempView("customers")
spark.sql('select * from customers limit 5')

member_id,emp_title,emp_length,home_ownership,annual_income,address_state,address_zipcode,address_country,grade,sub_grade,verification_status,tot_hi_cred_lim,application_type,join_annual_income,verification_status_joint,ingest_date
15488f69316884648...,Project Manager,4,RENT,105000.0,CT,067xx,USA,C,C4,Not Verified,195676.0,Individual,,,2025-04-29 05:59:...
0cf35c04342ac1525...,F&B Event Manager,2,RENT,45000.0,HI,968xx,USA,C,C5,Not Verified,33090.0,Individual,,,2025-04-29 05:59:...
a77753d35eeb079e1...,senior digital sp...,6,MORTGAGE,52000.0,MD,212xx,USA,B,B4,Not Verified,40703.0,Individual,,,2025-04-29 05:59:...
f4d2acc9dc6faf0a1...,kitchen designer,10,MORTGAGE,40000.0,FL,349xx,USA,A,A5,Not Verified,100511.0,Individual,,,2025-04-29 05:59:...
475d66adaa01cfa83...,Administrator,1,OWN,25000.0,NY,105xx,USA,A,A2,Not Verified,36551.0,Individual,,,2025-04-29 05:59:...


In [24]:
# checking any null values in emp_length column
customers_emplength_replaced.filter('emp_length is null').count()

0

#### 8. CLEAN ADDRESS STATE CODES

In [25]:
# Inspect distinct state codes and length issues
spark.sql("select distinct(address_state) from customers").show()

+--------------------+
|       address_state|
+--------------------+
|Helping Kenya's D...|
|175 (total projec...|
|               223xx|
|                  AZ|
|                  SC|
|I am 56 yrs. old ...|
|so Plan ""C"" is ...|
|financially I mad...|
|but no one will l...|
|                  LA|
|         etc.  First|
|                  MN|
|               850xx|
|yet Capital One n...|
|               499xx|
|Advocate business...|
|Eliminating Credi...|
|and MBA's are ove...|
|               951xx|
|I the credit card...|
+--------------------+
only showing top 20 rows



In [26]:
spark.sql("select count(*) from customers where length(address_state) > 2").show()

+--------+
|count(1)|
+--------+
|     254|
+--------+



In [27]:
from pyspark.sql.functions import when, length

# Replace any code longer than 2 chars with 'NA'
customers_state_cleaned = customers_emplength_replaced.withColumn(
    "address_state",
    when(length(col("address_state")) > 2, "NA")
     .otherwise(col("address_state"))
)

In [28]:
customers_state_cleaned.select("address_state").distinct()

address_state
AZ
SC
LA
MN
NJ
DC
OR
""
VA
""


#### 9. SAVE CLEANED DATA

In [29]:
# Write out parquet (efficient for downstream processing)
customers_state_cleaned.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "/user/itv017499/lendingclubproject/cleaned/customers_parquet") \
    .save()