<a id="1"></a>
# Import data

In [None]:
# Spark Session, Pipeline, Functions, and Metrics
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

# Keras / Deep Learning
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras import optimizers, regularizers
from keras.optimizers import Adam

# Elephas for Deep Learning on Spark
from elephas.ml_model import ElephasEstimator

from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.functions import col, create_map, lit
from itertools import chain

In [None]:
# Spark Session
conf = SparkConf().setAppName('Predict Loan Payback').setMaster('local[6]') # 6 cores
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)

In [None]:
# sc.stop()
sc

In [None]:
# Load Data to Spark Dataframe
df = sql_context.read.csv("./data/accepted_2007_to_2018Q4.csv", header=True, inferSchema=True)

In [None]:
# View Schema
df.printSchema()

<a id="2"></a>
# Preprocessing

In [None]:
df.limit(5).toPandas()

In [None]:
df.groupBy("loan_status").count().toPandas()

In [None]:
df = df.filter('loan_status in ("Fully Paid", "Charged Off")')

In [None]:
df.groupBy("loan_status").count().toPandas()

In [None]:
df = df.drop('id', 'pymnt_plan', 'hardship_flag', 'out_prncp', 'out_prncp_inv', 'policy_code')

In [None]:
def spark_df_shape(df):
    return df.count(), len(df.columns)

In [None]:
print(spark_df_shape(df))

let's drop these columns, as it will not contribute to our goal.

In [None]:
to_drop = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

In [None]:
to_drop

In [None]:
to_drop.sum().sort_values(ascending=False).head(50)

In [None]:
to_drop.sum().sort_values(ascending=False)[50:].head(50)

The first 95 columns has lots of nulls so I will drop them

In [None]:
to_drop = list(to_drop.sum().sort_values(ascending=False).head(95).index)

In [None]:
print(to_drop)

In [None]:
df = df.drop(*to_drop)
print((df.count(), len(df.columns)))

In [None]:
df = df.na.drop()

In [None]:
print(spark_df_shape(df))

In [None]:
to_drop = ['fico_range_low', 'funded_amnt_inv', 'funded_amnt', 'total_pymnt_inv', 'total_pymnt', 'installment', 'collection_recovery_fee', 'total_rec_prncp', 'last_fico_range_low']
df = df.drop(*to_drop)

In [None]:
print(spark_df_shape(df))

In [None]:
mapping = {'Fully Paid': 1, 'Charged Off': 0}

mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

df = df.withColumn("loan_is_paid", mapping_expr[col("loan_status")])

In [None]:
df.groupBy("loan_is_paid").count().toPandas()

In [None]:
df = df.drop('loan_status')

In [None]:
distinct_terms = [x.term for x in df.select('term').distinct().collect()]

In [None]:
distinct_terms

In [None]:
mapping = dict(zip(distinct_terms, [36, 60]))
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
df = df.withColumn("term_months", mapping_expr[col("term")])

In [None]:
df.select("term_months").distinct().show()

In [None]:
df = df.drop('term')

In [None]:
df.printSchema()

In [None]:
df.groupBy('home_ownership').count().toPandas()

In [None]:
df = df.replace('NONE', 'ANY', 'home_ownership')

In [None]:
df.groupBy('home_ownership').count().toPandas()

In [None]:
to_drop=['grade', 'issue_d', 'url', 'last_pymnt_d', 'last_credit_pull_d', 'zip_code', 'addr_state', 'earliest_cr_line']
df = df.drop(*to_drop)

In [None]:
df.printSchema()

In [None]:
cols = ['annual_inc', 'dti', 'delinq_2yrs', 'fico_range_high', 'inq_last_6mths', 
        'open_acc', 'pub_rec', 'revol_bal', 'revol_util', 'total_acc', 'total_rec_int', 
        'total_rec_late_fee', 'recoveries', 'last_pymnt_amnt', 'last_fico_range_high', 
        'collections_12_mths_ex_med', 'acc_now_delinq']
for col_name in cols:
    df = df.withColumn(col_name, col(col_name).cast('float'))

In [None]:
df.printSchema()

In [None]:
df.limit(5).toPandas()

In [None]:
# Spark Pipeline
cat_features = ['sub_grade', 'verification_status', 'application_type', 'initial_list_status', 
                'purpose', 'home_ownership', 'disbursement_method', 'debt_settlement_flag']
assembler_inputs = list(set(df.columns) - set(cat_features)) + [feature + "_class_vec" for feature in cat_features]
assembler_inputs.remove('loan_is_paid')
assembler_inputs

In [None]:
# Pipeline Stages List
stages = []

# Loop for StringIndexer and OHE for Categorical Variables
for features in cat_features:
    # Index Categorical Features
    string_indexer = StringIndexer(inputCol=features, outputCol=features + "_index")
    # One Hot Encode Categorical Features
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[features + "_class_vec"])
    # Append Pipeline Stages
    stages += [string_indexer, encoder]
    
assembler_final = VectorAssembler(inputCols=assembler_inputs, outputCol="features") 

stages += [assembler_final]

In [None]:
stages

In [None]:
# Set Pipeline
pipeline = Pipeline(stages=stages)

In [None]:
# Fit Pipeline to Data
pipeline_model = pipeline.fit(df)

In [None]:
# Transform Data using Fitted Pipeline
df = pipeline_model.transform(df)

In [None]:
# Preview Newly Transformed Data
df.limit(5).toPandas()

In [None]:
df.printSchema()

In [None]:
df = df.select('features','loan_is_paid')
df.limit(20).toPandas()

In [None]:
# Shuffle Data
df = df.orderBy(rand())

In [None]:
# Split Data into Train / Test Sets
train_data, test_data = df.randomSplit([.9, .1])

In [None]:
print(spark_df_shape(df))

In [None]:
# Number of Inputs or Input Dimensions
input_dim = len(train_data.select("features").first()[0])
input_dim

In [None]:
# Set up Deep Learning Model / Architecture
model = Sequential()
model.add(Dense(units=78, input_shape=(input_dim,), activation='relu'))
model.add(Dense(units=39,activation='relu'))
model.add(Dense(units=19,activation='relu'))
model.add(Dense(units=8,activation='relu'))
model.add(Dense(units=4,activation='relu'))
model.add(Dense(units=2,activation='sigmoid'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
adam = optimizers.Adam(lr=0.01)
opt_conf = optimizers.serialize(adam)

# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")              # These two come directly from pyspark,
estimator.setLabelCol("loan_is_paid")             # hence the camel case. Sorry :)
estimator.set_keras_model_config(model.to_yaml()) # Provide serialized Keras model
estimator.set_num_workers(1)                      # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(10)
estimator.set_batch_size(512)
estimator.set_verbosity(1)
estimator.set_validation_split(0.1)
estimator.set_categorical_labels(True)
estimator.set_nb_classes(2)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])

In [None]:
train_data.printSchema()

In [None]:
test_data.printSchema()

In [None]:
dl_pipeline = Pipeline(stages=[estimator])

In [None]:
fit_dl_pipeline = dl_pipeline.fit(train_data)

In [None]:
pred_test = fit_dl_pipeline.transform(test_data)

In [None]:
pred_test.limit(10).toPandas()

In [None]:
pnl_test = pred_test.select("loan_is_paid", "prediction")

In [None]:
pred_and_label_test = pnl_test.rdd.map(lambda row: (row["loan_is_paid"], row['prediction']))

In [None]:
metrics_test = MulticlassMetrics(pred_and_label_test)

In [None]:
print("\nTest Data Accuracy: {}".format(metrics_test.weightedPrecision))

In [None]:
print("Test Data Confusion Matrix")
display(pnl_test.crosstab('loan_is_paid', 'prediction').toPandas())