In [0]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer
from pyspark.sql.types import FloatType, ArrayType
from pyspark.sql.functions import col, to_date, year, month, regexp_like, \
                                    regexp_extract, isnan, when, count, lit, udf, \
                                    pow as psf_pow, sum as psf_sum
import findspark
findspark.init()

In [0]:
import warnings
warnings.filterwarnings('ignore')

In [0]:
storage_account_name = 'storageaccfinalproject'
storage_account_access_key = 'DUIIuqAEO5nU1sqavDh4FhMy/CeNs6GVCWMx8dnbujqRCYGSzRovY2nUZ2g6vI8016Nen2i1IwYd+ASt/gUMmQ=='
spark.conf.set('fs.azure.account.key.' + storage_account_name +
               '.blob.core.windows.net', storage_account_access_key)

In [0]:
blob_container = 'inputraw'
filePath = "wasbs://" + blob_container + "@" + storage_account_name + \
    ".blob.core.windows.net/Loan_status_2007-2020Q3.csv"
df = spark.read.format("csv").load(filePath, inferSchema=True, header=True)

rename some columns

In [0]:
find = [
    'mths_since_rcnt_il', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', # rcnt and mo_sin_rcnt
    'mths_since_recent_bc_dlq', 'pct_tl_nvr_dlq', # dlq
    'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', # mo_sin_old
]
replace = [
    'mths_since_recent_il', 'mths_since_recent_rev_tl_op', 'mths_since_recent_tl', # recent and mths_since
    'mths_since_recent_bc_delinq', 'percent_tl_nvr_deliq', # delinq
    'mths_since_old_il_acct', 'mths_since_old_rev_tl_op' # mths_since_old
]

for f, r in zip(find, replace):
    df = df.withColumnRenamed(f, r)

del find, replace

convert to datetime

In [0]:
datetime_columns = ['issue_d', 'earliest_cr_line', 'last_pymnt_d', 'next_pymnt_d',
                    'last_credit_pull_d', 'sec_app_earliest_cr_line', 'hardship_start_date',
                    'hardship_end_date', 'payment_plan_start_date']

for dt_col in datetime_columns:
    df = df.withColumn(dt_col, to_date(df[dt_col], "MMM-yyyy"))

remove loan data issued before 1-1-2013

In [0]:
df = df.filter(year('issue_d') > 2012)

modify column int_rate and revol_util: changing from "xx.xx%" to xx.xx

In [0]:
df = df.withColumn('int_rate', regexp_extract('int_rate', r'(\d+\.\d+)', 1).cast('float'))

In [0]:
df = df.withColumn('revol_util', regexp_extract('revol_util', r'(\d+\.\d+)', 1).cast("float"))

create a column for deciding if it is matured loan; empty row = matured

In [0]:
df = df.withColumn('is_matured_loan', col('next_pymnt_d').isNull())

I only analyzed the loan data that is matured

In [0]:
df = df.filter(df['is_matured_loan'])

handle na / null data in dataframe

In [0]:
# turned into binary: missing = 0; existing = 1
mths_since_to_drop = [
    col_name for col_name in df.columns \
    if col_name not in ['mths_since_recent_rev_tl_op', 'mths_since_recent_tl'] \
    and 'mths_since_' in col_name
]

for col_name in mths_since_to_drop:
    df = df.withColumn(col_name,
                       when(col(col_name).isNull(), 0) \
                       .otherwise(1))

In [0]:
# filled by zero
df = df \
    .withColumn('deferral_term', when(col('deferral_term').isNull(), 0) \
    .otherwise(col('deferral_term')).cast("integer"))

In [0]:
# filled by mean
col_fill_by_mean = ['il_util', 'all_util', 'total_bal_il', 'max_bal_bc', 
                    'percent_bc_gt_75', 'bc_util', 'revol_util', 'dti',
                    'percent_tl_nvr_deliq', 'avg_cur_bal', 'bc_open_to_buy']
df = df.withColumn('dti', col('dti').cast(FloatType()))
imputer = Imputer(strategy='mean', inputCols=col_fill_by_mean, outputCols=col_fill_by_mean)
imputer = imputer.fit(df)
df = imputer.transform(df)

In [0]:
# filled by mode (numeric)
col_fill_by_mode = ['open_acc_6m', 'open_act_il', 'open_il_12m', 'open_il_24m',
                    'open_rv_12m', 'open_rv_24m', 'inq_fi', 'total_cu_tl', 'inq_last_12m',
                    'num_tl_120dpd_2m', 'num_rev_accts', 'inq_last_6mths']

imputer = Imputer(strategy='mode', inputCols=col_fill_by_mode, outputCols=col_fill_by_mode)
imputer = imputer.fit(df)
df = imputer.transform(df)

In [0]:
# filled by mode (string)
for col_name in ['emp_length', 'zip_code', 'hardship_flag']:
    most_common = df.freqItems([col_name]).first()[0][0]
    df = df \
        .withColumn(col_name, when(col(col_name).isNull(), most_common) \
        .otherwise(col(col_name)))

In [0]:
# filled by empty
col_fill_by_empty_str = ['emp_title', 'title', 'purpose']

for col_name in col_fill_by_empty_str:
    df = df \
        .withColumn(col_name, when(col(col_name).isNull(), 'Unknown') \
        .otherwise(col(col_name)))
    
# filled by df.na.fill('') for string columns (I guess)
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.fillna.html

create columns for modelling the probablilty of default

In [0]:
# scale some columns by annual_inc
for col_name in ['loan_amnt', 'delinq_amnt', 'revol_bal', 'tot_cur_bal', 
                 'max_bal_bc', 'avg_cur_bal', 'total_bal_ex_mort']:
    
    df = df.withColumn(col_name + '_to_' + 'annual_inc', 
                       col(col_name) / (col('annual_inc') + 1))
    
# ratio of the number of open credit lines to the number of 
# total credit lines in the borrower's credit file.
df = df.withColumn('open_cl_ratio', col('open_acc') / col('total_acc'))

create column: "expected"/realized installment

In [0]:
df = df \
    .withColumn('num_mths_to_pay',
                (year(col('last_pymnt_d')) - year(col('issue_d'))) * 12
                + month(col('last_pymnt_d')) - month(col('issue_d')))

In [0]:
# logic:
# if last_pymnt_d not null => no need to fill => mnths_diff
# else
#   if total pymnt > 0 => total_pymnt / installment => approx mnths_diff
#   else 0 => no payment made

df = df.withColumn(
    'num_mths_to_pay',
    when(
        col('last_pymnt_d').isNull(),
        when(
            col('total_pymnt') > 0, 
            col('total_pymnt') / col('installment')) \
        .otherwise(0)) \
    .otherwise(col('num_mths_to_pay')).cast("integer"))

In [0]:
# create a term integer column and drop soon
df = df.withColumn('term_int', regexp_extract('term', r'(\d\d)', 1).cast('integer'))

In [0]:
def calculate_installment(p, r, n):
    r = r / 100
    return p * (r / 12) * (1 + r / 12) ** n / ((1 + r / 12) ** n - 1)

df = df.withColumn(
    'calculated_installment',
    calculate_installment(col('funded_amnt'), col('int_rate'), col('term_int'))
)

In [0]:
# scale some installment columns by annual_inc
for col_name in ['installment', 'calculated_installment']:
    
    df = df.withColumn(col_name + '_to_' + 'annual_inc', 
                       col(col_name) / (col('annual_inc') + 1))

create column: ROI

In [0]:
# assume
depreciation_rate = 0.02

In [0]:
def calculate_avg_pymnt(total_pymnt, num_mths_to_pay, last_pymnt_amnt):
    return when(num_mths_to_pay == 0, last_pymnt_amnt) \
            .otherwise(col('total_pymnt') / col('num_mths_to_pay'))

In [0]:
df = df.withColumn(
    'avg_pymnt', 
    calculate_avg_pymnt(
        col('total_pymnt'),
        col('num_mths_to_pay'), 
        col('last_pymnt_amnt')))

In [0]:
def calculate_cdf(num, depreciation_rate=0.02):
    return sum([(1 + depreciation_rate / 12) ** (-t) for t in range(1, num + 1)])

udf_calculate_cdf = udf(calculate_cdf, returnType=FloatType())

In [0]:
df = df.withColumn('cdf', udf_calculate_cdf(col('num_mths_to_pay')))

In [0]:
df = df.withColumn('CDP', col('cdf') * col('avg_pymnt'))

In [0]:
df = df.withColumn(
    'CDP',
    when(
        (col('avg_pymnt') > 0)& \
        (col('num_mths_to_pay') == 0), 
        col('avg_pymnt')
    ).otherwise(col('CDP')))

In [0]:
def calculate_discounted_net_recoveries(depreciation_rate=0.02):
    return (
        (col('recoveries') - col('collection_recovery_fee')) \
        * 1 / (1 + depreciation_rate / 12)**(col('num_mths_to_pay') + 6)
    )
    
df = df.withColumn('CDP', col('CDP') + calculate_discounted_net_recoveries())

In [0]:
df = df.withColumn(
    'ROI', 
    (col('CDP') - col('funded_amnt')) \
    / col('funded_amnt'))

create column: IRR

In [0]:
import findspark
findspark.init()

In [0]:
# print version pyspark
import pyspark
print(pyspark.__version__)

3.5.1


In [0]:
import pyspark.sql.functions as sf


df = df.withColumn(
    'num_mths_to_pay',
    when(
        (regexp_like(col('loan_status'), lit(r'(Fully)')))& \
        (col('issue_d') == col('last_pymnt_d')),
        1
    ).otherwise(col('num_mths_to_pay')))

In [0]:
def make_list_of_cashflow(f, p, n):
    # Cast to float
    f = float(f)
    p = float(p)
    n = int(n)
    return [-f] + [p] * n

udf_make_list_of_cashflow = udf(make_list_of_cashflow, returnType=ArrayType(FloatType()))

In [0]:
# equivalently, it means 
# df.apply(lambda r: [-r['funded_amnt']] + [r['avg_pymnt']] * r['num_mths_to_pay'], axis=1)

df = df.withColumn(
    'lst_of_cf', 
    udf_make_list_of_cashflow(
        col('funded_amnt'), 
        col('avg_pymnt'), 
        col('num_mths_to_pay')))

In [0]:
def irr(cash_flow):
    """
    Calculate the Internal Rate of Return (IRR) for a given cash flow.

    :param cash_flow: A list or array of cash flow values.
    :return: The calculated IRR.
    """
    cash_flow = np.array(cash_flow)
    
    # Check if all cash flows have the same sign
    same_sign = np.all(cash_flow > 0) if cash_flow[0] > 0 else np.all(cash_flow < 0)
    if same_sign:
        if len(cash_flow) == 1 and cash_flow[0] < 0:
            return float(-1)
        raise ValueError("cash_flow must contain positive and negative values")

    # Define the coefficients of the polynomial equation
    coeffs = cash_flow[::-1]  # Reverse the cash flow array to match the polynomial order

    # Find the roots of the polynomial equation
    roots = np.roots(coeffs)

    # Filter real roots within the range (-1, 1)
    real_roots = roots[np.isreal(roots)].real - 1
    real_roots = real_roots[(real_roots <= 1) & (real_roots >= -1)]

    # If no real solution, return NaN
    if len(real_roots) == 0:
        return float('nan')

    # If only one real solution, return it
    if len(real_roots) == 1:
        return float(real_roots[0])

    # Check sign of all real roots
    same_sign = np.all(real_roots > 0) if real_roots[0] > 0 else np.all(real_roots < 0)
    
    # If signs are not the same, filter potential IRR by comparing the total positive and negative cash flows
    if not same_sign:
        pos = np.sum(cash_flow[cash_flow > 0])
        neg = np.sum(cash_flow[cash_flow < 0])
        real_roots = real_roots[real_roots >= 0] if pos >= neg else real_roots[real_roots < 0]
    
    # Pick the smallest one in magnitude and return
    abs_IRR = np.abs(real_roots)
    return float(real_roots[np.argmin(abs_IRR)])


In [0]:
from pyspark.sql.types import StructType, StructField

In [0]:
# Register the UDF
calculat_irr_udf = udf(irr, returnType=FloatType())

# Define the schema for the cash flow list
cf_schema = StructType([
    StructField('cf', ArrayType(FloatType()))
])

# Calculate the IRR for each cash flow list
df = df.withColumn('IRR', calculat_irr_udf(col('lst_of_cf')))

In [0]:
# replace NaN with 0 in IRR column
df = df.na.fill(0, subset=['IRR'])

drop irrelevant columns

In [0]:
to_drop = [
    col for col in df.columns if 
        ('hardship' in col and 'hardship_flag' != col) or 
        ('mths_since_' in col and '_binary' not in col) or 
        ('sec_app' in col) or 
        ('joint' in col)
        
] + [
    'orig_projected_additional_accrued_interest', 
    'payment_plan_start_date', 'pymnt_plan',
    'issue_d_year', 'policy_code',
    'next_pymnt_d', 'earliest_cr_line',
    'url', 'last_credit_pull_d', 'last_pymnt_d'
    
] + mths_since_to_drop + [
    'lst_of_cf', 'cdf', 'CDP', 'is_matured_loan'
]

In [0]:
df = df.drop(*to_drop)

check na count 
- tried: count can run so long. so i collect it back work as a chkpt and turn it back to df (failed)
    - potential reason: can't handle such a large list

binning

In [0]:
def binning_int_rate(int_rate):
    if int_rate < 10:
        return 'less than 10%'
    for i in range(2, 6):
        if i*5 <= int_rate and (i+1)*5 > int_rate:
            return f'within {i*5}% and {i*5+5}%'
    return 'greater than or equal to 30%'

udf_binning_int_rate = udf(binning_int_rate)

In [0]:
df = df.withColumn('int_rate_bin', udf_binning_int_rate(col('int_rate')))

In [0]:
def binning_income(income):
    # Cast to int
    income = float(income)
    if income < 20000:
        return 'less than 20k'
    for i in range(2, 16):
        if i*10000 <= income and (i+1)*10000 > income:
            return f'within {i*10}k and {(i+1)*10}k'
    return f'greater than or equal to {(i+1)*10}k'

udf_binning_income = udf(binning_income)

In [0]:
df = df.withColumn('annual_inc_bin', udf_binning_income(col('annual_inc')))

In [0]:
def binning_dti(dti):
    # Cast to float
    dti = float(dti)
    if dti < 10:
        return 'less than 10'
    for i in range(1, 10):
        if i*10 <= dti and (i+1)*10 > dti:
            return f'within {i*10} and {(i+1)*10}'
    return 'greater than or equal to 100'

udf_binning_dti = udf(binning_dti)

In [0]:
df = df.withColumn('dti_bin', udf_binning_dti(col('dti')))

In [0]:
def binning_fico_score(fico_range_low, fico_range_high):
    # Binning from 660 - 700, 701 - 750, 751 - 800, 801 - 850
    # Cast to float
    fico_range_low = float(fico_range_low)
    fico_range_high = float(fico_range_high)
    fico = (fico_range_low + fico_range_high) / 2
    if fico < 700:
        return '660 - 700'
    if fico < 750:
        return '701 - 750'
    if fico < 800:
        return '751 - 800'
    return '801 - 850'


udf_binning_fico_score = udf(binning_fico_score)

In [0]:
df = df.withColumn(
    'fico_score_bin',
    udf_binning_fico_score(col('fico_range_low'), col('fico_range_high')))

load to local and save as csv

In [0]:
df.explain(mode="formatted")

== Physical Plan ==
Project (11)
+- BatchEvalPython (10)
   +- Project (9)
      +- Project (8)
         +- BatchEvalPython (7)
            +- Project (6)
               +- Project (5)
                  +- * Project (4)
                     +- * Project (3)
                        +- * Filter (2)
                           +- Scan csv  (1)


(1) Scan csv 
Output [97]: [_c0#68014, id#68015, loan_amnt#68016, funded_amnt#68017, funded_amnt_inv#68018, term#68019, int_rate#68020, installment#68021, grade#68022, sub_grade#68023, emp_title#68024, emp_length#68025, home_ownership#68026, annual_inc#68027, verification_status#68028, issue_d#68029, loan_status#68030, purpose#68033, title#68034, zip_code#68035, addr_state#68036, dti#68037, delinq_2yrs#68038, fico_range_low#68040, fico_range_high#68041, inq_last_6mths#68042, open_acc#68045, pub_rec#68046, revol_bal#68047, revol_util#68048, total_acc#68049, initial_list_status#68050, out_prncp#68051, out_prncp_inv#68052, total_pymnt#68053, total_pym

In [0]:
# Show the final schema
df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: float (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: date (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: float (nullable = true)
 |-- delinq_2yrs: double (nullable = true)
 |-- fico_range_low: string (nullable = true)
 |-- fico_ran

In [0]:
df.write.format("csv").option("header", "true").option("encoding", "UTF-8").mode(
    "overwrite").save("wasbs://inputcleaned@"+storage_account_name+".blob.core.windows.net/loan")