### Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Import Libraries

In [2]:
!pip install pyspark



In [3]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("Titanic").getOrCreate()

In [4]:
pd_df = pd.read_excel('/content/drive/MyDrive/Phutthabut_test/titanic 1.xlsx')

In [5]:
pyspark_df = spark.createDataFrame(pd_df)

In [6]:
pyspark_df.limit(5).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|  NaN|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|  NaN|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|  NaN|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [7]:
pyspark_df.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Survived: long (nullable = true)
 |-- Pclass: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: long (nullable = true)
 |-- Parch: long (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [8]:
# Cast long to int
pyspark_df = pyspark_df.withColumn("PassengerId", col("PassengerId").cast("integer")) \
       .withColumn("Survived", col("Survived").cast("integer")) \
       .withColumn("Pclass", col("Pclass").cast("integer")) \
       .withColumn("SibSp", col("SibSp").cast("integer")) \
       .withColumn("Parch", col("Parch").cast("integer")) \
       .withColumn("Age", col("Age").cast("double")) \
       .withColumn("Fare", col("Fare").cast("double"))

In [9]:
print(f'rows : {pyspark_df.count()}')
print(f'columns : {len(pyspark_df.columns)}')

rows : 891
columns : 12


## Exploratory Data Analysis

In [10]:
pyspark_df.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



There were 891 passengers on the Titanic, and only 342 survived.

In [11]:
pyspark_df.select('Age', 'Fare').summary().show()

+-------+----+------------------+
|summary| Age|              Fare|
+-------+----+------------------+
|  count| 891|               891|
|   mean| NaN|32.204207968574615|
| stddev| NaN| 49.69342859718091|
|    min|0.42|               0.0|
|    25%|22.0|            7.8958|
|    50%|32.0|           14.4542|
|    75%|54.0|              31.0|
|    max| NaN|          512.3292|
+-------+----+------------------+



In [12]:
pyspark_df.groupBy('Survived').pivot('Sex').count().show()

+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|
+--------+------+----+



In [13]:
pyspark_df.groupBy('Survived').pivot('Pclass').count().show()

+--------+---+---+---+
|Survived|  1|  2|  3|
+--------+---+---+---+
|       1|136| 87|119|
|       0| 80| 97|372|
+--------+---+---+---+



Similarly, first-class passengers had a higher likelihood of survival than those in second class. Unfortunately, third-class passengers had much worse luck.

In [14]:
pyspark_df.groupBy('Survived').pivot('SibSp').count().show()

+--------+---+---+---+---+---+----+----+
|Survived|  0|  1|  2|  3|  4|   5|   8|
+--------+---+---+---+---+---+----+----+
|       1|210|112| 13|  4|  3|NULL|NULL|
|       0|398| 97| 15| 12| 15|   5|   7|
+--------+---+---+---+---+---+----+----+



In [15]:
pyspark_df.groupBy('Survived').pivot('Embarked').count().show()

+--------+---+----+---+---+
|Survived|  C| NaN|  Q|  S|
+--------+---+----+---+---+
|       1| 93|   2| 30|217|
|       0| 75|NULL| 47|427|
+--------+---+----+---+---+



## Feature Engineering and Handle Missing Values

In [16]:
pyspark_df.filter(col("Age") == "NaN").count()

177

In [17]:
for cols in pyspark_df.columns:
  print(f'{cols} : {pyspark_df.filter(col(cols) == "NaN").count()}')

PassengerId : 0
Survived : 0
Pclass : 0
Name : 0
Sex : 0
Age : 177
SibSp : 0
Parch : 0
Ticket : 0
Fare : 0
Cabin : 687
Embarked : 2


It seem have null values stand with string of "NaN"

In [18]:
# Change "NaN" value to be real None
for column in pyspark_df.columns:
    pyspark_df = pyspark_df.withColumn(column, when(col(column) == 'NaN', None).otherwise(col(column)))

In [19]:
for col in pyspark_df.columns:
    print(f'{col} : {pyspark_df.filter(pyspark_df[col].isNull()).count()}')

PassengerId : 0
Survived : 0
Pclass : 0
Name : 0
Sex : 0
Age : 177
SibSp : 0
Parch : 0
Ticket : 0
Fare : 0
Cabin : 687
Embarked : 2


Cabin feature have over 600 missing values from 891 data, I decided to drop this feature

In [20]:
pyspark_df.select('Fare').summary('mean', '50%', 'max').show()

+-------+------------------+
|summary|              Fare|
+-------+------------------+
|   mean|32.204207968574615|
|    50%|           14.4542|
|    max|          512.3292|
+-------+------------------+



In [21]:
pyspark_df.groupBy('Embarked').count().orderBy('count', ascending=False).show(1)

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  644|
+--------+-----+
only showing top 1 row



Seems the most occurring class in Embarked is 'S'

In [22]:
# Fill null values in Embarked and Fare with most occurring class and percentile 50 of its column by sequential
pyspark_df = pyspark_df.fillna({'Embarked': 'S', 'Fare':14.45})

For missing values in Age column, I try to impute it with average age of group with title (Mrs tend to be older than Miss)

In [23]:
pyspark_df = pyspark_df.withColumn('Title', regexp_extract(pyspark_df['Name'], '([A-Za-z]+)\.', 1))

pyspark_df.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+--------+----------+------------------+
|   Title|count(Age)|          avg(Age)|
+--------+----------+------------------+
|     Don|         1|              40.0|
|     Mme|         1|              24.0|
|      Ms|         1|              28.0|
|Countess|         1|              33.0|
|    Lady|         1|              48.0|
|    Capt|         1|              70.0|
|     Sir|         1|              49.0|
|Jonkheer|         1|              38.0|
|     Col|         2|              58.0|
|    Mlle|         2|              24.0|
|   Major|         2|              48.5|
|     Rev|         6|43.166666666666664|
|      Dr|         6|              42.0|
|  Master|        36| 4.574166666666667|
|     Mrs|       108|35.898148148148145|
|    Miss|       146|21.773972602739725|
|      Mr|       398|32.368090452261306|
+--------+----------+------------------+



The last 4 group Mater, Mrs, Miss, Mr have highly repeated

In [24]:
from itertools import chain

title_dict = {'Mr':'Mr', 'Miss':'Miss', 'Mrs':'Mrs', 'Master':'Master', \
             'Mlle': 'Miss', 'Major': 'Mr', 'Col': 'Mr', 'Sir': 'Mr',\
             'Don': 'Mr', 'Mme': 'Miss', 'Jonkheer': 'Mr', 'Lady': 'Mrs',\
             'Capt': 'Mr', 'Countess': 'Mrs', 'Ms': 'Miss', 'Dona': 'Mrs', \
             'Dr':'Mr', 'Rev':'Mr'}

mapping = create_map([lit(x) for x in chain(*title_dict.items())])

pyspark_df = pyspark_df.withColumn('Title', mapping[pyspark_df['Title']])

In [25]:
pyspark_df.groupBy('Title').mean('Age').show()

+------+------------------+
| Title|          avg(Age)|
+------+------------------+
|  Miss|             21.86|
|Master| 4.574166666666667|
|    Mr| 33.02272727272727|
|   Mrs|35.981818181818184|
+------+------------------+



In [26]:
mean_age_dict = {row['Title']: row['avg(Age)'] for row in pyspark_df.groupBy('Title').mean('Age').collect()}

In [27]:
mean_age_dict

{'Miss': 21.86,
 'Master': 4.574166666666667,
 'Mr': 33.02272727272727,
 'Mrs': 35.981818181818184}

In [28]:
def imputer(df, title, age):
    return df.withColumn('Age', \
                         when((df['Age'].isNull()) & (df['Title']==title), \
                              age).otherwise(df['Age']))

In [29]:
for key, value in mean_age_dict.items():
    pyspark_df = imputer(pyspark_df, key, value)

Create new column 'FamilySize' combine with Parch and SibSp

In [30]:
pyspark_df = pyspark_df.withColumn('FamilySize', pyspark_df['Parch'] + pyspark_df['SibSp']).\
            drop('Parch', 'SibSp')

Drop unnecessary colummns

In [31]:
pyspark_df = pyspark_df.drop('PassengerID', 'Cabin', 'Name', 'Ticket', 'Title')

In [32]:
pyspark_df.limit(5).show()

+--------+------+------+----+-------+--------+----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|FamilySize|
+--------+------+------+----+-------+--------+----------+
|       0|     3|  male|22.0|   7.25|       S|         1|
|       1|     1|female|38.0|71.2833|       C|         1|
|       1|     3|female|26.0|  7.925|       S|         0|
|       1|     1|female|35.0|   53.1|       S|         1|
|       0|     3|  male|35.0|   8.05|       S|         0|
+--------+------+------+----+-------+--------+----------+



In [33]:
for col in pyspark_df.columns:
    print(col.ljust(20), pyspark_df.filter(pyspark_df[col].isNull()).count())

Survived             0
Pclass               0
Sex                  0
Age                  0
Fare                 0
Embarked             0
FamilySize           0


## Build the Models

In [34]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

In [35]:
stringIndex = StringIndexer(inputCols=['Sex', 'Embarked'],
                       outputCols=['SexNum', 'EmbNum'])

stringIndex_model = stringIndex.fit(pyspark_df)

pyspark_df_ = stringIndex_model.transform(pyspark_df).drop('Sex', 'Embarked')

In [36]:
pyspark_df_.show(5)

+--------+------+----+-------+----------+------+------+
|Survived|Pclass| Age|   Fare|FamilySize|SexNum|EmbNum|
+--------+------+----+-------+----------+------+------+
|       0|     3|22.0|   7.25|         1|   0.0|   0.0|
|       1|     1|38.0|71.2833|         1|   1.0|   1.0|
|       1|     3|26.0|  7.925|         0|   1.0|   0.0|
|       1|     1|35.0|   53.1|         1|   1.0|   0.0|
|       0|     3|35.0|   8.05|         0|   0.0|   0.0|
+--------+------+----+-------+----------+------+------+
only showing top 5 rows



In [37]:
vec_asmbl = VectorAssembler(inputCols=pyspark_df_.columns[1:],
                           outputCol='features')

pyspark_df_ = vec_asmbl.transform(pyspark_df_).select('features', 'Survived')
pyspark_df_.show(5, truncate=False)

+------------------------------+--------+
|features                      |Survived|
+------------------------------+--------+
|[3.0,22.0,7.25,1.0,0.0,0.0]   |0       |
|[1.0,38.0,71.2833,1.0,1.0,1.0]|1       |
|[3.0,26.0,7.925,0.0,1.0,0.0]  |1       |
|[1.0,35.0,53.1,1.0,1.0,0.0]   |1       |
|[3.0,35.0,8.05,0.0,0.0,0.0]   |0       |
+------------------------------+--------+
only showing top 5 rows



In [38]:
train_df, test_df = pyspark_df_.randomSplit([0.7, 0.3])

In [39]:
train_df.show(5, truncate=False)

+---------------------------------+--------+
|features                         |Survived|
+---------------------------------+--------+
|(6,[0,1],[2.0,33.02272727272727])|0       |
|(6,[0,1],[2.0,33.02272727272727])|0       |
|(6,[0,1],[3.0,19.0])             |0       |
|(6,[0,1],[3.0,25.0])             |1       |
|(6,[0,1],[3.0,36.0])             |0       |
+---------------------------------+--------+
only showing top 5 rows



### Ensemble Tree (Rf)

In [40]:
rf = RandomForestClassifier(labelCol='Survived')
model = rf.fit(train_df)
pred = model.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='Survived')

# Calculate evaluation metrics
auroc = evaluator.evaluate(pred, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(pred, {evaluator.metricName: "areaUnderPR"})

print('Area under ROC curve:', auroc)
print('Area under PR curve:', aupr)

Area under ROC curve: 0.8823888529770885
Area under PR curve: 0.7806890892271653


Hyperparameter tuning with CrossValidator and evaluation:

In [41]:
rfparamGrid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [2, 4, 5])
               .addGrid(rf.numTrees, [5, 10, 20, 100])
             .build())

In [42]:
evaluatorRF=BinaryClassificationEvaluator(labelCol='Survived')

cv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = evaluatorRF,
                      numFolds = 5)

rfcv=cv.fit(train_df)

In [43]:
# Make predictions on test documents with CrossValidation models
predictions=rfcv.bestModel.transform(test_df)
evaluatorRF.evaluate(predictions)

0.8854772678302091

In [44]:
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr= evaluatorRF.evaluate(predictions, {evaluatorRF.metricName: "areaUnderPR"})
print('Area under ROC curve: ',auroc)
print('Area under PR curve: ',aupr)

Area under ROC curve:  0.8854772678302092
Area under PR curve:  0.8438690592755584


In [45]:
# We can get the accuracy and F1 score using MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
rf_accuracy = multi_evaluator.evaluate(predictions)
print("Accuracy of RandomForestClassifier is = ", (rf_accuracy))

Accuracy of RandomForestClassifier is =  0.8401639344262295


In [46]:
# Using sklearn for classification report
from sklearn.metrics import classification_report
y_true = predictions.select(['Survived']).collect()
y_pred = predictions.select(['prediction']).collect()
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.87      0.88      0.87       153
           1       0.80      0.77      0.78        91

    accuracy                           0.84       244
   macro avg       0.83      0.83      0.83       244
weighted avg       0.84      0.84      0.84       244



## Conclusion

- Evaluation Metrics: AUROC Measures how well the model distinguishes between classes. AUPR Assesses the balance between precision and recall. A high score shows the model accurately predicts true positives

- Cross-validation dividing the dataset into multiple folds and training/testing on different folds, uses a 5-fold cross-validation to fine-tune the hyperparameters (maxDepth and numTrees) to ensures the model is not overfitting

- The scores we got for the metrics were higher (just a bit 😂) using hyperparameter tuning with CrossValidator than the ones we reached with the baseline model.