### A notebook for the Data Engineering Course Project - Milestone 3

#### In this notebook, we will be experimenting with spark to manipulate data and perform some basic operations on it.

##### Imports and setup

In [1]:
from pyspark.sql import functions as fn , SparkSession, Window, DataFrame, types
from datetime import date,timedelta
from random import uniform
import psutil

In [2]:
spark = SparkSession.builder.appName("Milestone3").getOrCreate()
sc = spark.sparkContext

24/11/22 00:36:40 WARN Utils: Your hostname, Omars-MacBook-Pro-4.local resolves to a loopback address: 127.0.0.1; using 192.168.107.154 instead (on interface en0)
24/11/22 00:36:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/22 00:36:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/22 00:36:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
DATA_PATH = "./data/fintech_data_17_52_4509.parquet"
CLEANED_DATA_PATH = "./data/fintech_spark_52_4509.parquet"
LOOKUP_PATH = "./data/lookup_spark_52_4509.parquet"

- Create Lookup dataframe

In [4]:
lookup_table = spark.createDataFrame([], schema=types.StructType([
    types.StructField("Column", types.StringType(), False),
    types.StructField("Original", types.StringType(), False),
    types.StructField("Encoded", types.StringType(), False),
    types.StructField("Type", types.StringType(), False)
]))

##### Part 1: Loading the data

- Load the dataset

In [5]:
fintech_df = spark.read.parquet(DATA_PATH)

                                                                                

- preview the first 20 rows of the data

In [6]:
fintech_df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YiJaJVx4YTNceGZiS...|                NU

- Display the number of partitions the data is split into

In [7]:
print(f"Number of partitions: {fintech_df.rdd.getNumPartitions()}")

Number of partitions: 1


- Change partitioning to match the number of cores in the machine

In [8]:
logical_cores = psutil.cpu_count(logical=True)  
physical_cores = psutil.cpu_count(logical=False)

print(f"Logical cores: {logical_cores}")
print(f"Physical cores: {physical_cores}")

Logical cores: 8
Physical cores: 8


> NB: Apple silicon ships number of logical cores matches the number of physical cores.
<br>
<br>
Explanation : After some digging i found that hyperthreading is actually a flaw that intel turned into a marketing item quoted by the random guy on some internet forum i found so basically due to the higher complexity of the ISA of the x86 architecture, the CPU has to do more complex decoding so having multiple threads on the same core would help in keeping the CPU busy while waiting for the data to be fetched from the memory. Which is not the case with ARM architecture which is simpler and more efficient. So, although hyperthreading can still be added the cpu is able to keep itself(mainly ALU units) busy without it.

In [9]:
fintech_df = fintech_df.repartition(logical_cores)

print(f"Number of partitions after repartition: {fintech_df.rdd.getNumPartitions()}")

Number of partitions after repartition: 8


##### Part 2: Cleaning the data

- Rename the columns

In [10]:
fintech_df = fintech_df.toDF(*[c.lower().replace(" ", "_") for c in fintech_df.columns])

fintech_df.schema.names


['customer_id',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'annual_inc_joint',
 'verification_status',
 'zip_code',
 'addr_state',
 'avg_cur_bal',
 'tot_cur_bal',
 'loan_id',
 'loan_status',
 'loan_amount',
 'state',
 'funded_amount',
 'term',
 'int_rate',
 'grade',
 'issue_date',
 'pymnt_plan',
 'type',
 'purpose',
 'description']

- Detect Missing Values

In [11]:
def missing_values(df: DataFrame) -> dict[str, float]:
    missing = {}
    total_rows = df.count() 
    for col in df.columns:
        missing_count = df.filter(fn.col(col).isNull()).count()
        missing_percentage = missing_count / total_rows * 100
        if missing_percentage > 0:
            missing[col] = missing_percentage
    missing = dict(sorted(missing.items(), key=lambda item: item[1], reverse=True))
    return missing

In [12]:
missing = missing_values(fintech_df)
missing

{'annual_inc_joint': 92.89678135405104,
 'emp_title': 8.812430632630411,
 'emp_length': 7.081021087680354,
 'int_rate': 4.62819089900111,
 'description': 0.8657047724750279}

- Handle Missing Values

In [13]:
for col in missing.keys():
    print(f"{col}: {fintech_df.schema[col].dataType}")


annual_inc_joint: DoubleType()
emp_title: StringType()
emp_length: StringType()
int_rate: DoubleType()
description: StringType()


In [14]:
def fill_missing(df: DataFrame, missing: dict[str, float]) -> DataFrame:
    mode_df = df.dropna()
    for col in missing.keys():
        if mode_df.schema[col].dataType == types.StringType():
            mode = mode_df.groupBy(col).count().orderBy(fn.desc("count")).first()[col]
            print(f"Mode for column {col} is {mode} and filled missing values with it")
            df = df.fillna(mode, subset=[col])
        else:
            df = df.fillna(0, subset=[col])
            print(f"Filled missing values in column {col} with 0")
    return df

In [15]:
def get_missing_count(df: DataFrame, missing_columns: list[str]) -> DataFrame:
    num_missing = 0
    for col in missing_columns:
        num_missing += df.filter(fn.col(col).isNull()).count()
    return num_missing

In [16]:
get_missing_count(fintech_df, missing.keys())

30891

In [17]:
fintech_df = fill_missing(fintech_df, missing)

Filled missing values in column annual_inc_joint with 0
Mode for column emp_title is Teacher and filled missing values with it
Mode for column emp_length is 10+ years and filled missing values with it
Filled missing values in column int_rate with 0
Mode for column description is Debt consolidation and filled missing values with it


- Check the missing values have been handled

In [18]:
get_missing_count(fintech_df, missing.keys())

0

##### Part 3: Encoding the data

- Change `emp_length` to numerical

In [19]:
fintech_df.select("emp_length").distinct().show()

+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|    1 year|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



replace years and year with empty string , replace < 1 with 0.5 and 10+ with 11 and convert to float

In [20]:
fintech_df = fintech_df.withColumn("emp_length", fn.regexp_replace("emp_length", " years?|year", ""))
fintech_df = fintech_df.withColumn("emp_length", fn.regexp_replace("emp_length", "< 1", "0.5"))
fintech_df = fintech_df.withColumn("emp_length", fn.regexp_replace("emp_length", "10\\+", "11"))
fintech_df = fintech_df.withColumn("emp_length", fintech_df["emp_length"].cast(types.FloatType()))
fintech_df.select("emp_length").distinct().show()

+----------+
|emp_length|
+----------+
|       9.0|
|       5.0|
|      11.0|
|       7.0|
|       2.0|
|       3.0|
|       0.5|
|       1.0|
|       6.0|
|       8.0|
|       4.0|
+----------+



- One Hot Encoding for `home_ownership` , `verification_status` , `type`  and and update the lookup table

In [21]:
def one_hot_encoding(df: DataFrame, column_name: str, lookup_table: DataFrame) -> DataFrame:
    distinct_values = df.select(column_name).distinct().rdd.flatMap(lambda x: [str(value).lower().replace(" ", "_") for value in x]).collect()
    new_rows = []
    for value in distinct_values:
        new_column_name = f"{column_name}_{value}"
        df = df.withColumn(new_column_name, (fn.col(column_name) == value).cast(types.IntegerType()))
        new_rows.append((column_name, value, new_column_name, "one-hot"))
    #  append to lookup table
    new_df = spark.createDataFrame(new_rows, schema=lookup_table.schema)
    lookup_table = lookup_table.union(new_df)
    return df, lookup_table

In [22]:
one_hot_encoding_columns = ["home_ownership", "verification_status", "type"]
for column in one_hot_encoding_columns:
    fintech_df, lookup_table = one_hot_encoding(fintech_df, column, lookup_table)

                                                                                

In [23]:
fintech_df.show(5)

+--------------------+------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+
|         customer_id|         emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_status_verified|verification_statu

24/11/22 00:36:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [24]:
lookup_table.show()

+-------------------+---------------+--------------------+-------+
|             Column|       Original|             Encoded|   Type|
+-------------------+---------------+--------------------+-------+
|     home_ownership|            own|  home_ownership_own|one-hot|
|     home_ownership|           rent| home_ownership_rent|one-hot|
|     home_ownership|       mortgage|home_ownership_mo...|one-hot|
|     home_ownership|            any|  home_ownership_any|one-hot|
|verification_status|       verified|verification_stat...|one-hot|
|verification_status|source_verified|verification_stat...|one-hot|
|verification_status|   not_verified|verification_stat...|one-hot|
|               type|      joint_app|      type_joint_app|one-hot|
|               type|     individual|     type_individual|one-hot|
|               type|     direct_pay|     type_direct_pay|one-hot|
|               type|          joint|          type_joint|one-hot|
|               type|     individual|     type_individual|one-

- Label Encoding for `state` and `purpose` and update the lookup table

In [25]:
from pyspark.ml.feature import StringIndexer

def label_encoding(df: DataFrame, column_name: str, lookup_table: DataFrame) -> DataFrame:
    # Initialize StringIndexer to encode the specified column
    indexer = StringIndexer(inputCol=column_name, outputCol=f"{column_name}_encoded")
    model = indexer.fit(df)
    df = model.transform(df)
    
    labels = model.labels
    encoded_values = list(map(str, range(len(labels))))
    
    new_rows = [(column_name, original, encoded, "Label") for original, encoded in zip(labels, encoded_values)]
    new_df = spark.createDataFrame(new_rows, schema=lookup_table.schema)

    lookup_table = lookup_table.union(new_df).distinct()

    return df, lookup_table

In [26]:
label_encoding_columns = ["state", "purpose"]
for column in label_encoding_columns:
    fintech_df, lookup_table = label_encoding(fintech_df, column, lookup_table)

fintech_df.show(5)

+--------------------+------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+
|         customer_id|         emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_stat

In [27]:
lookup_table.show()



+-------------------+---------------+--------------------+-------+
|             Column|       Original|             Encoded|   Type|
+-------------------+---------------+--------------------+-------+
|     home_ownership|            own|  home_ownership_own|one-hot|
|     home_ownership|           rent| home_ownership_rent|one-hot|
|     home_ownership|       mortgage|home_ownership_mo...|one-hot|
|     home_ownership|            any|  home_ownership_any|one-hot|
|verification_status|       verified|verification_stat...|one-hot|
|verification_status|source_verified|verification_stat...|one-hot|
|verification_status|   not_verified|verification_stat...|one-hot|
|               type|      joint_app|      type_joint_app|one-hot|
|               type|     individual|     type_individual|one-hot|
|               type|     direct_pay|     type_direct_pay|one-hot|
|               type|          joint|          type_joint|one-hot|
|              state|             NY|                   2|  La

                                                                                

- Discretize the grade to be a letter grade

In [28]:
fintech_df.createOrReplaceTempView("grades_table")

query = """
SELECT *,
    CASE 
        WHEN grade BETWEEN 1 AND 5 THEN 'A'
        WHEN grade BETWEEN 6 AND 10 THEN 'B'
        WHEN grade BETWEEN 11 AND 15 THEN 'C'
        WHEN grade BETWEEN 16 AND 20 THEN 'D'
        WHEN grade BETWEEN 21 AND 25 THEN 'E'
        WHEN grade BETWEEN 26 AND 30 THEN 'F'
        WHEN grade BETWEEN 31 AND 35 THEN 'G'
        ELSE NULL
    END AS grade_letter
FROM grades_table
"""

fintech_df = spark.sql(query)

In [29]:
fintech_df.show(5)

+--------------------+------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+
|         customer_id|         emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|veri

drop grade column

In [30]:
fintech_df = fintech_df.drop("grade")
fintech_df.show(5)

+--------------------+------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+
|         customer_id|         emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|      issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|verification_sta

##### Part 4: Feature Engineering

In [31]:
print(fintech_df.schema["issue_date"].dataType)

StringType()


- Parse the issue date column to date type

In [32]:
fintech_df = fintech_df.withColumn("issue_date", fn.to_date("issue_date", "dd MMMM yyyy"))

In [33]:
fintech_df.select("issue_date").show(5)

+----------+
|issue_date|
+----------+
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
+----------+
only showing top 5 rows



- Previous loan issue date from the same grade letter

In [34]:
window = Window.partitionBy("grade_letter").orderBy("issue_date")

lag_fn = fn.lag("issue_date", 1).over(window)

fintech_df = fintech_df.withColumn("prev_issue_date_grade", lag_fn)

fintech_df.show(5)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+----------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+---------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_ownership_any|ve

In [35]:
fintech_df.select("grade_letter", "issue_date", "prev_issue_date_grade").orderBy("grade_letter", "issue_date").show(20)

+------------+----------+---------------------+
|grade_letter|issue_date|prev_issue_date_grade|
+------------+----------+---------------------+
|           A|2012-09-12|                 NULL|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-09-12|           2012-09-12|
|           A|2012-10-12|           2012

- Previous loan issue amount from the same grade letter

In [36]:
window = Window.partitionBy("grade_letter").orderBy("loan_amount")

lag_fn = fn.lag("loan_amount", 1).over(window)

fintech_df = fintech_df.withColumn("prev_loan_amount_grade", lag_fn)

fintech_df.show(5)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+----------+----------+----------+------------------+------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+---------------------+----------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|issue_date|pymnt_plan|      type|           purpose|       description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|ho

In [37]:
fintech_df.select("grade_letter", "loan_amount", "prev_loan_amount_grade").orderBy("grade_letter", "loan_amount").show(20)

+------------+-----------+----------------------+
|grade_letter|loan_amount|prev_loan_amount_grade|
+------------+-----------+----------------------+
|           A|     1000.0|                  NULL|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1000.0|                1000.0|
|           A|     1100.0|                1000.0|
|           A|     1125.0|                1100.0|
|           A|     1200.0|                1125.0|
|           A|     1200.0|                1200.0|
|           A|     1200.0|                1200.0|
|           A|     1200.0|                1200.0|
|           A|     1200.0|                1200.0|


- Previous loan date from the same state and grade combined

In [38]:
window = Window.partitionBy("state", "grade_letter").orderBy("issue_date")

lag_fn = fn.lag("issue_date", 1).over(window)

fintech_df = fintech_df.withColumn("prev_issue_date_state_grade", lag_fn)

fintech_df.show(5)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+----------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+---------------------+----------------------+---------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_own|home_ownership

In [39]:
fintech_df.select("state", "grade_letter", "issue_date", "prev_issue_date_state_grade").orderBy("state", "grade_letter", "issue_date").show(20)

+-----+------------+----------+---------------------------+
|state|grade_letter|issue_date|prev_issue_date_state_grade|
+-----+------------+----------+---------------------------+
|   AK|           A|2012-09-12|                       NULL|
|   AK|           A|2013-12-13|                 2012-09-12|
|   AK|           A|2015-06-15|                 2013-12-13|
|   AK|           A|2016-01-16|                 2015-06-15|
|   AK|           A|2016-03-16|                 2016-01-16|
|   AK|           A|2016-04-16|                 2016-03-16|
|   AK|           A|2018-04-18|                 2016-04-16|
|   AK|           A|2018-05-18|                 2018-04-18|
|   AK|           A|2018-06-18|                 2018-05-18|
|   AK|           A|2018-07-18|                 2018-06-18|
|   AK|           A|2018-10-18|                 2018-07-18|
|   AK|           A|2019-05-19|                 2018-10-18|
|   AK|           A|2019-06-19|                 2019-05-19|
|   AK|           B|2013-06-13|         

- Previous loan amount from the same state and grade combined

In [40]:
window = Window.partitionBy("state", "grade_letter").orderBy("loan_amount")

lag_fn = fn.lag("loan_amount", 1).over(window)

fintech_df = fintech_df.withColumn("prev_loan_amount_state_grade", lag_fn)

fintech_df.show(5)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+----------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+---------------------+----------------------+---------------------------+----------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|issue_date|pymnt_plan|      type|           purpose|         description|home

In [41]:
fintech_df.select("state", "grade_letter", "loan_amount", "prev_loan_amount_state_grade").orderBy("state", "grade_letter", "loan_amount").show()

+-----+------------+-----------+----------------------------+
|state|grade_letter|loan_amount|prev_loan_amount_state_grade|
+-----+------------+-----------+----------------------------+
|   AK|           A|     3000.0|                        NULL|
|   AK|           A|     7000.0|                      3000.0|
|   AK|           A|     8000.0|                      7000.0|
|   AK|           A|     8000.0|                      8000.0|
|   AK|           A|    12750.0|                      8000.0|
|   AK|           A|    15000.0|                     12750.0|
|   AK|           A|    16000.0|                     15000.0|
|   AK|           A|    18000.0|                     16000.0|
|   AK|           A|    18000.0|                     18000.0|
|   AK|           A|    20000.0|                     18000.0|
|   AK|           A|    24000.0|                     20000.0|
|   AK|           A|    30000.0|                     24000.0|
|   AK|           A|    35000.0|                     30000.0|
|   AK| 

##### Part 5: Analysis SQL Queries

Q1 : Identify the average loan amount and interest rate for loans marked as
"Default" in the Loan Status, grouped by Emp Length and annual income ranges.

In [42]:
fintech_df.describe("annual_inc").show()

+-------+-----------------+
|summary|       annual_inc|
+-------+-----------------+
|  count|            27030|
|   mean|78853.63337077321|
| stddev| 67437.6383616835|
|    min|            200.0|
|    max|        6500031.0|
+-------+-----------------+



In [43]:
fintech_df.select("loan_status").distinct().show()

+------------------+
|       loan_status|
+------------------+
|        Fully Paid|
|   In Grace Period|
|       Charged Off|
|Late (31-120 days)|
|           Current|
| Late (16-30 days)|
+------------------+



In [44]:
fintech_df.groupBy("loan_status").count().show()

+------------------+-----+
|       loan_status|count|
+------------------+-----+
|        Fully Paid| 7637|
|   In Grace Period|  169|
|       Charged Off| 1824|
|Late (31-120 days)|  302|
|           Current|17016|
| Late (16-30 days)|   82|
+------------------+-----+



In [45]:
fintech_df.createOrReplaceTempView("fintech_table")

query = """
SELECT emp_length,
    CASE 
        WHEN annual_inc < 50000 THEN '0-50k'
        WHEN annual_inc BETWEEN 50000 AND 100000 THEN '50k-100k'
        WHEN annual_inc BETWEEN 100000 AND 150000 THEN '100k-150k'
        WHEN annual_inc BETWEEN 150000 AND 200000 THEN '150k-200k'
        WHEN annual_inc BETWEEN 200000 AND 250000 THEN '200k-250k'
        WHEN annual_inc BETWEEN 250000 AND 300000 THEN '250k-300k'
        WHEN annual_inc BETWEEN 300000 AND 350000 THEN '300k-350k'
        WHEN annual_inc BETWEEN 350000 AND 400000 THEN '350k-400k'
        WHEN annual_inc BETWEEN 400000 AND 450000 THEN '400k-450k'
        WHEN annual_inc BETWEEN 450000 AND 500000 THEN '450k-500k'
        WHEN annual_inc BETWEEN 500000 AND 1000000 THEN '500k-1M'
        ELSE '1M+'
    END AS annual_inc_range,
    AVG(loan_amount) AS avg_loan_amount,
    AVG(int_rate) AS avg_int_rate
FROM fintech_table
WHERE loan_status = 'Current'
GROUP BY emp_length, annual_inc_range
ORDER BY emp_length, annual_inc_range
"""

spark.sql(query).show()

+----------+----------------+------------------+-------------------+
|emp_length|annual_inc_range|   avg_loan_amount|       avg_int_rate|
+----------+----------------+------------------+-------------------+
|       0.5|           0-50k|10934.776264591439|0.13107782101167312|
|       0.5|       100k-150k| 21878.16742081448|0.11494389140271495|
|       0.5|       150k-200k|23934.736842105263|0.10905894736842105|
|       0.5|       200k-250k|           23887.5|0.10276153846153847|
|       0.5|       250k-300k|28763.636363636364|0.10363636363636365|
|       0.5|       300k-350k|           15750.0|0.10975000000000001|
|       0.5|       350k-400k|           31085.0|            0.11686|
|       0.5|       400k-450k|            2000.0|                0.0|
|       0.5|       450k-500k|           37500.0|0.08785000000000001|
|       0.5|         500k-1M|           35000.0|0.09634999999999999|
|       0.5|        50k-100k|16510.104302477183|0.12019634941329857|
|       1.0|           0-50k|10581

Q2: Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.

In [46]:
query = """
SELECT grade_letter,
    AVG(loan_amount - funded_amount) AS avg_diff
FROM fintech_table
GROUP BY grade_letter
ORDER BY avg_diff DESC
"""

spark.sql(query).show()

+------------+--------+
|grade_letter|avg_diff|
+------------+--------+
|           F|     0.0|
|           E|     0.0|
|           B|     0.0|
|           D|     0.0|
|           C|     0.0|
|           A|     0.0|
|           G|     0.0|
+------------+--------+



Q3: Compare the total Loan Amount for loans with "Verified" and "Not Verified" Verification Status across each state (Addr State).

In [47]:
query = """
SELECT addr_state, verification_status, SUM(loan_amount) AS total_loan_amount
FROM fintech_table
GROUP BY addr_state, verification_status
ORDER BY addr_state, verification_status
"""

spark.sql(query).show()

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         383400.0|
|        AK|    Source Verified|         436200.0|
|        AK|           Verified|         239325.0|
|        AL|       Not Verified|        1264625.0|
|        AL|    Source Verified|        1485950.0|
|        AL|           Verified|        1601325.0|
|        AR|       Not Verified|         761450.0|
|        AR|    Source Verified|        1310625.0|
|        AR|           Verified|         747750.0|
|        AZ|       Not Verified|        2894725.0|
|        AZ|    Source Verified|        3661375.0|
|        AZ|           Verified|        2435050.0|
|        CA|       Not Verified|       1.797605E7|
|        CA|    Source Verified|        2.35413E7|
|        CA|           Verified|       1.629185E7|
|        CO|       Not Verified|        3021250.0|
|        CO|    Source Verified

Q5: Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase


In [48]:
query = """
SELECT grade_letter,
    AVG(DATEDIFF(issue_date, prev_issue_date_grade)) AS avg_days_between_consec_loans
FROM fintech_table
WHERE prev_issue_date_grade IS NOT NULL
GROUP BY grade_letter
ORDER BY grade_letter
"""

spark.sql(query).show()

+------------+-----------------------------+
|grade_letter|avg_days_between_consec_loans|
+------------+-----------------------------+
|           A|           0.4618865297598329|
|           B|          0.34273678835843757|
|           C|           0.3501781237630294|
|           D|           0.6788874841972187|
|           E|             1.81158357771261|
|           F|            5.738095238095238|
|           G|                       17.576|
+------------+-----------------------------+



Q5: Identify the average difference in loan amounts between consecutive loans within the same state and grade combination.

In [49]:
query = """
SELECT state, grade_letter,
    AVG(loan_amount - prev_loan_amount_state_grade) AS avg_diff_consec_loan_amounts
FROM fintech_table
WHERE prev_loan_amount_state_grade IS NOT NULL
GROUP BY state, grade_letter
ORDER BY state, grade_letter
"""

spark.sql(query).show()


+-----+------------+----------------------------+
|state|grade_letter|avg_diff_consec_loan_amounts|
+-----+------------+----------------------------+
|   AK|           A|          2666.6666666666665|
|   AK|           B|          3083.3333333333335|
|   AK|           C|          1677.7777777777778|
|   AK|           D|          2384.6153846153848|
|   AK|           E|                     13900.0|
|   AL|           A|           484.8484848484849|
|   AL|           B|           571.6417910447761|
|   AL|           C|           464.2857142857143|
|   AL|           D|                       700.0|
|   AL|           E|          1560.5263157894738|
|   AL|           F|                      1400.0|
|   AL|           G|                      3650.0|
|   AR|           A|           941.1764705882352|
|   AR|           B|            675.438596491228|
|   AR|           C|            587.719298245614|
|   AR|           D|                   1976.5625|
|   AR|           E|           3677.777777777778|


##### Part 6: Save the data

- save the data to a new parquet file

In [50]:
fintech_df.write.parquet(CLEANED_DATA_PATH , mode="overwrite")
lookup_table.write.parquet(LOOKUP_PATH, mode="overwrite")

                                                                                

In [51]:
sc.stop()
spark.stop()