In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("LoanDataProcessing") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Verify the session
print(spark)

<pyspark.sql.session.SparkSession object at 0x78eabed29890>


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LoanDataProcessing").getOrCreate()

df = spark.read.csv("loan_2014_18.csv", header=True, inferSchema=True)

df.show(5)


+----------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+-------------------+----------+--------------+----------+-------------------+----------+------------------+----------+--------------------+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+---------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+-----------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------+-------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+------------------+------------+---

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("LoanDataProcessing").getOrCreate()

df = df.fillna({"emp_title": "self_employed"})

df.show()


+----------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+----------+------------------+----------+--------------------+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+---------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+----------------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------+-------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+------------------+----------

In [6]:


selected_columns = [
    "id", "borrower_key_pk_sk", "loan_product_key_pk", "sk_date", "loan_amnt",
    "funded_amnt_inv", "int_rate", "installment", "loan_status", "dti",
    "revol_bal", "revol_util", "tot_cur_bal", "total_bal_il", "max_bal_bc",
    "delinq_amnt", "annual_inc", "emp_title", "emp_length", "home_ownership",
    "open_acc", "pub_rec", "delinq_2yrs", "earliest_cr_line", "total_acc",
    "verification_status", "zip_code", "addr_state", "derived", "delinq_2yrs",
    "open_acc", "pub_rec", "total_acc", "last_pymnt_d", "last_pymnt_amnt",
    "next_pymnt_d", "mths_since_last_major_derog", "open_il_24m", "mort_acc",
    "hardship_flag", "grade", "sub_grade", "term", "purpose", "initial_list_status",
    "out_prncp", "application_type", "debt_settlement_flag", "hardship_flag",
    "hardship_type", "hardship_reason", "hardship_status", "hardship_amount",
    "hardship_start_date", "hardship_end_date", "hardship_length", "hardship_dpd",
    "hardship_loan_status", "hardship_payoff_balance_amount", "loan_product_key_pk",
    "application_type", "annual_inc_joint", "dti_joint", "sec_app_fico_range_low",
    "sec_app_earliest_cr_line", "sec_app_inq_last_6mths", "sec_app_mort_acc",
    "sec_app_open_acc", "sec_app_revol_util", "borrower_key_pk_sk", "issue_d"
]

selected_columns = list(set(selected_columns))

df = df.toDF(*[col.strip() for col in df.columns])

existing_columns = [col for col in selected_columns if col in df.columns]

df_selected = df.select(*existing_columns)

df_selected.show()



+--------------------+---------+---------+---------+------------------+---------+-------------+-----------------+-----------+---------+-----+----------------+----------+------------+--------------+---------------+-------+------------------+----------------+-----------+----------+--------------------+--------+-----+-------------+----------------------+--------+-----------+-------------------+----------------+---------------+------------+----------+---------------+--------------------+---------------------------+----------------------+--------+------------+---------------+---------------+----------+-----------+----------------+----------+------------+---------------+-----------+---------+---------+------------------+------------------------+----------------+------------------------------+-------------------+-------------------+--------+----------+----------+
|debt_settlement_flag|sub_grade|dti_joint|revol_bal|           purpose|total_acc|hardship_flag|hardship_end_date|delinq_amnt|out_pr

In [7]:
from pyspark.sql.functions import regexp_replace, when, col

df_selected = df_selected.withColumn(
    "emp_length",
    when(col("emp_length").isNull(), "0")
    .when(col("emp_length") == "10+ years", "+10")
    .when(col("emp_length") == "< 1 year", "<1")
    .otherwise(regexp_replace(col("emp_length"), "[^0-9]", ""))
)

df_selected.select("emp_length").show(20)


+----------+
|emp_length|
+----------+
|       +10|
|       +10|
|         9|
|         5|
|        <1|
|       +10|
|         3|
|        <1|
|         5|
|       +10|
|        <1|
|       +10|
|         4|
|       +10|
|        <1|
|         0|
|        <1|
|        <1|
|         6|
|       +10|
+----------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import col, when, expr

df_selected = df_selected.withColumn(
    "dti",
    when(
        col("dti").isNull(),
        col("installment") / (col("annual_inc") / 12)
    ).otherwise(col("dti"))
)

df_selected.select("dti", "installment", "annual_inc").show(20)


+-----+-----------+----------+
|  dti|installment|annual_inc|
+-----+-----------+----------+
|27.74|     375.88|   42000.0|
|12.35|    1062.71|  155000.0|
|31.11|     952.65|  120000.0|
|15.94|     501.17|   79077.0|
|19.06|    1022.12|  107000.0|
|12.27|     219.26|   32000.0|
|18.79|     320.05|   55000.0|
|20.36|     281.34|  120000.0|
|22.63|     434.93|   90000.0|
|17.91|     265.68|   90000.0|
|22.22|      699.3|   79000.0|
|24.03|     555.04|  100000.0|
|38.07|     509.69|  180000.0|
|14.37|     810.49|  200000.0|
| 9.53|     437.92|  125000.0|
| 42.2|      78.34|   24000.0|
|24.72|     398.42|   60000.0|
| 6.53|      67.96|  125000.0|
|20.18|     498.02|   71400.0|
| 3.07|     288.05|   72000.0|
+-----+-----------+----------+
only showing top 20 rows



In [9]:
from pyspark.sql.functions import col

df = df.withColumn("avg_fico_range", (col("fico_range_low") + col("fico_range_high")) / 2)

df_selected = df_selected.join(df.select("id", "avg_fico_range"), on="id", how="left")

df_selected.select("id", "avg_fico_range").show(10)


+---------+--------------+
|       id|avg_fico_range|
+---------+--------------+
|120122535|         717.0|
|119374887|         717.0|
|119321612|         767.0|
|120122034|         702.0|
|118659541|         787.0|
|119246018|         712.0|
|119183129|         697.0|
|120086024|         702.0|
|119417303|         662.0|
|120020939|         662.0|
+---------+--------------+
only showing top 10 rows



In [14]:
from pyspark.sql.functions import col, when, isnan
from pyspark.sql.types import DoubleType

df = df.withColumn("revol_util", col("revol_util").cast(DoubleType()))

valid_values = df.filter((col("revol_util").isNotNull()) & (~isnan(col("revol_util"))))

if valid_values.count() > 0:
    median_value = df.approxQuantile("revol_util", [0.5], 0.01)[0]
else:
    median_value = 0.0

df_selected = df_selected.withColumn(
    "revol_util",
    when(col("revol_util").isNull(), median_value).otherwise(col("revol_util"))
)

df_selected.select("revol_util").show(10)




+----------+
|revol_util|
+----------+
|       37%|
|     34.1%|
|     20.7%|
|     57.7%|
|     16.1%|
|     22.8%|
|     57.5%|
|     92.3%|
|       86%|
|     29.9%|
+----------+
only showing top 10 rows



In [15]:
from pyspark.sql.types import StringType

df_selected = df_selected.withColumn("revol_util", col("revol_util").cast(StringType()))

df_selected.select("revol_util").show(10)


+----------+
|revol_util|
+----------+
|       37%|
|     34.1%|
|     20.7%|
|     57.7%|
|     16.1%|
|     22.8%|
|     57.5%|
|     92.3%|
|       86%|
|     29.9%|
+----------+
only showing top 10 rows



In [19]:
from pyspark.sql.functions import abs, col

df_selected = df_selected.withColumn("last_pymnt_amnt", abs(col("last_pymnt_amnt")))

df_selected.select("last_pymnt_amnt").show(10)


+---------------+
|last_pymnt_amnt|
+---------------+
|        2591.95|
|            0.0|
|         952.65|
|            0.0|
|            0.0|
|         219.26|
|       10091.46|
|        3418.89|
|         4572.7|
|         265.68|
+---------------+
only showing top 10 rows



In [20]:
from pyspark.sql.functions import col, when

df_selected = df_selected.withColumn(
    "next_pymnt_d",
    when(col("next_pymnt_d").isNull(), "finished").otherwise(col("next_pymnt_d"))
)

df_selected.select("next_pymnt_d").show(10)


+------------+
|next_pymnt_d|
+------------+
|    finished|
|    Jul-2020|
|    Jun-2020|
|    Jun-2020|
|    Jun-2020|
|    Jun-2020|
|    finished|
|    finished|
|    finished|
|    Jun-2020|
+------------+
only showing top 10 rows



In [25]:
from pyspark.sql.functions import col, when

df_selected = df_selected.withColumn(
    "last_pymnt_d",
    when(col("last_pymnt_d").isNull(), "first month").otherwise(col("last_pymnt_d"))
)

df_selected.select("last_pymnt_d").show(10)

+------------+
|last_pymnt_d|
+------------+
|    May-2020|
|    May-2020|
|    May-2020|
|    May-2020|
|    Apr-2020|
|    May-2020|
|    Nov-2017|
|    Oct-2019|
|    Dec-2019|
|    May-2020|
+------------+
only showing top 10 rows



In [26]:
from pyspark.sql.functions import when

# Replace nulls with -1 in mths_since_last_major_derog
df_selected = df_selected.withColumn(
    "mths_since_last_major_derog",
    when(df_selected.mths_since_last_major_derog.isNull(), -1).otherwise(df_selected.mths_since_last_major_derog)
)

# Show transformed values
df_selected.select("mths_since_last_major_derog").distinct().show()

+---------------------------+
|mths_since_last_major_derog|
+---------------------------+
|                       70.0|
|                       67.0|
|                        8.0|
|                       69.0|
|                        0.0|
|                        7.0|
|                      112.0|
|                      108.0|
|                       88.0|
|                       49.0|
|                       -1.0|
|                      101.0|
|                       98.0|
|                       29.0|
|                      107.0|
|                       64.0|
|                       75.0|
|                       47.0|
|                       42.0|
|                       44.0|
+---------------------------+
only showing top 20 rows



In [28]:
from pyspark.sql.functions import col

# Step 1: Compute sec_app_fico_range_avg in the original df
df = df.withColumn(
    "sec_app_fico_range_avg",
    (col("sec_app_fico_range_low") + col("sec_app_fico_range_high")) / 2
)

# Step 2: Join df_selected with the computed sec_app_fico_range_avg column
df_selected = df_selected.join(df.select("id", "sec_app_fico_range_avg"), on="id", how="left")

# Step 3: Show results
df_selected.select("id", 'sec_app_fico_range_avg').show(10)



+---------+----------------------+
|       id|sec_app_fico_range_avg|
+---------+----------------------+
|120122535|                  NULL|
|119374887|                  NULL|
|119321612|                  NULL|
|120122034|                  NULL|
|118659541|                 707.0|
|119246018|                  NULL|
|119183129|                  NULL|
|120086024|                 602.0|
|119417303|                  NULL|
|120020939|                  NULL|
+---------+----------------------+
only showing top 10 rows



In [30]:
df_selected = df_selected.drop("sec_app_fico_range_low", "sec_app_fico_range_high")

df_selected.show()


+---------+--------------------+---------+---------+---------+------------------+---------+-------------+-----------------+-----------+---------+-----+----------------+----------+------------+--------------+---------------+-------+------------------+----------------+-----------+----------+--------------------+--------+-----+-------------+--------+-----------+-------------------+----------------+---------------+------------+----------+---------------+--------------------+---------------------------+----------------------+--------+------------+---------------+---------------+----------+-----------+----------------+----------+------------+---------------+-----------+---------+------------------+------------------------+----------------+------------------------------+-------------------+-------------------+--------+----------+----------+--------------+----------------------+
|       id|debt_settlement_flag|sub_grade|dti_joint|revol_bal|           purpose|total_acc|hardship_flag|hardship_en

In [31]:
from pyspark.sql.functions import when, col

# Update hardship_flag based on hardship_type
df_selected = df_selected.withColumn(
    "hardship_flag",
    when(col("hardship_type").isNull(), "n").otherwise("y")
)

# Verify the changes
df_selected.select("hardship_type", "hardship_flag").show(10)

+-------------+-------------+
|hardship_type|hardship_flag|
+-------------+-------------+
|         NULL|            n|
|    CVD19SKIP|            y|
|         NULL|            n|
|         NULL|            n|
|   ST0650PV01|            y|
|         NULL|            n|
|         NULL|            n|
|         NULL|            n|
|         NULL|            n|
|         NULL|            n|
+-------------+-------------+
only showing top 10 rows

