In [1]:
from google.colab import files
uploaded = files.upload()

Saving fintech_data_30_52_3224.parquet to fintech_data_30_52_3224.parquet


In [36]:
from pyspark.sql import SparkSession
#Load the dataset.
spark = SparkSession.builder.appName("Milestone3").getOrCreate()
df = spark.read.parquet("fintech_data_30_52_3224.parquet")
df.printSchema()
df.summary().show()

root
 |-- Customer Id: string (nullable = true)
 |-- Emp Title: string (nullable = true)
 |-- Emp Length: string (nullable = true)
 |-- Home Ownership: string (nullable = true)
 |-- Annual Inc: double (nullable = true)
 |-- Annual Inc Joint: double (nullable = true)
 |-- Verification Status: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- Addr State: string (nullable = true)
 |-- Avg Cur Bal: double (nullable = true)
 |-- Tot Cur Bal: double (nullable = true)
 |-- Loan Id: long (nullable = true)
 |-- Loan Status: string (nullable = true)
 |-- Loan Amount: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Funded Amount: double (nullable = true)
 |-- Term: string (nullable = true)
 |-- Int Rate: double (nullable = true)
 |-- Grade: long (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Pymnt Plan: boolean (nullable = true)
 |-- Type: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Description: string (nullable = t

In [37]:
#Preview first 20 rows.
df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGFjblx4YzVeX...|  Principal Examin

In [38]:
#How many partitions is this dataframe split into?
print(df.rdd.getNumPartitions())

1


In [39]:
import os
#Change partitions to be equal to the number of your logical cores
num_of_logical_cores = os.cpu_count()
df = df.repartition(num_of_logical_cores)

In [40]:
print(df.rdd.getNumPartitions())

2


In [41]:
#Rename all columns (replacing a space with an underscore, and making it lowercase)
df = df.selectExpr([f"`{col}` as {col.lower().replace(' ', '_')}" for col in df.columns])

In [42]:
df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGRiIFx4Zjh3X...|                NU

In [43]:
#Create a function that takes in the df and returns any data structure of your choice(df/dict,list,tuple,etc)
#which has the name of the column and percentage of missing entries from the whole dataset.
def detect_missing(df):
    return {col: (df.filter(df[col].isNull()).count() / df.count()) * 100 for col in df.columns}

missing_info = detect_missing(df)
#Printout the missing info
print(missing_info)

{'customer_id': 0.0, 'emp_title': 8.809056938843463, 'emp_length': 7.066484146657294, 'home_ownership': 0.0, 'annual_inc': 0.0, 'annual_inc_joint': 93.08520478005106, 'verification_status': 0.0, 'zip_code': 0.0, 'addr_state': 0.0, 'avg_cur_bal': 0.0, 'tot_cur_bal': 0.0, 'loan_id': 0.0, 'loan_status': 0.0, 'loan_amount': 0.0, 'state': 0.0, 'funded_amount': 0.0, 'term': 0.0, 'int_rate': 4.435976173739317, 'grade': 0.0, 'issue_date': 0.0, 'pymnt_plan': 0.0, 'type': 0.0, 'purpose': 0.0, 'description': 0.8398386917755005}


In [44]:
#Handle missing:
from pyspark.sql.functions import col

# 1. Replace numeric columns with 0
numeric_columns = ["annual_inc_joint", "int_rate"]
df = df.fillna({col_name: 0 for col_name in numeric_columns})

# 2. Replace categorical columns with their mode
categorical_columns = ["emp_title", "emp_length", "description"]
for col_name in categorical_columns:
    mode_df = df.groupBy(col_name).count().orderBy(col('count').desc())
    mode_values = mode_df.select(col_name).collect()

    # Get the first mode (if available)
    first_mode = mode_values[0][0] if len(mode_values) > 0 else None
    # Get the second mode if the first mode is None
    second_mode = mode_values[1][0] if len(mode_values) > 1 else "unknown"

    # Use first mode if it's not None, otherwise use second mode
    mode_value = first_mode if first_mode is not None else second_mode

    print(f"Replacing missing values in {col_name} with mode: {mode_value}")
    df = df.fillna(value=mode_value, subset=[col_name])
#The emp_title feature is replaced with the second mode because the first mode in None

Replacing missing values in emp_title with mode: Teacher
Replacing missing values in emp_length with mode: 10+ years
Replacing missing values in description with mode: Debt consolidation


In [45]:
#Afterwards, check that there are no missing values
missing_info = detect_missing(df)
print(missing_info)

{'customer_id': 0.0, 'emp_title': 0.0, 'emp_length': 0.0, 'home_ownership': 0.0, 'annual_inc': 0.0, 'annual_inc_joint': 0.0, 'verification_status': 0.0, 'zip_code': 0.0, 'addr_state': 0.0, 'avg_cur_bal': 0.0, 'tot_cur_bal': 0.0, 'loan_id': 0.0, 'loan_status': 0.0, 'loan_amount': 0.0, 'state': 0.0, 'funded_amount': 0.0, 'term': 0.0, 'int_rate': 0.0, 'grade': 0.0, 'issue_date': 0.0, 'pymnt_plan': 0.0, 'type': 0.0, 'purpose': 0.0, 'description': 0.0}


In [46]:
df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGRiIFx4Zjh3X...|             Teach

In [47]:
#Encoding:
from pyspark.sql.functions import col, when, regexp_extract
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import lower


#Emp Length: Change to numerical
emp_length_temp = df.select("emp_length")
df = df.withColumn("emp_length_temp", emp_length_temp["emp_length"])
df = df.withColumn(
    "emp_length",
    when(col("emp_length").like("< 1 year"), 0.5)
    .when(col("emp_length").like("1 year"), 1)
    .when(col("emp_length").like("2 years"), 2)
    .when(col("emp_length").like("3 years"), 3)
    .when(col("emp_length").like("4 years"), 4)
    .when(col("emp_length").like("5 years"), 5)
    .when(col("emp_length").like("6 years"), 6)
    .when(col("emp_length").like("7 years"), 7)
    .when(col("emp_length").like("8 years"), 8)
    .when(col("emp_length").like("9 years"), 9)
    .when(col("emp_length").like("10+ years"), 10)
)
#State: Label Encoding
state_indexer = StringIndexer(inputCol="state", outputCol="state_encoded")
df = state_indexer.fit(df).transform(df)

# Convert values in the "home_ownership" column to lowercase because there are INDIVIDUAL and Individual categories, so this let them both individual
df = df.withColumn("type", lower(col("type")))

#Home Ownership, Verification Status, and Type: One Hot Encoding
columns_to_encode = ["home_ownership", "verification_status", "type"]

for column in columns_to_encode:
    unique_values = [row[0] for row in df.select(column).distinct().collect()]

    for value in unique_values:
        new_col_name = f"{column}_{value}"
        df = df.withColumn(
            new_col_name,
            when(col(column) == value, 1).otherwise(0)
        )

# Purpose: Label Encoding
purpose_indexer = StringIndexer(inputCol="purpose", outputCol="purpose_encoded")
df = purpose_indexer.fit(df).transform(df)

#Grade - Discretize to letter grade
df = df.withColumn(
    "grade_letter",
    when((col("grade") >= 1) & (col("grade") <= 5), "A")
    .when((col("grade") >= 6) & (col("grade") <= 10), "B")
    .when((col("grade") >= 11) & (col("grade") <= 15), "C")
    .when((col("grade") >= 16) & (col("grade") <= 20), "D")
    .when((col("grade") >= 21) & (col("grade") <= 25), "E")
    .when((col("grade") >= 26) & (col("grade") <= 30), "F")
    .when((col("grade") >= 31) & (col("grade") <= 35), "G")
)

# Standardize column names
df = df.selectExpr([f"`{col}` as {col.lower().replace(' ', '_')}" for col in df.columns])
df.show(20)


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+---------------+-------------+------------------+-------------------+-----------------------+------------------+-------------------+----------------------------+-----------------------------------+--------------------------------+----------+---------------+--------------+---------------+---------------+------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|emp_length_temp|state_encoded|home_ownership

In [48]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, last

# Create window specifications
window_by_grade = Window.partitionBy("grade").orderBy("issue_date")
window_by_state_grade = Window.partitionBy("state", "grade").orderBy("issue_date")

# Add the required features
df = df.withColumn("prev_loan_issue_date_grade", lag("issue_date").over(window_by_grade)) \
       .withColumn("prev_loan_amount_grade", lag("loan_amount").over(window_by_grade)) \
       .withColumn("prev_loan_issue_date_state_grade", lag("issue_date").over(window_by_state_grade)) \
       .withColumn("prev_loan_amount_state_grade", lag("loan_amount").over(window_by_state_grade))


# Preview the updated DataFrame
df.select("state", "grade", "issue_date", "prev_loan_issue_date_grade", "prev_loan_amount_grade", "prev_loan_issue_date_state_grade", "prev_loan_amount_state_grade").show(20)

+-----+-----+-----------------+--------------------------+----------------------+--------------------------------+----------------------------+
|state|grade|       issue_date|prev_loan_issue_date_grade|prev_loan_amount_grade|prev_loan_issue_date_state_grade|prev_loan_amount_state_grade|
+-----+-----+-----------------+--------------------------+----------------------+--------------------------------+----------------------------+
|   AK|    1|  16 January 2016|           16 January 2016|                9600.0|                            NULL|                        NULL|
|   AK|    1| 18 November 2018|          18 November 2018|               24000.0|                 16 January 2016|                     20000.0|
|   AK|    2|     14 July 2014|              14 July 2014|               16000.0|                            NULL|                        NULL|
|   AK|    2|  18 October 2018|           18 October 2018|               13000.0|                    14 July 2014|                     1

In [49]:
df.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+---------------+-------------+------------------+-------------------+-----------------------+------------------+-------------------+----------------------------+-----------------------------------+--------------------------------+----------+---------------+--------------+---------------+---------------+------------+--------------------------+----------------------+--------------------------------+----------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue

In [50]:
count_value = df.filter(df['loan_status'] == 'Default').count()
print(f"Count of 'Default': {count_value}")

Count of 'Default': 1


In [51]:
min = df.agg({"annual_inc": "min"}).collect()[0][0]
max = df.agg({"annual_inc": "max"}).collect()[0][0]
print(f"Min: {min}, Max: {max}")

Min: 34.0, Max: 6200000.0


In [52]:
#Identify the average loan amount and interest rate for loans marked as ”Default” in the Loan Status,
#grouped by Emp Length and annual income ranges

#SQL:
df.createOrReplaceGlobalTempView("loans")
average = spark.sql("""
SELECT
    emp_length,
    CASE
        WHEN annual_inc < 10000 THEN 'Low'
        WHEN annual_inc BETWEEN 10000 AND 50000 THEN 'Mid-low'
        WHEN annual_inc BETWEEN 50000 AND 500000 THEN 'Medium'
        WHEN annual_inc BETWEEN 500000 AND 1000000 THEN 'Mid-High'
        WHEN annual_inc BETWEEN 1000000 AND 5000000 THEN 'High'
        ELSE 'Very High'

    END AS income_range,
    AVG(loan_amount) AS avg_loan_amount,
    AVG(int_rate) AS avg_interest_rate
FROM global_temp.loans
WHERE loan_status = 'Default'
GROUP BY emp_length, income_range;""")
average.show()


+----------+------------+---------------+-----------------+
|emp_length|income_range|avg_loan_amount|avg_interest_rate|
+----------+------------+---------------+-----------------+
|       1.0|    Mid-High|        35000.0|           0.2589|
+----------+------------+---------------+-----------------+



In [53]:
#Spark:
from pyspark.sql.functions import when, avg, col

# Add an 'income_range' column using a CASE-like logic
loans_with_income_range = df.withColumn(
    "income_range",
    when(col("annual_inc") < 10000, "Low")
    .when((col("annual_inc") >= 10000) & (col("annual_inc") <= 50000), "Mid-low")
    .when((col("annual_inc") > 50000) & (col("annual_inc") <= 500000), "Midium")
    .when((col("annual_inc") > 500000) & (col("annual_inc") <= 1000000), "Mid-High")
    .when((col("annual_inc") > 1000000) & (col("annual_inc") <= 5000000), "High")
    .otherwise("Very High")
)

# Filter rows where loan_status = 'Default'
filtered_loans = loans_with_income_range.filter(col("loan_status") == "Default")

# Group by 'emp_length' and 'income_range', then calculate averages
average = filtered_loans.groupBy("emp_length", "income_range").agg(
    avg("loan_amount").alias("avg_loan_amount"),
    avg("int_rate").alias("avg_interest_rate")
)

# Show the results
average.show()


+----------+------------+---------------+-----------------+
|emp_length|income_range|avg_loan_amount|avg_interest_rate|
+----------+------------+---------------+-----------------+
|       1.0|    Mid-High|        35000.0|           0.2589|
+----------+------------+---------------+-----------------+



In [54]:
#Calculate the average difference between Loan Amount and Funded Amount for each loan Grade and
#sort by the grades with the largest differences
#SQL:
average_difference = spark.sql("""
SELECT
    grade,
    AVG(loan_amount - funded_amount) AS avg_difference
FROM global_temp.loans
GROUP BY grade
ORDER BY avg_difference DESC;
""")
average_difference.show()
#The difference is zero because the loan_amount feature is equal to the funced_amount feature.

+-----+--------------+
|grade|avg_difference|
+-----+--------------+
|   29|           0.0|
|   26|           0.0|
|   19|           0.0|
|   22|           0.0|
|    7|           0.0|
|   34|           0.0|
|   32|           0.0|
|   31|           0.0|
|   25|           0.0|
|    6|           0.0|
|    9|           0.0|
|   27|           0.0|
|   17|           0.0|
|   33|           0.0|
|   28|           0.0|
|    5|           0.0|
|    1|           0.0|
|   10|           0.0|
|    3|           0.0|
|   12|           0.0|
+-----+--------------+
only showing top 20 rows



In [55]:
#Spark:
from pyspark.sql.functions import col, avg

average_difference = df.withColumn("difference", col("loan_amount") - col("funded_amount")) \
    .groupBy("grade") \
    .agg(avg("difference").alias("avg_difference")) \
    .orderBy(col("avg_difference").desc())
average_difference.show()

+-----+--------------+
|grade|avg_difference|
+-----+--------------+
|   29|           0.0|
|   26|           0.0|
|   19|           0.0|
|   22|           0.0|
|    7|           0.0|
|   34|           0.0|
|   32|           0.0|
|   31|           0.0|
|   25|           0.0|
|    6|           0.0|
|    9|           0.0|
|   27|           0.0|
|   17|           0.0|
|   33|           0.0|
|   28|           0.0|
|    5|           0.0|
|    1|           0.0|
|   10|           0.0|
|    3|           0.0|
|   12|           0.0|
+-----+--------------+
only showing top 20 rows



In [56]:
#Compare the total Loan Amount for loans with ”Verified” and ”Not Verified” Verification Status across
#each state (Addr State).
#SQL:
total_loan = spark.sql("""
SELECT
    addr_state,
    verification_status,
    SUM(loan_amount) AS total_loan_amount
FROM global_temp.loans
WHERE verification_status IN ('Verified', 'Not Verified')
GROUP BY addr_state, verification_status
ORDER BY addr_state, verification_status;
""")
total_loan.show()

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         330150.0|
|        AK|           Verified|         249400.0|
|        AL|       Not Verified|        1628575.0|
|        AL|           Verified|        1394150.0|
|        AR|       Not Verified|         903150.0|
|        AR|           Verified|         834775.0|
|        AZ|       Not Verified|        3774750.0|
|        AZ|           Verified|        2545150.0|
|        CA|       Not Verified|      1.8343025E7|
|        CA|           Verified|      1.5368725E7|
|        CO|       Not Verified|        2672250.0|
|        CO|           Verified|        2266475.0|
|        CT|       Not Verified|        2195050.0|
|        CT|           Verified|        1759150.0|
|        DC|       Not Verified|         217500.0|
|        DC|           Verified|         402875.0|
|        DE|       Not Verified

In [57]:
#Spark:
from pyspark.sql.functions import col, sum

total_loan = df.filter(col("verification_status").isin("Verified", "Not Verified")) \
    .groupBy("addr_state", "verification_status") \
    .agg(sum("loan_amount").alias("total_loan_amount")) \
    .orderBy("addr_state", "verification_status")

total_loan.show()

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         330150.0|
|        AK|           Verified|         249400.0|
|        AL|       Not Verified|        1628575.0|
|        AL|           Verified|        1394150.0|
|        AR|       Not Verified|         903150.0|
|        AR|           Verified|         834775.0|
|        AZ|       Not Verified|        3774750.0|
|        AZ|           Verified|        2545150.0|
|        CA|       Not Verified|      1.8343025E7|
|        CA|           Verified|      1.5368725E7|
|        CO|       Not Verified|        2672250.0|
|        CO|           Verified|        2266475.0|
|        CT|       Not Verified|        2195050.0|
|        CT|           Verified|        1759150.0|
|        DC|       Not Verified|         217500.0|
|        DC|           Verified|         402875.0|
|        DE|       Not Verified

In [58]:
#. Calculate the average time gap (in days) between consecutive loans for each grade using the new
#features you added in the feature engineering phase
#SQL:
avg_difference = spark.sql("""
SELECT
    grade,

    AVG(DATEDIFF(TO_DATE(issue_date, 'dd MMMM yyyy'), TO_DATE(prev_loan_issue_date_grade, 'dd MMMM yyyy'))) AS avg_time_gap
FROM global_temp.loans
GROUP BY grade;
""")
avg_difference.show(35)


+-----+------------------+
|grade|      avg_time_gap|
+-----+------------------+
|    1| 2.109612141652614|
|    2|2.2952646239554317|
|    3| 2.097233864207879|
|    4| 2.149484536082474|
|    5|2.2091152815013406|
|    6|1.5598316295850871|
|    7| 1.618860510805501|
|    8|1.6059050064184852|
|    9|               1.6|
|   10|1.5979314802844216|
|   11|1.6263157894736842|
|   12|1.6114732724902217|
|   13|1.6976439790575917|
|   14|1.5876685934489403|
|   15|1.7020997375328084|
|   16|3.0977443609022557|
|   17|3.1291139240506327|
|   18|3.2474358974358974|
|   19|3.3816689466484267|
|   20|3.1480582524271843|
|   21| 8.928853754940711|
|   22| 9.188755020080322|
|   23|   8.8212927756654|
|   24| 7.752508361204013|
|   25| 8.460674157303371|
|   26|24.256410256410255|
|   27|              32.4|
|   28| 27.78205128205128|
|   29|20.666666666666668|
|   30|              24.5|
|   31| 95.95238095238095|
|   32| 80.45454545454545|
|   33|           110.625|
|   34|  79.6086956521739|
|

In [59]:
#Spark:
from pyspark.sql.functions import to_date, datediff, avg
from pyspark.sql.window import Window

# Convert the date columns to proper date format
df = df.withColumn("issue_date", to_date(col("issue_date"), "dd MMMM yyyy")) \
       .withColumn("prev_loan_issue_date_grade", to_date(col("prev_loan_issue_date_grade"), "dd MMMM yyyy"))

# Calculate the time difference between dates
df = df.withColumn("time_diff", datediff(col("issue_date"), col("prev_loan_issue_date_grade")))

# Calculate the average time gap grouped by grade
avg_difference = df.groupBy("grade") \
                   .agg(avg("time_diff").alias("avg_time_gap"))

# Show the results
avg_difference.show(35)


+-----+------------------+
|grade|      avg_time_gap|
+-----+------------------+
|    1| 2.109612141652614|
|    2|2.2952646239554317|
|    3| 2.097233864207879|
|    4| 2.149484536082474|
|    5|2.2091152815013406|
|    6|1.5598316295850871|
|    7| 1.618860510805501|
|    8|1.6059050064184852|
|    9|               1.6|
|   10|1.5979314802844216|
|   11|1.6263157894736842|
|   12|1.6114732724902217|
|   13|1.6976439790575917|
|   14|1.5876685934489403|
|   15|1.7020997375328084|
|   16|3.0977443609022557|
|   17|3.1291139240506327|
|   18|3.2474358974358974|
|   19|3.3816689466484267|
|   20|3.1480582524271843|
|   21| 8.928853754940711|
|   22| 9.188755020080322|
|   23|   8.8212927756654|
|   24| 7.752508361204013|
|   25| 8.460674157303371|
|   26|24.256410256410255|
|   27|              32.4|
|   28| 27.78205128205128|
|   29|20.666666666666668|
|   30|              24.5|
|   31| 95.95238095238095|
|   32| 80.45454545454545|
|   33|           110.625|
|   34|  79.6086956521739|
|

In [60]:
#. Identify the average difference in loan amounts between consecutive loans within the same state and
#grade combination.
#SQL:
state_grade_diff = spark.sql("""
WITH lagged_loans AS (
    SELECT
        addr_state,
        grade,
        issue_date,
        loan_amount,
        LAG(loan_amount, 1) OVER (PARTITION BY addr_state, grade ORDER BY issue_date) AS prev_loan_amount
    FROM global_temp.loans
)
SELECT
    addr_state,
    grade,
    AVG(loan_amount - prev_loan_amount) AS avg_difference
FROM lagged_loans
WHERE prev_loan_amount IS NOT NULL
GROUP BY addr_state, grade;
""")
state_grade_diff.show()


+----------+-----+-------------------+
|addr_state|grade|     avg_difference|
+----------+-----+-------------------+
|        AK|    1|                0.0|
|        AK|    2|            19450.0|
|        AK|    3|                0.0|
|        AK|    4|            26400.0|
|        AK|    5|            27000.0|
|        AK|    6|             -955.0|
|        AK|    9|            15600.0|
|        AK|   10| 141.66666666666666|
|        AK|   11| -7333.333333333333|
|        AK|   14|            4056.25|
|        AK|   15|             1225.0|
|        AK|   16|            16250.0|
|        AK|   19|            21000.0|
|        AK|   20|             5500.0|
|        AK|   22|            29250.0|
|        AL|    1| 1555.5555555555557|
|        AL|    2|             1375.0|
|        AL|    3| 2533.3333333333335|
|        AL|    4|-472.72727272727275|
|        AL|    5| 135.71428571428572|
+----------+-----+-------------------+
only showing top 20 rows



In [61]:
#Spark:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, avg

# Define the window specification
window_spec = Window.partitionBy("addr_state", "grade").orderBy("issue_date")

# Add the previous loan amount using the LAG function
df = df.withColumn("prev_loan_amount", lag("loan_amount", 1).over(window_spec))

# Calculate the average difference for each addr_state and grade
state_grade_diff = df.filter(col("prev_loan_amount").isNotNull()) \
    .withColumn("difference", col("loan_amount") - col("prev_loan_amount")) \
    .groupBy("addr_state", "grade") \
    .agg(avg("difference").alias("avg_difference"))

# Display the results
state_grade_diff.show()


+----------+-----+-------------------+
|addr_state|grade|     avg_difference|
+----------+-----+-------------------+
|        AK|    1|                0.0|
|        AK|    2|            19450.0|
|        AK|    3|                0.0|
|        AK|    4|            26400.0|
|        AK|    5|            27000.0|
|        AK|    6|             -955.0|
|        AK|    9|            15600.0|
|        AK|   10| 3058.3333333333335|
|        AK|   11| -7333.333333333333|
|        AK|   14|            4056.25|
|        AK|   15|             1225.0|
|        AK|   16|            16250.0|
|        AK|   19|            21000.0|
|        AK|   20|             3537.5|
|        AK|   22|           -29250.0|
|        AL|    1| 222.22222222222223|
|        AL|    2|                0.0|
|        AL|    3|  583.3333333333334|
|        AL|    4|-472.72727272727275|
|        AL|    5| 1821.4285714285713|
+----------+-----+-------------------+
only showing top 20 rows



In [62]:
#Lookup Table:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

# Initialize an empty DataFrame with the same schema as the lookup table
empty_schema = StructType([StructField("original_col", StringType(), True), StructField("encoded_col", StringType(), True)])
df_lookup = spark.createDataFrame(spark.sparkContext.emptyRDD(), empty_schema)

# Function to create a lookup table for a given column
def create_lookup_table(df: DataFrame, original_col: str, encoded_col: str) -> DataFrame:
    if encoded_col == "Encoded with One_hot":
        lookup_df = df.select(original_col).distinct()
        lookup_df = lookup_df.withColumn("encoded_col", F.lit("one-hot encoded"))
    else:
        # Select distinct values from the original and encoded columns
        lookup_df = df.select(original_col, encoded_col).distinct()
        lookup_df = lookup_df.select(
            col(original_col).cast("string").alias("original_col"),
            col(encoded_col).cast("string").alias("encoded_col")
        )

    return lookup_df


df_lookup = df_lookup.union(create_lookup_table(df, "emp_length_temp", "emp_length"))
df_lookup = df_lookup.union(create_lookup_table(df, "home_ownership", "Encoded with One_hot"))
df_lookup = df_lookup.union(create_lookup_table(df, "verification_status", "Encoded with One_hot"))
df_lookup = df_lookup.union(create_lookup_table(df, "state", "state_encoded"))
df_lookup = df_lookup.union(create_lookup_table(df, "type", "Encoded with One_hot"))
df_lookup = df_lookup.union(create_lookup_table(df, "purpose", "purpose_encoded"))
df_lookup = df_lookup.union(create_lookup_table(df, "grade", "grade_letter"))
df = df.drop("emp_length_temp")

#lookup table
df_lookup.show(n=122)


+------------------+---------------+
|      original_col|    encoded_col|
+------------------+---------------+
|           5 years|            5.0|
|         10+ years|           10.0|
|           4 years|            4.0|
|           9 years|            9.0|
|           7 years|            7.0|
|          < 1 year|            0.5|
|           2 years|            2.0|
|           3 years|            3.0|
|            1 year|            1.0|
|           6 years|            6.0|
|           8 years|            8.0|
|               OWN|one-hot encoded|
|              RENT|one-hot encoded|
|          MORTGAGE|one-hot encoded|
|               ANY|one-hot encoded|
|              NONE|one-hot encoded|
|          Verified|one-hot encoded|
|   Source Verified|one-hot encoded|
|      Not Verified|one-hot encoded|
|                NV|           22.0|
|                WI|           23.0|
|                NJ|            5.0|
|                AR|           31.0|
|                AL|           25.0|
|

In [63]:
#Saving the dataset:
df.write.parquet("fintech_spark_52_3224_clean.parquet", mode="overwrite")
df_lookup.write.parquet("lookup_spark_52_3224_clean.parquet", mode="overwrite")