<a href="https://colab.research.google.com/github/Utkarshmishra2k2/LoanLens-HMDA-Loan-Data-Analysis-and-Modeling/blob/main/HMDA_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count
from pyspark.sql.functions import expr
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import numpy as np
import json
import warnings
warnings.filterwarnings("ignore")
import shutil

In [None]:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.stats import gaussian_kde
import pandas as pd

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("utkarshmishra2k2/hyderabad-metropolitan-development-authority")

print("Path to dataset files:", path)

In [None]:
shutil.copy(f"/root/.cache/kagglehub/datasets/utkarshmishra2k2/hyderabad-metropolitan-development-authority/versions/1/HMDA Data Set For Use.csv", "/content/HMDA Data Set For Use.csv")

In [None]:
spark = SparkSession.builder.appName("Project").getOrCreate()

In [None]:
df = spark.read.csv("HMDA Data Set For Use.csv", header=True, inferSchema=True)

In [None]:
print("DataFrame Schema:")
df.printSchema()

In [None]:
num_rows = df.count()
num_cols = len(df.columns)
print("DataFrame Shape: ({}, {})".format(num_rows, num_cols))

In [None]:
print("\nDescriptive Statistics:")
df.describe().show()

In [None]:
dtypes = df.dtypes
num_columns = [name for name, dtype in dtypes if dtype in ['int', 'double']]
cat_columns = [name for name, dtype in dtypes if dtype == 'string']

In [None]:
code_columns = ['agency_code', 'loan_type', 'property_type', 'loan_purpose', 'owner_occupancy',
                'preapproval', 'action_taken', 'applicant_ethnicity', 'co_applicant_ethnicity',
                'applicant_race_1', 'co_applicant_race_1', 'applicant_sex', 'co_applicant_sex',
                'purchaser_type', 'hoepa_status', 'lien_status']

In [None]:
for col_name in code_columns:
    if col_name in df.columns:
        df = df.withColumn(col_name, col(col_name).cast("string"))
        cat_columns.append(col_name)

In [None]:
cat_columns = list(set(cat_columns))

In [None]:
dtypes = df.dtypes  # Refresh dtypes after casting
num_columns = [name for name, dtype in dtypes if dtype in ['int', 'double']]

In [None]:
for col_name in cat_columns:
    print(f"\nFrequency Distribution for '{col_name}':")
    df.groupBy(col_name).count().orderBy("count", ascending=False).show()

In [None]:
missing_df = df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns])
print("Missing Values per Column:")
missing_df.show()

In [None]:
for col_name in num_columns:
    median_val = df.approxQuantile(col_name, [0.5], 0.01)[0]
    df = df.na.fill({col_name: median_val})

In [None]:
for col_name in cat_columns:
    if col_name in df.columns:
        mode_row = df.groupBy(col_name).count().orderBy("count", ascending=False).first()
        if mode_row and mode_row[0] is not None:
            mode_val = mode_row[0]
            df = df.na.fill({col_name: mode_val})

In [None]:
df = df.withColumn("loan_income_ratio", col("loan_amount_000s") / col("applicant_income_000s"))
df = df.withColumn("loan_property_ratio", col("loan_amount_000s") / (col("hud_median_family_income") / 1000))

In [None]:
low_variance_cols = [col_name for col_name in cat_columns if df.select(col_name).distinct().count() <= 1]
print(f"Dropping low-variance columns: {low_variance_cols}")
df = df.drop(*low_variance_cols)

In [None]:
pdf = df.select(num_columns).toPandas()

In [None]:
# (a) Distribution Histograms & KDE Plots for each numerical feature
fig_hist_kde = make_subplots(
    rows=2, cols=3,
    subplot_titles=[f'Distribution of {feature}' for feature in num_columns[:6]]
)

for i, feature in enumerate(num_columns[:6]):  # limiting to first 6 numeric columns for layout
    row = i // 3 + 1
    col_loc = i % 3 + 1

    # Histogram trace
    hist_trace = go.Histogram(
        x=pdf[feature],
        name=f'{feature} Histogram',
        histnorm='probability density',
        opacity=0.75
    )

    # KDE trace using scipy
    data = pdf[feature].dropna()
    if len(data) > 1:
        kde = gaussian_kde(data)
        x_vals = np.linspace(data.min(), data.max(), 1000)
        y_vals = kde(x_vals)
        kde_trace = go.Scatter(
            x=x_vals,
            y=y_vals,
            mode='lines',
            name=f'{feature} KDE',
            line=dict(color='black', dash='dot')
        )
        fig_hist_kde.add_trace(kde_trace, row=row, col=col_loc)
    fig_hist_kde.add_trace(hist_trace, row=row, col=col_loc)

fig_hist_kde.update_layout(
    title_text="Histograms and KDEs of Numerical Features",
    height=800,
    width=1000,
    showlegend=False,
    template="plotly_dark",
    title_x=0.5,
)
fig_hist_kde.show()

In [None]:
# (b) Boxplots for numerical features
fig_box = make_subplots(
    rows=2, cols=3,
    subplot_titles=[f'Boxplot of {feature}' for feature in num_columns[:6]]
)

for i, feature in enumerate(num_columns[:6]):
    row = i // 3 + 1
    col_loc = i % 3 + 1
    box_trace = go.Box(
        y=pdf[feature],
        name=feature,
        boxmean='sd',
        jitter=0.05,
        whiskerwidth=0.5,
        marker=dict(color='lightblue'),
        line=dict(width=1)
    )
    fig_box.add_trace(box_trace, row=row, col=col_loc)

fig_box.update_layout(
    title_text="Boxplots of Numerical Features",
    height=800,
    width=1000,
    showlegend=False,
    template="plotly_dark",
    title_x=0.5,
)
fig_box.show()

In [None]:
corr_matrix = pdf.corr()
fig_corr = px.imshow(corr_matrix, text_auto=True, aspect="auto",
                     title="Correlation Heatmap of Numerical Features", template="plotly_dark")
fig_corr.show()

In [None]:
irrelevant_cols = [
    'applicant_race_name_2', 'applicant_race_name_3', 'applicant_race_name_4', 'applicant_race_4',
    'applicant_race_name_5', 'applicant_race_5',
    'co_applicant_race_name_3', 'co_applicant_race_3',
    'co_applicant_race_name_4', 'co_applicant_race_4',
    'co_applicant_race_name_5', 'co_applicant_race_5',
    'edit_status_name', 'edit_status', 'sequence_number',
    'application_date_indicator', 'respondent_id', 'co_applicant_sex_name',
    'denial_reason_name_1', 'denial_reason_name_2', 'denial_reason_name_3',
    'hoepa_status_name', 'lien_status_name','applicant_sex_name','co_applicant_race_name_2'
]
print(f"Dropping irrelevant columns: {irrelevant_cols}")
df = df.drop(*irrelevant_cols)

In [None]:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
print(f"Training Rows: {train_df.count()}, Test Rows: {test_df.count()}")

In [None]:
existing_columns = set(df.columns)
valid_cat_columns = [
    col_name for col_name in cat_columns
    if col_name in existing_columns and df.select(col_name).distinct().count() > 1
]
print("Valid Categorical Columns:", valid_cat_columns)

In [None]:
target = "loan_amount_000s"
numerical_features = ['applicant_income_000s', 'hud_median_family_income', 'population',
                      'minority_population', "loan_income_ratio", "loan_property_ratio"]

In [None]:
indexers = [
    StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index", handleInvalid="keep")
    for col_name in valid_cat_columns
]

In [None]:
encoders = [
    OneHotEncoder(inputCol=f"{col_name}_index", outputCol=f"{col_name}_encoded")
    for col_name in valid_cat_columns
]

In [None]:
assembled_features = numerical_features + [f"{col}_encoded" for col in valid_cat_columns]

In [None]:
assembler = VectorAssembler(inputCols=assembled_features, outputCol="features")

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol=target, numTrees=100, maxDepth=10, seed=42)

In [None]:
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, rf])

In [None]:
model = pipeline.fit(train_df)

In [None]:
predictions = model.transform(test_df)

In [None]:
rmse_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

In [None]:
mae_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")

In [None]:
r2_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R-squared (R2): {r2}")

In [None]:
spark.stop()

In [None]:
print("The End")