In [1]:
from pyspark.sql import SparkSession, Window, Row, Column, functions as fn
import psutil
from typing import List, Dict, Union, Tuple
import pyspark

# Part 1: Loading the dataset

Simply load the dataset from the parquet format given in the google drive above
- Load the dataset.
- Preview first 20 rows.
- How many partitions is this dataframe split into?
- Change partitions to be equal to the number of your logical cores

In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("M3").getOrCreate()
#.config("spark.jars", "/opt/spark/jars/postgresql-42.7.3.jar").master("local")
# spark.conf.set("spark.sql.execution.arrow.enabled", 'true')
# spark.config("spark.memory.offHeap.enabled","true") 
# spark.config("spark.memory.offHeap.size","10g")
# spark.conf.set("spark.executor.memory", "2g")
# spark context to interact with the driver
sc = spark.sparkContext

In [3]:
import os
os.getcwd()

'/home/jovyan'

In [4]:
dataset_path = './work/fintech_data_29_52_1008.parquet'
fintech_df = spark.read.parquet(dataset_path)


In [5]:
fintech_df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceDkzalx4YmRce...|     president/own

In [6]:
print(fintech_df.rdd.getNumPartitions())

1


In [7]:
logical_cores = psutil.cpu_count(logical=True)  # Logical cores
physical_cores = psutil.cpu_count(logical=False)  # Physical cores

print(f"Logical cores: {logical_cores}")
print(f"Physical cores: {physical_cores}")

Logical cores: 24
Physical cores: 12


In [8]:
repartitioned_fintech_df = fintech_df.repartition(logical_cores)

In [9]:
repartitioned_fintech_df.rdd.getNumPartitions()

24

# Part 2: Cleaning


- Rename all columns (replacing a space with an underscore, and making it lowercase)
- Detect missing    
  -  Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
  - Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.
  - Prinout the missing info
- Handle missing
  - For numerical features replace with 0.
  - For categorical/strings replace with mode
- Check missing
  - Afterwards, check that there are no missing values

In [10]:
repartitioned_fintech_df.columns

['Customer Id',
 'Emp Title',
 'Emp Length',
 'Home Ownership',
 'Annual Inc',
 'Annual Inc Joint',
 'Verification Status',
 'Zip Code',
 'Addr State',
 'Avg Cur Bal',
 'Tot Cur Bal',
 'Loan Id',
 'Loan Status',
 'Loan Amount',
 'State',
 'Funded Amount',
 'Term',
 'Int Rate',
 'Grade',
 'Issue Date',
 'Pymnt Plan',
 'Type',
 'Purpose',
 'Description']

In [11]:
renamed_fintech_df = repartitioned_fintech_df.toDF(*[col.replace(" ", "_").lower() for col in repartitioned_fintech_df.columns])



In [12]:
renamed_fintech_df.columns

['customer_id',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'annual_inc_joint',
 'verification_status',
 'zip_code',
 'addr_state',
 'avg_cur_bal',
 'tot_cur_bal',
 'loan_id',
 'loan_status',
 'loan_amount',
 'state',
 'funded_amount',
 'term',
 'int_rate',
 'grade',
 'issue_date',
 'pymnt_plan',
 'type',
 'purpose',
 'description']

In [13]:
renamed_fintech_df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|    loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YicgXHgxOVx4ODU+X...| barte

In [14]:
def detect_missing(df: pyspark.sql.dataframe.DataFrame) -> Dict[str, float]:
    """
    Detect missing values in a PySpark DataFrame and calculate the percentage of missing entries.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    
    Returns:
    dict: Dictionary where keys are column names and values are percentages of missing values.
    """
    total_rows = df.count()
    missing_info = {}
    
    for column in df.columns:
        missing_count = df.filter(fn.col(column).isNull()).count()
        missing_percentage = (missing_count / total_rows) * 100
        missing_info[column] = missing_percentage
    
    return missing_info

In [15]:
missing_dict = detect_missing(renamed_fintech_df)

In [16]:
missing_dict

{'customer_id': 0.0,
 'emp_title': 8.612652608213097,
 'emp_length': 6.777654458009619,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 93.11875693673696,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 4.384017758046615,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.8139104698483166}

In [17]:
renamed_fintech_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- annual_inc_joint: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- avg_cur_bal: double (nullable = true)
 |-- tot_cur_bal: double (nullable = true)
 |-- loan_id: long (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- state: string (nullable = true)
 |-- funded_amount: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- grade: long (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- pymnt_plan: boolean (nullable = true)
 |-- type: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- description: string (nullable = t

In [18]:
numerical_types = (pyspark.sql.types.DoubleType, pyspark.sql.types.FloatType, pyspark.sql.types.IntegerType, pyspark.sql.types.LongType)
numerical_columns = [field.name for field in renamed_fintech_df.schema.fields if isinstance(field.dataType, numerical_types)]
numerical_columns

['annual_inc',
 'annual_inc_joint',
 'avg_cur_bal',
 'tot_cur_bal',
 'loan_id',
 'loan_amount',
 'funded_amount',
 'int_rate',
 'grade']

In [19]:
from pyspark.sql import functions as F

def handle_missing_numerical(df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
    """
    Replace missing values in numerical columns with 0.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    
    Returns:
    DataFrame: DataFrame with missing values in numerical columns replaced by 0.
    """
    missing_dict = detect_missing(df)
    numerical_types = (pyspark.sql.types.DoubleType, pyspark.sql.types.FloatType, pyspark.sql.types.IntegerType, pyspark.sql.types.LongType)
    numerical_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, numerical_types)]
    for column in numerical_columns:
        if missing_dict[column] > 0:
            df = df.fillna({column: 0})
    return df


In [20]:
numerically_imputed_df = handle_missing_numerical(renamed_fintech_df)

In [21]:
missing_dict_updated = detect_missing(numerically_imputed_df)
missing_dict_updated

{'customer_id': 0.0,
 'emp_title': 8.612652608213097,
 'emp_length': 6.777654458009619,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 0.0,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 0.0,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.8139104698483166}

In [22]:
def handle_missing_categorical(df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
    """
    Replace missing values in categorical/string columns with the mode (most frequent value).
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    
    Returns:
    DataFrame: DataFrame with missing values in categorical columns replaced by mode.
    """
    missing_dict = detect_missing(df)
    categorical_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, pyspark.sql.types.StringType)]
    for column in categorical_columns:
        if missing_dict[column] > 0:
            mode_col = df.filter(fn.col(column).isNotNull()).groupBy(column).count().orderBy(fn.col('count').desc()).limit(1)
            mode_col.show()
            mode_value = mode_col.select(column).collect()[0][0]
            print(f"Mode value for {column}: {mode_value}")
            df = df.fillna(value= mode_value, subset=[column])
            print('-'* 50)
    return df


In [23]:
categorically_imputed_df = handle_missing_categorical(numerically_imputed_df)

+---------+-----+
|emp_title|count|
+---------+-----+
|  Teacher|  468|
+---------+-----+

Mode value for emp_title: Teacher
--------------------------------------------------
+----------+-----+
|emp_length|count|
+----------+-----+
| 10+ years| 8855|
+----------+-----+

Mode value for emp_length: 10+ years
--------------------------------------------------
+------------------+-----+
|       description|count|
+------------------+-----+
|Debt consolidation|14421|
+------------------+-----+

Mode value for description: Debt consolidation
--------------------------------------------------


In [24]:
missing_dict_updated = detect_missing(categorically_imputed_df)
missing_dict_updated

{'customer_id': 0.0,
 'emp_title': 0.0,
 'emp_length': 0.0,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 0.0,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 0.0,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.0}

no missing values are remaining

# Part 3: Encoding

Encode only the following categorical values
- Emp Length: Change to numerical
- Home Ownership: One Hot Encoding
- Verification Status: One Hot Encoding
- State: Label Encoding
- Type: One Hot Encoding
- Purpose: Label Encoding
- For the grade, only descretize it to be letter grade, not need to label encode it further

DO NOT Encode the employment title of description or any other column that is not
mentioned above

In [25]:
GLOBAL_LOOKUP_TABLE = spark.createDataFrame([], schema="original_column STRING, original_value STRING, encoded_column STRING")

## OHE

In [26]:
categorically_imputed_df.select('home_ownership').distinct().collect()[0][0]

'OWN'

In [27]:
categorically_imputed_df.select("home_ownership").distinct().show()

+--------------+
|home_ownership|
+--------------+
|           OWN|
|          RENT|
|      MORTGAGE|
|           ANY|
+--------------+



In [28]:
def one_hot_encode(df: pyspark.sql.dataframe.DataFrame, columns: List[str]) -> pyspark.sql.dataframe.DataFrame:
    """
    Apply one-hot encoding to the specified categorical columns in the DataFrame.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    columns (list): List of column names to one-hot encode.
    
    Returns:
    DataFrame: DataFrame with one-hot encoded columns.
    """
    global GLOBAL_LOOKUP_TABLE
    lookup_data = []
    for column in columns:
        unique_values = df.select(column).distinct().collect()
        
        for row in unique_values:
            curr_value = row[0]
            encoded_col_name = f"{column}_{curr_value}"
            df = df.withColumn(encoded_col_name, fn.when(fn.col(column) == curr_value, 1).otherwise(0))
            df = df.withColumn(encoded_col_name, df[encoded_col_name].cast("int"))
            lookup_data.append(Row(original_column=column, original_value=curr_value, encoded_column=encoded_col_name))
            
    lookup_table = spark.createDataFrame(lookup_data)
    print(lookup_data)
    GLOBAL_LOOKUP_TABLE = GLOBAL_LOOKUP_TABLE.union(lookup_table)
    df = df.drop(*columns)
    return df


In [29]:
data = [("Alice", 28), ("Bob", 35), ("Charlie", 40)]

# Create DataFrame
ff = spark.createDataFrame(data, ["Name", "Age"])

ff.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 28|
|    Bob| 35|
|Charlie| 40|
+-------+---+



In [30]:
ohe_columns = ["home_ownership", "verification_status", "type"]
ohe_fintech_df = one_hot_encode(categorically_imputed_df, ohe_columns)

[Row(original_column='home_ownership', original_value='OWN', encoded_column='home_ownership_OWN'), Row(original_column='home_ownership', original_value='RENT', encoded_column='home_ownership_RENT'), Row(original_column='home_ownership', original_value='MORTGAGE', encoded_column='home_ownership_MORTGAGE'), Row(original_column='home_ownership', original_value='ANY', encoded_column='home_ownership_ANY'), Row(original_column='verification_status', original_value='Verified', encoded_column='verification_status_Verified'), Row(original_column='verification_status', original_value='Source Verified', encoded_column='verification_status_Source Verified'), Row(original_column='verification_status', original_value='Not Verified', encoded_column='verification_status_Not Verified'), Row(original_column='type', original_value='Joint App', encoded_column='type_Joint App'), Row(original_column='type', original_value='Individual', encoded_column='type_Individual'), Row(original_column='type', original_

In [31]:
ohe_fintech_df.columns, len(ohe_fintech_df.columns)

(['customer_id',
  'emp_title',
  'emp_length',
  'annual_inc',
  'annual_inc_joint',
  'zip_code',
  'addr_state',
  'avg_cur_bal',
  'tot_cur_bal',
  'loan_id',
  'loan_status',
  'loan_amount',
  'state',
  'funded_amount',
  'term',
  'int_rate',
  'grade',
  'issue_date',
  'pymnt_plan',
  'purpose',
  'description',
  'home_ownership_OWN',
  'home_ownership_RENT',
  'home_ownership_MORTGAGE',
  'home_ownership_ANY',
  'verification_status_Verified',
  'verification_status_Source Verified',
  'verification_status_Not Verified',
  'type_Joint App',
  'type_INDIVIDUAL',
  'type_DIRECT_PAY',
  'type_JOINT'],
 32)

In [32]:
ohe_fintech_df.select(['home_ownership_MORTGAGE']).distinct().show()

+-----------------------+
|home_ownership_MORTGAGE|
+-----------------------+
|                      1|
|                      0|
+-----------------------+



## Label Encode

In [33]:
ohe_fintech_df.select("state").distinct().sort("state").collect()

[Row(state='AK'),
 Row(state='AL'),
 Row(state='AR'),
 Row(state='AZ'),
 Row(state='CA'),
 Row(state='CO'),
 Row(state='CT'),
 Row(state='DC'),
 Row(state='DE'),
 Row(state='FL'),
 Row(state='GA'),
 Row(state='HI'),
 Row(state='ID'),
 Row(state='IL'),
 Row(state='IN'),
 Row(state='KS'),
 Row(state='KY'),
 Row(state='LA'),
 Row(state='MA'),
 Row(state='MD'),
 Row(state='ME'),
 Row(state='MI'),
 Row(state='MN'),
 Row(state='MO'),
 Row(state='MS'),
 Row(state='MT'),
 Row(state='NC'),
 Row(state='ND'),
 Row(state='NE'),
 Row(state='NH'),
 Row(state='NJ'),
 Row(state='NM'),
 Row(state='NV'),
 Row(state='NY'),
 Row(state='OH'),
 Row(state='OK'),
 Row(state='OR'),
 Row(state='PA'),
 Row(state='RI'),
 Row(state='SC'),
 Row(state='SD'),
 Row(state='TN'),
 Row(state='TX'),
 Row(state='UT'),
 Row(state='VA'),
 Row(state='VT'),
 Row(state='WA'),
 Row(state='WI'),
 Row(state='WV'),
 Row(state='WY')]

In [34]:
def label_encode(df: pyspark.sql.dataframe.DataFrame, columns: List[str]) -> pyspark.sql.dataframe.DataFrame:
    """
    Apply label encoding to the specified categorical columns in the DataFrame.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    columns (list): List of column names to label encode.
    
    Returns:
    DataFrame: DataFrame with label-encoded columns.
    """
    global GLOBAL_LOOKUP_TABLE
    lookup_data = []
    for column in columns:
        state_values = df.select(column).distinct().sort(column).collect()
        for i, row in enumerate(state_values):
            value = row[0]
            df = df.withColumn(column, fn.when(df[column] == value, i).otherwise(df[column]))
            lookup_data.append(Row(original_column=column, original_value=value, encoded_column=i))
        df = df.withColumn(column, df[column].cast("int"))
    print(lookup_data)
    lookup_table = spark.createDataFrame(lookup_data)
    GLOBAL_LOOKUP_TABLE = GLOBAL_LOOKUP_TABLE.union(lookup_table)
    return df


In [35]:
label_columns = ["state", "purpose"]
label_fintech_df = label_encode(ohe_fintech_df, label_columns)

[Row(original_column='state', original_value='AK', encoded_column=0), Row(original_column='state', original_value='AL', encoded_column=1), Row(original_column='state', original_value='AR', encoded_column=2), Row(original_column='state', original_value='AZ', encoded_column=3), Row(original_column='state', original_value='CA', encoded_column=4), Row(original_column='state', original_value='CO', encoded_column=5), Row(original_column='state', original_value='CT', encoded_column=6), Row(original_column='state', original_value='DC', encoded_column=7), Row(original_column='state', original_value='DE', encoded_column=8), Row(original_column='state', original_value='FL', encoded_column=9), Row(original_column='state', original_value='GA', encoded_column=10), Row(original_column='state', original_value='HI', encoded_column=11), Row(original_column='state', original_value='ID', encoded_column=12), Row(original_column='state', original_value='IL', encoded_column=13), Row(original_column='state', 

In [36]:
len(label_fintech_df.columns)

32

In [37]:
label_fintech_df.select("state").distinct().sort('state').show(53)

+-----+
|state|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
|   11|
|   12|
|   13|
|   14|
|   15|
|   16|
|   17|
|   18|
|   19|
|   20|
|   21|
|   22|
|   23|
|   24|
|   25|
|   26|
|   27|
|   28|
|   29|
|   30|
|   31|
|   32|
|   33|
|   34|
|   35|
|   36|
|   37|
|   38|
|   39|
|   40|
|   41|
|   42|
|   43|
|   44|
|   45|
|   46|
|   47|
|   48|
|   49|
+-----+



## Letter Grade

- A (1-5)
- B (6-10)
- C (11-15)
- D (16-20)
- E (21-25)
- F (26-30)
- G (31-35)

In [38]:

def create_letter_grade(df: pyspark.sql.dataframe.DataFrame, grade_column: str="grade") -> pyspark.sql.dataframe.DataFrame:
    """
    Add a new column `letter_grade` based on the numerical range of `grade`.

    Args:
    df (DataFrame): Input PySpark DataFrame.
    grade_column (str): Name of the column containing numerical grades.

    Returns:
    DataFrame: DataFrame with an additional `letter_grade` column.
    """
    global GLOBAL_LOOKUP_TABLE
    letters = ["A", "B", "C", "D", "E", "F", "G"]
    lookup_data = []
    for i, letter in enumerate(letters):
        for j in range((i * 5)+1, ((i + 1) * 5)+1):
            lookup_data.append(Row(original_column=grade_column, original_value=str(j), encoded_value=letter))
    lookup_table = spark.createDataFrame(lookup_data)
    print(lookup_data)
    GLOBAL_LOOKUP_TABLE = GLOBAL_LOOKUP_TABLE.union(lookup_table)

    
    grade_mapping = [
        (fn.col(grade_column).between(1, 5), "A"),
        (fn.col(grade_column).between(6, 10), "B"),
        (fn.col(grade_column).between(11, 15), "C"),
        (fn.col(grade_column).between(16, 20), "D"),
        (fn.col(grade_column).between(21, 25), "E"),
        (fn.col(grade_column).between(26, 30), "F"),
        (fn.col(grade_column).between(31, 35), "G"),
    ]
    
    letter_grade_column = fn.when(*grade_mapping[0])
    for condition, letter in grade_mapping[1:]:
        letter_grade_column = letter_grade_column.when(condition, letter)
    letter_grade_column = letter_grade_column.otherwise("Unknown")

    df = df.withColumn("letter_grade", letter_grade_column)
    return df


In [39]:
encoded_fintech_df = create_letter_grade(label_fintech_df, "grade")

[Row(original_column='grade', original_value='1', encoded_value='A'), Row(original_column='grade', original_value='2', encoded_value='A'), Row(original_column='grade', original_value='3', encoded_value='A'), Row(original_column='grade', original_value='4', encoded_value='A'), Row(original_column='grade', original_value='5', encoded_value='A'), Row(original_column='grade', original_value='6', encoded_value='B'), Row(original_column='grade', original_value='7', encoded_value='B'), Row(original_column='grade', original_value='8', encoded_value='B'), Row(original_column='grade', original_value='9', encoded_value='B'), Row(original_column='grade', original_value='10', encoded_value='B'), Row(original_column='grade', original_value='11', encoded_value='C'), Row(original_column='grade', original_value='12', encoded_value='C'), Row(original_column='grade', original_value='13', encoded_value='C'), Row(original_column='grade', original_value='14', encoded_value='C'), Row(original_column='grade',

In [40]:
encoded_fintech_df.select("grade", "letter_grade").distinct().sort('grade').show(40)

+-----+------------+
|grade|letter_grade|
+-----+------------+
|    1|           A|
|    2|           A|
|    3|           A|
|    4|           A|
|    5|           A|
|    6|           B|
|    7|           B|
|    8|           B|
|    9|           B|
|   10|           B|
|   11|           C|
|   12|           C|
|   13|           C|
|   14|           C|
|   15|           C|
|   16|           D|
|   17|           D|
|   18|           D|
|   19|           D|
|   20|           D|
|   21|           E|
|   22|           E|
|   23|           E|
|   24|           E|
|   25|           E|
|   26|           F|
|   27|           F|
|   28|           F|
|   29|           F|
|   30|           F|
|   31|           G|
|   32|           G|
|   33|           G|
|   34|           G|
|   35|           G|
+-----+------------+



## Emp_length

In [41]:
encoded_fintech_df.select("emp_length").distinct().show()

+----------+
|emp_length|
+----------+
|   5 years|
|   9 years|
|    1 year|
|   2 years|
|   7 years|
|   8 years|
|   4 years|
|   6 years|
|   3 years|
| 10+ years|
|  < 1 year|
+----------+



In [42]:
from pyspark.sql import functions as fn

def convert_emp_length_to_numeric(df: pyspark.sql.dataframe.DataFrame, column: str = "emp_length") -> pyspark.sql.dataframe.DataFrame:
    """
    Converts the 'emp_length' column to numeric values using string manipulation.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    column (str): Column name containing employment length data.
    
    Returns:
    DataFrame: DataFrame with 'emp_length' column converted to numeric values.
    """
    df = df.withColumn(column, fn.regexp_replace(fn.col(column), "years|year", ""))
    df = df.withColumn(column, fn.when(fn.col(column).like("%<%"), "0.5")
                                 .when(fn.col(column).like("%+%"), "11")
                                 .otherwise(fn.col(column)))
    df = df.withColumn(column, fn.col(column).cast("float"))
    
    return df


In [43]:
encoded_fintech_df_2 = convert_emp_length_to_numeric(encoded_fintech_df, "emp_length")

In [44]:
encoded_fintech_df_2.select("emp_length").distinct().sort("emp_length").show()

+----------+
|emp_length|
+----------+
|       0.5|
|       1.0|
|       2.0|
|       3.0|
|       4.0|
|       5.0|
|       6.0|
|       7.0|
|       8.0|
|       9.0|
|      11.0|
+----------+



In [45]:
encoded_fintech_df_2.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|    loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|type_Joint App|type_INDIVIDUAL|type_

In [46]:
GLOBAL_LOOKUP_TABLE.show(50)

+-------------------+---------------+--------------------+
|    original_column| original_value|      encoded_column|
+-------------------+---------------+--------------------+
|     home_ownership|            OWN|  home_ownership_OWN|
|     home_ownership|           RENT| home_ownership_RENT|
|     home_ownership|       MORTGAGE|home_ownership_MO...|
|     home_ownership|            ANY|  home_ownership_ANY|
|verification_status|       Verified|verification_stat...|
|verification_status|Source Verified|verification_stat...|
|verification_status|   Not Verified|verification_stat...|
|               type|      Joint App|      type_Joint App|
|               type|     Individual|     type_Individual|
|               type|     DIRECT_PAY|     type_DIRECT_PAY|
|               type|          JOINT|          type_JOINT|
|               type|     INDIVIDUAL|     type_INDIVIDUAL|
|              state|             AK|                   0|
|              state|             AL|                   

# Part 4: Feature Engineering

Write a function that adds the 3 following features. Try as much as you can to use
built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing
UDFs from scratch.
- Previous loan issue date from the same grade
- Previoius Loan amount from the same grade
- Previous loan date from the same state and grade combined
- Previous loan amount from the same state and grade combined

## Previous loan issue date from the same grade

In [47]:
def add_previous_loan_issue_date_form_same_grade(df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
    """
    Add a new column 'previous_loan_issue_date_same_grade' that contains the issue date of the previous loan with the same grade.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    
    Returns:
    DataFrame: DataFrame with the new column 'previous_loan_issue_date_same_grade'.
    """
    df = df.withColumn("issue_date_preprocessed", fn.to_date(fn.col("issue_date"), "dd MMMM yyyy"))
    window_spec = Window.partitionBy("grade").orderBy("issue_date_preprocessed")
    prev_issue_date = fn.lag("issue_date", 1).over(window_spec)
    df = df.withColumn("previous_loan_issue_date_same_grade", prev_issue_date)
    df = df.drop("issue_date_preprocessed")
    return df

In [48]:
lagged_fintech_df = add_previous_loan_issue_date_form_same_grade(encoded_fintech_df_2)

In [49]:
lagged_fintech_df.select("issue_date", "grade", "previous_loan_issue_date_same_grade").show()

+-----------------+-----+-----------------------------------+
|       issue_date|grade|previous_loan_issue_date_same_grade|
+-----------------+-----+-----------------------------------+
|12 September 2012|    1|                               NULL|
|12 September 2012|    1|                  12 September 2012|
|12 September 2012|    1|                  12 September 2012|
|  12 October 2012|    1|                  12 September 2012|
| 12 November 2012|    1|                    12 October 2012|
| 12 December 2012|    1|                   12 November 2012|
| 12 December 2012|    1|                   12 December 2012|
|  13 January 2013|    1|                   12 December 2012|
|  13 January 2013|    1|                    13 January 2013|
| 13 February 2013|    1|                    13 January 2013|
| 13 February 2013|    1|                   13 February 2013|
|    13 March 2013|    1|                   13 February 2013|
|    13 March 2013|    1|                      13 March 2013|
|    13 

## Previoius Loan amount from the same grade

In [50]:
def add_prev_loan_amount_from_same_grade(df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
    """
    Add a new column 'previous_loan_amount_same_grade' that contains the loan amount of the previous loan with the same grade.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    
    Returns:
    DataFrame: DataFrame with the new column 'previous_loan_amount_same_grade'.
    """
    df = df.withColumn("issue_date_preprocessed", fn.to_date(fn.col("issue_date"), "dd MMMM yyyy"))
    window_spec = Window.partitionBy("grade").orderBy("issue_date_preprocessed")
    prev_loan_amount = fn.lag("loan_amount", 1).over(window_spec)
    df = df.withColumn("previous_loan_amount_same_grade", prev_loan_amount)
    df = df.drop("issue_date_preprocessed")
    return df

In [51]:
lagged_fintech_df_2 = add_prev_loan_amount_from_same_grade(lagged_fintech_df)

In [52]:
lagged_fintech_df_2.select("issue_date","loan_amount", "grade", "previous_loan_amount_same_grade").show()

+-----------------+-----------+-----+-------------------------------+
|       issue_date|loan_amount|grade|previous_loan_amount_same_grade|
+-----------------+-----------+-----+-------------------------------+
|12 September 2012|    12000.0|    1|                           NULL|
|12 September 2012|    11200.0|    1|                        12000.0|
|12 September 2012|     6500.0|    1|                        11200.0|
|  12 October 2012|     8000.0|    1|                         6500.0|
| 12 November 2012|    10000.0|    1|                         8000.0|
| 12 December 2012|    24000.0|    1|                        10000.0|
| 12 December 2012|    24000.0|    1|                        24000.0|
|  13 January 2013|    16850.0|    1|                        24000.0|
|  13 January 2013|    12000.0|    1|                        16850.0|
| 13 February 2013|    10000.0|    1|                        12000.0|
| 13 February 2013|    28000.0|    1|                        10000.0|
|    13 March 2013| 

## Previous loan date from the same state and grade combined

In [53]:
def add_prev_loan_date_from_same_state_and_grade(df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
    """
    Add a new column 'previous_loan_date_same_state_and_grade' that contains the issue date of the previous loan with the same state and grade.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    
    Returns:
    DataFrame: DataFrame with the new column 'previous_loan_amount_same_grade'.
    """
    df = df.withColumn("issue_date_preprocessed", fn.to_date(fn.col("issue_date"), "dd MMMM yyyy"))
    window_spec = Window.partitionBy("state","grade").orderBy("issue_date_preprocessed")
    prev_loan_amount = fn.lag("issue_date", 1).over(window_spec)
    df = df.withColumn("previous_loan_date_same_state_and_grade", prev_loan_amount)
    df = df.drop("issue_date_preprocessed")
    return df

In [54]:
lagged_fintech_df_3 = add_prev_loan_date_from_same_state_and_grade(lagged_fintech_df_2)

In [55]:
lagged_fintech_df_3.select("issue_date","state","grade", "previous_loan_date_same_state_and_grade").show()

+-----------------+-----+-----+---------------------------------------+
|       issue_date|state|grade|previous_loan_date_same_state_and_grade|
+-----------------+-----+-----+---------------------------------------+
|  14 October 2014|    0|    1|                                   NULL|
|  19 January 2019|    0|    1|                        14 October 2014|
| 19 November 2019|    0|    1|                        19 January 2019|
|  14 October 2014|    0|    2|                                   NULL|
|     17 June 2017|    0|    2|                        14 October 2014|
| 17 November 2017|    0|    2|                           17 June 2017|
|    18 March 2018|    0|    2|                       17 November 2017|
|15 September 2015|    0|    3|                                   NULL|
|  18 October 2018|    0|    3|                      15 September 2015|
|    19 April 2019|    0|    3|                        18 October 2018|
|     16 June 2016|    0|    4|                                 

## Previous loan amount from the same state and grade combined

In [56]:
def add_prev_loan_amount_from_same_state_and_grade(df: pyspark.sql.dataframe.DataFrame) -> pyspark.sql.dataframe.DataFrame:
    """
    Add a new column 'previous_loan_amount_same_state_and_grade' that contains the loan amount of the previous loan with the same state and grade.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    
    Returns:
    DataFrame: DataFrame with the new column 'previous_loan_amount_same_state_and_grade'.
    """
    df = df.withColumn("issue_date_preprocessed", fn.to_date(fn.col("issue_date"), "dd MMMM yyyy"))
    window_spec = Window.partitionBy("state","grade").orderBy("issue_date_preprocessed")
    prev_loan_amount = fn.lag("loan_amount", 1).over(window_spec)
    df = df.withColumn("previous_loan_amount_same_state_and_grade", prev_loan_amount)
    df = df.drop("issue_date_preprocessed")
    return df

In [57]:
lagged_fintech_df_4 = add_prev_loan_amount_from_same_state_and_grade(lagged_fintech_df_3)

In [58]:
lagged_fintech_df_4.select("issue_date","state","loan_amount", "grade", "previous_loan_amount_same_state_and_grade").show()

+-----------------+-----+-----------+-----+-----------------------------------------+
|       issue_date|state|loan_amount|grade|previous_loan_amount_same_state_and_grade|
+-----------------+-----+-----------+-----+-----------------------------------------+
|  14 October 2014|    0|    10000.0|    1|                                     NULL|
|  19 January 2019|    0|    10000.0|    1|                                  10000.0|
| 19 November 2019|    0|     6000.0|    1|                                  10000.0|
|  14 October 2014|    0|    24000.0|    2|                                     NULL|
|     17 June 2017|    0|    30000.0|    2|                                  24000.0|
| 17 November 2017|    0|    21375.0|    2|                                  30000.0|
|    18 March 2018|    0|    35000.0|    2|                                  21375.0|
|15 September 2015|    0|    15000.0|    3|                                     NULL|
|  18 October 2018|    0|    25000.0|    3|           

# Part 5: Analysis SQL vs Spark

Answer each of the following questions using both SQL and Spark:
1. Identify the average loan amount and interest rate for loans marked as
"Default" in the Loan Status, grouped by Emp Length and annual income ranges.
Hint: Use SQL Cases to bin Annual Income into Income Ranges
2. Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.
3. Compare the total Loan Amount for loans with "Verified" and "Not Verified"
Verification Status across each state (Addr State).
4. Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase
5. Identify the average difference in loan amounts between consecutive loans
within the same state and grade combination.

In [59]:
lagged_fintech_df_4.createOrReplaceTempView("fintech_data")

In [103]:
def check_equality_of_dfs(df1: pyspark.sql.dataframe.DataFrame, df2: pyspark.sql.dataframe.DataFrame) -> bool:
    if df1.exceptAll(df2).isEmpty() and df1.exceptAll(df2).isEmpty():
        print("The DataFrames are exactly equal.")
    else:
        print("The DataFrames are not exactly equal.")

## Question 1
Identify the average loan amount and interest rate for loans marked as "Default" in the Loan Status, grouped by Emp Length and annual income ranges.

Hint: Use SQL Cases to bin Annual Income into Income Ranges

### SQL

In [73]:
query_1 = """
SELECT 
    AVG(int_rate) AS avg_int_rate, 
    AVG(loan_amount) AS avg_loan_amount, 
    emp_length, 
    CASE 
        WHEN annual_inc < 50000 THEN 'Low'
        WHEN annual_inc BETWEEN 50000 AND 200000 THEN 'Medium'
        ELSE 'High'
    END AS annual_inc_category
FROM fintech_data
WHERE loan_status = 'Default'
GROUP BY emp_length, annual_inc_category
"""

q1_sql_df = spark.sql(query_1)


In [74]:
q1_sql_df.show()

+------------+---------------+----------+-------------------+
|avg_int_rate|avg_loan_amount|emp_length|annual_inc_category|
+------------+---------------+----------+-------------------+
|      0.0789|        11900.0|       6.0|                Low|
|      0.2075|        15000.0|       1.0|             Medium|
|      0.2589|        35000.0|       1.0|               High|
+------------+---------------+----------+-------------------+



### Spark

In [106]:

default_loans_df = lagged_fintech_df_4.filter(fn.col("loan_status") == "Default")

categorized_loans_df = default_loans_df.withColumn(
    "annual_inc_category",
    fn.when(fn.col("annual_inc") < 50000, "Low")
    .when((fn.col("annual_inc") >= 50000) & (fn.col("annual_inc") <= 200000), "Medium")
    .otherwise("High"),
)

grouped_loans_df = categorized_loans_df.groupBy("emp_length", "annual_inc_category")

q1_spark_df = grouped_loans_df.agg(
    fn.avg("loan_amount").alias("avg_loan_amount"),
    fn.avg("int_rate").alias("avg_int_rate"),
)

q1_spark_df = q1_spark_df.select(
    "avg_int_rate", 
    "avg_loan_amount", 
    "emp_length", 
    "annual_inc_category"
)

q1_spark_df.show()


+------------+---------------+----------+-------------------+
|avg_int_rate|avg_loan_amount|emp_length|annual_inc_category|
+------------+---------------+----------+-------------------+
|      0.0789|        11900.0|       6.0|                Low|
|      0.2075|        15000.0|       1.0|             Medium|
|      0.2589|        35000.0|       1.0|               High|
+------------+---------------+----------+-------------------+



In [107]:
check_equality_of_dfs(q1_sql_df, q1_spark_df)

The DataFrames are exactly equal.


## Question 2
Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.

### SQL

In [87]:
query_2 = """
SELECT 
    grade,
    AVG(loan_amount - funded_amount) AS avg_amount_difference
FROM fintech_data
GROUP BY grade
ORDER BY avg_amount_difference DESC 
"""

q2_sql_df = spark.sql(query_2)
q2_sql_df.show(q2_sql_df.count(), truncate=False)

+-----+---------------------+
|grade|avg_amount_difference|
+-----+---------------------+
|29   |0.0                  |
|26   |0.0                  |
|19   |0.0                  |
|22   |0.0                  |
|7    |0.0                  |
|34   |0.0                  |
|32   |0.0                  |
|31   |0.0                  |
|25   |0.0                  |
|6    |0.0                  |
|9    |0.0                  |
|27   |0.0                  |
|17   |0.0                  |
|28   |0.0                  |
|33   |0.0                  |
|5    |0.0                  |
|1    |0.0                  |
|10   |0.0                  |
|3    |0.0                  |
|12   |0.0                  |
|8    |0.0                  |
|11   |0.0                  |
|2    |0.0                  |
|4    |0.0                  |
|13   |0.0                  |
|18   |0.0                  |
|14   |0.0                  |
|21   |0.0                  |
|15   |0.0                  |
|30   |0.0                  |
|23   |0.0

### Spark

In [88]:
loan_amount_diff = fn.col('loan_amount') - fn.col('funded_amount')
grouped_by_grade_df = lagged_fintech_df_4.groupBy('grade')
q2_spark_df = grouped_by_grade_df \
    .agg(fn.avg(loan_amount_diff).alias('avg_amount_difference')) \
    .orderBy(fn.desc('avg_amount_difference'))
q2_spark_df.show(q2_spark_df.count(), truncate=False)

+-----+---------------------+
|grade|avg_amount_difference|
+-----+---------------------+
|29   |0.0                  |
|26   |0.0                  |
|19   |0.0                  |
|22   |0.0                  |
|7    |0.0                  |
|34   |0.0                  |
|32   |0.0                  |
|31   |0.0                  |
|25   |0.0                  |
|6    |0.0                  |
|9    |0.0                  |
|27   |0.0                  |
|17   |0.0                  |
|28   |0.0                  |
|33   |0.0                  |
|5    |0.0                  |
|1    |0.0                  |
|10   |0.0                  |
|3    |0.0                  |
|12   |0.0                  |
|8    |0.0                  |
|11   |0.0                  |
|2    |0.0                  |
|4    |0.0                  |
|13   |0.0                  |
|18   |0.0                  |
|14   |0.0                  |
|21   |0.0                  |
|15   |0.0                  |
|30   |0.0                  |
|23   |0.0

In [None]:
check_equality_of_dfs(q2_sql_df, q2_spark_df)

The DataFrames are exactly equal.


## Question 3
Compare the total Loan Amount for loans with "Verified" and "Not Verified"
Verification Status across each state (Addr State).

### SQL

In [138]:
query_3_1 = """
SELECT 
    addr_state, 
    SUM(loan_amount) AS total_loan_amount
FROM fintech_data
WHERE verification_status_Verified = 1
GROUP BY addr_state
"""
# spark.sql(query_3_1).show()
spark.sql(query_3_1).createOrReplaceTempView("verified_total_loan_amount")

query_3_2 = """
SELECT 
    addr_state, 
    SUM(loan_amount) AS total_loan_amount
FROM fintech_data
WHERE `verification_status_Not Verified` = 1
GROUP BY addr_state
"""
# spark.sql(query_3_2).show()
spark.sql(query_3_2).createOrReplaceTempView("not_verified_total_loan_amount")


query_3_3 = """
SELECT 
    verified_total_loan_amount.addr_state, 
    COALESCE(verified_total_loan_amount.total_loan_amount, 0) AS verified_loan_amount, 
    COALESCE(not_verified_total_loan_amount.total_loan_amount,0) AS not_verified_loan_amount
FROM verified_total_loan_amount LEFT JOIN not_verified_total_loan_amount ON verified_total_loan_amount.addr_state = not_verified_total_loan_amount.addr_state
"""
q3_sql_df = spark.sql(query_3_3)

q3_sql_df.show(q3_sql_df.count(), truncate=False)


+----------+--------------------+------------------------+
|addr_state|verified_loan_amount|not_verified_loan_amount|
+----------+--------------------+------------------------+
|AZ        |2637100.0           |3098950.0               |
|SC        |1627475.0           |1752025.0               |
|LA        |1309425.0           |1160100.0               |
|MN        |1519050.0           |2337500.0               |
|NJ        |5230225.0           |4557250.0               |
|DC        |156350.0            |243925.0                |
|OR        |1229700.0           |1401775.0               |
|VA        |3419325.0           |3982500.0               |
|RI        |529125.0            |768675.0                |
|KY        |1243300.0           |1105025.0               |
|WY        |235825.0            |252925.0                |
|NH        |638650.0            |631100.0                |
|MI        |3008450.0           |3668700.0               |
|NV        |1684200.0           |1916150.0              

### Spark

In [139]:
sum_verified_total_loans = lagged_fintech_df_4.filter(fn.col('verification_status_Verified')==1)\
    .groupby('addr_state')\
    .agg(fn.sum('loan_amount').alias('verified_total_loan_amount'))

sum_not_verified_total_loans = lagged_fintech_df_4.filter(fn.col('verification_status_Not Verified')==1)\
    .groupby('addr_state')\
    .agg(fn.sum('loan_amount').alias('not_verified_total_loan_amount'))

q3_spark_df = sum_verified_total_loans.join(sum_not_verified_total_loans, 'addr_state', 'left')\
    .select('addr_state', 'verified_total_loan_amount', 'not_verified_total_loan_amount')\

q3_spark_df = q3_spark_df.fillna(0)
q3_spark_df.show(q3_spark_df.count(), truncate=False)

+----------+--------------------------+------------------------------+
|addr_state|verified_total_loan_amount|not_verified_total_loan_amount|
+----------+--------------------------+------------------------------+
|AZ        |2637100.0                 |3098950.0                     |
|SC        |1627475.0                 |1752025.0                     |
|LA        |1309425.0                 |1160100.0                     |
|MN        |1519050.0                 |2337500.0                     |
|NJ        |5230225.0                 |4557250.0                     |
|DC        |156350.0                  |243925.0                      |
|OR        |1229700.0                 |1401775.0                     |
|VA        |3419325.0                 |3982500.0                     |
|RI        |529125.0                  |768675.0                      |
|KY        |1243300.0                 |1105025.0                     |
|WY        |235825.0                  |252925.0                      |
|NH   

In [140]:
check_equality_of_dfs(q3_sql_df, q3_spark_df)

The DataFrames are exactly equal.


## Question 4
Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase

### SQL

In [141]:
query_4 = """
SELECT
    grade,
    AVG(DATEDIFF(
        TO_DATE(issue_date, 'dd MMMM yyyy'),
        TO_DATE(previous_loan_issue_date_same_grade, 'dd MMMM yyyy')
    )) AS avg_days_between_loans
FROM fintech_data
WHERE previous_loan_issue_date_same_grade IS NOT NULL
GROUP BY grade
"""

q4_sql_df = spark.sql(query_4)
q4_sql_df.show(q4_sql_df.count(), truncate=False)

+-----+----------------------+
|grade|avg_days_between_loans|
+-----+----------------------+
|1    |2.235888795282224     |
|2    |2.2988013698630136    |
|3    |2.2548853016142734    |
|4    |2.273497036409822     |
|5    |2.3368146214099217    |
|6    |1.6947637292464879    |
|7    |1.7606557377049181    |
|8    |1.756452680344143     |
|9    |1.6566791510611736    |
|10   |1.669776119402985     |
|11   |1.7526109660574412    |
|12   |1.742613263296126     |
|13   |1.6765634870499053    |
|14   |1.742613263296126     |
|15   |1.7716955941255006    |
|16   |3.477979274611399     |
|17   |3.6283783783783785    |
|18   |3.638211382113821     |
|19   |3.4480946123521683    |
|20   |3.372299872935197     |
|21   |8.338129496402878     |
|22   |8.926923076923076     |
|23   |9.309236947791165     |
|24   |8.307420494699647     |
|25   |7.685430463576159     |
|26   |25.69736842105263     |
|27   |30.955882352941178    |
|28   |27.384615384615383    |
|29   |34.04347826086956     |
|30   |2

### Spark

In [142]:
preprocessed_issue_date = fn.to_date(fn.col("issue_date"), "dd MMMM yyyy")
preprocessed_prev_issue_date = fn.to_date(fn.col("previous_loan_issue_date_same_grade"), "dd MMMM yyyy")

q4_spark_df = lagged_fintech_df_4.filter(fn.col("previous_loan_issue_date_same_grade").isNotNull()) \
    .withColumn("days_between_loans", fn.datediff(preprocessed_issue_date, preprocessed_prev_issue_date)) \
    .groupBy("grade") \
    .agg(fn.avg("days_between_loans").alias("avg_days_between_loans"))

q4_spark_df.show(q4_spark_df.count(), truncate=False)

+-----+----------------------+
|grade|avg_days_between_loans|
+-----+----------------------+
|1    |2.235888795282224     |
|2    |2.2988013698630136    |
|3    |2.2548853016142734    |
|4    |2.273497036409822     |
|5    |2.3368146214099217    |
|6    |1.6947637292464879    |
|7    |1.7606557377049181    |
|8    |1.756452680344143     |
|9    |1.6566791510611736    |
|10   |1.669776119402985     |
|11   |1.7526109660574412    |
|12   |1.742613263296126     |
|13   |1.6765634870499053    |
|14   |1.742613263296126     |
|15   |1.7716955941255006    |
|16   |3.477979274611399     |
|17   |3.6283783783783785    |
|18   |3.638211382113821     |
|19   |3.4480946123521683    |
|20   |3.372299872935197     |
|21   |8.338129496402878     |
|22   |8.926923076923076     |
|23   |9.309236947791165     |
|24   |8.307420494699647     |
|25   |7.685430463576159     |
|26   |25.69736842105263     |
|27   |30.955882352941178    |
|28   |27.384615384615383    |
|29   |34.04347826086956     |
|30   |2

In [143]:
check_equality_of_dfs(q4_sql_df, q4_spark_df)

The DataFrames are exactly equal.


## Question 5
Identify the average difference in loan amounts between consecutive loans
within the same state and grade combination.

### SQL

In [154]:
query_5 = """
SELECT
    state,
    grade,
    AVG(loan_amount - COALESCE(previous_loan_amount_same_state_and_grade, 0)) AS avg_loan_amount_difference
FROM fintech_data
GROUP BY state, grade
"""
# WHERE previous_loan_amount_same_state_and_grade IS NOT NULL

q5_sql_df = spark.sql(query_5)
q5_sql_df.show(50)

+-----+-----+--------------------------+
|state|grade|avg_loan_amount_difference|
+-----+-----+--------------------------+
|    0|    1|                    2000.0|
|    0|    2|                    8750.0|
|    0|    3|        13333.333333333334|
|    0|    4|                    8000.0|
|    0|    5|                    8400.0|
|    0|    6|                   11000.0|
|    0|    7|        3333.3333333333335|
|    0|    8|                    2200.0|
|    0|    9|                    1600.0|
|    0|   10|                   10000.0|
|    0|   11|         3889.285714285714|
|    0|   12|                   18000.0|
|    0|   13|                    5500.0|
|    0|   14|                    1500.0|
|    0|   15|         1142.857142857143|
|    0|   16|        11666.666666666666|
|    0|   17|                    3500.0|
|    0|   19|                   10637.5|
|    0|   20|                   19225.0|
|    0|   21|                   11200.0|
|    0|   22|                   23000.0|
|    0|   24|   

### Spark

In [156]:
loan_prev_diff = fn.col('loan_amount') - fn.coalesce(fn.col('previous_loan_amount_same_state_and_grade'), fn.lit(0))
# .filter(fn.col('previous_loan_amount_same_state_and_grade').isNotNull())
q5_spark_df = lagged_fintech_df_4 \
    .groupBy('state', 'grade') \
    .agg(fn.avg(loan_prev_diff).alias('avg_loan_amount_difference'))

q5_spark_df.show(50)

+-----+-----+--------------------------+
|state|grade|avg_loan_amount_difference|
+-----+-----+--------------------------+
|    0|    1|                    2000.0|
|    0|    2|                    8750.0|
|    0|    3|        13333.333333333334|
|    0|    4|                    8000.0|
|    0|    5|                    8400.0|
|    0|    6|                   11000.0|
|    0|    7|        3333.3333333333335|
|    0|    8|                    2200.0|
|    0|    9|                    1600.0|
|    0|   10|                   10000.0|
|    0|   11|         3889.285714285714|
|    0|   12|                   18000.0|
|    0|   13|                    5500.0|
|    0|   14|                    1500.0|
|    0|   15|         1142.857142857143|
|    0|   16|        11666.666666666666|
|    0|   17|                    3500.0|
|    0|   19|                   10637.5|
|    0|   20|                   19225.0|
|    0|   21|                   11200.0|
|    0|   22|                   23000.0|
|    0|   24|   

In [157]:
check_equality_of_dfs(q5_sql_df, q5_spark_df)

The DataFrames are exactly equal.


# Part 6: Lookup Table & Saving the dataset

## load (save) the cleaned PySpark df and the lookup table to parquet files

In [59]:
def save_df_as_parquet(df: pyspark.sql.dataframe.DataFrame, path: str) -> None:
    """
    Save a PySpark DataFrame to disk in Parquet format.
    
    Args:
    df (DataFrame): Input PySpark DataFrame.
    path (str): Output path for the Parquet file.
    
    Returns:
    None
    """
    df.write.mode("overwrite").parquet(path)
    print(f"DataFrame saved to {path}")

In [60]:
save_df_as_parquet(lagged_fintech_df_4, "./work/fintech_spark_52_1008_clean.parquet")
save_df_as_parquet(GLOBAL_LOOKUP_TABLE, "./work/lookup_spark_52_1008.parquet")

DataFrame saved to ./work/fintech_spark_52_1008_clean.parquet
DataFrame saved to ./work/lookup_spark_52_1008.parquet


In [61]:
def load_df_from_parquet(path: str) -> pyspark.sql.dataframe.DataFrame:
    """
    Load a PySpark DataFrame from disk in Parquet format.
    
    Args:
    path (str): Path to the Parquet file.
    
    Returns:
    DataFrame: Loaded PySpark DataFrame.
    """
    return spark.read.parquet(path)

In [62]:
loaded_fintech_df = load_df_from_parquet("./work/fintech_spark_52_1008_clean.parquet")
loaded_lookup_table = load_df_from_parquet("./work/lookup_spark_52_1008.parquet")

In [63]:
loaded_fintech_df.show(5)

+--------------------+-------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+-------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+------------+-----------------------------------+-------------------------------+---------------------------------------+-----------------------------------------+
|         customer_id|          emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_

In [64]:
loaded_lookup_table.show(5)

+-------------------+---------------+--------------------+
|    original_column| original_value|      encoded_column|
+-------------------+---------------+--------------------+
|verification_status|Source Verified|verification_stat...|
|verification_status|   Not Verified|verification_stat...|
|verification_status|       Verified|verification_stat...|
|     home_ownership|       MORTGAGE|home_ownership_MO...|
|     home_ownership|           RENT| home_ownership_RENT|
+-------------------+---------------+--------------------+
only showing top 5 rows



# BONUS: Loading to Postgres

- Load the cleaned parquet file and lookup table into a Postgres database.
- Take Screenshots showing the newly added features in the feature engineering section
- Take a screenshot from the lookup table

In [65]:
import pandas as pd
from sqlalchemy import create_engine


def save_to_db(df_path: str, table_name: str):
    spark_df = load_df_from_parquet(df_path)
    pd_df = spark_df.toPandas()
    engine = create_engine('postgresql://root:root@pgdatabase:5432/testdb')
    if(engine.connect()):
        print('Connected to Database')
        try:
            print('Writing cleaned dataset to database')
            pd_df.to_sql(table_name, con=engine, if_exists='replace')
            print('Done writing to database')
        except ValueError as vx:
            print('Cleaned Table already exists.')
        except Exception as ex:
            print(ex)
    else:
        print('Failed to connect to Database')

In [66]:
save_to_db(df_path='./work/fintech_spark_52_1008_clean.parquet', table_name="fintech_data")

Connected to Database
Writing cleaned dataset to database
Done writing to database


In [67]:
save_to_db(df_path='./work/lookup_spark_52_1008.parquet', table_name="lookup_table")

Connected to Database
Writing cleaned dataset to database
Done writing to database


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 44808)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =

lookup table screenshot

![image.png](attachment:image.png)

cleaned dataset shot

![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)