In [56]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("milestone3").getOrCreate()
sc = spark.sparkContext

In [57]:
spark = SparkSession.builder \
    .config("spark.hadoop.fs.local.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .getOrCreate()


# `Part 1: Loading the dataset`

In [58]:
df= spark.read.parquet('fintech_data_21_52_23665.parquet')

`initializing the lookup table  and a track changes functions that adds the new encodings to it`

In [59]:
from pyspark.sql.types import StructType, StructField, StringType

# Create an empty tracking DataFrame
schema = StructType([
    StructField("Feature", StringType(), True),
    StructField("Original Value", StringType(), True),
    StructField("Imputed/Encoded Value", StringType(), True)
])

lookup_table = spark.createDataFrame([], schema)

# Function to track changes
def track_changes(feature, original_value, imputed_value):
    global lookup_table
    # Create a new row as a DataFrame
    new_row = spark.createDataFrame(
        [(feature, original_value, imputed_value)],
        schema
    )
    # Union the new row with the existing lookup_table
    lookup_table = lookup_table.union(new_row)

In [60]:
df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|    Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|Yid9bVx4OGNceGQxX...|     T

## Partitioning

In [61]:
import os

num_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cores}")

Number of CPU cores: 4


In [62]:
df.rdd.getNumPartitions()

1

In [63]:
import psutil

logical_cores = psutil.cpu_count(logical=True)  # Logical cores
physical_cores = psutil.cpu_count(logical=False)  # Physical cores

print(f"Logical cores: {logical_cores}")
print(f"Physical cores: {physical_cores}")


Logical cores: 4
Physical cores: 2


In [64]:
df = df.repartition(logical_cores)
df.rdd.getNumPartitions()

4

In [65]:
df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGE2XHhmYlx4O...|                NU

# `Part 2: Cleaning`

In [66]:
df_new = df.toDF(*[col.replace(" ", "_").lower() for col in df.columns])


In [67]:
df_new.summary().show()

+-------+--------------------+------------+----------+--------------+-----------------+------------------+-------------------+--------+----------+------------------+------------------+----------------+------------------+------------------+-----+------------------+----------+-------------------+-----------------+-----------------+----------+-------+--------------------+
|summary|         customer_id|   emp_title|emp_length|home_ownership|       annual_inc|  annual_inc_joint|verification_status|zip_code|addr_state|       avg_cur_bal|       tot_cur_bal|         loan_id|       loan_status|       loan_amount|state|     funded_amount|      term|           int_rate|            grade|       issue_date|      type|purpose|         description|
+-------+--------------------+------------+----------+--------------+-----------------+------------------+-------------------+--------+----------+------------------+------------------+----------------+------------------+------------------+-----+-----------

## columns renaming

## Detect missing

In [68]:
def missing_percentage(df):
    total_rows = df.count()
    missing = {col: df.filter(df[col].isNull()).count() / total_rows * 100 for col in df.columns}
    return missing

missing_info = missing_percentage(df_new)
missing_info

{'customer_id': 0.0,
 'emp_title': 8.96411394746578,
 'emp_length': 7.1809100998890125,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 93.21494635590085,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 4.661487236403995,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.9470958194598594}

In [69]:
df_new = df_new.fillna(value = 0,subset=['annual_inc_joint','int_rate'])

In [70]:
missing_info = missing_percentage(df_new)
missing_info

{'customer_id': 0.0,
 'emp_title': 8.96411394746578,
 'emp_length': 7.1809100998890125,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 0.0,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 0.0,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.9470958194598594}

## handling emp length missing


In [71]:
from pyspark.sql.functions import col
mode_emp_length = df_new.groupBy('emp_length').count()\
                      .orderBy(col('count').desc())\
                      .limit(1)
mode_emp_length.show()

+----------+-----+
|emp_length|count|
+----------+-----+
| 10+ years| 8970|
+----------+-----+



In [72]:
mode_emp_length = mode_emp_length.select('emp_length').collect()[0][0]
mode_emp_length

'10+ years'

In [73]:
df_new = df_new.fillna(value = mode_emp_length,subset=['emp_length'])
df_new.filter(df_new.emp_length.isNull()).count()

0

## handling emp title missing

In [74]:
mode_emp_title = df_new.groupBy('emp_title').count().orderBy(col('count').desc()).limit(2)
mode_emp_title.show()

+---------+-----+
|emp_title|count|
+---------+-----+
|     NULL| 2423|
|  Teacher|  479|
+---------+-----+



In [75]:
mode_emp_title = mode_emp_title.select('emp_title').collect()[1][0]
mode_emp_title

'Teacher'

In [76]:
df_new= df_new.fillna(value = mode_emp_title,subset=['emp_title'])
df_new.filter(df_new.emp_title.isNull()).count()

0

## handling description missing

In [77]:
mode_desc= df_new.groupBy('description').count().orderBy(col('count').desc()).limit(2)
mode_desc.show()

+--------------------+-----+
|         description|count|
+--------------------+-----+
|  Debt consolidation|14316|
|Credit card refin...| 6013|
+--------------------+-----+



In [78]:
mode_desc = mode_desc.select('description').collect()[1][0]


In [79]:
df_new= df_new.fillna(value = mode_desc,subset=['description'])


## checking that there are'nt any missing values any more

In [80]:
missing_info = missing_percentage(df_new)
missing_info

{'customer_id': 0.0,
 'emp_title': 0.0,
 'emp_length': 0.0,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 0.0,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 0.0,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.0}

# `Part 3: Encoding`

## converting emp length to int

In [81]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

# Define the convert_emp_length function
def convert_emp_length(emp_length):
    if emp_length is None:
        return None  # Handle null values
    emp_length = emp_length.strip().lower()  # Normalize the input
    if emp_length == '10+ years':
        return 11
    elif emp_length == '< 1 year':
        return 0
    elif emp_length == '1 year':
        return 1
    elif emp_length == '2 years':
        return 2
    elif emp_length == '3 years':
        return 3
    elif emp_length == '4 years':
        return 4
    elif emp_length == '5 years':
        return 5
    elif emp_length == '6 years':
        return 6
    elif emp_length == '7 years':
        return 7
    elif emp_length == '8 years':
        return 8
    elif emp_length == '9 years':
        return 9
    else:
        return None  # Return None for unexpected values

# Register the UDF
convert_emp_length_udf = udf(convert_emp_length, IntegerType())

# Apply the UDF to overwrite the column
df_new = df_new.withColumn("emp_length_imputed", convert_emp_length_udf(df_new["emp_length"]))
df_new_distinct = df_new.select("emp_length", "emp_length_imputed").distinct()
for row in df_new_distinct.collect():
    emp_length = row['emp_length']
    emp_length_imputed = row['emp_length_imputed']
    track_changes("emp_length", str(emp_length), emp_length_imputed)

df_new = df_new.withColumn("emp_length", convert_emp_length_udf(df_new["emp_length"]))
df_new=df_new.drop("emp_length_imputed")

df_new.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGE2XHhmYlx4O...|             Teach

## encoding verification status


In [82]:
from pyspark.sql.functions import when

# Dynamically fetch unique values
unique_values = df_new.select("verification_status").distinct().rdd.flatMap(lambda x: x).collect()

# Add one-hot encoded columns to the original dataset
for value in unique_values:
    df_new = df_new.withColumn(f"verification_status_{value}", when(df_new["verification_status"] == value, 1).otherwise(0))
    track_changes("verification_status_",value,f"verification_status_{value}")


# Show the result
df_new.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+---

## encoding state

In [83]:
from pyspark.ml.feature import StringIndexer

state_indexer = StringIndexer(inputCol="state", outputCol="state_encoded")

indexer_model = state_indexer.fit(df_new)

labels = indexer_model.labels  # List of original values

[track_changes(state_indexer.getInputCol(), label, float(encoded)) for label, encoded in zip(labels, range(len(labels)))]

df_new = indexer_model.transform(df_new)

df_new.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|state_encoded|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------

## encoding home ownership

In [84]:
from pyspark.sql.functions import when

unique_values = df_new.select("home_ownership").distinct().rdd.flatMap(lambda x: x).collect()

# Add one-hot encoded columns to the original dataset
for value in unique_values:
    df_new = df_new.withColumn(f"home_ownership_{value}", when(df_new["home_ownership"] == value, 1).otherwise(0))
    track_changes("home_ownership",value,f"home_ownership_{value}")


# Show the result
df_new.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|state_encoded|home_ownership

## encoding type

In [85]:
from pyspark.sql.functions import when

unique_values = df_new.select("type").distinct().rdd.flatMap(lambda x: x).collect()

# Add one-hot encoded columns to the original dataset
for value in unique_values:
    df_new = df_new.withColumn(f"type_{value}", when(df_new["type"] == value, 1).otherwise(0))
    track_changes("type",value,f"type_{value}")

# Show the result
df_new.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------------+--------------+---------------+---------------+----------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|verification_status_Verified|verification_status_Source Verified|ver

## encoding purpose

In [86]:
from pyspark.ml.feature import StringIndexer

purpose_indexer = StringIndexer(inputCol="purpose", outputCol="purpose_encoded")

indexer_model = purpose_indexer.fit(df_new)

labels = indexer_model.labels  # List of original values

# Track changes for each state label and its encoded value
for label, encoded in zip(labels, range(len(labels))):
    track_changes(purpose_indexer.getInputCol(), label, float(encoded))

df_new = indexer_model.transform(df_new)

df_new.show()



+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------------+--------------+---------------+---------------+----------+---------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|verification_status_Verified|verification_status_Sou

In [87]:
df_new.select("purpose", "purpose_encoded").distinct().show()


+------------------+---------------+
|           purpose|purpose_encoded|
+------------------+---------------+
|       credit_card|            1.0|
|           wedding|           12.0|
|             other|            3.0|
|    small_business|            6.0|
|               car|            7.0|
|            moving|           10.0|
|             house|            8.0|
|          vacation|            9.0|
|           medical|            5.0|
|    major_purchase|            4.0|
|  home_improvement|            2.0|
|debt_consolidation|            0.0|
|  renewable_energy|           11.0|
+------------------+---------------+



## encoding  grade to letter grade

In [88]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the function
def categorize_grade(numeric_value):
    if 1 <= numeric_value <= 5:
        return 'A'
    elif 6 <= numeric_value <= 10:
        return 'B'
    elif 11 <= numeric_value <= 15:
        return 'C'
    elif 16 <= numeric_value <= 20:
        return 'D'
    elif 21 <= numeric_value <= 25:
        return 'E'
    elif 26 <= numeric_value <= 30:
        return 'F'
    elif 31 <= numeric_value <= 35:
        return 'G'
    else:
        return 'Unknown'

categorize_grade_udf = udf(categorize_grade, StringType())

df_new = df_new.withColumn("grade_encoded", categorize_grade_udf(df_new["grade"]))
df_new_distinct = df_new.select("grade", "grade_encoded").distinct()

for row in df_new_distinct.collect():
    grade = row['grade']
    grade_encoded = row['grade_encoded']
    track_changes("grade", str(grade), grade_encoded)

df_new.select("grade","grade_encoded").distinct().show()


+-----+-------------+
|grade|grade_encoded|
+-----+-------------+
|   21|            E|
|   29|            F|
|    9|            B|
|   35|            G|
|    8|            B|
|   33|            G|
|   17|            D|
|   10|            B|
|   25|            E|
|   16|            D|
|   19|            D|
|   20|            D|
|   22|            E|
|   24|            E|
|   23|            E|
|   12|            C|
|   30|            F|
|    4|            A|
|   13|            C|
|   11|            C|
+-----+-------------+
only showing top 20 rows



# `Part 4: Feature Engineering`

In [89]:
from pyspark.sql import functions as fn
from pyspark.sql import Window
from pyspark.sql.functions import to_date
df_new = df_new.withColumn("issue_date1", fn.to_date(df_new["issue_date"], "dd MMMM yyyy"))

window1=Window.partitionBy("grade").orderBy("issue_date1")
prev_date=fn.lag("issue_date1").over(window1)
prev_loan=fn.lag("loan_amount").over(window1)
df_new = df_new.withColumn(
    "prev_loan_date", prev_date,
).withColumn(    "prev_loan_amount", prev_loan
)

window2=Window.partitionBy("state", "grade").orderBy("issue_date1")
prev_date_stateDate=fn.lag("issue_date1").over(window2)
prev_amount_stateDate=fn.lag("loan_amount").over(window2)

df_new=df_new.withColumn("prev_amount_stateDate",prev_amount_stateDate)
df_new=df_new.withColumn("prev_date_stateDate",prev_date_stateDate)

In [90]:
df_new.select("loan_amount","issue_date","grade_encoded","prev_loan_date","prev_loan_amount").filter((fn.col("grade_encoded") == "B")).show(40)

+-----------+-----------------+-------------+--------------+----------------+
|loan_amount|       issue_date|grade_encoded|prev_loan_date|prev_loan_amount|
+-----------+-----------------+-------------+--------------+----------------+
|     9450.0|   12 August 2012|            B|          NULL|            NULL|
|    10000.0|12 September 2012|            B|    2012-08-12|          9450.0|
|     6000.0|12 September 2012|            B|    2012-09-12|         10000.0|
|     7000.0|12 September 2012|            B|    2012-09-12|          6000.0|
|    10625.0|12 September 2012|            B|    2012-09-12|          7000.0|
|    28000.0|  12 October 2012|            B|    2012-09-12|         10625.0|
|    11200.0|  12 October 2012|            B|    2012-10-12|         28000.0|
|     5650.0|  12 October 2012|            B|    2012-10-12|         11200.0|
|     8000.0|  12 October 2012|            B|    2012-10-12|          5650.0|
|    14400.0| 12 November 2012|            B|    2012-10-12|    

In [91]:
df_new.select("loan_amount","issue_date","grade_encoded","state","prev_amount_stateDate","prev_date_stateDate").show(20)

+-----------+-----------------+-------------+-----+---------------------+-------------------+
|loan_amount|       issue_date|grade_encoded|state|prev_amount_stateDate|prev_date_stateDate|
+-----------+-----------------+-------------+-----+---------------------+-------------------+
|    10000.0|    16 March 2016|            A|   AK|                 NULL|               NULL|
|    14500.0|     18 June 2018|            A|   AK|              10000.0|         2016-03-16|
|    30000.0| 19 February 2019|            A|   AK|              14500.0|         2018-06-18|
|    21000.0|   19 August 2019|            A|   AK|              30000.0|         2019-02-19|
|    24000.0| 19 December 2019|            A|   AK|              21000.0|         2019-08-19|
|     2000.0|  17 October 2017|            A|   AK|                 NULL|               NULL|
|    10000.0| 17 November 2017|            A|   AK|               2000.0|         2017-10-17|
|    10000.0|18 September 2018|            A|   AK|         

# `Part 5: Analysis SQL vs Spark`

Identify the average loan amount and interest rate for loans marked as
"Default" in the Loan Status, grouped by Emp Length and annual income ranges.

In [92]:
from pyspark.sql import functions as F

# Calculate quantile value
quantiles = df_new.approxQuantile("annual_inc", [0.25, 0.5, 0.75], 0.01)
q1, median, q3 = quantiles 

print(f"Quantiles: Q1={q1}, Median={median}, Q3={q3}")


Quantiles: Q1=46000.0, Median=65000.0, Q3=93700.0


In [93]:
df_new.createOrReplaceTempView("Loans")
import pyspark.sql.utils
from pyspark.errors import AnalysisException


# Register the DataFrame as a SQL temporary view

# Define SQL query using the calculated quantiles
query = f"""
SELECT 
    emp_length,
    CASE
        WHEN annual_inc <= {q1} THEN 'Low'
        WHEN annual_inc > {q1} AND annual_inc <= {q3} THEN 'Mid'
        WHEN annual_inc > {q3} THEN 'High'
    END AS income_range,
    AVG(loan_amount) AS avg_loan_amount,
    AVG(int_rate) AS avg_interest_rate
FROM Loans
WHERE loan_status = 'Default'
GROUP BY emp_length, income_range
"""

result=spark.sql(query)
result.show()


+----------+------------+---------------+-------------------+
|emp_length|income_range|avg_loan_amount|  avg_interest_rate|
+----------+------------+---------------+-------------------+
|         0|         Low|        14087.5|0.19285000000000002|
|         2|         Mid|        22075.0|             0.2734|
|         1|         Low|         4200.0|             0.1825|
+----------+------------+---------------+-------------------+



In [94]:
df_binned = df_new.withColumn(
    "income_range",
    F.when(F.col("annual_inc") <= q1, "Low")
    .when((F.col("annual_inc") > q1) & (F.col("annual_inc") <= q3), "Mid")
    .otherwise("High")
)

df_filter1=df_binned.filter((fn.col("loan_status") == "Default"))
result_df = (
    df_filter1.groupBy("emp_length", "income_range")
    .agg(
        F.avg("loan_amount").alias("avg_loan_amount"),
        F.avg("int_rate").alias("avg_interest_rate"),
    )
)
result_df.show()

+----------+------------+---------------+-------------------+
|emp_length|income_range|avg_loan_amount|  avg_interest_rate|
+----------+------------+---------------+-------------------+
|         0|         Low|        14087.5|0.19285000000000002|
|         2|         Mid|        22075.0|             0.2734|
|         1|         Low|         4200.0|             0.1825|
+----------+------------+---------------+-------------------+



Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.

In [95]:

query = """
SELECT 
    grade,
    AVG(loan_amount - funded_amount) AS avg_difference
FROM Loans
GROUP BY grade
ORDER BY avg_difference DESC
"""

result_df = spark.sql(query)

result_df.show(50)


+-----+--------------+
|grade|avg_difference|
+-----+--------------+
|   29|           0.0|
|   26|           0.0|
|   19|           0.0|
|   22|           0.0|
|    7|           0.0|
|   34|           0.0|
|   32|           0.0|
|   31|           0.0|
|   25|           0.0|
|    6|           0.0|
|    9|           0.0|
|   27|           0.0|
|   17|           0.0|
|   28|           0.0|
|   33|           0.0|
|    5|           0.0|
|    1|           0.0|
|   10|           0.0|
|    3|           0.0|
|   12|           0.0|
|    8|           0.0|
|   11|           0.0|
|   35|           0.0|
|    2|           0.0|
|    4|           0.0|
|   13|           0.0|
|   18|           0.0|
|   14|           0.0|
|   21|           0.0|
|   15|           0.0|
|   30|           0.0|
|   23|           0.0|
|   20|           0.0|
|   16|           0.0|
|   24|           0.0|
+-----+--------------+



In [96]:
diff= F.col("loan_amount") - F.col("funded_amount")
avg=F.avg("amount_difference").alias("avg_difference")
avg_diff=F.col("avg_difference").desc()

result_df = (
    df_new.withColumn("amount_difference",diff).
    groupBy("grade")
    .agg(avg)
    .orderBy(avg_diff)
)

result_df.show()

+-----+--------------+
|grade|avg_difference|
+-----+--------------+
|   29|           0.0|
|   26|           0.0|
|   19|           0.0|
|   22|           0.0|
|    7|           0.0|
|   34|           0.0|
|   32|           0.0|
|   31|           0.0|
|   25|           0.0|
|    6|           0.0|
|    9|           0.0|
|   27|           0.0|
|   17|           0.0|
|   28|           0.0|
|   33|           0.0|
|    5|           0.0|
|    1|           0.0|
|   10|           0.0|
|    3|           0.0|
|   12|           0.0|
+-----+--------------+
only showing top 20 rows



Compare the total Loan Amount for loans with "Verified" and "Not Verified"
Verification Status across each state

In [97]:

query = """
SELECT 
    addr_state,
    verification_status,
    SUM(loan_amount) AS total_loan_amount
FROM Loans
WHERE verification_status IN ('Verified', 'Not Verified')
GROUP BY addr_state, verification_status
ORDER BY addr_state
"""

# Execute the query
result_df = spark.sql(query)

# Show the result
result_df.show()


+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         408575.0|
|        AK|           Verified|         286375.0|
|        AL|       Not Verified|        1343075.0|
|        AL|           Verified|        1322250.0|
|        AR|       Not Verified|         963875.0|
|        AR|           Verified|         624225.0|
|        AZ|       Not Verified|        2926750.0|
|        AZ|           Verified|        2910750.0|
|        CA|       Not Verified|      1.8705525E7|
|        CA|           Verified|      1.5857925E7|
|        CO|           Verified|        2398400.0|
|        CO|       Not Verified|        2881075.0|
|        CT|       Not Verified|        2246700.0|
|        CT|           Verified|        1789600.0|
|        DC|           Verified|         257625.0|
|        DC|       Not Verified|         351475.0|
|        DE|           Verified

In [98]:

filtered_df = df_new.filter(F.col("verification_status").isin("Verified", "Not Verified"))

result_df = (
    filtered_df.groupBy("addr_state", "verification_status")
    .agg(F.sum("loan_amount").alias("total_loan_amount"))
    .orderBy("addr_state")
)

# Show the result
result_df.show()


+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         408575.0|
|        AK|           Verified|         286375.0|
|        AL|       Not Verified|        1343075.0|
|        AL|           Verified|        1322250.0|
|        AR|       Not Verified|         963875.0|
|        AR|           Verified|         624225.0|
|        AZ|       Not Verified|        2926750.0|
|        AZ|           Verified|        2910750.0|
|        CA|       Not Verified|      1.8705525E7|
|        CA|           Verified|      1.5857925E7|
|        CO|           Verified|        2398400.0|
|        CO|       Not Verified|        2881075.0|
|        CT|       Not Verified|        2246700.0|
|        CT|           Verified|        1789600.0|
|        DC|           Verified|         257625.0|
|        DC|       Not Verified|         351475.0|
|        DE|           Verified

Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase

In [99]:
query = """
SELECT
    grade_encoded,
    AVG(DATEDIFF(DAY,prev_loan_date,issue_date1 )) AS avg_diff
FROM 
    Loans
WHERE 
    prev_loan_date IS NOT NULL AND issue_date1 IS NOT NULL
GROUP BY 
    grade_encoded

"""
result_df = spark.sql(query)
result_df.show()



+-------------+------------------+
|grade_encoded|          avg_diff|
+-------------+------------------+
|            F| 25.72705314009662|
|            E| 8.644578313253012|
|            B|1.7088235294117646|
|            D| 3.382756866734486|
|            C|1.7757059136920619|
|            A|2.2646509257686427|
|            G| 83.48113207547169|
+-------------+------------------+



In [100]:
result_df = (
    df_new.filter(F.col("prev_loan_date").isNotNull()) 
    .groupBy("grade_encoded")
    .agg(F.avg(F.datediff(F.col("issue_date1"), F.col("prev_loan_date"))).alias("avg_diff"))
    .orderBy("grade_encoded")
)
# Show the result
result_df.show()

+-------------+------------------+
|grade_encoded|          avg_diff|
+-------------+------------------+
|            A|2.2646509257686427|
|            B|1.7088235294117646|
|            C|1.7757059136920619|
|            D| 3.382756866734486|
|            E| 8.644578313253012|
|            F| 25.72705314009662|
|            G| 83.48113207547169|
+-------------+------------------+



In [101]:
# Register the DataFrame as a temporary view if it's not a table already

# Write the query
query = """
SELECT
    grade_encoded,
    state,
    AVG(loan_amount - prev_amount_stateDate) AS loan_difference
FROM 
    Loans
WHERE 
    prev_amount_stateDate IS NOT NULL
GROUP BY 
    grade_encoded, state
ORDER BY grade_encoded

"""

# Run the query
result_df = spark.sql(query)
result_df.show()



+-------------+-----+-------------------+
|grade_encoded|state|    loan_difference|
+-------------+-----+-------------------+
|            A|   TX| -46.00409836065574|
|            A|   VT|              475.0|
|            A|   DE|-1745.4545454545455|
|            A|   LA|         392.578125|
|            A|   UT|  511.8421052631579|
|            A|   ND| 3466.6666666666665|
|            A|   NM|             1917.0|
|            A|   WV|              850.0|
|            A|   NY|  8.094713656387665|
|            A|   IL|-218.14671814671814|
|            A|   PA|-43.646408839779006|
|            A|   AK|             2640.0|
|            A|   MD| -46.25984251968504|
|            A|   MS|  2079.310344827586|
|            A|   FL| 123.10513447432763|
|            A|   OR|   161.231884057971|
|            A|   TN|  391.9642857142857|
|            A|   NV| 226.62337662337663|
|            A|   IN|  308.3333333333333|
|            A|   ID|-1291.6666666666667|
+-------------+-----+-------------

In [102]:
avg=F.avg("avg_loan_difference")
diff= F.col("loan_amount") - F.col("prev_amount_stateDate")
result_df = (
    df_new.filter(F.col("prev_amount_stateDate").isNotNull())
    .withColumn("avg_loan_difference",diff)
    .groupBy("grade_encoded","state")
    .agg(avg)
    .orderBy("grade_encoded")
)
# Show the result
result_df.show()

+-------------+-----+------------------------+
|grade_encoded|state|avg(avg_loan_difference)|
+-------------+-----+------------------------+
|            A|   TX|      -46.00409836065574|
|            A|   VT|                   475.0|
|            A|   DE|     -1745.4545454545455|
|            A|   LA|              392.578125|
|            A|   UT|       511.8421052631579|
|            A|   ND|      3466.6666666666665|
|            A|   NM|                  1917.0|
|            A|   WV|                   850.0|
|            A|   NY|       8.094713656387665|
|            A|   IL|     -218.14671814671814|
|            A|   PA|     -43.646408839779006|
|            A|   AK|                  2640.0|
|            A|   MD|      -46.25984251968504|
|            A|   MS|       2079.310344827586|
|            A|   FL|      123.10513447432763|
|            A|   OR|        161.231884057971|
|            A|   TN|       391.9642857142857|
|            A|   NV|      226.62337662337663|
|            

# `Part 6: Lookup Table & Saving the dataset`

In [109]:
lookup_table= lookup_table.coalesce(1)


In [110]:
lookup_table.rdd.getNumPartitions()

1

In [111]:
df_new= df_new.coalesce(1)
df_new.rdd.getNumPartitions()

1

In [112]:
df_new.write.mode("overwrite").parquet("fintech_spark_52_23665_clean.parquet")

In [113]:

lookup_table.write.mode("overwrite").parquet("lookup_spark_52_23665.parquet")
print("done")

done


In [115]:
df_new.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-------------+------------------+-------------------+-----------------------+------------------+--------------------+-------------------+--------------+---------------+---------------+----------+---------------+-------------+-----------+--------------+----------------+---------------------+-------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|   

In [116]:
lookup_table.show()

+--------------------+---------------+---------------------+
|             Feature| Original Value|Imputed/Encoded Value|
+--------------------+---------------+---------------------+
|          emp_length|        4 years|                    4|
|          emp_length|        2 years|                    2|
|          emp_length|        8 years|                    8|
|          emp_length|        9 years|                    9|
|          emp_length|        6 years|                    6|
|          emp_length|         1 year|                    1|
|          emp_length|        7 years|                    7|
|          emp_length|        5 years|                    5|
|          emp_length|      10+ years|                   11|
|          emp_length|        3 years|                    3|
|          emp_length|       < 1 year|                    0|
|verification_status_|       Verified| verification_stat...|
|verification_status_|Source Verified| verification_stat...|
|verification_status_|  

In [None]:
sc.stop()