# Main Code

In [1]:
#import findspark
#findspark.init()
import pyspark
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, DecimalType
from pyspark.sql.functions import when, col, collect_set, expr, min, to_date, add_months, date_trunc, date_format, lag, date_sub, first
from pyspark.sql import functions as F
from pyspark.sql.window import Window

import pandas as pd

#import datetime
#from dateutil.relativedelta import relativedelta

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "100g") \
    .appName('my-cool-app') \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)


## Origination Dataset

In [2]:
schema = StructType([
StructField("CREDIT SCORE", IntegerType(), True),
StructField("FIRST PAYMENT DATE", StringType(), True),
StructField("FIRST TIME HOMEBUYER FLAG", StringType(), True),
StructField("MATURITY DATE", StringType(), True ),
StructField("METROPOLITAN STATISTICAL AREA (MSA) OR METROPOLITAN DIVISION", IntegerType(),True),
StructField("MORTGAGE INSURANCE PERCENTAGE (MI %)", IntegerType(),True),
StructField("NUMBER OF UNITS", IntegerType(),True),
StructField("OCCUPANCY STATUS", StringType(),True),
StructField("ORIGINAL COMBINED LOAN-TO-VALUE (CLTV)", IntegerType(),True),
StructField("ORIGINAL DEBT-TO-INCOME (DTI) RATIO", IntegerType(),True),
StructField("ORIGINAL UPB" ,IntegerType(),True),
StructField("ORIGINAL LOAN-TO-VALUE (LTV)", IntegerType(),True),
StructField("ORIGINAL INTEREST RATE", DecimalType(10,7),True),
StructField("CHANNEL", StringType(),True),
StructField("PREPAYMENT PENALTY MORTGAGE (PPM) FLAG", StringType(),True),
StructField("AMORTIZATION TYPE", StringType(),True),
StructField("PROPERTY STATE", StringType(),True),
StructField("PROPERTY TYPE", StringType(),True),
StructField("POSTAL CODE", IntegerType(),True),
StructField("LOAN SEQUENCE NUMBER", StringType(),True),
StructField("LOAN PURPOSE", StringType(),True),
StructField("ORIGINAL LOAN TERM", IntegerType(),True),
StructField("NUMBER OF BORROWERS", IntegerType(),True),
StructField("SELLER NAME", StringType(),True),
StructField("SERVICER NAME", StringType(),True),
StructField("SUPER CONFORMING FLAG", StringType(),True),
StructField("PRE-RELIEF REFINANCE LOAN SEQUENCE NUMBER", StringType(),True),
StructField("PROGRAM INDICATOR", StringType(),True),
StructField("RELIEF REFINANCE INDICATOR", StringType(),True),
StructField("PROPERTY VALUATION METHOD", IntegerType(),True),
StructField("INTEREST ONLY INDICATOR (I/O INDICATOR)", StringType(),True),
StructField("MI CANCELLATION INDICATOR", StringType(),True)])

# Read a CSV file with a specific delimiter
df_orig = spark.read.option("delimiter", "|").csv("sample_orig_2003.txt", schema=schema)

#Removing loans which are not 30 years
df_orig = df_orig.filter(col("ORIGINAL LOAN TERM") == 360)
df_orig = df_orig.filter(col("PROPERTY TYPE") == 'SF')
#Changing First Payment Date to YYYY-MM format
df_orig = df_orig.withColumn("FIRST PAYMENT DATE", expr("concat_ws('-', substring(`FIRST PAYMENT DATE`, 1, 4), substring(`FIRST PAYMENT DATE`, 5, 2))"))

#Removing loans which are not FRM
df_orig = df_orig.filter(col("AMORTIZATION TYPE") == "FRM")
df_orig.show(10, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------
 CREDIT SCORE                                                 | 773             
 FIRST PAYMENT DATE                                           | 2002-04         
 FIRST TIME HOMEBUYER FLAG                                    | N               
 MATURITY DATE                                                | 203203          
 METROPOLITAN STATISTICAL AREA (MSA) OR METROPOLITAN DIVISION | NULL            
 MORTGAGE INSURANCE PERCENTAGE (MI %)                         | 0               
 NUMBER OF UNITS                                              | 1               
 OCCUPANCY STATUS                                             | P               
 ORIGINAL COMBINED LOAN-TO-VALUE (CLTV)                       | 80              
 ORIGINAL DEBT-TO-INCOME (DTI) RATIO                          | 22              
 ORIGINAL UPB                                                 | 68000           
 ORIGINAL LOAN-TO-VALUE (LTV

## Performance Dataset

In [3]:
schema1 = StructType([
StructField("LOAN SEQUENCE NUMBER", StringType(), True),
StructField("MONTHLY REPORTING PERIOD", IntegerType(), True),
StructField("CURRENT ACTUAL UPB", DecimalType(10,4), True),
StructField("CURRENT LOAN DELINQUENCY STATUS", StringType(), True ),
StructField("LOAN AGE", IntegerType(),True),
StructField("REMAINING MONTHS TO LEGAL MATURITY", IntegerType(),True),
StructField("DEFECT SETTLEMENT DATE", IntegerType(),True),
StructField("MODIFICATION FLAG", StringType(),True),
StructField("ZERO BALANCE CODE", IntegerType(),True),
StructField("ZERO BALANCE EFFECTIVE DATE", IntegerType(),True),
StructField("CURRENT INTEREST RATE", DecimalType(10,7),True),
StructField("CURRENT NON-INTEREST BEARING UPB", DecimalType(10,5),True),
StructField("DUE DATE OF LAST PAID INSTALLMENT (DDLPI)", IntegerType(),True),
StructField("MI RECOVERIES", DecimalType(10,4),True),
StructField("NET SALE PROCEEDS", StringType(),True),
StructField("NON MI RECOVERIES", DecimalType(10,4),True),
StructField("TOTAL EXPENSES", DecimalType(10,4),True),
StructField("LEGAL COSTS", DecimalType(10,4),True),
StructField("MAINTENANCE AND PRESERVATION COSTS", DecimalType(10,4),True),
StructField("TAXES AND INSURANCE", DecimalType(10,4),True),
StructField("MISCELLANEOUS EXPENSES", DecimalType(10,4),True),
StructField("ACTUAL LOSS CALCULATION", DecimalType(10,4),True),
StructField("CUMULATIVE MODIFICATION COST", DecimalType(10,4),True),
StructField("STEP MODIFICATION FLAG", StringType(),True),
StructField("PAYMENT DEFERRAL", StringType(),True),
StructField("ESTIMATED LOAN TO VALUE (ELTV)", IntegerType(),True),
StructField("ZERO BALANCE REMOVAL UPB", DecimalType(10,4),True),
StructField("DELINQUENT ACCRUED INTEREST", DecimalType(10,4),True),
StructField("DELINQUENCY DUE TO DISASTER", StringType(),True),
StructField("BORROWER ASSISTANCE STATUS CODE", StringType(),True),
StructField("CURRENT MONTH MODIFICATION COST", DecimalType(10,4),True),
StructField("INTEREST BEARING UPB", DecimalType(10,4),True),
StructField("DEFAULT", StringType(),True)])

# Read a CSV file with a specific delimiter
df_perf1 = spark.read.option("delimiter", "|").csv("sample_svcg_2002.txt", schema=schema1)
df_perf1.show(10, truncate=False, vertical=True)



#Changing MONTHLY REPORTING PERIOD to YYYY-MM format
df_perf1 = df_perf1.withColumn("MONTHLY REPORTING PERIOD", expr("concat_ws('-', substring(`MONTHLY REPORTING PERIOD`, 1, 4), substring(`MONTHLY REPORTING PERIOD`, 5, 2))"))


-RECORD 0-------------------------------------------------
 LOAN SEQUENCE NUMBER                      | F02Q10000115 
 MONTHLY REPORTING PERIOD                  | 200203       
 CURRENT ACTUAL UPB                        | 60000.0000   
 CURRENT LOAN DELINQUENCY STATUS           | 0            
 LOAN AGE                                  | 0            
 REMAINING MONTHS TO LEGAL MATURITY        | 180          
 DEFECT SETTLEMENT DATE                    | NULL         
 MODIFICATION FLAG                         | NULL         
 ZERO BALANCE CODE                         | NULL         
 ZERO BALANCE EFFECTIVE DATE               | NULL         
 CURRENT INTEREST RATE                     | 6.6250000    
 CURRENT NON-INTEREST BEARING UPB          | 0.00000      
 DUE DATE OF LAST PAID INSTALLMENT (DDLPI) | NULL         
 MI RECOVERIES                             | NULL         
 NET SALE PROCEEDS                         | NULL         
 NON MI RECOVERIES                         | NULL       

In [4]:
# Deleting Loan Sequence Numbers which are not 30 year or FRM loans from Performance dataset

# Find unique loan sequence numbers in df_orig
loan_sequence_numbers_orig = df_orig.select("LOAN SEQUENCE NUMBER").distinct()

# Filter df_pref to keep rows where 'LOAN SEQUENCE NUMBER' is in df_orig
df_perf = df_perf1.join(loan_sequence_numbers_orig,
                                on="LOAN SEQUENCE NUMBER",
                                how="left_semi")

# Show how many loans were removed from Performance dataset
deleted_rows = df_perf1.count() - df_perf.count()
total_perf_rows = df_perf1.count()
if deleted_rows > 0:
    print(f"{deleted_rows} amount of loans which are not not 30 year or FRM have been deleted out of the {total_perf_rows} total.")
else:
    print("No rows have been deleted.")

# Show the result
df_perf.show(10, truncate=False, vertical=True)

1517673 amount of loans which are not not 30 year or FRM have been deleted out of the 2487610 total.
-RECORD 0-------------------------------------------------
 LOAN SEQUENCE NUMBER                      | F02Q10000361 
 MONTHLY REPORTING PERIOD                  | 2002-03      
 CURRENT ACTUAL UPB                        | 68000.0000   
 CURRENT LOAN DELINQUENCY STATUS           | 0            
 LOAN AGE                                  | 0            
 REMAINING MONTHS TO LEGAL MATURITY        | 360          
 DEFECT SETTLEMENT DATE                    | NULL         
 MODIFICATION FLAG                         | NULL         
 ZERO BALANCE CODE                         | NULL         
 ZERO BALANCE EFFECTIVE DATE               | NULL         
 CURRENT INTEREST RATE                     | 6.8750000    
 CURRENT NON-INTEREST BEARING UPB          | 0.00000      
 DUE DATE OF LAST PAID INSTALLMENT (DDLPI) | NULL         
 MI RECOVERIES                             | NULL         
 NET SALE PROC

In [5]:
#Sets Default (Y/N) for Performance Dataset
df_perf = df_perf.withColumn("DEFAULT",
                             when(col("CURRENT LOAN DELINQUENCY STATUS") >= 6, "1")
                             .when(col("CURRENT LOAN DELINQUENCY STATUS") == "RA", "1")
                             .otherwise("0"))
df_perf.show(10, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------
 LOAN SEQUENCE NUMBER                      | F02Q10000361 
 MONTHLY REPORTING PERIOD                  | 2002-03      
 CURRENT ACTUAL UPB                        | 68000.0000   
 CURRENT LOAN DELINQUENCY STATUS           | 0            
 LOAN AGE                                  | 0            
 REMAINING MONTHS TO LEGAL MATURITY        | 360          
 DEFECT SETTLEMENT DATE                    | NULL         
 MODIFICATION FLAG                         | NULL         
 ZERO BALANCE CODE                         | NULL         
 ZERO BALANCE EFFECTIVE DATE               | NULL         
 CURRENT INTEREST RATE                     | 6.8750000    
 CURRENT NON-INTEREST BEARING UPB          | 0.00000      
 DUE DATE OF LAST PAID INSTALLMENT (DDLPI) | NULL         
 MI RECOVERIES                             | NULL         
 NET SALE PROCEEDS                         | NULL         
 NON MI RECOVERIES                         | NULL       

In [6]:
# Find count of unique loan sequence numbers in df_orig
loan_sequence_numbers_orig = df_orig.select("LOAN SEQUENCE NUMBER").distinct()
loan_sequence_numbers_orig.count()

# Find count of unique loan sequence numbers in df_perf
loan_sequence_numbers_perf = df_perf.select("LOAN SEQUENCE NUMBER").distinct()
loan_sequence_numbers_perf.count()

unique_rows = loan_sequence_numbers_orig.count() - loan_sequence_numbers_perf.count()
if unique_rows > 0:
    print(f"{unique_rows} The amount of unique loans in the origination and performance dataset are not the same.")
else:
    print("The amount of unique loans in the origination and performance dataset are the same.")


The amount of unique loans in the origination and performance dataset are the same.


### Transferring DEFAULT values from Performance dataset to Origination dataset

In [7]:
#Shows which LSN defaulted in performance dataset
result_perf = df_perf.filter(df_perf["DEFAULT"] == "1").select("LOAN SEQUENCE NUMBER", "DEFAULT")
result_perf.show(10)

#Shows which LSN did not default in performance dataset
result_perf1 = df_perf.filter(df_perf["DEFAULT"] == "0").select("LOAN SEQUENCE NUMBER", "DEFAULT")
result_perf1.show(10)


+--------------------+-------+
|LOAN SEQUENCE NUMBER|DEFAULT|
+--------------------+-------+
|        F02Q10002863|      1|
|        F02Q10002863|      1|
|        F02Q10008200|      1|
|        F02Q10008200|      1|
|        F02Q10008200|      1|
|        F02Q10008200|      1|
|        F02Q10008200|      1|
|        F02Q10008200|      1|
|        F02Q10008200|      1|
|        F02Q10008200|      1|
+--------------------+-------+
only showing top 10 rows

+--------------------+-------+
|LOAN SEQUENCE NUMBER|DEFAULT|
+--------------------+-------+
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
|        F02Q10000361|      0|
+--------------------+-------+
only showing top 10 rows



In [8]:
#Calculate the true DEFAULT value for each loan sequence number in df_perf
windowSpec = Window.partitionBy("LOAN SEQUENCE NUMBER")
df_perf_aggregated = df_perf.withColumn("TRUE_DEFAULT", F.max("DEFAULT").over(windowSpec))

# Filter the DataFrame to include only loan_sequence_number and true_default columns
df_filtered = df_perf_aggregated.select("LOAN SEQUENCE NUMBER", "TRUE_DEFAULT")

# Drop duplicates to get unique loan_sequence_numbers and true_defaults
df_unique = df_filtered.dropDuplicates()
# Group by loan_sequence_number and collect_list of unique true_default values
df_grouped = df_unique.groupby("LOAN SEQUENCE NUMBER") \
    .agg(F.first("TRUE_DEFAULT").alias("unique_true_defaults"))
df_grouped = df_grouped.withColumnRenamed("LOAN SEQUENCE NUMBER", "LOAN SEQUENCE NUMBER_xtra")
# Show the DataFrame
df_grouped.show(5, truncate=False, vertical=True)

#Checking to see if "unique_true_defaults" contains both "1" and "0" for DEFAULT
#df_filtered = df_grouped.filter(df_grouped["LOAN SEQUENCE NUMBER_xtra"] == "F00Q10000231")
#df_filtered.show(15, truncate=False, vertical=True)

-RECORD 0---------------------------------
 LOAN SEQUENCE NUMBER_xtra | F02Q10000361 
 unique_true_defaults      | 0            
-RECORD 1---------------------------------
 LOAN SEQUENCE NUMBER_xtra | F02Q10000385 
 unique_true_defaults      | 0            
-RECORD 2---------------------------------
 LOAN SEQUENCE NUMBER_xtra | F02Q10000582 
 unique_true_defaults      | 0            
-RECORD 3---------------------------------
 LOAN SEQUENCE NUMBER_xtra | F02Q10000728 
 unique_true_defaults      | 0            
-RECORD 4---------------------------------
 LOAN SEQUENCE NUMBER_xtra | F02Q10000863 
 unique_true_defaults      | 0            
only showing top 5 rows



In [9]:
#Checking to see number of rows match in origination vs. grouped
#row_count = df_grouped.rdd.count()
#print("Number of rows in the DataFrame:", row_count)
#row_count1 = df_orig.rdd.count()
#print("Number of rows in the DataFrame:", row_count1)
#if row_count == row_count1:
    #print("The number of rows in df_grouped and df_orig is the same.")
#else:
    #print("The number of rows in df_grouped and df_orig is different.")

In [10]:
#Checking to see if every Loan Sequence Number has a DEFAULT value
count_df = df_grouped.groupby("LOAN SEQUENCE NUMBER_xtra","unique_true_defaults").count()
# Check if there are any loan sequence numbers with missing values in the default column
missing_values = count_df.filter(F.col("count") == 0)
# If missing_values is not empty, there are loan sequence numbers without values in the default column
if missing_values.count() > 0:
    print("There are loan sequence numbers without values in the default column.")
else:
    print("Every loan sequence number has a value in its corresponding default column.")

Every loan sequence number has a value in its corresponding default column.


In [11]:
# Update df_orig based on the true DEFAULT values from df_grouped

# Join the two DataFrames on loan_sequence_number
df_updated = df_orig.join(df_grouped, df_orig["LOAN SEQUENCE NUMBER"] == df_grouped["LOAN SEQUENCE NUMBER_xtra"], "left") \
    .drop("LOAN SEQUENCE NUMBER_xtra")  # Drop the duplicate column

# Update the DEFAULT column in df_orig with the values from df_grouped["unique_true_defaults"]
df_orig = df_updated.withColumn("TRUE_DEFAULT", col("unique_true_defaults")) \
    .drop("unique_true_defaults")  # Drop the duplicate column

# Show the updated DataFrame
df_orig.show(10, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------
 CREDIT SCORE                                                 | 773             
 FIRST PAYMENT DATE                                           | 2002-04         
 FIRST TIME HOMEBUYER FLAG                                    | N               
 MATURITY DATE                                                | 203203          
 METROPOLITAN STATISTICAL AREA (MSA) OR METROPOLITAN DIVISION | NULL            
 MORTGAGE INSURANCE PERCENTAGE (MI %)                         | 0               
 NUMBER OF UNITS                                              | 1               
 OCCUPANCY STATUS                                             | P               
 ORIGINAL COMBINED LOAN-TO-VALUE (CLTV)                       | 80              
 ORIGINAL DEBT-TO-INCOME (DTI) RATIO                          | 22              
 ORIGINAL UPB                                                 | 68000           
 ORIGINAL LOAN-TO-VALUE (LTV

In [12]:
df_defaulted_orig = df_orig.filter(col("TRUE_DEFAULT") == "1")

# Show the result
df_defaulted_orig.show(10, truncate=False, vertical=True)


-RECORD 0-------------------------------------------------------------------------------------
 CREDIT SCORE                                                 | 478                           
 FIRST PAYMENT DATE                                           | 2002-03                       
 FIRST TIME HOMEBUYER FLAG                                    | N                             
 MATURITY DATE                                                | 203202                        
 METROPOLITAN STATISTICAL AREA (MSA) OR METROPOLITAN DIVISION | 20260                         
 MORTGAGE INSURANCE PERCENTAGE (MI %)                         | 0                             
 NUMBER OF UNITS                                              | 1                             
 OCCUPANCY STATUS                                             | P                             
 ORIGINAL COMBINED LOAN-TO-VALUE (CLTV)                       | 69                            
 ORIGINAL DEBT-TO-INCOME (DTI) RATIO              

## Sample 200 Origination Dataset with True Default

In [13]:
from pyspark.sql import functions as F

# Create Quarter and Year columns based on LOAN SEQUENCE NUMBER
df_orig = df_orig.withColumn("OrigYear", F.expr("substring(`LOAN SEQUENCE NUMBER`, 2, 2)").cast("integer") + 2000)
df_orig = df_orig.withColumn("OrigQuarter", F.expr("substring(`LOAN SEQUENCE NUMBER`, 5, 1)").cast("string"))
df_orig = df_orig.withColumn("OrigQuarter", F.concat(F.lit("Q"), "OrigQuarter"))

# Create Data column by concatenating Year and Quarter
df_orig = df_orig.withColumn(
    "OrigDate",
    F.concat(
        F.col("OrigYear").cast("string"),
        F.col("OrigQuarter")
    )
)

def sample_exact_per_quarter(df, quarter, n_samples):
    # Filter for the quarter and limit to the exact number of samples
    df_filtered = df.filter(F.col("OrigQuarter") == quarter)
    return df_filtered.limit(n_samples)

# Initialize an empty DataFrame to store results
sample_orig = spark.createDataFrame([], df_orig.schema)

# Define quarters and sample size per category
quarters = ["Q1", "Q2", "Q3", "Q4"]
sample_size_per_category = 25

# Loop through each quarter
for q in quarters:
    df_default_Y = sample_exact_per_quarter(df_orig.filter(F.col("TRUE_DEFAULT") == '1'), q, sample_size_per_category)
    df_default_N = sample_exact_per_quarter(df_orig.filter(F.col("TRUE_DEFAULT") == '0'), q, sample_size_per_category)

    sample_orig = sample_orig.union(df_default_Y).union(df_default_N)


#sample_orig.show(1, truncate=False, vertical=True)
#sample_orig.groupby("OrigQuarter", "TRUE_DEFAULT").count().show()

In [14]:
#unique_loan_count = sample_orig.select('LOAN SEQUENCE NUMBER').distinct().count()
#print("Count of unique LOAN SEQUENCE NUMBER:", unique_loan_count)

## Merge Sample Origination Dataset and Performance Dataset

In [15]:
# Dropping unnecessary columns from performance dataset
df_perf_final = df_perf.select("LOAN SEQUENCE NUMBER","MONTHLY REPORTING PERIOD","CURRENT ACTUAL UPB","CURRENT LOAN DELINQUENCY STATUS","LOAN AGE","CURRENT INTEREST RATE","CURRENT NON-INTEREST BEARING UPB","ZERO BALANCE REMOVAL UPB","INTEREST BEARING UPB", "ESTIMATED LOAN TO VALUE (ELTV)","DEFAULT")
df_perf_final.show(1, truncate=False, vertical=True)
df_merged = df_perf_final.join(sample_orig, 'LOAN SEQUENCE NUMBER', 'inner')
#df_merged = df_merged.repartition(100, "LOAN SEQUENCE NUMBER")


# Checks to see if the same number of LSN are in df_merged and the sample_orig
#unique_loan_count1 = df_merged.select('LOAN SEQUENCE NUMBER').distinct().count()
#print("Count of unique LOAN SEQUENCE NUMBER:", unique_loan_count1)

-RECORD 0----------------------------------------
 LOAN SEQUENCE NUMBER             | F02Q10000361 
 MONTHLY REPORTING PERIOD         | 2002-03      
 CURRENT ACTUAL UPB               | 68000.0000   
 CURRENT LOAN DELINQUENCY STATUS  | 0            
 LOAN AGE                         | 0            
 CURRENT INTEREST RATE            | 6.8750000    
 CURRENT NON-INTEREST BEARING UPB | 0.00000      
 ZERO BALANCE REMOVAL UPB         | NULL         
 INTEREST BEARING UPB             | 68000.0000   
 ESTIMATED LOAN TO VALUE (ELTV)   | NULL         
 DEFAULT                          | 0            
only showing top 1 row



#### Note: Data regarding Loan Sequence Number "F02Q10000139" at origination where Loan Age = 0 is not present in the unadulterated performance dataset. This is why the merged dataset has a first record beginning with Loan Age = 1.

df_merged.show(1, truncate=False, vertical=True)

test = df_perf1.filter((col("LOAN SEQUENCE NUMBER") == "F00Q10005912") & (col("LOAN AGE") == 0))
test.show(1, truncate=False, vertical=True)

In [16]:
sample_orig_loan_numbers = sample_orig.select('LOAN SEQUENCE NUMBER').distinct()
df_merged_loan_numbers = df_merged.select('LOAN SEQUENCE NUMBER').distinct()

# Check if there are any loan sequence numbers in df_merged that are not in sample_orig
diff_df = df_merged_loan_numbers.exceptAll(sample_orig_loan_numbers)
#if diff_df.count() == 0:
    #print("All loan sequence numbers in df_merged_loan_numbers are present in sample_orig")
#else:
    #print("There are loan sequence numbers in df_merged_loan_numbers that are not present in sample_orig")

In [17]:
#Checks to see if the same amount of loans are present in df_merged as sample_orig
#unique_loan_count1 = df_merged.select('LOAN SEQUENCE NUMBER').distinct().count()
#print("Count of unique LOAN SEQUENCE NUMBER:", unique_loan_count1)

#if unique_loan_count1 == unique_loan_count1:
    #print("There are the same amount of loans present in df_merged as sample_orig")
#else:
    #print("There are not the same amount of loans present in df_merged as sample_orig")

## Macroeconomics Variables

## Step 1: Load csvfile into each DataFrame

In [18]:
# Load Macroeconomic datasets into a DataFrame

hpi_full_df = spark.read.csv("HPI.csv", header=True, inferSchema=True)
#unemployment_df = spark.read.csv("/GWSB/home/g35026169/Desktop/Datasets/Macro/Unemployment.csv", header=True, inferSchema=True)
#unemployment_df = unemployment_df.withColumn("Date", expr("concat_ws('-', substring(`Date`, 1, 4), substring(`Date`, 5, 2))"))
hpi_full_df.show(1, truncate=False, vertical=True)

-RECORD 0-------------------
 frequency  | monthly       
 place_name | United States 
 index_nsa  | 128.21        
 index_sa   | 128.4         
 Date       | 199901        
only showing top 1 row



## Step 2: Merge with Merged Dataframe

### HPI

In [19]:
unique_freq_values = hpi_full_df.select('frequency').distinct()
print("Unique values in frequency column:", unique_freq_values)

monthly_freq_values = hpi_full_df.filter(hpi_full_df['frequency'] == 'monthly')
monthly_freq_values = monthly_freq_values.select('place_name').distinct()
monthly_freq_values.show(3, truncate=False, vertical=True)

Unique values in frequency column: +---------+
|frequency|
+---------+
|  monthly|
+---------+

-RECORD 0-------------------
 place_name | United States 



#### To keep consistency between all macroeconomic data, HPI data is filtered to include only monthly data. Yet, state information from the Federal Housing Finance Agency HPI dataset is incomplete for monthly analysis.

https://www.fhfa.gov/DataTools/Downloads/Pages/House-Price-Index-Datasets.aspx

#### Due to this issue, each monthly value for HPI is to be view from a national perspective.

In [20]:
hpi_df = spark.read.csv("HPI.csv", header=True, inferSchema=True)
hpi_df = hpi_df.withColumn("Date", expr("concat_ws('-', substring(`Date`, 1, 4), substring(`Date`, 5, 2))"))
hpi_df.show(1, truncate=False, vertical=True)

-RECORD 0-------------------
 frequency  | monthly       
 place_name | United States 
 index_nsa  | 128.21        
 index_sa   | 128.4         
 Date       | 1999-01       
only showing top 1 row



In [21]:
# Count Number of null values of index_sa and index_nsa in hpi_df
sa_null_check_df = hpi_df.filter(col('index_sa').isNull())
nsa_null_check_df = hpi_df.filter(col('index_nsa').isNull())

num_null_values = sa_null_check_df.count()
num_null_values1 = nsa_null_check_df.count()

# Display the result
print("Number of null values in index_sa column of hpi_df:", num_null_values)
print("Number of null values in index_nsa column of hpi_df:", num_null_values1)

Number of null values in index_sa column of hpi_df: 0
Number of null values in index_nsa column of hpi_df: 0


In [22]:
# Perform the join with only Date
df_merged_hpi = df_merged.join(hpi_df,
                           (df_merged['MONTHLY REPORTING PERIOD'] == hpi_df['Date']),
                           'left')

# Drop unnecessary columns from hpi_df:
df_merged_hpi = df_merged_hpi.drop("frequency","place_name","Date")


# Check for null values in index_nsa and index_sa column
#null_nsa_check = df_merged_hpi.filter(col('index_nsa').isNull())
#null_sa_check = df_merged_hpi.filter(col('index_sa').isNull())

#print("There are", null_nsa_check.count(), "null values in index_nsa")
#print("There are", null_sa_check.count(), "null values in index_sa")

# Count the number of null values
#num_null_values = null_nsa_check.count() + null_sa_check.count()

# Display the result
#if num_null_values > 0:
    #print("There are", num_null_values, "total null values in index_nsa and index_sa columns.")
#else:
    #print("There are no null values in index_nsa and index_sa columns.")

In [23]:
#df_merged_hpi.show(2, truncate=False, vertical=True)

#### The amount of null values in index_nsa and index_sa is 0 when state is removed and only monthly data is used. It is important to note that this leaves makes the nonseasonal and seaonal HPI data nationally based.

In [24]:
#Checks to see if the same amount of loans are present in df_merged as df_merged_hpi
#num_records_merged = df_merged.count()
#print("Number of records:", num_records_merged)

#num_records_merged_hpi = df_merged_hpi.count()
#print("Number of records:", num_records_merged_hpi)

#if num_records_merged == num_records_merged_hpi:
    #print("There are the same amount of loans present in df_merged as df_merged_hpi")
#else:
    #print("There are not the same amount of loans present in df_merged as df_merged_hpi")

In [25]:
#Confirms that the correct index_nsa and index_sa is matched based on the period
df_merged_hpi.filter((df_merged_hpi['MONTHLY REPORTING PERIOD'] == '2000-02')).show(1, truncate=False, vertical=True)

filtered_hpi = hpi_df.filter(hpi_df['Date'] == '2000-02')
filtered_hpi.select('Date', 'index_nsa', 'index_sa').show()

(0 rows)

+-------+---------+--------+
|   Date|index_nsa|index_sa|
+-------+---------+--------+
|2000-02|   136.61|  136.85|
+-------+---------+--------+



In [26]:
df_merged = df_merged_hpi

In [27]:
unemployment_df = spark.read.csv("UNRATE.csv", header=True, inferSchema=True)

# Convert and format the DATE column
unemployment_df = unemployment_df.withColumn("DATE", date_format(to_date(unemployment_df["DATE"], "M/d/yyyy"), "yyyy-MM"))
unemployment_df.show()

+-------+------+
|   DATE|UNRATE|
+-------+------+
|1999-01|   4.3|
|1999-02|   4.4|
|1999-03|   4.2|
|1999-04|   4.3|
|1999-05|   4.2|
|1999-06|   4.3|
|1999-07|   4.3|
|1999-08|   4.2|
|1999-09|   4.2|
|1999-10|   4.1|
|1999-11|   4.1|
|1999-12|   4.0|
|2000-01|   4.0|
|2000-02|   4.1|
|2000-03|   4.0|
|2000-04|   3.8|
|2000-05|   4.0|
|2000-06|   4.0|
|2000-07|   4.0|
|2000-08|   4.1|
+-------+------+
only showing top 20 rows



### Unemployment

In [28]:

# Join the DataFrames
df_merged_un= df_merged.join(
    unemployment_df.select("Date", "UNRATE"),  # Select only the necessary columns from unemployment_df
    df_merged["MONTHLY REPORTING PERIOD"] == unemployment_df["Date"],  # Join condition
    "left"
)

df_merged_un = df_merged_un.select(df_merged["*"], unemployment_df["UNRATE"])

#Confirms that the correct UNRATE is matched
df_merged_un.filter((df_merged_un['MONTHLY REPORTING PERIOD'] == '2002-02')).show(2, truncate=False, vertical=True)

#filtered_unemployment_df = unemployment_df.filter(unemployment_df['Date'] == '2002-02')
#filtered_unemployment_df.select('Date', 'UNRATE').show()

-RECORD 0-----------------------------------------------------------------------
 LOAN SEQUENCE NUMBER                                         | F02Q10002863    
 MONTHLY REPORTING PERIOD                                     | 2002-02         
 CURRENT ACTUAL UPB                                           | 78000.0000      
 CURRENT LOAN DELINQUENCY STATUS                              | 0               
 LOAN AGE                                                     | 0               
 CURRENT INTEREST RATE                                        | 7.1250000       
 CURRENT NON-INTEREST BEARING UPB                             | 0.00000         
 ZERO BALANCE REMOVAL UPB                                     | NULL            
 INTEREST BEARING UPB                                         | 78000.0000      
 ESTIMATED LOAN TO VALUE (ELTV)                               | NULL            
 DEFAULT                                                      | 0               
 CREDIT SCORE               

In [29]:
df_merged = df_merged_un

### Inflation

In [30]:
inflation_df = spark.read.csv("Inflationd.csv", header=True, inferSchema=True)
#inflation_df = inflation_df.withColumn("Date", expr("concat_ws('-', substring(`Date`, 1, 4), substring(`Date`, 5, 2))"))
# Join the DataFrames
df_merged_in= df_merged.join(
    inflation_df.select("Date", "inflation"),
    df_merged["MONTHLY REPORTING PERIOD"] == inflation_df["Date"],
    "left"
)
# Optionally, you can drop the 'Date' column if it's redundant after the join
df_merged_in = df_merged_in.select(df_merged["*"], inflation_df["inflation"])

#Confirms that the correct inflation is matched
#df_merged_in.filter((df_merged_in['MONTHLY REPORTING PERIOD'] == '2000-02')).show(2, truncate=False, vertical=True)

#filtered_inflation_df = inflation_df.filter(inflation_df['Date'] == '2000-02')
#filtered_inflation_df.select('Date', 'inflation').show()

In [31]:
df_merged = df_merged_in

In [32]:
#df_merged.show(2, truncate=False, vertical=True)

### ELTV

####  The ratio obtained by the Current Actual UPB by the Housing Price, calculated as the current actual unpaid balance divided by percent change in the HPI from origination to the prediction month times original unpaid balance.

In [33]:
# Finds origination date and the index at origination

# Truncate the 'MONTHLY REPORTING PERIOD' column to the month level
df_merged_copy = df_merged.withColumn("truncated_monthly_reporting_period", date_trunc("month", col("MONTHLY REPORTING PERIOD")))

# Calculate the new date by subtracting 'LOAN AGE' months (min Loan Age is not 0 for all loans so origination is an estimate)
df_merged_copy = df_merged_copy.withColumn("origination", expr("add_months(`truncated_monthly_reporting_period`, -`LOAN AGE`)"))
df_merged_copy = df_merged_copy.withColumn("origination", date_format("origination", "yyyy-MM")).drop("truncated_monthly_reporting_period")

# Define a window specification partitioned by 'LOAN SEQUENCE NUMBER' and ordered by 'MONTHLY REPORTING PERIOD'
window_spec = Window.partitionBy("LOAN SEQUENCE NUMBER")

# Rename column to prevent duplicate confusion before joining
hpi_df = hpi_df.withColumnRenamed("index_nsa", "index_nsa1")
hpi_df = hpi_df.withColumnRenamed("index_sa", "index_sa1")

# Calculate the index_sa value corresponding to the origination date using hpi_df
df_merged_2 = df_merged_copy.join(hpi_df,
                           (df_merged_copy['origination'] == hpi_df['Date']),
                           'left')
df_merged_2 = df_merged_2.withColumnRenamed("index_sa1", "index_sa_origination")

# Drop the additional columns from hpi_df
df_merged_2 = df_merged_2.drop("frequency", "place_name", "index_nsa1", "Date")

In [34]:
# Calculate Current Housing Price
df_merged_2a = df_merged_2.withColumn(
    "Current Housing Price",
    F.col("ORIGINAL UPB") * (1 + (((F.col("index_sa") - F.col("index_sa_origination")) / F.col("index_sa_origination")))))

# Calculate ELTV Ratio
df_merged_ELTV = df_merged_2a.withColumn(
    "ESTIMATED LOAN TO VALUE (ELTV)",
    F.when(
        F.col("index_sa") != F.col("index_sa_origination"),
        F.round(F.col("CURRENT ACTUAL UPB") / F.col("Current Housing Price"), 4)
    ).otherwise("Undefined")
)

In [35]:
df_merged_ELTV = df_merged_ELTV.withColumn('% Change in UPB', F.round(((F.col('CURRENT ACTUAL UPB') - F.col('ORIGINAL UPB'))/ F.col('ORIGINAL UPB')), 4))

#### Because some LSN have a row where Loan Age = 0 (which represents their date of origination), the corresponding HPI index and HPI index at origination will be the same. Therefore, the percent change will be 0 and the ELTV Ratio will be 1 for that specific row.

In [36]:
# Verify that there are no null values in the new index_sa_origination column
index_sa_origination_null_check = df_merged_ELTV.filter(col('index_sa_origination').isNull())
num_null_values = index_sa_origination_null_check.count()
print("Number of null values in index_sa_origination column of df_merged_ELTV:", num_null_values)

Number of null values in index_sa_origination column of df_merged_ELTV: 0


In [37]:
# Join df_merged_ELTV with hpi_df on origination and Date
joined_df = df_merged_ELTV.join(hpi_df, (df_merged_ELTV["origination"] == hpi_df["Date"]), "inner")

# Filter to check if index_sa_origination is equal to index_sa
matched_rows = joined_df.filter(col("index_sa_origination") == col("index_sa"))

matched_rows_loan_numbers = matched_rows.select('LOAN SEQUENCE NUMBER').distinct()
df_merged_ELTV_loan_numbers = df_merged_ELTV.select('LOAN SEQUENCE NUMBER').distinct()

# Check if there are any loan sequence numbers in df_merged that are not in sample_orig
diff_df = df_merged_loan_numbers.exceptAll(df_merged_ELTV_loan_numbers)

if diff_df.count() == 0:
    print("All loan sequence numbers in matched_rows are present in df_merged_ELTV_loan_numbers. This confirms that the correct origination HPI index was applied to each LSN, using its origination date and hpi_df.")
else:
    print("There are loan sequence numbers in matched_rows that are not present in df_merged_ELTV_loan_numbers")

All loan sequence numbers in matched_rows are present in df_merged_ELTV_loan_numbers. This confirms that the correct origination HPI index was applied to each LSN, using its origination date and hpi_df.


### Dataframe Cleaning

In [38]:
### Calculate the number of nulls in each column
#null_counts = [df_merged_ELTV.where(col(c).isNull()).count() for c in df_merged_ELTV.columns]

### Filter columns with more than 0 nulls
#columns_with_nulls = [df_merged_ELTV.columns[i] for i, count in enumerate(null_counts) if count > 0]

### Print columns with more than 0 nulls
#print("Columns with more than 0 nulls:")
#for col_name in columns_with_nulls:
    #print(col_name)

In [39]:
#df_merged_loan_numbers = df_merged.select('LOAN SEQUENCE NUMBER').distinct()
#df_merged_ELTV_loan_numbers = df_merged_ELTV.select('LOAN SEQUENCE NUMBER').distinct()

# Check if there are any loan sequence numbers in df_merged that are not in sample_orig
#diff_df = df_merged_loan_numbers.exceptAll(df_merged_ELTV_loan_numbers)

#if diff_df.count() == 0:
    #print("All loan sequence numbers in df_merged are present in df_merged_ELTV")
#else:
    #print("There are loan sequence numbers in df_merged that are not present in df_merged_ELTV")

In [40]:
#Checking to see number of rows match in origination vs. grouped
#row_count = df_merged_ELTV.rdd.count()
#print("Number of rows in the DataFrame:", row_count)

#row_count1 = df_merged.rdd.count()
#print("Number of rows in the DataFrame:", row_count1)

#if row_count == row_count1:
    #print("The number of rows in df_merged_ELTV and df_merged is the same.")
#else:
    #print("The number of rows in df_merged_ELTV and df_merged is different.")

In [41]:
df_merged_final = df_merged_ELTV

In [42]:
# Drop the additional columns from hpi_df and origination dataset (index_nsa is dropped because it is non-seasonal)
df_merged_final = df_merged_final.drop("ZERO BALANCE REMOVAL UPB", "METROPOLITAN STATISTICAL AREA (MSA) OR METROPOLITAN DIVISION","SUPER CONFORMING FLAG", "PRE-RELIEF REFINANCE LOAN SEQUENCE NUMBER","RELIEF REFINANCE INDICATOR","index_nsa", "index_sa_origination", "origination","Current Housing Price")
#df_merged_final

In [43]:
df_final = df_merged_final.drop("TRUE_DEFAULT", "INTEREST BEARING UPB", "CURRENT INTEREST BEARING UPB", "MATURITY DATE", "ORIGINAL UPB", "SERVICER NAME", "CURRENT NON-INTEREST BEARING UPB", "PREPAYMENT PENALTY MORTGAGE (PPM) FLAG", "CURRENT NON-INTEREST BEARING UPBCURRENT NON-INTEREST BEARING UPB", "MORTGAGE INSURANCE PERCENTAGE (MI %)","POSTAL CODE", "SERVICE NAME", "PROGRAM INDICATOR", "PROPERTY VALUATION METHOD", "INTEREST ONLY INDICATOR (I/O INDICATOR)", "MI CANCELLATION INDICATOR", 'ORIGINAL INTEREST RATE', 'SELLER NAME')
df_final.printSchema()

root
 |-- LOAN SEQUENCE NUMBER: string (nullable = true)
 |-- MONTHLY REPORTING PERIOD: string (nullable = false)
 |-- CURRENT ACTUAL UPB: decimal(10,4) (nullable = true)
 |-- CURRENT LOAN DELINQUENCY STATUS: string (nullable = true)
 |-- LOAN AGE: integer (nullable = true)
 |-- CURRENT INTEREST RATE: decimal(10,7) (nullable = true)
 |-- ESTIMATED LOAN TO VALUE (ELTV): string (nullable = true)
 |-- DEFAULT: string (nullable = false)
 |-- CREDIT SCORE: integer (nullable = true)
 |-- FIRST PAYMENT DATE: string (nullable = false)
 |-- FIRST TIME HOMEBUYER FLAG: string (nullable = true)
 |-- NUMBER OF UNITS: integer (nullable = true)
 |-- OCCUPANCY STATUS: string (nullable = true)
 |-- ORIGINAL COMBINED LOAN-TO-VALUE (CLTV): integer (nullable = true)
 |-- ORIGINAL DEBT-TO-INCOME (DTI) RATIO: integer (nullable = true)
 |-- ORIGINAL LOAN-TO-VALUE (LTV): integer (nullable = true)
 |-- CHANNEL: string (nullable = true)
 |-- AMORTIZATION TYPE: string (nullable = true)
 |-- PROPERTY STATE: strin

In [44]:
#df_final.coalesce(1).write.option("header", "true").csv('Sampled_2002')

In [45]:
df_final.show(1,truncate=False, vertical=True)

-RECORD 0----------------------------------------------
 LOAN SEQUENCE NUMBER                   | F02Q10000728 
 MONTHLY REPORTING PERIOD               | 2002-04      
 CURRENT ACTUAL UPB                     | 85000.0000   
 CURRENT LOAN DELINQUENCY STATUS        | 0            
 LOAN AGE                               | 0            
 CURRENT INTEREST RATE                  | 6.8750000    
 ESTIMATED LOAN TO VALUE (ELTV)         | Undefined    
 DEFAULT                                | 0            
 CREDIT SCORE                           | 777          
 FIRST PAYMENT DATE                     | 2002-05      
 FIRST TIME HOMEBUYER FLAG              | N            
 NUMBER OF UNITS                        | 1            
 OCCUPANCY STATUS                       | P            
 ORIGINAL COMBINED LOAN-TO-VALUE (CLTV) | 61           
 ORIGINAL DEBT-TO-INCOME (DTI) RATIO    | 24           
 ORIGINAL LOAN-TO-VALUE (LTV)           | 61           
 CHANNEL                                | R     

In [46]:
dfs = df_final.toPandas()

In [47]:
dfs.shape

(16718, 30)

In [49]:
dfs.to_csv(r'C:\Users\LIB6\Downloads\Avinash\Final data\Sample_finale\Sample_2002.csv', encoding='utf-8', header=True)

In [51]:
null_counts = dfs.isnull().sum()

# Print the number of null values in each column
print("Number of null values in each column:")
print(null_counts)

Number of null values in each column:
LOAN SEQUENCE NUMBER                      0
MONTHLY REPORTING PERIOD                  0
CURRENT ACTUAL UPB                        0
CURRENT LOAN DELINQUENCY STATUS           0
LOAN AGE                                  0
CURRENT INTEREST RATE                     0
ESTIMATED LOAN TO VALUE (ELTV)            0
DEFAULT                                   0
CREDIT SCORE                              0
FIRST PAYMENT DATE                        0
FIRST TIME HOMEBUYER FLAG                 0
NUMBER OF UNITS                           0
OCCUPANCY STATUS                          0
ORIGINAL COMBINED LOAN-TO-VALUE (CLTV)    0
ORIGINAL DEBT-TO-INCOME (DTI) RATIO       0
ORIGINAL LOAN-TO-VALUE (LTV)              0
CHANNEL                                   0
AMORTIZATION TYPE                         0
PROPERTY STATE                            0
PROPERTY TYPE                             0
LOAN PURPOSE                              0
ORIGINAL LOAN TERM                    