In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import date_format
from pyspark.sql import Window
import pyspark.sql.functions as F

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)


# Create a Spark session
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName('TimeSeries') \
    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

# Load data from a CSV file into a DataFrame
df_merged = spark.read.csv('/Users/andrewrenga/Documents/GW Senior Year/Semester 2/Capstone/Sample_2023', header=True, inferSchema=True)
df_merged = df_merged.withColumn('MONTHLY REPORTING PERIOD', date_format('MONTHLY REPORTING PERIOD', 'yyyy-MM'))
#df_merged = df_merged.withColumn('FIRST PAYMENT DATE', date_format('FIRST PAYMENT DATE', 'yyyy-MM'))
df_merged.show(10, truncate=False, vertical=True)

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (
24/04/15 12:23:38 WARN Utils: Your hostname, Andrews-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.119 instead (on interface en0)
24/04/15 12:23:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/15 12:23:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

-RECORD 0----------------------------------------
 LOAN SEQUENCE NUMBER            | F23Q10000005  
 MONTHLY REPORTING PERIOD        | 2023-02       
 CURRENT ACTUAL UPB              | 284000.0      
 CURRENT LOAN DELINQUENCY STATUS | 0             
 LOAN AGE                        | 0             
 CURRENT INTEREST RATE           | 7.375         
 ESTIMATED LOAN TO VALUE (ELTV)  | Undefined     
 DEFAULT                         | 0             
 CREDIT SCORE                    | 775           
 FIRST TIME HOMEBUYER FLAG       | N             
 OCCUPANCY STATUS                | P             
 ORIGINAL INTEREST RATE          | 7.375         
 PROPERTY TYPE                   | PU            
 LOAN PURPOSE                    | N             
 SELLER NAME                     | Other sellers 
 OrigYear                        | 2023          
 OrigQuarter                     | Q1            
 OrigDate                        | 2023Q1        
 index_sa                        | 395.44        


In [2]:
windowSpec = Window.partitionBy("LOAN SEQUENCE NUMBER", "LOAN AGE").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Add a column to count occurrences of each LOAN AGE
df_merged = df_merged.withColumn("LOAN_AGE_COUNT", F.count("LOAN AGE").over(windowSpec))

# Check if LOAN AGE for a specific LSN resets
loan_age_reset = df_merged.groupby("LOAN SEQUENCE NUMBER").agg(F.max("LOAN_AGE_COUNT") > 1).\
    withColumnRenamed("max(LOAN_AGE_COUNT) > 1", "LOAN_AGE_RESET")

# Check if there is at least one true value in the column
has_true_value = loan_age_reset.select(F.max(F.col("(max(LOAN_AGE_COUNT) > 1)").cast("int"))).collect()[0][0] == 1

# Print "yes" if there is at least one true value, otherwise print "no"
if has_true_value:
    print("Multiple of the same loan age has been detected for a unique loan sequence number at two different points in time.")
else:
    print("Each loan increases in age across its lifespan, where age does not reset.")
    
df_merged = df_merged.drop("LOAN_AGE_COUNT")

[Stage 5:>                                                          (0 + 1) / 1]

Each loan increases in age across its lifespan, where age does not reset.


                                                                                

LOAN AGE does not continuously increase for all loans with respect to time due to a data entry error. This results in repeated loan age values for the same loan accross different points in time. The below code fixes this issue by hard-coding in loan age for all loans.


In [3]:
# Calculate the minimum loan age for each LSN
min_loan_age = df_merged.groupBy("LOAN SEQUENCE NUMBER").agg(F.min("LOAN AGE").alias("min_loan_age"))

# Join the minimum loan age back to the original DataFrame
df_merged = df_merged.join(min_loan_age, on="LOAN SEQUENCE NUMBER")

windowSpec = Window.partitionBy("LOAN SEQUENCE NUMBER").orderBy(F.lit(1))

# Calculate the monotonically increasing ID that starts at the minimum loan age for each LSN
df_merged = df_merged.withColumn("LOAN AGE", F.row_number().over(windowSpec) + F.first("min_loan_age").over(windowSpec) - 1) 
df_merged = df_merged.drop("min_loan_age")

In [4]:
from pyspark.sql.functions import lit, expr, row_number, monotonically_increasing_id, when, col, first
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import numpy as np
#pip install tqdm
from tqdm import tqdm

df_merged_skinny = df_merged.select("LOAN SEQUENCE NUMBER", "CREDIT SCORE", "LOAN AGE", "DEFAULT")
df_merged_skinny = df_merged_skinny.withColumn("Group", monotonically_increasing_id())

# Calculate the number of rows needed
num_rows = df_merged_skinny.count()
new_num_rows = (num_rows * 24) + num_rows

In [5]:
# Define the schema of the DataFrame
schema = df_merged_skinny.schema

# Create a pandas DataFrame with the desired number of rows
pandas_df = pd.DataFrame(index=range(new_num_rows))

# Calculate the Horizon values
pandas_df['Horizon'] = (pandas_df.index % 25) 

# Calculate the Group values
pandas_df['Group'] = (pandas_df.index // 25)

# Find the Source values
pandas_df['Source'] = 'orig'
pandas_df.loc[pandas_df['Horizon'].between(1, 24), 'Source'] = 'Duplicated'

result_df = spark.createDataFrame(pandas_df)
df_merged_skinny = df_merged_skinny.join(result_df, on='Group', how='left')

In [6]:
# Calculating Correct LOAN AGE for Duplicated rows
from pyspark.sql.functions import when, col, lit
from pyspark.sql.window import Window

# Calculate the static LOAN AGE value when Source = 'orig' for each group
static_loan_age_df = df_merged_skinny.filter(col("Source") == "orig") \
    .groupBy("Group").agg({"LOAN AGE": "first"}) \
    .withColumnRenamed("first(LOAN AGE)", "static_LOAN_AGE")

# Join static_loan_age_df to df_merged_skinny based on the "Group" column
result_df1 = df_merged_skinny.join(static_loan_age_df, on="Group")

# Define a window specification
window_spec = Window.partitionBy("Group").orderBy("Horizon")

# Subtract the static LOAN AGE value by the 'Horizon' value for each row within the same group
result_df1 = result_df1.withColumn("LOAN AGE", 
                                 when(col("Source") == "Duplicated", 
                                      col("static_LOAN_AGE") - col("Horizon"))
                                 .otherwise(col("LOAN AGE")))

# Drop the static_LOAN_AGE column and keep rows where loan age >= 0 
result_df1 = result_df1.drop("static_LOAN_AGE")
result_df1 = result_df1[result_df1['LOAN AGE'] >= 0]


df_merged_skinny = result_df1

In [7]:
# Left join df_merged into df_merged_skinny based on matching columns
df_merged_skinny = df_merged_skinny.withColumnRenamed("LOAN SEQUENCE NUMBER", "LOAN SEQUENCE NUMBER 1") \
                                   .withColumnRenamed("CREDIT SCORE", "CREDIT SCORE 1") \
                                   .withColumnRenamed("LOAN AGE", "LOAN AGE 1") \
                                   .withColumnRenamed("DEFAULT", "DEFAULT_orig")

df_timeseries = df_merged_skinny.join(df_merged, 
                                   (df_merged_skinny["LOAN SEQUENCE NUMBER 1"] == df_merged["LOAN SEQUENCE NUMBER"]) &
                                   (df_merged_skinny["LOAN AGE 1"] == df_merged["LOAN AGE"]), 
                                   "left")

# Drop duplicated columns and rename DEFAULT_orig back to DEFAULT
df_timeseries = df_timeseries.drop("LOAN SEQUENCE NUMBER 1", "CREDIT SCORE 1", "LOAN AGE 1", "DEFAULT")
df_timeseries = df_timeseries.withColumnRenamed("DEFAULT_orig", "DEFAULT") 

df_timeseries = df_timeseries.filter(df_timeseries['LOAN SEQUENCE NUMBER'].isNotNull())


df_timeseries = df_timeseries.orderBy("Group", "Horizon")

In [8]:
from pyspark.sql.functions import isnan, when, count, col

# Checks number of NaN Values in each column
df_timeseries.select([count(when(isnan(c), c)).alias(c) for c in df_timeseries.columns])

                                                                                

Group,DEFAULT,Horizon,Source,LOAN SEQUENCE NUMBER,MONTHLY REPORTING PERIOD,CURRENT ACTUAL UPB,CURRENT LOAN DELINQUENCY STATUS,LOAN AGE,CURRENT INTEREST RATE,ESTIMATED LOAN TO VALUE (ELTV),CREDIT SCORE,FIRST TIME HOMEBUYER FLAG,OCCUPANCY STATUS,ORIGINAL INTEREST RATE,PROPERTY TYPE,LOAN PURPOSE,SELLER NAME,OrigYear,OrigQuarter,OrigDate,index_sa,UNRATE,inflation,% Change in UPB
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [9]:
# Checks number of NULL Values in each column
df_timeseries.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_timeseries.columns])

                                                                                

Group,DEFAULT,Horizon,Source,LOAN SEQUENCE NUMBER,MONTHLY REPORTING PERIOD,CURRENT ACTUAL UPB,CURRENT LOAN DELINQUENCY STATUS,LOAN AGE,CURRENT INTEREST RATE,ESTIMATED LOAN TO VALUE (ELTV),CREDIT SCORE,FIRST TIME HOMEBUYER FLAG,OCCUPANCY STATUS,ORIGINAL INTEREST RATE,PROPERTY TYPE,LOAN PURPOSE,SELLER NAME,OrigYear,OrigQuarter,OrigDate,index_sa,UNRATE,inflation,% Change in UPB
0,0,0,0,0,0,15,0,0,0,10,0,0,0,0,0,0,0,0,0,0,0,0,0,15


#### Export as .csv file
df_timeseries.coalesce(1).write.csv('/Users/andrewrenga/Documents/GW Senior Year/Semester 2/Capstone.csv', header=True)