In [1]:
from pyspark.sql import SparkSession
import sys
import os

os.environ["PYSPARK_PYTHON"] = sys.executable

spark = SparkSession.builder.getOrCreate()

In [2]:
col_to_index = {
    "orig_interest_rate": 7,
    "orig_upb": 9,
    "orig_loan_term": 12,
    "orig_date": 13,
    "amortization_type": 34,
    "interest_only": 36,
    "zero_balance_code": 43,
    "zero_balance_date": 44,
    "upb_at_removal": 45,
    "foreclosure_costs": 53,
    "preservation_and_repair_costs": 54,
    "asset_recovery_costs": 55,
    "misc_holding_costs": 56,
    "holding_taxes_costs": 57,
    "net_sales_proceeds": 58,
    "credit_enhancement_proceeds": 59,
    "repurchase_make_whole_proceeds": 60,
    "other_foreclosure_proceeds": 61
}

col_names = col_to_index.keys()
col_indexes = col_to_index.values()

In [3]:
from pyspark.sql import DataFrame

def print_df(df: DataFrame):
    df.show()
    print(f"shape: ({df.count()}, {len(df.columns)})")

In [None]:
# STEP ONE: Load file with selected columns and column names

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

df1 = spark.read.csv(
    "data/2020Q1.csv",
    header=False,
    inferSchema=True,
    sep="|"
).select(
    [f"_c{i}" for i in col_indexes]
).toDF(*col_names)

print_df(df1)

+------------------+--------+--------------+---------+-----------------+-------------+-----------------+-----------------+--------------+-----------------+-----------------------------+--------------------+------------------+-------------------+------------------+---------------------------+------------------------------+--------------------------+
|orig_interest_rate|orig_upb|orig_loan_term|orig_date|amortization_type|interest_only|zero_balance_code|zero_balance_date|upb_at_removal|foreclosure_costs|preservation_and_repair_costs|asset_recovery_costs|misc_holding_costs|holding_taxes_costs|net_sales_proceeds|credit_enhancement_proceeds|repurchase_make_whole_proceeds|other_foreclosure_proceeds|
+------------------+--------+--------------+---------+-----------------+-------------+-----------------+-----------------+--------------+-----------------+-----------------------------+--------------------+------------------+-------------------+------------------+---------------------------+------

In [16]:
# STEP TWO: Filter loans that are FRMs, not interest-only, and have a zero-balance code and
#           cast UPB to integers

from pyspark.sql.functions import col

df2 = df1.filter(
    (col("amortization_type") == "FRM") &
    (col("interest_only") == "N") &
    (col("zero_balance_code").isNotNull())
).drop(
    "amortization_type", "interest_only"
).withColumn("orig_upb", col("orig_upb").cast(IntegerType()))

print_df(df2)

+------------------+--------+--------------+---------+-----------------+-----------------+--------------+-----------------+-----------------------------+--------------------+------------------+-------------------+------------------+---------------------------+------------------------------+--------------------------+
|orig_interest_rate|orig_upb|orig_loan_term|orig_date|zero_balance_code|zero_balance_date|upb_at_removal|foreclosure_costs|preservation_and_repair_costs|asset_recovery_costs|misc_holding_costs|holding_taxes_costs|net_sales_proceeds|credit_enhancement_proceeds|repurchase_make_whole_proceeds|other_foreclosure_proceeds|
+------------------+--------+--------------+---------+-----------------+-----------------+--------------+-----------------+-----------------------------+--------------------+------------------+-------------------+------------------+---------------------------+------------------------------+--------------------------+
|             3.375|  308000|           180

In [17]:
# STEP THREE: Compute cumulative interest paid

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def get_months(date_num: float) -> int:
    date_str = str(int(date_num)).zfill(6)
    month = date_str[:2]
    year = date_str[2:]

    return int(month) + 12*int(year)

def compute_cum_interest(
    principal,
    interest_rate,
    loan_term,
    orig_date,
    zero_balance_date
):
    m = get_months(zero_balance_date) - get_months(orig_date)

    i = interest_rate / 12 / 100
    M = principal * (i * (1 + i)**loan_term) / ((1 + i)**loan_term - 1)
    balance = principal
    cum_interest = 0.0

    for _ in range(int(m)):
        curr_interest = balance * i
        curr_principal = M - curr_interest
        balance -= curr_principal
        cum_interest += curr_interest

    return float(cum_interest)

compute_cum_interest_udf = udf(compute_cum_interest, DoubleType())

df3 = df2.withColumn(
    "cum_interest",
    compute_cum_interest_udf(
        df2["orig_upb"],
        df2["orig_interest_rate"],
        df2["orig_loan_term"],
        df2["orig_date"],
        df2["zero_balance_date"]
    )
).drop("orig_interest_rate", "orig_loan_term", "orig_date", "zero_balance_date")

print_df(df3)

+--------+-----------------+--------------+-----------------+-----------------------------+--------------------+------------------+-------------------+------------------+---------------------------+------------------------------+--------------------------+------------------+
|orig_upb|zero_balance_code|upb_at_removal|foreclosure_costs|preservation_and_repair_costs|asset_recovery_costs|misc_holding_costs|holding_taxes_costs|net_sales_proceeds|credit_enhancement_proceeds|repurchase_make_whole_proceeds|other_foreclosure_proceeds|      cum_interest|
+--------+-----------------+--------------+-----------------+-----------------------------+--------------------+------------------+-------------------+------------------+---------------------------+------------------------------+--------------------------+------------------+
|  308000|                1|     295534.64|             null|                         null|                null|              null|               null|              null|  

In [18]:
# STEP FOUR: Compute LGDs for defaulted loans

from pyspark.sql.functions import lit, when, col, coalesce

defaulted_costs_cols = [col for col in df3.columns if col.endswith("_costs")]
defaulted_proceeds_cols = [col for col in df3.columns if col.endswith("_proceeds")]
defaulted_codes = [2, 3, 9, 15, 16]
defaulted = df3["zero_balance_code"].isin(*defaulted_codes)

defaulted_costs = None
for c in defaulted_costs_cols:
    cost = when(defaulted, coalesce(col(c), lit(0)))
    
    if defaulted_costs is None:
        defaulted_costs = cost
    else:
        defaulted_costs += cost

defaulted_proceeds = None
for c in defaulted_proceeds_cols:
    proceed = when(defaulted, coalesce(col(c), lit(0)))
    
    if defaulted_proceeds is None:
        defaulted_proceeds = proceed
    else:
        defaulted_proceeds += proceed

df4 = df3.withColumn(
    "lgd",
    when(
        defaulted,
        (col("upb_at_removal") + defaulted_costs - defaulted_proceeds) / col("upb_at_removal")
    ).otherwise(lit(None)).alias("lgd")
).withColumn(
    "defaulted", 
    defaulted
).drop("zero_balance_code", *defaulted_costs_cols, *defaulted_proceeds_cols)

print_df(df4)

+--------+--------------+------------------+----+---------+
|orig_upb|upb_at_removal|      cum_interest| lgd|defaulted|
+--------+--------------+------------------+----+---------+
|  308000|     295534.64| 9323.340003277925|null|    false|
|  207000|     202842.63|  5563.35528405256|null|    false|
|  215000|      200243.5| 15669.95314739169|null|    false|
|  319000|     314314.38|10563.773696668144|null|    false|
|  362000|     351264.51|26961.797749326128|null|    false|
|  383000|     377369.88|10693.319667300344|null|    false|
|  271000|     268531.75| 5901.199010963915|null|    false|
|  564000|      552940.1| 20420.17220073891|null|    false|
|  340000|     335639.62| 11259.19453563376|null|    false|
|  274000|      272000.0|6764.6738883807275|null|    false|
|  742000|     725787.62|36674.737877994296|null|    false|
|  384000|     375432.68|12317.083316826373|null|    false|
|  192000|      190000.0| 4180.923284520559|null|    false|
|  480000|     380016.99| 67180.94600324

In [21]:
# STEP FIVE: Compute net

from pyspark.sql.types import FloatType
from pyspark.sql.functions import round

df5 = df4.withColumn(
    "net",
    when(
        col("defaulted"), 
        col("cum_interest") - col("upb_at_removal") * col("lgd")
    ).otherwise(col("cum_interest")).alias("net")
).drop(
    "upb_at_removal", "cum_interest", "lgd"
).withColumn("net", round(col("net"), 2).cast(FloatType()))

print_df(df5)

+--------+---------+--------+
|orig_upb|defaulted|     net|
+--------+---------+--------+
|  308000|    false| 9323.34|
|  207000|    false| 5563.36|
|  215000|    false|15669.95|
|  319000|    false|10563.77|
|  362000|    false| 26961.8|
|  383000|    false|10693.32|
|  271000|    false|  5901.2|
|  564000|    false|20420.17|
|  340000|    false|11259.19|
|  274000|    false| 6764.67|
|  742000|    false|36674.74|
|  384000|    false|12317.08|
|  192000|    false| 4180.92|
|  480000|    false|67180.95|
|  250000|    false|19610.63|
|  350000|    false|57517.56|
|  131000|    false|19778.37|
|  630000|    false|32184.88|
|  371000|    false|17721.65|
|  276000|    false|22734.55|
+--------+---------+--------+
only showing top 20 rows

shape: (407195, 3)


In [22]:
df5_pd = df5.toPandas()