## Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DateType
from pprint import pprint

## Variables

In [2]:
path = f'../data/raw/loans_raw.csv'

## Spark Session

In [3]:
spark = SparkSession.builder.appName('Loans').getOrCreate()

## Spark functions

In [4]:
def drop_na_cols(data, pct):
    rows = data.count()
    null_counts = data.select(
        [F.count(
            F.when(
                F.isnull(col), col)
        ).alias(col) for col in data.columns]
    )
    
    null_counts = null_counts.toPandas()
    null_counts = (null_counts/rows).ge(pct).all()
    null_cols = null_counts[null_counts == True].keys()
    
    
    
    return data.select([col for col in data.columns if col not in null_cols])

def lower_case_cols(data):
    data_dtypes = {col[0]: col[1] for col in data.dtypes}
    
    for column in data_dtypes.keys():
        if data_dtypes[column] == 'string':
            data = data.withColumn(column, F.lower(F.col(column)))
    
    return data
    
def remove_whitespace(data):
    data_dtypes = {col[0]: col[1] for col in data.dtypes}
    
    for column in data_dtypes.keys():
        if data_dtypes[column] == 'string':
            data = data.withColumn(column, F.lower(F.col(column)))
        
    return data

def make_col_numeric(data, column):
    return data.withColumn(column, data[column].cast(IntegerType()))

def truncate_credit_line(data, column):
    return data.withColumn(column, F.split(F.col(column), '-')[1])

def truncate_term(data, column):
    return data.withColumn(column, F.split(F.col(column), ' ')[1])

def categorise_employment_length(data, spark_session):
    name = 'df_el'
    data.createTempView(name)
    
    data = spark_session.sql(
    f"""
    
    select
    account_id,
    installment,
    loan_amount,
    interest_rate,
    term,
    purpose,
    issue_date,
    title,
    home_ownership,
    annual_income,

    case when employment_length in ('10+ years') then '10plus'
    when employment_length in ('< 1 year') then 'less1'
    when employment_length in ('1 year', '2 years', '3 years') then '1to3'
    when employment_length in ('4 years', '5 years', '6 years') then '4to6'
    when employment_length in ('7 years', '8 years', '9 years') then '7to9'
    else null
    end as employment_length,

    job_title,
    earliest_credit_line,
    public_records,
    delinquency_2y,
    inquiries_6m,
    open_accounts,
    debt_to_income,
    credit_card_usage,
    credit_card_balance,
    total_current_balance,
    nr_accounts,
    loan_status,
    amount_payed,
    year,
    district,
    postcode_district,
    credit_score

    from {name}
    """
    )
    
    return data

def categorise_home_ownership(data, spark):
    name = 'df_ho'
    data.createTempView(name)
    
    data = spark.sql(
    f"""
    select
    account_id,
    installment,
    loan_amount,
    interest_rate,
    term,
    purpose,
    issue_date,
    title,
    
    case when home_ownership in ('mortgage', 'rent', 'own') then home_ownership
    else 'other'
    end as home_ownership,
    
    annual_income,
    employment_length,
    job_title,
    earliest_credit_line,
    public_records,
    delinquency_2y,
    inquiries_6m,
    open_accounts,
    debt_to_income,
    credit_card_usage,
    credit_card_balance,
    total_current_balance,
    nr_accounts,
    loan_status,
    amount_payed,
    year,
    district,
    postcode_district,
    credit_score
    
    
    from {name}
    """
    )
    
    return data

def categorise_inquiry(data, spark):
    name = 'df_inq'
    data.createTempView(name)
    
    data = spark.sql(f"""
    select
    
    account_id,
    installment,
    loan_amount,
    interest_rate,
    term,
    purpose,
    issue_date,
    title,
    home_ownership,
    annual_income,
    employment_length,
    job_title,
    earliest_credit_line,
    public_records,
    delinquency_2y,
    
    case when inquiries_6m = 0 then 'no_inquiry'
    when inquiries_6m = 1 then '1_inquiry'
    else '2plus_inquiry'
    end as inquiries_6m,
    
    open_accounts,
    debt_to_income,
    credit_card_usage,
    credit_card_balance,
    total_current_balance,
    nr_accounts,
    loan_status,
    amount_payed,
    year,
    district,
    postcode_district,
    credit_score
    
    from {name}
    """
    )
    return data

def categorise_purpose(data, spark):
    name = 'df_purpose'
    data.createTempView(name)
    
    data = spark.sql(f"""
    select
    
    account_id,
    installment,
    loan_amount,
    interest_rate,
    term,
    
    case when purpose in ('debt_consolidation', 'credit_card') then purpose
    else 'other'
    end as purpose,
    
    issue_date,
    title,
    home_ownership,
    annual_income,
    employment_length,
    job_title,
    earliest_credit_line,
    public_records,
    delinquency_2y,
    inquiries_6m,
    open_accounts,
    debt_to_income,
    credit_card_usage,
    credit_card_balance,
    total_current_balance,
    nr_accounts,
    loan_status,
    amount_payed,
    year,
    district,
    postcode_district,
    credit_score
    
    from {name}    
    """
    )
    
    return data

def impute_column(data, spark_session, grouper, column_to_fill):
    name = 'df_tcb'
    data.createTempView(name)
    
    medians = data.groupBy(grouper).agg(F.expr(f'percentile_approx({column_to_fill}, 0.5)').alias(f'median_{column_to_fill}'))
    medians_name = 'df_median'
    medians.createTempView(medians_name)
    
    imputed_data = spark_session.sql(f"""
    select
    
    account_id,
    installment,
    loan_amount,
    interest_rate,
    term,
    purpose,
    issue_date,
    title,
    home_ownership,
    annual_income,
    employment_length,
    job_title,
    earliest_credit_line,
    public_records,
    delinquency_2y,
    inquiries_6m,
    open_accounts,
    debt_to_income,
    credit_card_usage,
    credit_card_balance,
    
    case when total_current_balance is null and {medians_name}.median_{column_to_fill} is not null then {medians_name}.median_{column_to_fill}
    else total_current_balance
    end as total_current_balance,
    
    nr_accounts,
    loan_status,
    amount_payed,
    year,
    {name}.district,
    postcode_district,
    credit_score
    
    from {name}
    left join {medians_name} on {medians_name}.district = {name}.district
    
    """
    )
    
    return imputed_data

def create_credit_age(data, spark_session, year):
    name = 'df_credit_age'
    data.createTempView(name)
    
    data = spark.sql(f"""
    select
    *,
    
    ({year} - earliest_credit_line) as credit_age_years
    
    from {name}
    """
    )
    
    return data

def create_binary_class(data, spark_session):
    name = 'df_binary_class'
    data.createTempView(name)
    
    data = spark_session.sql(f"""
    select
    
    *,
    
    case when loan_status in ('fully paid') then 0
    when loan_status in ('ongoing') then null
    
    when loan_status in ('default', 'charged_off', 'late (> 90 days)') then 1
    else null
    end as class
    
    from {name}
    """
    )
    return data

## Read data

In [5]:
data = spark.read.csv(
    path,
    inferSchema=True,
    header=True
)

In [6]:
data.printSchema()

root
 |-- account_id: integer (nullable = true)
 |-- installment: double (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- term: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- title: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: string (nullable = true)
 |-- employment_length: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- earliest_credit_line: string (nullable = true)
 |-- public_records: string (nullable = true)
 |-- last_record_months: string (nullable = true)
 |-- last_delinquency_months: string (nullable = true)
 |-- last_derog_months: string (nullable = true)
 |-- delinquency_2y: string (nullable = true)
 |-- inquiries_6m: string (nullable = true)
 |-- open_accounts: string (nullable = true)
 |-- debt_to_income: string (nullable = true)
 |-- credi

## Data Cleansing

In [7]:
data = drop_na_cols(data=data, pct=0.5)

In [8]:
data = lower_case_cols(data)

In [9]:
data = remove_whitespace(data)

In [10]:
data = make_col_numeric(data, 'credit_score')

In [11]:
data = make_col_numeric(data, 'annual_income')

In [12]:
data = truncate_credit_line(data, 'earliest_credit_line')

In [13]:
data = categorise_employment_length(data, spark)

In [14]:
data = categorise_home_ownership(data, spark)

In [15]:
data.createTempView('dft')

In [16]:
data = categorise_inquiry(data, spark)

In [17]:
data = categorise_purpose(data, spark)

In [18]:
data = impute_column(data, spark, 'district', 'total_current_balance')

In [19]:
data = create_credit_age(data, spark, 2015)

In [20]:
data = create_binary_class(data, spark)

In [21]:
data = data.na.drop()

In [22]:
data.createTempView('df')

In [24]:
data = truncate_term(data, 'term')