from google.colab import drive
drive.mount('/content/drive')

#Imports

In [1]:
# necessay libraries
from pyspark.sql import functions as fn
from pyspark.sql import Window
from datetime import date,timedelta
from random import uniform
import os
from pyspark.sql.functions import when, col, mean, avg, sum

In [2]:
## start the session
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("Mil3").getOrCreate()
# spark context to interact with the driver
sc = spark.sparkContext

#Loading the dataset
- Load the dataset.
- Preview first 20 rows.
- Howmany partitions is this dataframe split into?
- Change partitions to be equal to the number of your logical cores

In [3]:
df=spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/Datasets/fintech_data_29_52_23411.parquet")

In [4]:
df.printSchema()

root
 |-- Customer Id: string (nullable = true)
 |-- Emp Title: string (nullable = true)
 |-- Emp Length: string (nullable = true)
 |-- Home Ownership: string (nullable = true)
 |-- Annual Inc: double (nullable = true)
 |-- Annual Inc Joint: double (nullable = true)
 |-- Verification Status: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- Addr State: string (nullable = true)
 |-- Avg Cur Bal: double (nullable = true)
 |-- Tot Cur Bal: double (nullable = true)
 |-- Loan Id: long (nullable = true)
 |-- Loan Status: string (nullable = true)
 |-- Loan Amount: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Funded Amount: double (nullable = true)
 |-- Term: string (nullable = true)
 |-- Int Rate: double (nullable = true)
 |-- Grade: long (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Pymnt Plan: boolean (nullable = true)
 |-- Type: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Description: string (nullable = t

In [5]:
df.summary().show()

+-------+--------------------+---------+----------+--------------+-----------------+-----------------+-------------------+--------+----------+------------------+------------------+------------------+------------------+-----------------+-----+-----------------+----------+-------------------+------------------+-----------------+----------+-------+-----------------+
|summary|         Customer Id|Emp Title|Emp Length|Home Ownership|       Annual Inc| Annual Inc Joint|Verification Status|Zip Code|Addr State|       Avg Cur Bal|       Tot Cur Bal|           Loan Id|       Loan Status|      Loan Amount|State|    Funded Amount|      Term|           Int Rate|             Grade|       Issue Date|      Type|Purpose|      Description|
+-------+--------------------+---------+----------+--------------+-----------------+-----------------+-------------------+--------+----------+------------------+------------------+------------------+------------------+-----------------+-----+-----------------+--------

In [6]:
df.limit(20).show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|       Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+------------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGZlWFx4ZTBSX.

In [7]:
df.rdd.getNumPartitions()

1

In [8]:
# Change the number of partitions to no. cpu logical cores
num_partitions = os.cpu_count()
df = df.repartition(num_partitions)
df.rdd.getNumPartitions()

2

#Cleaning
- Rename all columns (replacing a space with an underscore, and making it lowercase)
- Detect missing:

  – Createafunctionthat takes in the df and returns any data structure of your choice(df/dict,list,tuple,etc)
  which has the name of the column and percentage of missing entries from the whole dataset.

  – Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.
- Printout the missing info
- Handle missing:– For numerical features replace with 0.– For categorical/strings replace with mode
- Check missing:– Afterwards, check that there are no missing values

##Column Renaming

In [9]:
# Rename columns: replace spaces with underscores and lowercase
df = df.select([fn.col(col).alias(col.replace(' ', '_').lower()) for col in df.columns])
df.limit(5).show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+---------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|     issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+---------------+----------+----------+------------------+--------------------+
|YidceGIzXHhhMDtce...| Senior Art Director|   

##Handeling messing Data

In [10]:
def detect_missing(df):
  missing_info = {}
  total_rows = df.count()
  for col in df.columns:
    missing_count = df.filter(fn.col(col).isNull()).count()
    missing_percentage = (missing_count / total_rows) * 100 if total_rows > 0 else 0
    missing_info[col] = missing_percentage
  return missing_info

In [11]:
missing_values = detect_missing(df)
missing_values

{'customer_id': 0.0,
 'emp_title': 9.019607843137255,
 'emp_length': 7.114317425083241,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 92.91897891231964,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 4.3248242693303744,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.8879023307436182}

In [12]:
df.dtypes

[('customer_id', 'string'),
 ('emp_title', 'string'),
 ('emp_length', 'string'),
 ('home_ownership', 'string'),
 ('annual_inc', 'double'),
 ('annual_inc_joint', 'double'),
 ('verification_status', 'string'),
 ('zip_code', 'string'),
 ('addr_state', 'string'),
 ('avg_cur_bal', 'double'),
 ('tot_cur_bal', 'double'),
 ('loan_id', 'bigint'),
 ('loan_status', 'string'),
 ('loan_amount', 'double'),
 ('state', 'string'),
 ('funded_amount', 'double'),
 ('term', 'string'),
 ('int_rate', 'double'),
 ('grade', 'bigint'),
 ('issue_date', 'string'),
 ('pymnt_plan', 'boolean'),
 ('type', 'string'),
 ('purpose', 'string'),
 ('description', 'string')]

In [13]:
# Identify numerical and categorical columns
categorical_cols = [col_name for col_name, col_type in df.dtypes if col_type in ('string', 'boolean')]
numerical_cols = [col_name for col_name, col_type in df.dtypes if not col_type in ('string', 'boolean')]

In [14]:
# Handle missing
df_no_messing=df.fillna( 0, subset=numerical_cols)

for col_name in categorical_cols:
    mode = df.filter(fn.col(col_name).isNotNull()).groupBy(col_name).count().orderBy(col('count').desc()).first()[0]
    #print(f"Mode for column '{col_name}': {mode}")
    df_no_messing = df_no_messing.fillna(mode, subset=[col_name])

In [15]:
# check messing again
missing_values_after = detect_missing(df_no_messing)
missing_values_after

{'customer_id': 0.0,
 'emp_title': 0.0,
 'emp_length': 0.0,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 0.0,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 0.0,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.0}

##Checking for Inconsestancy

In [16]:
exclude_cols = ['customer_id', 'emp_title','zip_code','issue_date','purpose','description']
for col in categorical_cols:
  #if col not in exclude_cols:
    distinct_values = [row[col] for row in df_no_messing.select(col).distinct().collect()]
    print(f"Distinct values for {col}: {distinct_values}")

Output hidden; open in https://colab.research.google.com to view.

In [17]:
# Form milestone 1 we know that the type can be Individual,JOINT or DIRECT_PAY so there is inconsistantncy here
df_no_messing=df_no_messing.withColumn("type", when(df_no_messing["type"] == "Joint App", "JOINT").otherwise(df_no_messing["type"]))
distinct_values = [row['type'] for row in df_no_messing.select('type').distinct().collect()]
print(f"Distinct values for type: {distinct_values}")

Distinct values for type: ['Individual', 'DIRECT_PAY', 'JOINT', 'INDIVIDUAL']


In [18]:
def unique(df):
  #exclude_cols = ['customer_id', 'emp_title','zip_code','issue_date','purpose','description']
  distinct_info = []  # Initialize an empty list to store distinct value counts

  for col in categorical_cols:
    #if col not in exclude_cols:
      num_distinct = df.select(fn.countDistinct(col)).collect()[0][0]  # Calculate distinct values
      distinct_info.append((col, num_distinct))  # Add to the list

  # Create a DataFrame from the list of distinct value counts
  df_distinct = spark.createDataFrame(distinct_info, ["Column", "NumDistinct"])
  return df_distinct


In [19]:
unique(df_no_messing).show()

+-------------------+-----------+
|             Column|NumDistinct|
+-------------------+-----------+
|        customer_id|      27030|
|          emp_title|      13211|
|         emp_length|         11|
|     home_ownership|          5|
|verification_status|          3|
|           zip_code|        853|
|         addr_state|         50|
|        loan_status|          7|
|              state|         50|
|               term|          2|
|         issue_date|         89|
|         pymnt_plan|          2|
|               type|          4|
|            purpose|         13|
|        description|        816|
+-------------------+-----------+



In [20]:
columns_to_lowercase = ['zip_code','emp_title','type', 'description']
for column in columns_to_lowercase:
    df_no_messing = df_no_messing.withColumn(column, fn.lower(df_no_messing[column]))

In [21]:
unique(df_no_messing).show()

+-------------------+-----------+
|             Column|NumDistinct|
+-------------------+-----------+
|        customer_id|      27030|
|          emp_title|      11726|
|         emp_length|         11|
|     home_ownership|          5|
|verification_status|          3|
|           zip_code|        853|
|         addr_state|         50|
|        loan_status|          7|
|              state|         50|
|               term|          2|
|         issue_date|         89|
|         pymnt_plan|          2|
|               type|          3|
|            purpose|         13|
|        description|        701|
+-------------------+-----------+



##Change to Date format

In [22]:
df_no_messing=df_no_messing.withColumn("issue_date", fn.to_date(df_no_messing["issue_date"], "dd MMMM yyyy"))
df_no_messing.limit(5).show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+
|YidceGIzXHhhMDtce...| senior art director|    1 year|       

#Encoding

 Encode only the following categorical values

1.  EmpLength: Change to numerical
2.   Verification Status: One Hot Encoding

1.   HomeOwnership: One Hot Encoding
2.   State: Label Encoding

1.   Type: One Hot Encoding
2.   Purpose: Label Encoding

1.   For the grade, only descretize it to be letter grade, no need to label encode it further


 DONOT Encode the employment title or description or any other column that is not mentioned above

In [23]:
from pyspark.sql.functions import when, col, mean, avg, sum

In [24]:
from pyspark.ml.feature import StringIndexer

def label_encode(df, column):
    indexer = StringIndexer(inputCol=column, outputCol=column + "_encoded")
    indexed = indexer.fit(df).transform(df)
    indexed = indexed.withColumn(column + "_encoded", fn.col(column + "_encoded").cast("int"))
    #df = indexed.drop(column)
    return indexed

In [25]:
def one_hot_encode(df, column):
  # Get unique categories in the column
  unique_values = [row[column] for row in df.select(column).distinct().collect()]
  for value in unique_values:
    df = df.withColumn(f"{column}_{value}", when(fn.col(column)== value, 1).otherwise (0))
  final=df.drop(column)
  return final

##EmpLength

In [26]:
df_encoded = df_no_messing.withColumn("emp_length_encoded", when(fn.col("emp_length") == "< 1 year", 0).otherwise(fn.col("emp_length")))
df_encoded=df_encoded.withColumn("emp_length_encoded", fn.regexp_extract(df_encoded["emp_length_encoded"], r"(\d+)", 1).cast("int"))
df_encoded.limit(5).show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|         description|emp_length_encoded|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+
|Yid

##Verification Status

In [27]:
df_encoded = one_hot_encode(df_encoded, "verification_status")
df_encoded.limit(5).show()

+--------------------+--------------------+----------+--------------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|         description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|
+--------------------+--------------------+----------+--------------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-

##home_ownership

In [28]:
df_encoded = one_hot_encode(df_encoded, "home_ownership")
df_encoded.limit(5).show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|         description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownership_OTHER|
+---

##state

In [29]:
df_encoded=label_encode(df_encoded, "state")
df_encoded.limit(5).show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|         description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownersh

##type

In [30]:
df_encoded = one_hot_encode(df_encoded, "type")
df_encoded.limit(5).show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+------------------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|           purpose|         description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_owner

##purpose

In [31]:
df_encoded=label_encode(df_encoded, "purpose")
df_encoded.limit(5).show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+------------------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+---------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|           purpose|         description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|home_ownership_OWN|home_ownership_RENT|home_ownership_MOR

##grade

In [32]:
from pyspark.ml.feature import Bucketizer

# Define bin edges and labels from milstone 1
bins = [0, 5, 10, 15, 20, 25, 30, 35]
labels = ['A', 'B', 'C', 'D', 'E', 'F', 'G']

bucketizer = Bucketizer(splits=bins, inputCol="grade", outputCol="Grade_Bucket")

df_bucketed = bucketizer.transform(df_encoded)

df_letter_grade = df_bucketed.withColumn(
    "letter_grade",
    when(col("Grade_Bucket") == 0.0, labels[0])
    .when(col("Grade_Bucket") == 1.0, labels[1])
    .when(col("Grade_Bucket") == 2.0, labels[2])
    .when(col("Grade_Bucket") == 3.0, labels[3])
    .when(col("Grade_Bucket") == 4.0, labels[4])
    .when(col("Grade_Bucket") == 5.0, labels[5])
    .when(col("Grade_Bucket") == 6.0, labels[6])
    .otherwise("Unknown")
)

df_encoded = df_letter_grade.drop("Grade_Bucket")

In [33]:
df_encoded.limit(2).show()

+--------------------+-------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+------------------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+---------------+------------+
|         customer_id|          emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|           purpose|         description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|home_ownership_OWN|home_ownership_RENT|home_ow

#Feature Engineering

Write a function that adds the 3 following features. Try as much as you can to use built in functions in
 PySpark from the functions library (check lab 8). Avoid writing UDFs from scratch.

*   Previous loan issue date from the same grade
*   Previous Loan amount from the same grade


*   Previous loan date from the same state and grade combined
*   Previous loan amount from the same state and grade combined




In [34]:
from pyspark.sql import Window

def add_previous_loan_features(df):

  # Window specification for previous loan based on grade
  window_grade = Window.partitionBy("letter_grade").orderBy("issue_date")

  # Window specification for previous loan based on state and grade
  window_state_grade = Window.partitionBy("state_encoded", "letter_grade").orderBy("issue_date")

  # Add previous loan features based on grade
  df = df.withColumn("prev_loan_issue_date_grade", fn.lag("issue_date", 1).over(window_grade))
  df = df.withColumn("prev_loan_amount_grade", fn.lag("loan_amount", 1).over(window_grade))

  # Add previous loan features based on state and grade
  df = df.withColumn("prev_loan_issue_date_state_grade", fn.lag("issue_date", 1).over(window_state_grade))
  df = df.withColumn("prev_loan_amount_state_grade", fn.lag("loan_amount", 1).over(window_state_grade))

  return df

In [35]:
df_engineered=add_previous_loan_features(df_encoded)
df_engineered.limit(5).show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+------------------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+---------------+------------+--------------------------+----------------------+--------------------------------+----------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|           purpose|         description|emp_length_encoded|verification_status_Verified|v

#Analysis SQL vs Spark
 Answer each of the following questions using both SQL and Spark:
 1. Identify the average loan amount and interest rate for loans marked as ”Default” in the Loan Status,
 grouped by Emp Length and annual income ranges.
 Hint: Use SQL Cases to bin Annual Income into Income Ranges
 2. Calculate the average difference between Loan Amount and Funded Amount for each loan Grade and
 sort by the grades with the largest differences.
 3. Compare the total Loan Amount for loans with ”Verified” and ”Not Verified” Verification Status across
 each state (Addr State).
 4. Calculate the average time gap (in days) between consecutive loans for each grade using the new
 features you added in the feature engineering phase
 5. Identify the average difference in loan amounts between consecutive loans within the same state and
 grade combination.

In [36]:
#Temp Views
df_no_messing.createOrReplaceTempView("df_no_messing")
df_encoded.createOrReplaceTempView("df_encoded")
df_engineered.createOrReplaceTempView("df_engineered")

##Query 1

In [37]:
#SQL
query = """
SELECT
    emp_length,
    CASE
        WHEN annual_inc BETWEEN 0 AND 50000 THEN '0-50k'
        WHEN annual_inc BETWEEN 50001 AND 100000 THEN '50k-100k'
        WHEN annual_inc BETWEEN 100001 AND 150000 THEN '100k-150k'
        ELSE '>150k'
    END AS income_range,
    AVG(loan_amount) AS avg_loan_amount,
    AVG(int_rate) AS avg_interest_rate
FROM
    df_no_messing
WHERE
    loan_status = 'Default'
GROUP BY
    emp_length,
    income_range;
"""
spark.sql(query).show()

+----------+------------+---------------+-----------------+
|emp_length|income_range|avg_loan_amount|avg_interest_rate|
+----------+------------+---------------+-----------------+
|   3 years|   100k-150k|        35000.0|           0.1344|
|    1 year|       0-50k|         4200.0|           0.1825|
| 10+ years|       0-50k|        13925.0|           0.2215|
+----------+------------+---------------+-----------------+



In [38]:
#Spark
default_loans_df = df_no_messing.filter(col("loan_status") == "Default") \
    .withColumn("income_range",
                when(col("annual_inc").between(0, 50000), "0-50k")
                .when(col("annual_inc").between(50001, 100000), "50k-100k")
                .when(col("annual_inc").between(100001, 150000), "100k-150k")
                .otherwise(">150k")) \
    .groupBy("emp_length", "income_range") \
    .agg(avg("loan_amount").alias("avg_loan_amount"),
         avg("int_rate").alias("avg_interest_rate"))
default_loans_df.show()

+----------+------------+---------------+-----------------+
|emp_length|income_range|avg_loan_amount|avg_interest_rate|
+----------+------------+---------------+-----------------+
|   3 years|   100k-150k|        35000.0|           0.1344|
|    1 year|       0-50k|         4200.0|           0.1825|
| 10+ years|       0-50k|        13925.0|           0.2215|
+----------+------------+---------------+-----------------+



In [39]:
df_no_messing.filter(col("loan_status") == "Default").show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+
|YidUeyVOXHg5Nlx4Z...|pharmacy technician |    1 year|       

Only 3 Rows

##Query 2

In [40]:
#SQL
query = """
SELECT
    letter_grade,
    AVG(loan_amount - funded_amount) AS avg_diff
FROM
    df_encoded
GROUP BY
    letter_grade
ORDER BY
    avg_diff DESC;
"""

spark.sql(query).show()

+------------+--------+
|letter_grade|avg_diff|
+------------+--------+
|           F|     0.0|
|           E|     0.0|
|           B|     0.0|
|           D|     0.0|
|           C|     0.0|
|           A|     0.0|
|           G|     0.0|
+------------+--------+



In [41]:
#Spark
avg_diff_df = df_encoded.groupBy("letter_grade") \
    .agg(avg(col("loan_amount") - col("funded_amount")).alias("avg_diff")) \
    .orderBy(col("avg_diff").desc())
avg_diff_df.show()

+------------+--------+
|letter_grade|avg_diff|
+------------+--------+
|           F|     0.0|
|           E|     0.0|
|           B|     0.0|
|           D|     0.0|
|           C|     0.0|
|           A|     0.0|
|           G|     0.0|
+------------+--------+



##Query 3

In [42]:
#SQL
query = """
SELECT
    state,
    verification_status,
    SUM(loan_amount) AS total_loan_amount
FROM
    df_no_messing
GROUP BY
    state,
    verification_status;
"""
spark.sql(query).show()

+-----+-------------------+-----------------+
|state|verification_status|total_loan_amount|
+-----+-------------------+-----------------+
|   MS|       Not Verified|         443475.0|
|   GA|    Source Verified|        5815225.0|
|   NJ|       Not Verified|        4876500.0|
|   KS|           Verified|         680150.0|
|   WV|           Verified|         585575.0|
|   MN|       Not Verified|        2314575.0|
|   RI|           Verified|         359575.0|
|   CO|       Not Verified|        3004125.0|
|   MT|           Verified|         372475.0|
|   ME|           Verified|         232275.0|
|   TX|           Verified|        9968975.0|
|   WI|       Not Verified|        1596150.0|
|   NY|           Verified|        9574750.0|
|   AK|       Not Verified|         545950.0|
|   SD|           Verified|         229225.0|
|   MA|           Verified|        2682800.0|
|   IN|       Not Verified|        2263025.0|
|   NC|           Verified|        3328000.0|
|   WA|       Not Verified|       

In [43]:
#spark
total_loan_amount_df = df_no_messing.groupBy("state", "verification_status") \
    .agg(sum("loan_amount").alias("total_loan_amount"))
total_loan_amount_df.show()

+-----+-------------------+-----------------+
|state|verification_status|total_loan_amount|
+-----+-------------------+-----------------+
|   MS|       Not Verified|         443475.0|
|   GA|    Source Verified|        5815225.0|
|   NJ|       Not Verified|        4876500.0|
|   KS|           Verified|         680150.0|
|   WV|           Verified|         585575.0|
|   MN|       Not Verified|        2314575.0|
|   RI|           Verified|         359575.0|
|   CO|       Not Verified|        3004125.0|
|   MT|           Verified|         372475.0|
|   ME|           Verified|         232275.0|
|   TX|           Verified|        9968975.0|
|   WI|       Not Verified|        1596150.0|
|   NY|           Verified|        9574750.0|
|   AK|       Not Verified|         545950.0|
|   SD|           Verified|         229225.0|
|   MA|           Verified|        2682800.0|
|   IN|       Not Verified|        2263025.0|
|   NC|           Verified|        3328000.0|
|   WA|       Not Verified|       

##Query 4

In [44]:
#SQL
query="""
SELECT
    letter_grade,
    AVG(DATEDIFF(issue_date, prev_loan_issue_date_grade)) AS avg_time_gap
FROM
    df_engineered
WHERE
    prev_loan_issue_date_grade IS NOT NULL
GROUP BY
    letter_grade;
"""
spark.sql(query).show()

+------------+-------------------+
|letter_grade|       avg_time_gap|
+------------+-------------------+
|           A| 0.5857329842931938|
|           B| 0.3617135928869729|
|           C|0.34725814795654425|
|           D| 0.5806660899653979|
|           E| 1.4494811578372475|
|           F|  3.721036585365854|
|           G| 11.647398843930636|
+------------+-------------------+



In [45]:
from pyspark.sql.functions import datediff, avg

df_result = df_engineered.filter(col("prev_loan_issue_date_grade").isNotNull()) \
    .groupBy("letter_grade") \
    .agg(avg(datediff("issue_date", "prev_loan_issue_date_grade")).alias("avg_time_gap(Days)"))
df_result.show()

+------------+-------------------+
|letter_grade| avg_time_gap(Days)|
+------------+-------------------+
|           A| 0.5857329842931938|
|           B| 0.3617135928869729|
|           C|0.34725814795654425|
|           D| 0.5806660899653979|
|           E| 1.4494811578372475|
|           F|  3.721036585365854|
|           G| 11.647398843930636|
+------------+-------------------+



##Query 5

In [46]:
query="""
SELECT
    state,
    letter_grade,
    AVG(loan_amount - prev_loan_amount_state_grade) AS avg_diff
FROM
    df_engineered
WHERE
    prev_loan_amount_state_grade IS NOT NULL
GROUP BY
    state_encoded,
    state,
    letter_grade
ORDER BY
    state_encoded;
"""
spark.sql(query).show()

+-----+------------+-------------------+
|state|letter_grade|           avg_diff|
+-----+------------+-------------------+
|   CA|           A|-12.801204819277109|
|   CA|           B|-16.599597585513077|
|   CA|           C|-16.901750972762645|
|   CA|           D|-32.414910858995135|
|   CA|           E|  -60.3448275862069|
|   CA|           F| -54.57746478873239|
|   CA|           G|             -184.0|
|   NY|           A| 53.513513513513516|
|   NY|           B| 12.691131498470948|
|   NY|           C| -8.265139116202946|
|   NY|           D| -43.19371727748691|
|   NY|           E| -86.60130718954248|
|   NY|           F|  301.2987012987013|
|   NY|           G|  117.6470588235294|
|   TX|           A|  1.112565445026178|
|   TX|           B|   3.33889816360601|
|   TX|           C|    6.0882800608828|
|   TX|           D| 10.519480519480519|
|   TX|           E| -38.32116788321168|
|   TX|           F| -72.72727272727273|
+-----+------------+-------------------+
only showing top

In [47]:
avg_diff_df = df_engineered.filter(col("prev_loan_amount_state_grade").isNotNull()) \
    .groupBy("state", "letter_grade","state_encoded") \
    .agg(avg(col("loan_amount") - col("prev_loan_amount_state_grade")).alias("avg_diff")).orderBy("state_encoded")
avg_diff_df.select("state", "letter_grade","avg_diff").show()

+-----+------------+-------------------+
|state|letter_grade|           avg_diff|
+-----+------------+-------------------+
|   CA|           A|-12.801204819277109|
|   CA|           B|-16.599597585513077|
|   CA|           C|-16.901750972762645|
|   CA|           D|-32.414910858995135|
|   CA|           E|  -60.3448275862069|
|   CA|           F| -54.57746478873239|
|   CA|           G|             -184.0|
|   NY|           A| 53.513513513513516|
|   NY|           B| 12.691131498470948|
|   NY|           C| -8.265139116202946|
|   NY|           D| -43.19371727748691|
|   NY|           E| -86.60130718954248|
|   NY|           F|  301.2987012987013|
|   NY|           G|  117.6470588235294|
|   TX|           A|  1.112565445026178|
|   TX|           B|   3.33889816360601|
|   TX|           C|    6.0882800608828|
|   TX|           D| 10.519480519480519|
|   TX|           E| -38.32116788321168|
|   TX|           F| -72.72727272727273|
+-----+------------+-------------------+
only showing top

#Lookup Table & Saving the dataset
 - Create a lookup table for the encodings only
 - Finally load (save) the cleaned PySpark df and the lookup table to parquet files

In [48]:
df_engineered.limit(2).show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+------------------+------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+---------------+------------+--------------------------+----------------------+--------------------------------+----------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|           purpose|       description|emp_length_encoded|verification_status_Verified|verif

##Lookup Table

In [49]:
from pyspark.sql.types import StructType, StructField, StringType
def create_lookup_table(df_encoded,columns,encoded_cols):
  # Define the schema
  schema = StructType([
      StructField("Column", StringType(), True),
      StructField("Original", StringType(), True),
      StructField("Imputed", StringType(), True)
  ])
  lookup_table = spark.createDataFrame([], schema)

  # Loop through each pair of columns
  for col, col_encoded in zip(Columns, encoded_cols):
        temp_df = (
            df_encoded.select(fn.col(col).alias("Original"),fn.col(col_encoded).alias("Imputed"))
            .dropDuplicates(["Original", "Imputed"])  # Ensure distinct mappings
            .withColumn("Column", fn.lit(col)).orderBy("Imputed")  # Add the column name
        )

        lookup_table = lookup_table.union(temp_df.select('Column',"Original", "Imputed"))

  return lookup_table

In [50]:
Columns=["emp_length","state","grade", "purpose"]
encoded_cols = ["emp_length_encoded","state_encoded","letter_grade","purpose_encoded"]
lookup_table=create_lookup_table(df_encoded,Columns,encoded_cols)
lookup_table.show()

+----------+---------+-------+
|    Column| Original|Imputed|
+----------+---------+-------+
|emp_length| < 1 year|      0|
|emp_length|   1 year|      1|
|emp_length|  2 years|      2|
|emp_length|  3 years|      3|
|emp_length|  4 years|      4|
|emp_length|  5 years|      5|
|emp_length|  6 years|      6|
|emp_length|  7 years|      7|
|emp_length|  8 years|      8|
|emp_length|  9 years|      9|
|emp_length|10+ years|     10|
|     state|       CA|      0|
|     state|       NY|      1|
|     state|       TX|      2|
|     state|       FL|      3|
|     state|       IL|      4|
|     state|       PA|      5|
|     state|       NJ|      6|
|     state|       OH|      7|
|     state|       GA|      8|
+----------+---------+-------+
only showing top 20 rows



##Dropping unneccessary excess columns


In [51]:
df_engineered.limit(2).show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+------------------+------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+---------------+------------+--------------------------+----------------------+--------------------------------+----------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|           purpose|       description|emp_length_encoded|verification_status_Verified|verif

In [52]:
#The ones that were encoded
df_final=df_engineered.drop("emp_length","state","grade","purpose")
df_final.limit(2).show()

+--------------------+--------------------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+----------+----------+------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+---------------+------------+--------------------------+----------------------+--------------------------------+----------------------------+
|         customer_id|           emp_title|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|issue_date|pymnt_plan|       description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|home_ownership_OWN|h

##Saving

In [53]:
# Save cleaned DataFrame (df_engineered)
df_final.write.parquet("/content/drive/MyDrive/Colab Notebooks/Datasets/fintech_spark_52_23411_clean.parquet", mode="overwrite")

# Save lookup table DataFrame (lookup_table_df)
lookup_table.write.parquet("/content/drive/MyDrive/Colab Notebooks/Datasets/lookup_52_23411.parquet", mode="overwrite")

print("Cleaned DataFrame and lookup table saved to Parquet files.")

Cleaned DataFrame and lookup table saved to Parquet files.


In [54]:
lookup_table.show(n=1000, truncate=False)

+----------+------------------+-------+
|Column    |Original          |Imputed|
+----------+------------------+-------+
|emp_length|< 1 year          |0      |
|emp_length|1 year            |1      |
|emp_length|2 years           |2      |
|emp_length|3 years           |3      |
|emp_length|4 years           |4      |
|emp_length|5 years           |5      |
|emp_length|6 years           |6      |
|emp_length|7 years           |7      |
|emp_length|8 years           |8      |
|emp_length|9 years           |9      |
|emp_length|10+ years         |10     |
|state     |CA                |0      |
|state     |NY                |1      |
|state     |TX                |2      |
|state     |FL                |3      |
|state     |IL                |4      |
|state     |PA                |5      |
|state     |NJ                |6      |
|state     |OH                |7      |
|state     |GA                |8      |
|state     |NC                |9      |
|state     |VA                |10     |


In [55]:
df_test=spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/Datasets/fintech_spark_52_23411_clean.parquet")
df_test.show()

+--------------------+--------------------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+----------+----------+--------------------+------------------+----------------------------+-----------------------------------+--------------------------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------+----------+---------------+---------------+---------------+------------+--------------------------+----------------------+--------------------------------+----------------------------+
|         customer_id|           emp_title|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|issue_date|pymnt_plan|         description|emp_length_encoded|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|home_ownership_O