In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructField,StructType,StringType,DoubleType,IntegerType
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer, MinMaxScaler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
spark = SparkSession.Builder().appName('titanic').getOrCreate()

In [0]:
df = spark.read.csv('dbfs:/FileStore/tables/titanic.csv', header=True, inferSchema=True)

In [0]:
df = df.withColumn("alone_cast", df["alone"].cast(StringType()))
df = df.withColumn("adult_male_cast", df["adult_male"].cast(StringType()))

In [0]:
df.show(5)

+--------+------+------+----+-----+-----+-------+--------+-----+-----+----------+----+-----------+-----+-----+----------+---------------+
|survived|pclass|   sex| age|sibsp|parch|   fare|embarked|class|  who|adult_male|deck|embark_town|alive|alone|alone_cast|adult_male_cast|
+--------+------+------+----+-----+-----+-------+--------+-----+-----+----------+----+-----------+-----+-----+----------+---------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|Third|  man|      true|null|Southampton|   no|false|     false|           true|
|       1|     1|female|38.0|    1|    0|71.2833|       C|First|woman|     false|   C|  Cherbourg|  yes|false|     false|          false|
|       1|     3|female|26.0|    0|    0|  7.925|       S|Third|woman|     false|null|Southampton|  yes| true|      true|          false|
|       1|     1|female|35.0|    1|    0|   53.1|       S|First|woman|     false|   C|Southampton|  yes|false|     false|          false|
|       0|     3|  male|35.0|    0

In [0]:
df.printSchema()

root
 |-- survived: integer (nullable = true)
 |-- pclass: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- embarked: string (nullable = true)
 |-- class: string (nullable = true)
 |-- who: string (nullable = true)
 |-- adult_male: boolean (nullable = true)
 |-- deck: string (nullable = true)
 |-- embark_town: string (nullable = true)
 |-- alive: string (nullable = true)
 |-- alone: boolean (nullable = true)
 |-- alone_cast: string (nullable = true)
 |-- adult_male_cast: string (nullable = true)



In [0]:
for feature in df.columns:
    print(feature,df.where(df[feature].isNull()).count())
    print()

survived 0

pclass 0

sex 0

age 177

sibsp 0

parch 0

fare 0

embarked 2

class 0

who 0

adult_male 0

deck 688

embark_town 2

alive 0

alone 0

alone_cast 0

adult_male_cast 0



In [0]:
df.where(df['deck'].isNull()).count()/ df.count() * 100

Out[8]: 77.21661054994388

In [0]:
df.show(5)

+--------+------+------+----+-----+-----+-------+--------+-----+-----+----------+----+-----------+-----+-----+----------+---------------+
|survived|pclass|   sex| age|sibsp|parch|   fare|embarked|class|  who|adult_male|deck|embark_town|alive|alone|alone_cast|adult_male_cast|
+--------+------+------+----+-----+-----+-------+--------+-----+-----+----------+----+-----------+-----+-----+----------+---------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|Third|  man|      true|null|Southampton|   no|false|     false|           true|
|       1|     1|female|38.0|    1|    0|71.2833|       C|First|woman|     false|   C|  Cherbourg|  yes|false|     false|          false|
|       1|     3|female|26.0|    0|    0|  7.925|       S|Third|woman|     false|null|Southampton|  yes| true|      true|          false|
|       1|     1|female|35.0|    1|    0|   53.1|       S|First|woman|     false|   C|Southampton|  yes|false|     false|          false|
|       0|     3|  male|35.0|    0

In [0]:
df.select('class').distinct().show()

+------+
| class|
+------+
| First|
|Second|
| Third|
+------+



In [0]:
class_dict = {'First':1,'Second':2,'Third':3}
ordinal_encoder = F.udf(lambda value: class_dict[value])
df = df.withColumn('class_encod',ordinal_encoder(F.col('class')))

In [0]:
features_to_drop = ['who','deck','embark_town','alive','class','embarked','alone','adult_male']
df = df.drop(*features_to_drop)

In [0]:
df.show(5)

+--------+------+------+----+-----+-----+-------+----------+---------------+-----------+
|survived|pclass|   sex| age|sibsp|parch|   fare|alone_cast|adult_male_cast|class_encod|
+--------+------+------+----+-----+-----+-------+----------+---------------+-----------+
|       0|     3|  male|22.0|    1|    0|   7.25|     false|           true|          3|
|       1|     1|female|38.0|    1|    0|71.2833|     false|          false|          1|
|       1|     3|female|26.0|    0|    0|  7.925|      true|          false|          3|
|       1|     1|female|35.0|    1|    0|   53.1|     false|          false|          1|
|       0|     3|  male|35.0|    0|    0|   8.05|      true|           true|          3|
+--------+------+------+----+-----+-----+-------+----------+---------------+-----------+
only showing top 5 rows



In [0]:
df.describe().show()

+-------+-------------------+------------------+------+------------------+------------------+-------------------+-----------------+----------+---------------+------------------+
|summary|           survived|            pclass|   sex|               age|             sibsp|              parch|             fare|alone_cast|adult_male_cast|       class_encod|
+-------+-------------------+------------------+------+------------------+------------------+-------------------+-----------------+----------+---------------+------------------+
|  count|                891|               891|   891|               714|               891|                891|              891|       891|            891|               891|
|   mean| 0.3838383838383838| 2.308641975308642|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824| 32.2042079685746|      null|           null| 2.308641975308642|
| stddev|0.48659245426485753|0.8360712409770491|  null|14.526497332334035|1.1027434322934315| 0.80605722112994

In [0]:
# 'sex','alone','adult_male','embarked'
sex_indexer = StringIndexer(inputCol='sex',outputCol='sex_index')
sex_encoder = OneHotEncoder(inputCol='sex_index',outputCol='sex_vec')

alone_indexer = StringIndexer(inputCol='alone_cast',outputCol='alone_index')
alone_encoder = OneHotEncoder(inputCol='alone_index',outputCol='alone_vec')

adult_male_indexer = StringIndexer(inputCol='adult_male_cast',outputCol='adult_male_index')
adult_male_encoder = OneHotEncoder(inputCol='adult_male_index',outputCol='adult_male_vec')

In [0]:
data = sex_indexer.fit(df).transform(df)
sex_encoder.fit(data).transform(data).show(5)
del data

+--------+------+------+----+-----+-----+-------+----------+---------------+-----------+---------+-------------+
|survived|pclass|   sex| age|sibsp|parch|   fare|alone_cast|adult_male_cast|class_encod|sex_index|      sex_vec|
+--------+------+------+----+-----+-----+-------+----------+---------------+-----------+---------+-------------+
|       0|     3|  male|22.0|    1|    0|   7.25|     false|           true|          3|      0.0|(1,[0],[1.0])|
|       1|     1|female|38.0|    1|    0|71.2833|     false|          false|          1|      1.0|    (1,[],[])|
|       1|     3|female|26.0|    0|    0|  7.925|      true|          false|          3|      1.0|    (1,[],[])|
|       1|     1|female|35.0|    1|    0|   53.1|     false|          false|          1|      1.0|    (1,[],[])|
|       0|     3|  male|35.0|    0|    0|   8.05|      true|           true|          3|      0.0|(1,[0],[1.0])|
+--------+------+------+----+-----+-----+-------+----------+---------------+-----------+--------

In [0]:
age_imp = Imputer(inputCol='age', outputCol='age_imp').setStrategy('median')

assembler = VectorAssembler(inputCols=[
    'pclass',
    'sex_vec',
    'age_imp',
    'sibsp',
    'parch',
    'fare',
    'adult_male_vec',
    'alone_vec',
],outputCol='features')

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

log_reg_titanic = LogisticRegression(featuresCol='scaled_features',labelCol='survived')

In [0]:
Log_pipe = Pipeline(stages=[age_imp,
                 sex_indexer,
                 sex_encoder,
                 alone_indexer,
                 alone_encoder,
                 adult_male_indexer,
                 adult_male_encoder,
                 assembler,
                 scaler,
                 log_reg_titanic
                 ])

In [0]:
train, test = df.randomSplit([0.7,0.3])

In [0]:
Log_model_pipe = Log_pipe.fit(train)

In [0]:
train_results = Log_model_pipe.transform(train)
test_results = Log_model_pipe.transform(test)

In [0]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='survived')
Train_AUC = my_eval.evaluate(train_results)
Test_AUC = my_eval.evaluate(test_results)

Out[145]: 0.7987393633785062

In [0]:
Train_AUC,Test_AUC

Out[146]: (0.8096424104966402, 0.7987393633785062)

In [0]:
rfc = RandomForestClassifier(featuresCol='scaled_features',labelCol='survived')

rfc_pipe = Pipeline(stages=[age_imp,
                sex_indexer,
                 sex_encoder,
                 alone_indexer,
                 alone_encoder,
                 adult_male_indexer,
                 adult_male_encoder,
                 assembler,
                 scaler,
                 rfc
                 ])

rfc_model_pipe = rfc_pipe.fit(train)

train_results = rfc_model_pipe.transform(train)
test_results = rfc_model_pipe.transform(test)

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='survived')
Train_AUC = my_eval.evaluate(train_results)
Test_AUC = my_eval.evaluate(test_results)

Train_AUC,Test_AUC

Out[148]: (0.8272516268520678, 0.7919319256224393)

In [0]:
dtc = DecisionTreeClassifier(featuresCol='scaled_features',labelCol='survived')

dtc_pipe = Pipeline(stages=[age_imp,
                sex_indexer,
                 sex_encoder,
                 alone_indexer,
                 alone_encoder,
                 adult_male_indexer,
                 adult_male_encoder,
                 assembler,
                 scaler,
                 dtc
                 ])

dtc_model_pipe = dtc_pipe.fit(train)

train_results = dtc_model_pipe.transform(train)
test_results = dtc_model_pipe.transform(test)

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='survived')
Train_AUC = my_eval.evaluate(train_results)
Test_AUC = my_eval.evaluate(test_results)

Train_AUC,Test_AUC

Out[150]: (0.8459577760349323, 0.7920264733690513)

In [0]:
gbtc = GBTClassifier(featuresCol='scaled_features',labelCol='survived')

gbtc_pipe = Pipeline(stages=[age_imp,
                sex_indexer,
                 sex_encoder,
                 alone_indexer,
                 alone_encoder,
                 adult_male_indexer,
                 adult_male_encoder,
                 assembler,
                 scaler,
                 gbtc
                 ])

gbtc_model_pipe = gbtc_pipe.fit(train)

train_results = gbtc_model_pipe.transform(train)
test_results = gbtc_model_pipe.transform(test)

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='survived')
Train_AUC = my_eval.evaluate(train_results)
Test_AUC = my_eval.evaluate(test_results)

Train_AUC,Test_AUC

Out[151]: (0.8928503296097675, 0.769429561928774)

# Hyperparameter Tuning

## Logistic Regressor

In [0]:
params = ParamGridBuilder()\
    .addGrid(log_reg_titanic.regParam,[0.1,0.01,0.001])\
        .addGrid(log_reg_titanic.elasticNetParam,[0.1,0.01,0.001]).build()

crossval = CrossValidator(estimator=Log_pipe, estimatorParamMaps=params, evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='survived'))        

cvModel = crossval.fit(train)

In [0]:
cvModel.estimatorParamMaps

Out[60]: Param(parent='CrossValidatorModel_0a9eb3f4552f', name='estimatorParamMaps', doc='estimator param maps')

In [0]:
bestModel = cvModel.bestModel

bestModel.stages[-1].getRegParam(),bestModel.stages[-1].getElasticNetParam()

Out[67]: (0.01, 0.1)

In [0]:
result = bestModel.transform(test)

In [0]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='survived')
my_eval.evaluate(result)

Out[71]: 0.7775250227479528

## Random Forest

In [0]:
rfc = RandomForestClassifier(featuresCol='scaled_features',labelCol='survived')

rfc_pipe = Pipeline(stages=[age_imp,
                sex_indexer,
                 sex_encoder,
                 alone_indexer,
                 alone_encoder,
                 adult_male_indexer,
                 adult_male_encoder,
                 assembler,
                 scaler,
                 rfc
                 ])

params = ParamGridBuilder()\
    .addGrid(rfc.maxDepth,[5,6,7,8])\
        .addGrid(rfc.numTrees,[30,50,70,100])\
            .addGrid(rfc.subsamplingRate,[0.20,0.30,0.50])\
                .addGrid(rfc.featureSubsetStrategy,['sqrt','log2'])\
                    .build()

crossval = CrossValidator(estimator=rfc_pipe, estimatorParamMaps=params, evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='survived'))        

cvModel = crossval.fit(train)

In [0]:
rdc_model = cvModel.bestModel

model = rdc_model.stages[-1]

print(f'MaxDepth : {model.getMaxDepth()}\n',
      f'NumTrees : {model.getNumTrees}\n',
      f'subsamplingRate : {model.getSubsamplingRate()}\n',
      f'featureSubsetStrategy : {model.getFeatureSubsetStrategy()}'
      )

MaxDepth : 7
 NumTrees : 100
 subsamplingRate : 0.5
 featureSubsetStrategy : sqrt


In [0]:
result = rdc_model.transform(test)

In [0]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='survived')
my_eval.evaluate(result)

Out[82]: 0.7648468304519261