## Import

In [1]:
# from pyspark.sql import SparkSession
import pandas as pd
import warnings
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lead, to_date, lpad, min, max
from pyspark.sql.window import Window
sys.path.append('../src')
from data import run, load_and_concat_csv, drop_na_columns, process_data, add_y_label,lppub_column_names, lppub_column_classes
warnings.simplefilter(action='ignore', category=pd.errors.DtypeWarning)
print("Package Imported")

Package Imported


In [2]:
# file_path = '../data/raw/2016Q1.csv'
# new_directory = '../data/processed'

# df = run(file_path, new_directory)


In [3]:
# Initialize Spark session
spark = SparkSession.builder.appName("MortgageDelinquency").getOrCreate()

# Load the dataset (replace 'path_to_your_data.csv' with the actual path to your dataset)
df = spark.read.csv('../data/processed/2016Q1.csv', header=True, inferSchema=True)

# Convert 'ACT_PERIOD' from MMYYYY integer format to timestamp
df = df.withColumn('ACT_PERIOD', lpad(df['ACT_PERIOD'].cast('string'), 6, '0'))
df = df.withColumn('ACT_PERIOD', to_date(df['ACT_PERIOD'].cast('string'), 'MMyyyy'))
df = df.withColumn('default_status', col('default_status').cast('int'))

# Define a window specification to partition by LOAN_ID and order by ACT_PERIOD
window_spec = Window.partitionBy('LOAN_ID').orderBy('ACT_PERIOD')

# Create a column 'next_8_quarters' to look ahead for the next 8 quarters
# Use the lag function to check DLQ_STATUS for the next 8 quarters for each LOAN_ID
df = df.withColumn(
    'next_8_quarters_default',
    when(col('default_status') >= 3, 1).otherwise(0)
)

# Use the window spec to look at the next 8 quarters' default status
df = df.withColumn(
    'next_8_quarters_default',
    lead('next_8_quarters_default', 1).over(window_spec)
)

# Create the 'y_label' column based on next 8 quarters' default status
df = df.withColumn('y_label', when(col('next_8_quarters_default') == 1, 1).otherwise(0))

# Drop the intermediate 'next_8_quarters_default' column
df = df.drop('next_8_quarters_default')

# Show the results
df.select('LOAN_ID', 'ACT_PERIOD', 'default_status', 'y_label').show()

# Optionally, save the processed DataFrame to a new CSV or Parquet file
df_single = df.coalesce(1)
# df_single.write.option("header", "true").csv('../data/processed/2016Q1_ylabel.csv')


24/12/03 00:37:09 WARN Utils: Your hostname, Fengs-Laptop.local resolves to a loopback address: 127.0.0.1; using 10.91.162.124 instead (on interface en0)
24/12/03 00:37:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/03 00:37:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


CodeCache: size=131072Kb used=21576Kb max_used=21672Kb free=109495Kb
 bounds [0x0000000108984000, 0x0000000109ed4000, 0x0000000110984000]
 total_blobs=8857 nmethods=7941 adapters=828
 compilation: disabled (not enough contiguous free space left)


[Stage 4:>                                                          (0 + 1) / 1]

+------------+----------+--------------+-------+
|     LOAN_ID|ACT_PERIOD|default_status|y_label|
+------------+----------+--------------+-------+
|100009919815|2016-01-01|             0|      0|
|100009919815|2016-02-01|             0|      0|
|100009919815|2016-03-01|             0|      0|
|100009919815|2016-04-01|             0|      0|
|100009919815|2016-05-01|             0|      0|
|100009919815|2016-06-01|             0|      0|
|100009919815|2016-07-01|             0|      0|
|100009919815|2016-08-01|             0|      0|
|100009919815|2016-09-01|             0|      0|
|100009919815|2016-10-01|             0|      0|
|100009919815|2016-11-01|             0|      0|
|100009919815|2016-12-01|             0|      0|
|100009919815|2017-01-01|             0|      0|
|100009919815|2017-02-01|             0|      0|
|100009919815|2017-03-01|             0|      0|
|100009919815|2017-04-01|             0|      0|
|100009919815|2017-05-01|             0|      0|
|100009919815|2017-0

                                                                                

In [4]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

train_df = df.filter(df['ACT_PERIOD'] <= '2016-12-30')
test_df = df.filter(df['ACT_PERIOD'] > '2018-12-30')
categorical_columns = [
    'seller_type', 'servicer_type', 'channel_type',
    'purpose', 'property_type', 'occupancy_status', 'state', 
    'default_status', 'high_balance_loan_indicator', 'mod_indicator', 
    'homeready_indicator', 'relocation_mortgage_indicator', 'htlv_indicator', 
    'payment_deferral'
    ]
indexers = [StringIndexer(inputCol=col, outputCol=col + '_index').fit(df) for col in categorical_columns]
for indexer in indexers:
    df = indexer.transform(df)
    train_df = indexer.transform(train_df)
    test_df = indexer.transform(test_df)

# Assemble features (you can choose the features you want to use)
feature_columns = [
    'adjusted_remaining_time',  # Numeric column
    'num_borrowers', # Numeric column
    'seller_type_index', 'servicer_type_index', 'channel_type_index', 
    'purpose_index', 'property_type_index', 'occupancy_status_index', 
    'state_index', 'high_balance_loan_indicator_index', 'mod_indicator_index', 
    'homeready_indicator_index', 'relocation_mortgage_indicator_index', 
    'htlv_indicator_index', 'payment_deferral_index'
    ]

assembler = VectorAssembler(inputCols=feature_columns, outputCol='features', handleInvalid='skip')
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

# Initialize LogisticRegression model
lr = LogisticRegression(labelCol='y_label', featuresCol='features', family='binomial')

# Train the model on the training data
lr_model = lr.fit(train_df)

# Make predictions on the test data
predictions = lr_model.transform(test_df)

# Evaluate the model's performance
evaluator = BinaryClassificationEvaluator(labelCol='y_label')
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")


24/12/03 00:40:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/03 00:40:43 WARN MemoryStore: Not enough space to cache rdd_157_0 in memory! (computed 17.0 MiB so far)
24/12/03 00:40:43 WARN BlockManager: Persisting block rdd_157_0 to disk instead.
24/12/03 00:40:44 WARN MemoryStore: Not enough space to cache rdd_157_5 in memory! (computed 17.0 MiB so far)
24/12/03 00:40:44 WARN BlockManager: Persisting block rdd_157_5 to disk instead.
24/12/03 00:40:44 WARN MemoryStore: Not enough space to cache rdd_157_1 in memory! (computed 17.0 MiB so far)
24/12/03 00:40:44 WARN BlockManager: Persisting block rdd_157_1 to disk instead.
24/12/03 00:40:54 WARN MemoryStore: Not enough space to cache rdd_157_4 in memory! (computed 33.0 MiB so far)
24/12/03 00:40:54 WARN BlockManager: Persisting block rdd_157_4 to disk instead.
24/12/03 00:40:54 WARN MemoryStore: Not enough space to cache rdd_157_6 in memory! (computed 33.0 MiB so far)
24/12/03 00:40:54 W

Test Accuracy: 0.6864054922399269


                                                                                

In [5]:
# string_columns = final_df.select_dtypes(include='string').columns
# unique_values = {col: final_df[col].unique() for col in string_columns}
# for col, values in unique_values.items():
#     print(f"Unique values in column '{col}': {values}")

In [6]:
def classify_servicer_type(seller_name):
    # Convert to lowercase for consistent matching
    seller_name = seller_name.lower()
    
    # Define keywords for each category
    bank_keywords = [
        'bank', 'national association', 'credit union', 'fifth third', 
        'pnc', 'citizens bank', 'wells fargo', 'regions bank', 
        'suntrust', 'truist', 'jpmorgan', 'citi'
    ]
    
    mortgage_company_keywords = [
        'mortgage', 'lending', 'loan', 'servicing', 'financial', 
        'homeloans', 'loandepot', 'pennymac', 'roundpoint', 
        'freedom', 'quicken', 'amerihome', 'guild', 'caliber'
    ]
    
    # Check for keywords in the seller name
    if any(keyword in seller_name for keyword in bank_keywords):
        return 'Bank'
    elif any(keyword in seller_name for keyword in mortgage_company_keywords):
        return 'Mortgage Company'
    else:
        return 'Other'

# Apply the classification function to the SELLER column
final_df['servicer_type'] = final_df['SELLER'].apply(classify_servicer_type)
print(final_df['servicer_type'].unique())


NameError: name 'final_df' is not defined

In [None]:
print(final_df['CHANNEL'].dtype)

In [None]:
channel_map = {'C': 0, 'B': 1, 'R': 2}
final_df['CHANNEL1'] = final_df['CHANNEL'].map(channel_map)
print(final_df['CHANNEL1'].unique())

In [None]:
## firstpayment date - orginal date = time_to_first_payment
## ADJ Rem month / ORIGNAL Term
## Num of borrow stirng to int
## First Flag N: 0 Y: 1
## Purpose: P: 0  C R U: 1
## PROP: SF: 0, PU: 1, CO: 2, MH: 3, CP: 4
## Occupancy Status: U: 0 P: 1 I: 2 S: 3
## State: Alphabet sorted 
## MSA/ZIP leave for comment
## Product abandon
## "DLQ_STATUS" (string to int)
## MOD FLAG remove NA (N: 0 Y: 1)
## SERV_IND  (N: 0 Y: 1)
## HOMEREADY_PROGRAM_INDICATOR: 7: 0 F: 1 H:2
## RELOCATION_MORTGAGE_INDICATOR N: 0 Y:1
## HIGH_BALANCE_LOAN_INDICATOR N: 0 Y:1
## ACT Period -> date time format YYYY MM

In [None]:
final_df['adjusted_remaining_ratio'] = final_df['ADJ_REM_MONTHS'] / final_df['ORIG_TERM']
print(final_df['adjusted_remaining_ratio'].unique())

In [None]:
final_df['NUM_BO'] = pd.to_numeric(final_df['NUM_BO'], errors='coerce')
print(final_df['NUM_BO'].unique())

In [None]:
final_df[''] = pd.to_numeric(final_df['NUM_BO'], errors='coerce')
final

In [None]:
final_df['purpose'] = final_df['PURPOSE'].map({'P': 0, 'C': 1, 'R': 1, 'U': 1})
print(final_df['purpose'].unique())

In [None]:
final_df['high_balance_loan_indicator'] = final_df['HIGH_BALANCE_LOAN_INDICATOR'].map({'N': 0, 'Y': 1})
print(final_df['high_balance_loan_indicator'].unique())

In [None]:
prop_map = {'SF': 0, 'PU': 1, 'CO': 2, 'MH': 3, 'CP': 4}
final_df['PROP'] = final_df['PROP'].map(prop_map)

In [None]:
occupancy_map = {'U': 0, 'P': 1, 'I': 2, 'S': 3}
final_df['OCC_STAT'] = final_df['OCC_STAT'].map(occupancy_map)
print(final_df['OCC_STAT'].unique())

In [None]:
final_df['SERV_IND'] = final_df['SERV_IND'].map({'N': 0, 'Y': 1})

In [None]:
final_df['MOD_FLAG'] = final_df['MOD_FLAG'].map({'N': 0, 'Y': 1}).fillna(0)
print(final_df['MOD_FLAG'].unique())

In [None]:

# ORIG_DATE FIRST_PAY MATR_DT not in datetime format right now, need to change later to


channel_map = {'C': 0, 'B': 1, 'R': 2}
final_df['CHANNEL'] = final_df['CHANNEL'].map(channel_map)


# 1. Calculate `time_to_first_payment` as the difference between `firstpayment` and `origination date`
# need do it later 

# 2. Calculate `ADJ Rem month / ORIG_TERM`
final_df['adjusted_remaining_ratio'] = final_df['ADJ_REM_MONTHS'] / final_df['ORIG_TERM']

# 3. Convert `NUM_BO` (number of borrowers) from string to integer
final_df['NUM_BO'] = pd.to_numeric(final_df['NUM_BO'], errors='coerce')

# 4. Convert `FIRST_FLAG` (N: 0, Y: 1)
final_df['FIRST_FLAG'] = final_df['FIRST_FLAG'].map({'N': 0, 'Y': 1})

# 5. Convert `PURPOSE` (P: 0, C/R/U: 1)
final_df['PURPOSE'] = final_df['PURPOSE'].map({'P': 0, 'C': 1, 'R': 1, 'U': 1})

# 6. Convert `PROP` (SF: 0, PU: 1, CO: 2, MH: 3, CP: 4)
prop_map = {'SF': 0, 'PU': 1, 'CO': 2, 'MH': 3, 'CP': 4}
final_df['PROP'] = final_df['PROP'].map(prop_map)

# 7. Convert `OCCUPANCY_STATUS` (U: 0, P: 1, I: 2, S: 3)
occupancy_map = {'U': 0, 'P': 1, 'I': 2, 'S': 3}
final_df['OCC_STAT'] = final_df['OCC_STAT'].map(occupancy_map)

# 8. Convert `STATE` to integers by alphabetically sorting unique values and assigning them numbers
# Provided list of states in the dataset
states = ['GA', 'KS', 'IL', 'IN', 'TX', 'UT', 'MO', 'IA', 'OR', 'DE', 'CA', 'MI', 'KY',
          'CO', 'NY', 'PA', 'WI', 'WA', 'VA', 'AZ', 'MD', 'TN', 'MA', 'OH', 'SC', 'AK',
          'AL', 'LA', 'MN', 'NC', 'AR', 'MS', 'OK', 'NE', 'NJ', 'ID', 'FL', 'ND', 'NV',
          'NM', 'CT', 'VT', 'WV', 'DC', 'ME', 'SD', 'NH', 'MT', 'HI', 'PR', 'RI', 'WY',
          'VI', 'GU']

# Sort states alphabetically and assign rank values starting from 1
state_mapping = {state: idx + 1 for idx, state in enumerate(sorted(states))}
final_df['STATE'] = final_df['STATE'].map(state_mapping)


# 9. Leave `MSA` and `ZIP` columns as-is for now, as instructed, need further exploration

# 10. Drop the `PRODUCT` column
final_df.drop(columns=['PRODUCT'], inplace=True)

# 11. Convert `DLQ_STATUS` from string to integer, treating non-numeric as NaN
final_df['DLQ_STATUS'] = pd.to_numeric(final_df['DLQ_STATUS'], errors='coerce')

# 12. Convert `MOD_FLAG` (N: 0, Y: 1), removing rows where `MOD_FLAG` is NA
final_df = final_df.dropna(subset=['MOD_FLAG'])
final_df['MOD_FLAG'] = final_df['MOD_FLAG'].map({'N': 0, 'Y': 1})

# 13. Convert `SERV_IND` (N: 0, Y: 1)
final_df['SERV_IND'] = final_df['SERV_IND'].map({'N': 0, 'Y': 1})

# 14. Convert `HOMEREADY_PROGRAM_INDICATOR` (7: 0, F: 1, H: 2)
final_df['HOMEREADY_PROGRAM_INDICATOR'] = final_df['HOMEREADY_PROGRAM_INDICATOR'].map({'7': 0, 'F': 1, 'H': 2})

# 15. Convert `RELOCATION_MORTGAGE_INDICATOR` (N: 0, Y: 1)
final_df['RELOCATION_MORTGAGE_INDICATOR'] = final_df['RELOCATION_MORTGAGE_INDICATOR'].map({'N': 0, 'Y': 1})

# 16. Convert `HIGH_BALANCE_LOAN_INDICATOR` (N: 0, Y: 1)
final_df['HIGH_BALANCE_LOAN_INDICATOR'] = final_df['HIGH_BALANCE_LOAN_INDICATOR'].map({'N': 0, 'Y': 1})





In [None]:
df['PPMT_FLG'] = df['PPMT_FLG'].map({'N': 0, 'Y': 1})
print(df['interst_only_loan'].unique())

In [None]:
df['prepayment_penalty'] = df['PPMT_FLG'].map({'N': 0, 'Y': 1})
print(df['prepayment_penalty'].unique())

In [None]:
df['interst_only_loan'] = df['IO'].map({'N': 0, 'Y': 1})
print(df['interst_only_loan'].unique())

In [None]:
#17 HIGH_LOAN_TO_VALUE_HLTV_REFINANCE_OPTION_INDICATOR
final_df['HIGH_LOAN_TO_VALUE_HLTV_REFINANCE_OPTION_INDICATOR'] = final_df['HIGH_LOAN_TO_VALUE_HLTV_REFINANCE_OPTION_INDICATOR'].map({'N': 0, 'Y': 1})

#18 PAYMENT_DEFERRAL_MOD_EVENT_FLAG
final_df['PAYMENT_DEFERRAL_MOD_EVENT_FLAG'] = final_df['PAYMENT_DEFERRAL_MOD_EVENT_FLAG'].map({'N': 0, 'Y': 1,'7': 2})
# Checking the data types and first few rows to confirm transformations

#19 PPMT_FLG
final_df['PPMT_FLG'] = final_df['PPMT_FLG'].map({'N': 0, 'Y': 1})

#20 IO
final_df['IO'] = final_df['IO'].map({'N': 0, 'Y': 1})

# Map classification categories to integers (Bank: 0, Mortgage Company: 1, Investor: 2, Other: 3)
seller_type_mapping = {'Bank': 0, 'Mortgage Company': 1, 'Other': 2, 'Investor': 3}
final_df['seller_type'] = final_df['seller_type'].map(seller_type_mapping)


Service_type_mapping = {'Bank': 0, 'Mortgage Company': 1, 'Other': 2}
final_df['servicer_type'] = final_df['servicer_type'].map(Service_type_mapping)



print(final_df.dtypes)



In [None]:
print(final_df.head(50))

In [None]:
# Count the number of NaN values in each column in the dataset
nan_counts = final_df.isna().sum()

# Display columns with NaN values and their counts
nan_counts[nan_counts > 0]

In [None]:
# Drop rows with NaN values in the specified columns and save the cleaned DataFrame to a CSV file

# Specify columns with NaN values to drop
columns_with_nan = [
    'SERVICER', 'LOAN_AGE', 'REM_MONTHS', 'ADJ_REM_MONTHS', 'MATR_DT',
    'DTI', 'CSCORE_B', 'ZIP', 'PPMT_FLG', 'IO', 'DLQ_STATUS', 
    'SERV_IND', 'adjusted_remaining_ratio'
]

# Drop rows with NaN values in these columns
cleaned_df = final_df.dropna(subset=columns_with_nan)
final_df = final_df.drop(columns=['SELLER','SERVICER' 'ORIG_DATE','FIRST_PAY','MSA','ZIP'], errors='ignore')


In [None]:
final_df.head(10)

In [None]:
output_path = "../data/final_16Q1.csv"
final_df.to_csv(output_path, index=False)


In [None]:
df['hltv_INDICATOR'] = df['HIGH_LOAN_TO_VALUE_HLTV_REFINANCE_OPTION_INDICATOR'].map({'N': 0, 'Y': 1})
print(df['hltv_INDICATOR'].unique())

In [None]:
print("update")