###### 1. dataframe with proper datatype 2.renaming few columns as per our convienience 3.Inserting ingestion date column as a current timestamp. 4.removing duplicate rows
###### 5. Remove rows where annual income is NULL 6.convert emp_length to INTEGER 6.Average of emp_length with NULLS 8.Only need 2 letters for address state remove the misc.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("LendingClubProject_DC") \
    .master("local[*]") \
    .getOrCreate()

In [2]:
spark

In [6]:
cust_raw_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("Lending_club_project/raw/customers_data_csv")

In [8]:
cust_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: double (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: string (nullable = true)
 |-- verification_status_joint: string (nullable = true)



##### Need schema with proper datatypes

In [18]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

cust_schema = StructType([
    StructField("member_id", StringType(), True),
    StructField("emp_title", StringType(), True),
    StructField("emp_length", StringType(), True),
    StructField("home_ownership", StringType(), True),
    StructField("annual_inc", FloatType(), True),
    StructField("addr_state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("grade", StringType(), True),
    StructField("sub_grade", StringType(), True),
    StructField("verification_status", StringType(), True),
    StructField("tot_hi_cred_lim", FloatType(), True),
    StructField("application_type", StringType(), True),
    StructField("annual_inc_joint", FloatType(), True),
    StructField("verification_status_joint", StringType(), True)
])


In [19]:
cust_raw_df = spark.read.format("csv").option("header","true").schema(cust_schema).load("Lending_club_project/raw/customers_data_csv")

In [20]:
cust_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: float (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- tot_hi_cred_lim: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- annual_inc_joint: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)



In [23]:
# Renaming the Columns
# Defining  a dictionary mapping old column names to new names
rename_map = {
    "annual_inc": "annual_income",
    "addr_state": "address_state",
    "zip_code":"address_zipcode",
    "country":"address_country",
    "tot_hi_cred_lim":"total_high_credit_limit",
    "annual_inc_joint":"join_annual_income"
}

# Apply renames cumulatively
renamed_df = cust_raw_df
for old_name, new_name in rename_map.items():
    renamed_df = renamed_df.withColumnRenamed(old_name, new_name)

renamed_df.show()

+--------------------+--------------------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+
|           member_id|           emp_title|emp_length|home_ownership|annual_income|address_state|address_zipcode|address_country|grade|sub_grade|verification_status|total_high_credit_limit|application_type|join_annual_income|verification_status_joint|
+--------------------+--------------------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+
|6d5091b3fcaaeb4ea...|             leadman| 10+ years|      MORTGAGE|      55000.0|           PA|          190xx|            USA|    C|       C4|       Not Verified|               178050.0|      Individual|              null|                   

In [24]:
# Adding the Current ingestion timestamp
from pyspark.sql.functions import current_timestamp

time_df = renamed_df.withColumn("ingest_date",current_timestamp())

In [26]:
time_df.show(1)

+--------------------+---------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|           member_id|emp_title|emp_length|home_ownership|annual_income|address_state|address_zipcode|address_country|grade|sub_grade|verification_status|total_high_credit_limit|application_type|join_annual_income|verification_status_joint|         ingest_date|
+--------------------+---------+----------+--------------+-------------+-------------+---------------+---------------+-----+---------+-------------------+-----------------------+----------------+------------------+-------------------------+--------------------+
|6d5091b3fcaaeb4ea...|  leadman| 10+ years|      MORTGAGE|      55000.0|           PA|          190xx|            USA|    C|       C4|       Not Verified|               178050.0|      Individual|              null|

In [27]:
time_df.count()

717922

In [31]:
time_df.distinct().count()

717912

In [32]:
unq_cust = time_df.distinct()

In [33]:
unq_cust.createOrReplaceTempView("customers")

In [34]:
spark.sql("select count(*) from customers").show()

+--------+
|count(1)|
+--------+
|  717912|
+--------+



In [39]:
spark.sql("select count(*) AS Null_income_values from customers where annual_income is NULL").show()

+------------------+
|Null_income_values|
+------------------+
|                 2|
+------------------+



In [36]:
customers_income_filtered = spark.sql("SELECT * FROM customers where annual_income is NOT NULL ")

In [38]:
customers_income_filtered.count()

717910

In [40]:
customers_income_filtered.createOrReplaceTempView("customers")

In [45]:
spark.sql("SELECT DISTINCT(emp_length) FROM customers").show()

+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|      null|
|    1 year|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [53]:
#Replace Nulls with avg of emp_length , and make it integer 
from pyspark.sql.functions import regexp_replace , col
emp_cleaned = customers_income_filtered.withColumn("emp_length", regexp_replace(col("emp_length"),"(\D)","").cast("int")) # If its not a digit it will be replaced with 
emp_cleaned.printSchema

<bound method DataFrame.printSchema of DataFrame[member_id: string, emp_title: string, emp_length: int, home_ownership: string, annual_income: float, address_state: string, address_zipcode: string, address_country: string, grade: string, sub_grade: string, verification_status: string, total_high_credit_limit: float, application_type: string, join_annual_income: float, verification_status_joint: string, ingest_date: timestamp]>

In [54]:
emp_cleaned.filter("emp_length IS NULL").count()

45932

In [55]:
emp_cleaned.createOrReplaceTempView("customers")

In [58]:
spark.sql("SELECT floor(AVG(emp_length)) as avg_emp_length FROM customers").show()

+--------------+
|avg_emp_length|
+--------------+
|             6|
+--------------+



In [59]:
#Collecting the average emp_length
avg_emp_length = spark.sql("SELECT floor(AVG(emp_length)) as avg_emp_length FROM customers").collect()

In [61]:
avg_emp_length

[Row(avg_emp_length=6)]

In [68]:
row = avg_emp_length[0]  # to access only the value we can also use avg_emp_length[0][0]
avg_emp_duration = row['avg_emp_length']

In [75]:
#Filling nulls with the average emp_length
cust_emp_length_fixed_df =emp_cleaned.fillna(avg_emp_duration,subset=['emp_length'])
cust_emp_length_fixed_df.createOrReplaceTempView("customers")
spark.sql("Select count(*) as null_salary from customers where emp_length IS NULL ").show()

+-----------+
|null_salary|
+-----------+
|          0|
+-----------+



In [78]:
spark.sql("select DISTINCT(address_state) FROM customers where length(address_state) > 2").show()

+------------------+
|     address_state|
+------------------+
|debt_consolidation|
+------------------+



In [89]:
from pyspark.sql.functions import when, col,length
cust_state_fixed=cust_emp_length_fixed_df.withColumn("address_state",when(length(col("address_state"))>2,"NA").otherwise(col("address_state"))
                                                     )
cust_state_fixed.select("address_state").filter(length("address_state")>2).show()

+-------------+
|address_state|
+-------------+
+-------------+



In [90]:
cust_state_fixed.write.format("parquet").mode("overwrite")\
.option("path","Lending_club_project/cleaned/customers_data_parquet").save()

In [91]:
cust_state_fixed.repartition(1).write.format("csv").mode("overwrite").option("header","True")\
.option("path","Lending_club_project/cleaned/csv/customers_data_csv").save()