#### Import packages and start Spark Session

In [23]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql.functions import *
import pandas as pd 
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Imputer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
spark = SparkSession.builder.appName('random_forest_spark').getOrCreate()

#### Get External dataset

In [2]:
url = "https://storage.googleapis.com/bdt-spark-store/external_sources.csv"
spark.sparkContext.addFile(url)
df_ext = spark.read.csv("file:///"+SparkFiles.get("external_sources.csv"), header=True, inferSchema= True)
print((df_ext.count(), len(df_ext.columns)))

(307511, 4)


#### Get Data dataset

In [3]:
url = "https://storage.googleapis.com/bdt-spark-store/internal_data.csv"
spark.sparkContext.addFile(url)
df_data  = spark.read.csv("file:///"+SparkFiles.get("internal_data.csv"), header=True, inferSchema= True)
print((df_data.count(), len(df_data.columns)))

(307511, 119)


#### Join datasets

In [102]:
df_full = df_data.join(df_ext, on='SK_ID_CURR', how='inner')
print((df_full.count(), len(df_full.columns)))

(307511, 122)


#### Filter for columns required

In [103]:
columns_extract = ['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3',
                  'DAYS_BIRTH', 'DAYS_EMPLOYED', 'NAME_EDUCATION_TYPE',
                  'DAYS_ID_PUBLISH', 'CODE_GENDER', 'AMT_ANNUITY',
                  'DAYS_REGISTRATION', 'AMT_GOODS_PRICE', 'AMT_CREDIT',
                  'ORGANIZATION_TYPE', 'DAYS_LAST_PHONE_CHANGE',
                  'NAME_INCOME_TYPE', 'AMT_INCOME_TOTAL', 'OWN_CAR_AGE', 'TARGET']
df = df_full[columns_extract]

#### One-hot Encoding Categorical Variables

In [134]:
categorical_variables = ['NAME_EDUCATION_TYPE', 'CODE_GENDER', 'ORGANIZATION_TYPE', 'NAME_INCOME_TYPE']
indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]
encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features"
)

pipeline = Pipeline(stages=indexers + [encoder, assembler])
df_oh = pipeline.fit(df).transform(df)

#### Split into test and train

In [105]:
train, test = df_oh.randomSplit([0.8, 0.2], seed=101)
print('Training data shape: ', (train.count(), len(train.columns)))
print('Testing data shape: ', (test.count(), len(test.columns)))

Training data shape:  (246051, 27)
Testing data shape:  (61460, 27)


#### Calculate target variable proportions in each set

In [106]:
train_prop = train.groupBy('TARGET').count()
test_prop = test.groupBy('TARGET').count()
print(train_prop.withColumn('train_proportion', train_prop['count'] / train.count()).show())
print(test_prop.withColumn('test_proportion', test_prop['count'] / test.count()).show())

+------+------+-------------------+
|TARGET| count|   train_proportion|
+------+------+-------------------+
|     1| 19861|0.08071903792303221|
|     0|226190| 0.9192809620769677|
+------+------+-------------------+

None
+------+-----+-------------------+
|TARGET|count|    test_proportion|
+------+-----+-------------------+
|     1| 4964|0.08076797917344615|
|     0|56496| 0.9192320208265539|
+------+-----+-------------------+

None


#### Impute missing values

In [107]:
imputer = Imputer()

imputer.setInputCols(['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'DAYS_LAST_PHONE_CHANGE', 'OWN_CAR_AGE'])

imputer.setOutputCols(['out_EXT_SOURCE_1', 'out_EXT_SOURCE_2', 'out_EXT_SOURCE_3', 'out_AMT_ANNUITY', 'out_AMT_GOODS_PRICE', 'out_DAYS_LAST_PHONE_CHANGE', 'out_OWN_CAR_AGE'])

imputer.getRelativeError()

imputer.setStrategy('median')

model = imputer.fit(train)
model.setInputCols(['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'DAYS_LAST_PHONE_CHANGE', 'OWN_CAR_AGE'])
train_filled = model.transform(train)

model = imputer.fit(test)
model.setInputCols(['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'DAYS_LAST_PHONE_CHANGE', 'OWN_CAR_AGE'])
test_filled = model.transform(test)

#### Drop redundant columns

In [108]:
columns_to_drop = ['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'DAYS_LAST_PHONE_CHANGE', 'OWN_CAR_AGE']
train_filled_dropped = train_filled.drop(*columns_to_drop)
test_filled_dropped = test_filled.drop(*columns_to_drop)

In [109]:
continuous_variables = ['DAYS_BIRTH', 'DAYS_EMPLOYED', 'DAYS_ID_PUBLISH', 'DAYS_REGISTRATION',
                        'AMT_CREDIT', 'AMT_INCOME_TOTAL', 'TARGET', 'out_AMT_ANNUITY',
                        'out_DAYS_LAST_PHONE_CHANGE', 'out_EXT_SOURCE_1', 'out_OWN_CAR_AGE',
                        'out_EXT_SOURCE_2', 'out_AMT_GOODS_PRICE', 'out_EXT_SOURCE_3']

assembler = VectorAssembler(inputCols=[*continuous_variables], outputCol='continuous_features')

train_filled_dropped = assembler.transform(train_filled_dropped)
test_filled_dropped = assembler.transform(test_filled_dropped)

#### Scale numerical variables

In [110]:
scaler = StandardScaler(inputCol='continuous_features', outputCol='scaled_continuous_features')

scaler_model = scaler.fit(train_filled_dropped)
train_final_scaled = scaler_model.transform(train_filled_dropped)

scaler_model = scaler.fit(test_filled_dropped)
test_final_scaled = scaler_model.transform(test_filled_dropped)

#### Combine categorical vector with continuous variables

In [111]:
assembler = VectorAssembler(inputCols=['categorical-features', 'continuous_features'], outputCol='features')

var_names = assembler
train_final = assembler.transform(train_final_scaled)
test_final = assembler.transform(test_final_scaled)

#### Train Random Forest Classifier

In [139]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="TARGET", featuresCol="features", numTrees=100, seed=50, impurity='gini')

model = rf.fit(train_final)

#### Make predictions

In [140]:
predictions = model.transform(test_final)

#### Model Evaluation

In [114]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = ulticlassClassificationEvaluator(labelCol="TARGET", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
# acc = evaluator.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
print("Accuracy = %g" % (accuracy))

Accuracy = 0.963651


#### Calculate feature importances

In [119]:
print(model.featureImportances)

(84,[0,1,2,3,4,5,6,7,8,9,10,12,13,14,15,16,18,19,20,21,22,25,27,28,30,32,33,34,35,37,38,40,41,42,45,49,52,53,57,58,59,62,63,64,65,66,67,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83],[0.0014837734904038855,0.003965600742873514,6.6428243489286556e-06,5.169548769134106e-06,0.00541254103793424,0.0015421031043257598,2.2697431107516686e-05,0.0006918241034137973,5.717870628748703e-05,1.0317759049543155e-05,0.0002515782990983191,1.9603061927268928e-05,9.73355533651983e-05,4.315486706163916e-06,7.1796237686768594e-06,2.486547883743997e-05,1.5728814025436644e-06,4.750877017655322e-05,3.888380461296613e-05,5.679880321673615e-06,2.814413002235665e-05,7.090293593677337e-06,1.842839977285682e-05,6.447604116689151e-06,9.823526484480058e-06,6.625462502996261e-06,8.237518074385378e-06,2.4547770570924784e-05,1.026811354010322e-05,4.25454849148211e-05,1.2472352066861696e-05,1.379770475713538e-05,3.111562013481721e-05,3.736865482333044e-06,4.758518560535429e-06,2.4816574512289112e-05,2.4058726655644958e-0