In [1]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('./titanic.csv', inferSchema=True, header=True)

In [3]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|Gender| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [4]:
data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])

In [5]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
imputer_model = imputer.fit(data)
data = imputer_model.transform(data)
data.show()

+--------+------+------+----+-----+-----+-------+-----------------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|       AgeImputed|
+--------+------+------+----+-----+-----+-------+-----------------+
|       0|     3|  male|22.0|    1|    0|   7.25|             22.0|
|       1|     1|female|38.0|    1|    0|71.2833|             38.0|
|       1|     3|female|26.0|    0|    0|  7.925|             26.0|
|       1|     1|female|35.0|    1|    0|   53.1|             35.0|
|       0|     3|  male|35.0|    0|    0|   8.05|             35.0|
|       0|     3|  male|null|    0|    0| 8.4583|29.69911764705882|
|       0|     1|  male|54.0|    0|    0|51.8625|             54.0|
|       0|     3|  male| 2.0|    3|    1| 21.075|              2.0|
|       1|     3|female|27.0|    0|    2|11.1333|             27.0|
|       1|     2|female|14.0|    1|    0|30.0708|             14.0|
|       1|     3|female| 4.0|    1|    1|   16.7|              4.0|
|       1|     1|female|58.0|    0|    0|  26.55

In [6]:
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(data)
data = gender_indexer_model.transform(data)

In [7]:
data.show()

+--------+------+------+----+-----+-----+-------+-----------------+-------------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|       AgeImputed|GenderIndexed|
+--------+------+------+----+-----+-----+-------+-----------------+-------------+
|       0|     3|  male|22.0|    1|    0|   7.25|             22.0|          0.0|
|       1|     1|female|38.0|    1|    0|71.2833|             38.0|          1.0|
|       1|     3|female|26.0|    0|    0|  7.925|             26.0|          1.0|
|       1|     1|female|35.0|    1|    0|   53.1|             35.0|          1.0|
|       0|     3|  male|35.0|    0|    0|   8.05|             35.0|          0.0|
|       0|     3|  male|null|    0|    0| 8.4583|29.69911764705882|          0.0|
|       0|     1|  male|54.0|    0|    0|51.8625|             54.0|          0.0|
|       0|     3|  male| 2.0|    3|    1| 21.075|              2.0|          0.0|
|       1|     3|female|27.0|    0|    2|11.1333|             27.0|          1.0|
|       1|     2

In [8]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
data = assembler.transform(data)

In [9]:
data.show()

+--------+------+------+----+-----+-----+-------+-----------------+-------------+--------------------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|       AgeImputed|GenderIndexed|            features|
+--------+------+------+----+-----+-----+-------+-----------------+-------------+--------------------+
|       0|     3|  male|22.0|    1|    0|   7.25|             22.0|          0.0|[3.0,1.0,0.0,7.25...|
|       1|     1|female|38.0|    1|    0|71.2833|             38.0|          1.0|[1.0,1.0,0.0,71.2...|
|       1|     3|female|26.0|    0|    0|  7.925|             26.0|          1.0|[3.0,0.0,0.0,7.92...|
|       1|     1|female|35.0|    1|    0|   53.1|             35.0|          1.0|[1.0,1.0,0.0,53.1...|
|       0|     3|  male|35.0|    0|    0|   8.05|             35.0|          0.0|[3.0,0.0,0.0,8.05...|
|       0|     3|  male|null|    0|    0| 8.4583|29.69911764705882|          0.0|[3.0,0.0,0.0,8.45...|
|       0|     1|  male|54.0|    0|    0|51.8625|             54.0|      

In [10]:
from pyspark.ml.classification import RandomForestClassifier
algo = RandomForestClassifier(featuresCol='features', labelCol='Survived')
model = algo.fit(data)

In [11]:
predictions = model.transform(data)

In [12]:
predictions.select(['Survived','prediction', 'probability']).show()

+--------+----------+--------------------+
|Survived|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.88581129986537...|
|       1|       1.0|[0.02965627489007...|
|       1|       1.0|[0.46283114188206...|
|       1|       1.0|[0.02965627489007...|
|       0|       0.0|[0.88274431920774...|
|       0|       0.0|[0.87604647878125...|
|       0|       0.0|[0.74960596821151...|
|       0|       0.0|[0.75383132908710...|
|       1|       1.0|[0.44610561224946...|
|       1|       1.0|[0.08766474563101...|
|       1|       1.0|[0.34366208799566...|
|       1|       1.0|[0.12087878691075...|
|       0|       0.0|[0.87604647878125...|
|       0|       0.0|[0.84226301203838...|
|       0|       1.0|[0.36660823393805...|
|       1|       1.0|[0.22708347214493...|
|       0|       0.0|[0.79547900240144...|
|       1|       0.0|[0.86199199264428...|
|       0|       1.0|[0.48931177170447...|
|       1|       1.0|[0.23818349688173...|
+--------+-

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')

In [14]:
evaluator.evaluate(predictions)

0.8975889176493146

In [15]:
y_true = predictions.select(['Survived']).collect()
y_pred = predictions.select(['prediction']).collect()

In [16]:
from sklearn.metrics import classification_report, confusion_matrix

In [17]:
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.83      0.94      0.88       549
           1       0.88      0.69      0.78       342

    accuracy                           0.85       891
   macro avg       0.86      0.82      0.83       891
weighted avg       0.85      0.85      0.84       891



In [18]:
print(confusion_matrix(y_true, y_pred))

[[518  31]
 [106 236]]
