##Importing Libraries

In [2]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql.functions import col, udf

from pyspark.ml.stat import Correlation

from pyspark.ml.classification import LogisticRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel

from pyspark.ml.feature import Bucketizer, StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml import regression
from pyspark.ml import feature
from pyspark.ml import Pipeline

In [3]:
import seaborn as sns
import matplotlib.pyplot as plt

## Starting a new Spark Session

In [5]:
spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("predict mortgage approval")
         .getOrCreate())
spark

In [6]:
sc = spark.sparkContext
sc

In [7]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

In [8]:
# Load training data and cache it. We will be using this data set over and over again.
mortgage = (spark.read.csv(path='/FileStore/tables/ny_hmda_2015.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache())

In [9]:
display(mortgage)

In [10]:
mortgage.printSchema()

In [11]:
mortgage=mortgage.toPandas()

In [12]:
mortgage.info()

In [13]:
mortgage[['action_taken_name','action_taken']]

In [14]:
mortgage['purchaser_type_name']

In [15]:
mortgage.columns

In [16]:
cols=['action_taken_name','agency_abbr','agency_name','applicant_ethnicity_name','applicant_race_2','applicant_race_3','applicant_race_4','applicant_race_5',\
     'applicant_race_name_1', 'applicant_race_name_2','applicant_race_name_3', 'applicant_race_name_4','applicant_race_name_5','applicant_sex_name','co_applicant_ethnicity_name','co_applicant_race_2','co_applicant_race_3',\
      'co_applicant_race_4','co_applicant_race_5','co_applicant_race_name_1', 'co_applicant_race_name_2','co_applicant_race_name_3', 'co_applicant_race_name_4',\
       'co_applicant_race_name_5','co_applicant_sex_name','county_name',\
       'denial_reason_1', 'denial_reason_2', 'denial_reason_3',\
       'denial_reason_name_1', 'denial_reason_name_2',\
       'denial_reason_name_3', 'edit_status', 'edit_status_name','hoepa_status_name','lien_status_name','loan_purpose_name','loan_type_name','msamd_name',\
     'owner_occupancy_name','preapproval_name','property_type_name','purchaser_type_name','state_abbr','state_name','rate_spread']

In [17]:
mtg=mortgage.drop(cols,axis=1)

## Checking missing values

In [19]:
total = mtg.isnull().sum().sort_values(ascending=False)
percent = (mtg.isnull().sum()/mtg.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(30)

In [20]:
mtg_1=mtg[(mtg['action_taken']==1)|(mtg['action_taken']==3)]

In [21]:
#Checking class imbalance
mtg_1['action_taken'].value_counts()/len(mtg_1['action_taken'])

## Checking missing data by defining 2 classes

In [23]:
total = mtg_1.isnull().sum().sort_values(ascending=False)
percent = (mtg_1.isnull().sum()/mtg.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(30)

## Checking correlation

In [25]:
corr1=mtg_1.corr()
f, ax = plt.subplots(figsize=(25, 10))
sns.heatmap(corr1)
display(f)

In [26]:
remcol=['county_code','number_of_owner_occupied_units','number_of_1_to_4_family_units','census_tract_number','property_type','population','owner_occupancy','hoepa_status','application_date_indicator','as_of_year','sequence_number','respondent_id','state_code','msamd','tract_to_msamd_income']

In [27]:
mtg_999=mtg_1.drop(remcol,axis=1)

In [28]:
mtg_999.info()

In [29]:
corr2=mtg_999.corr()
f9, ax = plt.subplots(figsize=(25, 10))
sns.heatmap(corr2,cmap="YlGnBu")
plt.tight_layout()
display(f9)

## Filling missing values in Income by Linear regression

In [33]:
filtered_df = mtg_1[mtg_1['applicant_income_000s'].notnull()]

In [34]:
filtered_df['action_taken'].value_counts()

In [35]:
#fig, ax = plt.subplots()
#ax=sns.distplot(filtered_df['applicant_income_000s'])
#display(fig)

In [36]:
#Checking outliers
#f, ax = plt.subplots(figsize=(15, 6))
#ax=sns.boxplot(x="action_taken", y="applicant_income_000s", data=mtg_1)
#display(f)

In [37]:
#mtg_1[(mtg_1['applicant_income_000s']>2000) & (mtg_1['action_taken']==3)].count()

In [38]:
#mtg_2=mtg_1[mtg_1['applicant_income_000s']<600]

In [39]:
#f, ax = plt.subplots(figsize=(15, 6))
#ax=sns.boxplot(x="action_taken", y="applicant_income_000s", data=mtg_2)
#display(f)

In [40]:
mtg_1['msamd'].fillna(value=0, inplace=True)

In [41]:
total = mtg_1.isnull().sum().sort_values(ascending=False)
percent = (mtg_1.isnull().sum()/mtg.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(30)

In [42]:
mtg_3=mtg_1
mtg_3.head()

In [43]:
mtg_3= mtg_3[mtg_3['number_of_1_to_4_family_units'].notnull()]
mtg_3= mtg_3[mtg_3['number_of_owner_occupied_units'].notnull()]
mtg_3= mtg_3[mtg_3['tract_to_msamd_income'].notnull()]
mtg_3= mtg_3[mtg_3['minority_population'].notnull()]
mtg_3= mtg_3[mtg_3['population'].notnull()]
mtg_3= mtg_3[mtg_3['hud_median_family_income'].notnull()]
mtg_3= mtg_3[mtg_3['census_tract_number'].notnull()]
mtg_3= mtg_3[mtg_3['county_code'].notnull()]

In [44]:
mtg_4= mtg_3[mtg_3['applicant_income_000s'].notnull()]

In [45]:
total = mtg_3.isnull().sum().sort_values(ascending=False)
percent = (mtg_3.isnull().sum()/mtg.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(30)

In [46]:
mtg_3.info()

In [47]:
#train test split
test1=mtg_3[mtg_3.applicant_income_000s.isnull()]
train1= mtg_3[mtg_3.applicant_income_000s.notnull()]
test1.head()

In [48]:
train_1=spark.createDataFrame(train1)
test_1=spark.createDataFrame(test1)


In [49]:
display(train_1)

In [50]:
a=train_1.columns
a.remove('applicant_income_000s')
a.remove('respondent_id')
a

In [51]:
train_1.printSchema()

In [52]:
from pyspark.ml.feature import StandardScaler
va = feature.VectorAssembler(inputCols=['loan_amount_000s'], outputCol='features')
assembled_train_df = va.transform(train_1)
assembled_test_df = va.transform(test_1)

In [53]:
training_df, validation_df = assembled_train_df.randomSplit([0.7, 0.3])

In [54]:
scaler = StandardScaler(inputCol='features', outputCol="scaledFeatures",withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(assembled_train_df)
# Normalize each feature to have unit standard deviation.
scaled_train = scalerModel.transform(assembled_train_df)
scaled_test = scalerModel.transform(assembled_test_df)

In [55]:
display(scaled_test)

In [56]:
lr = regression.LinearRegression(featuresCol='scaledFeatures', labelCol='applicant_income_000s')
pipe = Pipeline(stages=[scaler, lr])

In [57]:
pipe_model = pipe.fit(training_df)

In [58]:
pipe_model.transform(validation_df)

In [59]:
display(pipe_model.transform(validation_df))

In [60]:
rmse = fn.sqrt(fn.avg((fn.col('applicant_income_000s') - fn.col('prediction'))**2))

In [61]:
pipe_model.transform(validation_df).select(rmse).show()

In [62]:
pipe_model.transform(validation_df).select(fn.col('prediction'),fn.col('applicant_income_000s')).show()

In [63]:
hmda_new_01= pipe_model.transform(assembled_test_df)

In [64]:
display(hmda_new_01)

In [65]:
test2= hmda_new_01.toPandas()

In [66]:
test2.head()

In [67]:
test2.drop('features',axis=1,inplace=True)
test2.drop('scaledFeatures',axis=1,inplace=True)
test2.drop('applicant_income_000s',axis=1,inplace=True)

In [68]:
test2.info()

In [69]:
test2 = test2.rename(columns={'prediction' : 'applicant_income_000s'})

In [70]:
hmda_imp=pd.concat([train1, test2], ignore_index=True)

In [71]:
hmda_imp.info()

## Removing all the missing values

In [73]:
# starting off with all values removed
#total = mtg_4.isnull().sum().sort_values(ascending=False)
percent = (mtg_4.isnull().sum()/mtg.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(30)

In [74]:
mtg_4['action_taken'].value_counts()

In [75]:
mtg_4['action_taken'].replace(
    to_replace=[3],
    value=0,
    inplace=True
)

## Creating spark df

In [77]:
#hmda=spark.createDataFrame(mtg_4)

In [78]:
#display(hmda)

## Keeping all the features

In [80]:
#columns
#b=hmda.columns

In [81]:
#b.remove('respondent_id')
b.remove('sequence_number')
b.remove('action_taken')

In [82]:
# Train test split
#training_df_hmda, test_df_hmda = hmda.randomSplit([0.7, 0.3],123)

## Simple Logistic Regression without cross validation and regularization

In [84]:
#vector assembler
#va3= feature.VectorAssembler(inputCols=b, outputCol='features')


In [85]:
# Logistic regression with all features
from pyspark.ml import classification
logis = classification.LogisticRegression(featuresCol='features', labelCol='action_taken')


In [86]:
#pipe_hmda = Pipeline(stages=[va3, logis])

In [87]:
#pipe_hmda_fitted=pipe_hmda.fit(training_df_hmda)

In [88]:
#pipe_hmda_fitted.transform(test_df_hmda).select(fn.col('probability'),fn.col('action_taken')).show(100)

## Defining Evaluators

In [90]:
def binary_evaluation(model_pipeline, model_fitted, data):
  return BinaryClassificationEvaluator(labelCol=model_pipeline.getStages()[-1].getLabelCol(), 
                                rawPredictionCol=model_pipeline.getStages()[-1].getRawPredictionCol()).\
    evaluate(model_fitted.transform(data))

In [91]:
def accuracy(predlbls):
    counttotal = predlbls.count()
    correct = predlbls.filter(col('action_taken') == col("prediction")).count()
    wrong = predlbls.filter(col('action_taken') != col("prediction")).count()
    ratioCorrect = float(correct)/counttotal
    print("Correct: {0}, Wrong: {1}, Model Accuracy: {2}".format(correct, wrong, np.round(ratioCorrect, 2)))

In [92]:
#binary_evaluation(pipe_hmda,pipe_hmda_fitted,test_df_hmda)

In [93]:
#accuracy(pipe_hmda_fitted.transform(test_df_hmda))

In [94]:
#training_df_hmda.groupby('action_taken').count().show()

In [95]:
#scaled_feat = StandardScaler(inputCol='features', outputCol="scaledFeatures",withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
#scalerModel = scaler.fit(scaled_hmda)

In [96]:
#from pyspark.ml import classification
#logis = classification.LogisticRegression(featuresCol='scaledFeatures', labelCol='action_taken')
#pipe_logis = Pipeline(stages=[scaled_feat,logis])

## Removing un-related columns

In [98]:
cols=hmda.columns
cols.remove('county_code')
cols.remove('number_of_owner_occupied_units')
cols.remove('number_of_1_to_4_family_units')
cols.remove('census_tract_number')
cols.remove('property_type')
cols.remove('population')
cols.remove('owner_occupancy')
cols.remove('hoepa_status')
cols.remove('application_date_indicator')
cols.remove('as_of_year')
cols.remove('sequence_number')
cols.remove('respondent_id')
cols.remove('action_taken')

In [99]:
cols

## Using some features

In [101]:
#model1 = Pipeline(stages=[feature.VectorAssembler(inputCols=['purchaser_type'],
                                        outputCol='features'),
                 classification.LogisticRegression(labelCol='action_taken', featuresCol='features',regParam=0,elasticNetParam=0)])

In [102]:
#model1_fitted = model1.fit(training_df_hmda)

In [103]:
#accuracy(model1_fitted.transform(test_df_hmda))

In [104]:
#binary_evaluation(model1,model1_fitted,test_df_hmda)

## Fitting features present in cols

In [106]:
#model2 = Pipeline(stages=[feature.VectorAssembler(inputCols=cols,
                                        #outputCol='features'),
                 #logis])

In [107]:
#model2_fitted = model2.fit(training_df_hmda)

In [108]:
#binary_evaluation(model2,model2_fitted,test_df_hmda)

In [109]:
#accuracy(model2_fitted.transform(test_df_hmda))

In [110]:
#paramGrid = ParamGridBuilder() \
    .addGrid(logis.elasticNetParam, [0.0, 0.1, 0.01]) \
    .addGrid(logis.regParam, [0.0,0.1, 0.01]) \
    .build()

In [111]:
#Cross validation
#crossval_01 = CrossValidator(estimator=model2, 
                          #estimatorParamMaps=paramGrid, 
                          #evaluator= evaluator, 
                          #numFolds=2)

In [112]:
#crossval_fitted_01 = crossval_01.fit(training_df_hmda)

In [113]:
#accuracy(crossval_fitted_01.transform(test_df_hmda))

In [114]:
#evaluator.evaluate(crossval_fitted_01.transform(test_df_hmda))

## Random Forest Classifier

In [116]:
from pyspark.ml.classification import RandomForestClassifier

In [117]:
rf=classification.RandomForestClassifier(labelCol='action_taken', featuresCol='features',maxDepth=8,numTrees=200)
pipe_rf= Pipeline(stages=[feature.VectorAssembler(inputCols=cols,
                                        outputCol='features'),rf])

In [118]:
pipe_rf_fitted=pipe_rf.fit(training_df_hmda)

In [119]:
binary_evaluation(pipe_rf,pipe_rf_fitted,test_df_hmda)

In [120]:
accuracy(pipe_rf_fitted.transform(test_df_hmda))

In [121]:
#evaluator_rf = BinaryClassificationEvaluator(labelCol="action_taken")

## Using with more trees

In [123]:
#rf_01=classification.RandomForestClassifier(labelCol='action_taken', featuresCol='features',maxDepth=8,numTrees=400)
#pipe_rf_01= Pipeline(stages=[feature.VectorAssembler(inputCols=cols,
                                        outputCol='features'),rf_01])

In [124]:
#pipe_rf_fitted_01=pipe_rf_01.fit(training_df_hmda)

In [125]:
#binary_evaluation(pipe_rf_01,pipe_rf_fitted_01,test_df_hmda)

In [126]:
#accuracy(pipe_rf_fitted_01.transform(test_df_hmda))

## Using data from dataframe that has imputed values for Income

In [128]:
hmda_imp['action_taken'].replace(
    to_replace=[3],
    value=0,
    inplace=True
)

In [129]:
hmda_imp['action_taken'].value_counts()

In [130]:
#creating spark df
hmda_2=spark.createDataFrame(hmda_imp)

In [131]:
display(hmda_2)

In [132]:
cols1=hmda_2.columns
cols1.remove('county_code')
cols1.remove('number_of_owner_occupied_units')
cols1.remove('number_of_1_to_4_family_units')
cols1.remove('census_tract_number')
cols1.remove('property_type')
cols1.remove('population')
cols1.remove('owner_occupancy')
cols1.remove('hoepa_status')
cols1.remove('application_date_indicator')
cols1.remove('as_of_year')
cols1.remove('sequence_number')
cols1.remove('respondent_id')
cols1.remove('action_taken')


In [133]:
training_df_hmda_1, test_df_hmda_1 = hmda_2.randomSplit([0.7, 0.3],123)

In [135]:
va001= feature.VectorAssembler(inputCols=cols1,
                                        outputCol='features')

In [136]:
assembled_train=va001.transform(training_df_hmda_1)
assembled_test= va001.transform(test_df_hmda_1)

In [137]:
log_reg = classification.LogisticRegression(featuresCol='features', labelCol='action_taken')

pipeline = Pipeline(stages=[feature.VectorAssembler(inputCols=cols1,
                                        outputCol='features'),
                logis])


In [138]:
paramGrid = ParamGridBuilder() \
    .addGrid(logis.elasticNetParam, [0.0, 0.1, 0.01]) \
    .addGrid(logis.regParam, [0.0,0.1, 0.01,.001 ]) \
    .build()

In [139]:
evaluator = BinaryClassificationEvaluator(labelCol='action_taken')

In [140]:
#Cross validation
crossVal = CrossValidator(estimator=pipeline, 
                          estimatorParamMaps=paramGrid, 
                          evaluator= evaluator, 
                          numFolds=2)

In [141]:
cvModel = crossVal.fit(training_df_hmda_1)

In [142]:
cvModel.bestModel.stages

In [143]:
display(cvModel.transform(test22))

In [144]:
cvModel.bestModel.stages[1]


In [145]:
best_log_reg_model = cvModel.bestModel.stages[1]
best_log_reg_model

In [146]:
best_log_reg_model.extractParamMap()

In [147]:
for param in ['regParam', 'elasticNetParam', 'maxIter', 'tol']:
    best_log_reg_model.explainParam(param)

In [148]:
grid_score_param_records = []
for metric, param_map in zip(cvModel.avgMetrics, paramGrid):
    grid_score_param_records.append(list(param_map.values()) + [metric])

In [149]:
grid_score_param_df = pd.DataFrame(grid_score_param_records, columns=['regParam', 'elasticNetParam', 'areaUnderROC'])

In [150]:
grid_score_param_df

In [151]:
grid_score_param_df = pd.pivot_table(grid_score_param_df, index=['regParam', 'elasticNetParam'], values=['areaUnderROC'])

In [152]:
grid_score_param_df

In [153]:
grid_score_param_df['areaUnderROC'].unstack()

In [154]:
# Compute the heat map
hmap = grid_score_param_df['areaUnderROC'].unstack()
f2, ax = plt.subplots(figsize=(6, 4))
# Set up the matplotlib figure
sns.heatmap(hmap, square=False, annot=True, cmap='viridis', fmt='.4g', linewidths=1)
plt.title('Heat Map of Grid Search Parameters')
plt.tight_layout()
display(f2)

In [155]:
model = log_reg.fit(assembled_train)

In [156]:
test_summary = model.evaluate(assembled_test)

In [157]:
test_summary.accuracy

In [158]:
test_summary()

In [159]:
test_summary.roc.limit(10).toPandas()

In [160]:
test_roc_pdf = test_summary.roc.toPandas()

In [161]:
plt.figure(figsize=(6,4))
plt.plot(test_roc_pdf['FPR'], test_roc_pdf['TPR'], lw=1, label='logistic Classifier AUC = %0.2f' % (test_summary.areaUnderROC))
plt.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='NULL Accuracy')
plt.title('ROC AUC Curve')
plt.tight_layout()
plt.legend(loc="best" )
display()

In [162]:
test_preds = cvModel.transform(test_df_hmda_1)

In [163]:
cv_test_areaUnderROC = evaluator.evaluate(test_preds)
cv_test_areaUnderROC

In [164]:
testpredlbls = test_preds.select("prediction", "action_taken")

In [165]:
cvModel.bestModel.stages[1]

In [166]:
print("Coeff :" + str(cvModel.bestModel.stages[1].coefficientMatrix))
print("intercepts :" + str(cvModel.bestModel.stages[1].interceptVector))

In [167]:
coeff=(cvModel.bestModel.stages[1].coefficientMatrix)
coeff=coeff.toArray()
coeff1=np.array(coeff).tolist()
coeff1

In [168]:
coeff1=[-0.114871852712418,
  0.0009155153068500655,
  0.0008821364731822005,
  0.038890706161297174,
  0.014961989368882204,
  -0.1826686237195676,
  0.0016804793278324965,
  0.06817042688115463,
  2.6589042009618525e-07,
  -0.5250199678929605,
  -0.000279462784692149,
  -0.49518827654134034,
  -0.08355833183166382,
  -0.0066203628512083,
  7.1984832166973485e-06,
  -0.1348451600724103,
  0.0,
  0.0019111227562408969]

In [169]:
d={'Features': cols1 , 'Coeff': coeff1}
coeffdf= pd.DataFrame(data=d)
coeffdf

In [170]:
f8, ax = plt.subplots(figsize=(15, 6))
ax = sns.barplot(x="Features", y="Coeff", data=coeffdf)
ax.set_xticklabels(ax.get_xticklabels(), rotation = 90)
plt.title("Logistic regression coefficients")
plt.tight_layout()
display(f8)

## Random Forest

In [172]:
rf=classification.RandomForestClassifier(labelCol='action_taken', featuresCol='features',maxDepth=8,numTrees=200)
pipe_rf= Pipeline(stages=[feature.VectorAssembler(inputCols=cols1,
                                        outputCol='features'),rf])

In [173]:
pipe_rf_fitted=pipe_rf.fit(training_df_hmda_1)

In [174]:
binary_evaluation(pipe_rf,pipe_rf_fitted,test_df_hmda_1)

In [175]:
accuracy(pipe_rf_fitted.transform(test_df_hmda_1))

In [176]:
display(pipe_rf_fitted.transform(test_df_hmda_1))

In [177]:
display(training_df_hmda_1)

In [178]:
d = {'agency_code': 3, 'applicant_ethnicity2': 2,'applicant_race_1': 3, 'applicant_sex': 2 ,'co_applicant_ethnicity':2,'co_applicant_race_1':4 ,'co_applicant_sex':1 \
   ,'hud_median_family_income':70,'lien_status':4, 'loan_amount_000s':10,'loan_purpose':1,'loan_type':3,'minority_population':5,'msamd':15380, 'preapproval':3,'state_code':36,'tract_to_msamd_income':80 }

In [179]:
d2= {'respondent_id': u'0000451965', 'loan_purpose': 1, 'minority_population': 18878.770000457763672, 'tract_to_msamd_income': 138.44000244140625, 'as_of_year': 2015, 'co_applicant_ethnicity': 2, 'county_code': 64.0, 'property_type': 1,  'purchaser_type': 0, 'agency_code': 4, 'hoepa_status': 2, 'applicant_ethnicity': 2, 'number_of_1_to_4_family_units': 249.0, 'msamd': 30824.0, 'co_applicant_sex': 2, 'loan_amount_000s': 10, 'preapproval': 1, 'co_applicant_race_1': 5, 'loan_type': 3, 'state_code': 36, 'number_of_owner_occupied_units': 2077.0, 'population': 7442.0, 'owner_occupancy': 1,'purchaser_type':2, 'application_date_indicator': 0, 'census_tract_number': 319.0, 'lien_status': 2, 'applicant_sex': 1, 'hud_median_family_income': 100.0, 'applicant_income_000s': 760.0, 'applicant_race_1': 5, 'sequence_number': 155}

In [180]:
test=pd.DataFrame(d2, index=[0])

In [181]:
test22=spark.createDataFrame(test)

## Trying to predict

In [183]:
pred22=pipe_rf_fitted.transform(test22)

In [184]:
display(pred22)

In [186]:
#hmda_imp1=hmda_imp.iloc[7]

In [187]:
#print(hmda_imp1.to_dict())

In [188]:
#from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = pipe_rf_fitted.transform(test_df_hmda_1).select(['probability', 'action_taken'])
 
## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)
 
metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)


## Gradient Boosting

In [190]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="action_taken", featuresCol="features", maxIter=10)
pipe_gbt= Pipeline(stages=[feature.VectorAssembler(inputCols=cols1,
                                        outputCol='features'),gbt])

In [191]:
pipe_gbt_fitted=pipe_gbt.fit(training_df_hmda_1)

In [192]:
evaluator.evaluate(pipe_gbt_fitted.transform(test_df_hmda_1))

In [193]:
accuracy(pipe_gbt_fitted.transform(test_df_hmda_1))

## Try to predict

In [195]:
display(pipe_gbt_fitted.transform(test22))

In [197]:
display(trans)