# Overview

The description is available via: https://www.kaggle.com/competitions/titanic/data

The data has been split into two groups:

* training set (train.csv)
* test set (test.csv)

**The training set** should be used to build your machine learning models. For the training set, we provide the outcome (also known as the “ground truth”) for each passenger. Your model will be based on “features” like passengers’ gender and class. You can also use feature engineering to create new features.

**The test set** should be used to see how well your model performs on unseen data. For the test set, we do not provide the ground truth for each passenger. It is your job to predict these outcomes. For each passenger in the test set, use the model you trained to predict whether or not they survived the sinking of the Titanic.

We also include **gender_submission.csv**, a set of predictions that assume all and only female passengers survive, as an example of what a submission file should look like.

# Step 1 - Prepare the Necessary Datasets

In [0]:
# Load data directly from the DBFS FileStore
df_train = spark.read.csv('/FileStore/tables/train.csv',header=True,inferSchema=True)
df_test = spark.read.csv('/FileStore/tables/test.csv',header=True,inferSchema=True)

# Compare the differences, the test file does not have the column 'Survived' 
df_train.show(10)
df_test.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [0]:
# Check the datatypes
df_train.printSchema()
df_test.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



# Step 2 - Handle the Missing Data

In [0]:
from pyspark.sql.functions import count, when

# Count the number of null values in each column
train_null_counts = df_train.select([count(when(isnull(c), c)).alias(c) for c in df_train.columns])
test_null_counts = df_test.select([count(when(isnull(c), c)).alias(c) for c in df_test.columns])

# Show the count of null values
print('Null Counts in Train Data:')
train_null_counts.show()

print('Null Counts in Test Data:')
test_null_counts.show()

Null Counts in Train Data:
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

Null Counts in Test Data:
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|     0|   0|  0| 86|    0|    0|     0|   1|  327|       0|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+



## Handle Column "Fare"

In [0]:
# Filter rows where 'Fare' is null in the test DataFrame
df_test_fare_null = df_test.filter(isnull(col('Fare')))

# Show the rows with null 'Fare' in the test DataFrame
print("Rows with null 'Fare' in the test DataFrame:")
df_test_fare_null.show()

Rows with null 'Fare' in the test DataFrame:
+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|              Name| Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+
|       1044|     3|Storey, Mr. Thomas|male|60.5|    0|    0|  3701|null| null|       S|
+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+



In [0]:
from pyspark.sql.functions import col, when, coalesce, expr

# Calculate the median fare for each combination of 'Embarked' and 'Pclass'
Fare_Medians = df_train.groupBy("Pclass", "Embarked").agg(expr("percentile_approx(Fare, 0.5)").alias("Fare_Median"))

# Show the median values
print("Median Fare per Pclass and Embarked :")
Fare_Medians.show()

Median Fare per Pclass and Embarked :
+------+--------+-----------+
|Pclass|Embarked|Fare_Median|
+------+--------+-----------+
|     3|       C|     7.8958|
|     2|       C|       24.0|
|     1|    null|       80.0|
|     1|       Q|       90.0|
|     3|       Q|       7.75|
|     2|       Q|      12.35|
|     1|       C|    78.2667|
|     1|       S|       52.0|
|     3|       S|       8.05|
|     2|       S|       13.5|
+------+--------+-----------+



In [0]:
# Join the original DataFrame with the median values
df_joined = df_test.join(medians, on=["Embarked", "Pclass"], how="left")

# Create a flag column to mark rows where 'Fare' was originally null
df_flagged = df_joined.withColumn("Fare_Replaced", when(col("Fare").isNull(), 1).otherwise(0))

# Fill null values in 'Fare' column using the median values
df_test = df_flagged.withColumn("Fare", coalesce(col("Fare"), col("Fare_Median"))).drop('Fare_Median','Fare_Replaced')

# Filter and show the rows where 'Fare' was replaced
print("Rows where 'Fare' was replaced:")
df_replaced = df_test.filter(col("Fare_replaced") == 1)
df_replaced.show()

Rows where 'Fare' was replaced:
+--------+------+-----------+------------------+----+----+-----+-----+------+----+-----+
|Embarked|Pclass|PassengerId|              Name| Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|
+--------+------+-----------+------------------+----+----+-----+-----+------+----+-----+
|       S|     3|       1044|Storey, Mr. Thomas|male|60.5|    0|    0|  3701|8.05| null|
+--------+------+-----------+------------------+----+----+-----+-----+------+----+-----+



## Handle Column "Embarked"

In [0]:
# Calculate the mode of the 'Embarked' column
mode_embarked = df_train.groupBy("Embarked").count().orderBy(col("count").desc()).first()["Embarked"]

# Create a flag column to indicate which rows are being replaced
df_flagged = df_train.withColumn("Embarked_Replaced", when(col("Embarked").isNull(), 1).otherwise(0))

# Fill null values in 'Embarked' column with the mode value
df_train = df_flagged.withColumn("Embarked", coalesce(col("Embarked"), expr(f"'{mode_embarked}'"))).drop("Embarked_Replaced")

# Filter and show the rows where 'Embarked' was replaced
print("Rows where 'Embarked' was replaced:")
df_replaced = df_train.filter(col("Embarked_Replaced") == 1)
df_replaced.show()
#df_replaced.show()

Rows where 'Embarked' was replaced:
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|         62|       1|     1| Icard, Miss. Amelie|female|38.0|    0|    0|113572|80.0|  B28|       S|
|        830|       1|     1|Stone, Mrs. Georg...|female|62.0|    0|    0|113572|80.0|  B28|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+



## Handle Column "Age"

In [0]:
# Define the grouping columns
# group = ['Pclass', 'Sex', 'Sibsp', 'Parch', 'Embarked']
group = ['Pclass', 'Sibsp', 'Parch']

# Calculate the median fare for each combination of 'Pclass', 'Sex', 'Sibsp', 'Parch', and 'Embarked'
train_medians = df_train.groupBy(group).agg(expr('percentile_approx(Fare, 0.5)').alias('Age_Median'))
test_medians = df_test.groupBy(group).agg(expr('percentile_approx(Fare, 0.5)').alias('Age_Median'))

# Show the median values for train and test
print('Median Fare per Pclass, Sex, Sibsp, Parch, and Embarked (Train):')
# train_medians.count()
train_medians.show()

print('Median Fare per Pclass, Sex, Sibsp, Parch, and Embarked (Test):')
# test_medians.count()
test_medians.show()

Median Fare per Pclass, Sex, Sibsp, Parch, and Embarked (Train):
+------+-----+-----+----------+
|Pclass|Sibsp|Parch|Age_Median|
+------+-----+-----+----------+
|     3|    3|    1|   25.4667|
|     3|    1|    4|      27.9|
|     3|    0|    0|    7.8958|
|     1|    3|    2|     263.0|
|     3|    1|    1|      15.5|
|     1|    0|    1|   77.2875|
|     1|    1|    1|     79.65|
|     2|    0|    0|      13.0|
|     3|    4|    1|   39.6875|
|     3|    2|    0|      18.0|
|     1|    0|    0|      31.0|
|     1|    1|    0|   76.7292|
|     2|    2|    0|      73.5|
|     2|    1|    0|      26.0|
|     3|    1|    5|    31.275|
|     3|    3|    0|     15.85|
|     2|    2|    1|      27.0|
|     2|    3|    0|      21.0|
|     1|    2|    2|   262.375|
|     3|    2|    2|    34.375|
+------+-----+-----+----------+
only showing top 20 rows

Median Fare per Pclass, Sex, Sibsp, Parch, and Embarked (Test):
+------+-----+-----+----------+
|Pclass|Sibsp|Parch|Age_Median|
+------+-----

In [0]:
# Join the original DataFrame with the median values
df_train_joined = df_train.join(train_medians, on=group, how="left")
df_test_joined = df_test.join(test_medians, on=group, how="left")

# Create a flag column to mark rows where 'Fare' was originally null
df_train_flagged = df_train_joined.withColumn("Age_Replaced", when(col("Age").isNull(), 1).otherwise(0))
df_test_flagged = df_test_joined.withColumn("Age_Replaced", when(col("Age").isNull(), 1).otherwise(0))

# Fill null values in 'Fare' column using the median values
df_train = df_train_flagged.withColumn("Age", coalesce(col("Age"), col("Age_Median"))).drop("Age_Median",'Age_Replaced')
df_test = df_test_flagged.withColumn("Age", coalesce(col("Age"), col("Age_Median"))).drop("Age_Median", 'Age_Replaced')

# Filter and show the rows where 'Fare' was replaced
print("Rows where 'Age' was replaced in Train Data:")
df_train_replaced = df_train.filter(col("Age_replaced") == 1)
df_train_replaced.show()

print("Rows where 'Age' was replaced in Test Data:")
df_test_replaced = df_test.filter(col("Age_replaced") == 1)
df_test_replaced.show()

Rows where 'Age' was replaced in Train Data:
+------+-----+-----+-----------+--------+--------------------+------+-------+---------------+--------+-----+--------+
|Pclass|SibSp|Parch|PassengerId|Survived|                Name|   Sex|    Age|         Ticket|    Fare|Cabin|Embarked|
+------+-----+-----+-----------+--------+--------------------+------+-------+---------------+--------+-----+--------+
|     3|    0|    0|          6|       0|    Moran, Mr. James|  male| 7.8958|         330877|  8.4583| null|       Q|
|     2|    0|    0|         18|       1|Williams, Mr. Cha...|  male|   13.0|         244373|    13.0| null|       S|
|     3|    0|    0|         20|       1|Masselmani, Mrs. ...|female| 7.8958|           2649|   7.225| null|       C|
|     3|    0|    0|         27|       0|Emir, Mr. Farred ...|  male| 7.8958|           2631|   7.225| null|       C|
|     3|    0|    0|         29|       1|"O'Dwyer, Miss. E...|female| 7.8958|         330959|  7.8792| null|       Q|
|     3|   

In [0]:
# Again, count the number of null values in each column, check whether the above processing have been applied successfully
train_null_counts = df_train.select([count(when(isnull(c), c)).alias(c) for c in df_train.columns])
test_null_counts = df_test.select([count(when(isnull(c), c)).alias(c) for c in df_test.columns])

# Show the count of null values
print('Null Counts in Train Data:')
train_null_counts.show()

print('Null Counts in Test Data:')
test_null_counts.show()

Null Counts in Train Data:
+------+-----+-----+-----------+--------+----+---+---+------+----+-----+--------+
|Pclass|SibSp|Parch|PassengerId|Survived|Name|Sex|Age|Ticket|Fare|Cabin|Embarked|
+------+-----+-----+-----------+--------+----+---+---+------+----+-----+--------+
|     0|    0|    0|          0|       0|   0|  0|  0|     0|   0|  687|       0|
+------+-----+-----+-----------+--------+----+---+---+------+----+-----+--------+

Null Counts in Test Data:
+------+-----+-----+--------+-----------+----+---+---+------+----+-----+
|Pclass|SibSp|Parch|Embarked|PassengerId|Name|Sex|Age|Ticket|Fare|Cabin|
+------+-----+-----+--------+-----------+----+---+---+------+----+-----+
|     0|    0|    0|       0|          0|   0|  0|  0|     0|   0|  327|
+------+-----+-----+--------+-----------+----+---+---+------+----+-----+



# Step 3 - Adding Labels and Features

In [0]:
from pyspark.ml.feature import StringIndexer

# Define the columns to index and their corresponding indexed column names
index_columns = {'Sex': 'Sex_Indexed', 'Embarked': 'Embarked_Indexed'}

# Create StringIndexer instances for both train and test data
train_indexers = [StringIndexer(inputCol=col, outputCol=index_columns[col]).fit(df_train) for col in index_columns]
test_indexers = [StringIndexer(inputCol=col, outputCol=index_columns[col]).fit(df_test) for col in index_columns]

# Transform the dataframes using the indexers
df_train = df_train
for indexer in train_indexers:
    df_train = indexer.transform(df_train)

df_test = df_test
for indexer in test_indexers:
    df_test = indexer.transform(df_test)

# Show the transformed data
print("Train Data after adding 'Sex_Indexed' and 'Embarked_Indexed':")
df_train.select('Sex', 'Embarked', 'Sex_Indexed', 'Embarked_Indexed').show(5)

print("Test Data after adding 'Sex_Indexed' and 'Embarked_Indexed':")
df_test.select('Sex', 'Embarked', 'Sex_Indexed', 'Embarked_Indexed').show(5)

Train Data after adding 'Sex_Indexed' and 'Embarked_Indexed':
+------+--------+-----------+----------------+
|   Sex|Embarked|Sex_Indexed|Embarked_Indexed|
+------+--------+-----------+----------------+
|  male|       S|        0.0|             0.0|
|female|       C|        1.0|             1.0|
|female|       S|        1.0|             0.0|
|female|       S|        1.0|             0.0|
|  male|       S|        0.0|             0.0|
+------+--------+-----------+----------------+
only showing top 5 rows

Test Data after adding 'Sex_Indexed' and 'Embarked_Indexed':
+------+--------+-----------+----------------+
|   Sex|Embarked|Sex_Indexed|Embarked_Indexed|
+------+--------+-----------+----------------+
|  male|       Q|        0.0|             2.0|
|female|       S|        1.0|             0.0|
|  male|       Q|        0.0|             2.0|
|  male|       S|        0.0|             0.0|
|female|       S|        1.0|             0.0|
+------+--------+-----------+----------------+
only s

In [0]:
# Add the 'FamilySize' column to df_train
df_train = df_train.withColumn("FamilySize", col("SibSp") + col("Parch") + 1)

# Add the 'FamilySize' column to df_test
df_test = df_test.withColumn("FamilySize", col("SibSp") + col("Parch") + 1)

# Show the df_train DataFrame after adding 'FamilySize'
print("Train Data after adding 'FamilySize':")
df_train.select('SibSp', 'Parch', 'FamilySize').show(5)

# Show the df_test DataFrame after adding 'FamilySize'
print("Test Data after adding 'FamilySize':")
df_test.select('SibSp', 'Parch', 'FamilySize').show(5)

Train Data after adding 'FamilySize':
+-----+-----+----------+
|SibSp|Parch|FamilySize|
+-----+-----+----------+
|    1|    0|         2|
|    1|    0|         2|
|    0|    0|         1|
|    1|    0|         2|
|    0|    0|         1|
+-----+-----+----------+
only showing top 5 rows

Test Data after adding 'FamilySize':
+-----+-----+----------+
|SibSp|Parch|FamilySize|
+-----+-----+----------+
|    0|    0|         1|
|    1|    0|         2|
|    0|    0|         1|
|    0|    0|         1|
|    1|    1|         3|
+-----+-----+----------+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import VectorAssembler

# Define the feature columns
feature_cols = ['Pclass', 'Sex_Indexed', 'Age', 'Fare', 'FamilySize', 'Embarked_Indexed']

# Create VectorAssembler instances for both train and test data
train_feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol='Independent_Features')
df_train = train_feature_assembler.transform(df_train)

test_feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol='Independent_Features')
df_test = test_feature_assembler.transform(df_test)

# Show the transformed data
print("Train Data after adding 'Independent_Features':")
df_train.select('Independent_Features').show(5, truncate=False)

print("Test Data after adding 'Independent_Features':")
df_test.select('Independent_Features').show(5, truncate=False)


Train Data after adding 'Independent_Features':
+------------------------------+
|Independent_Features          |
+------------------------------+
|[3.0,0.0,22.0,7.25,2.0,0.0]   |
|[1.0,1.0,38.0,71.2833,2.0,1.0]|
|[3.0,1.0,26.0,7.925,1.0,0.0]  |
|[1.0,1.0,35.0,53.1,2.0,0.0]   |
|[3.0,0.0,35.0,8.05,1.0,0.0]   |
+------------------------------+
only showing top 5 rows

Test Data after adding 'Independent_Features':
+------------------------------+
|Independent_Features          |
+------------------------------+
|[3.0,0.0,34.5,7.8292,1.0,2.0] |
|[3.0,1.0,47.0,7.0,2.0,0.0]    |
|[2.0,0.0,62.0,9.6875,1.0,2.0] |
|[3.0,0.0,27.0,8.6625,1.0,0.0] |
|[3.0,1.0,22.0,12.2875,3.0,0.0]|
+------------------------------+
only showing top 5 rows



In [0]:
# show the data after data cleaning
print('Train Data:')
df_train.show(5, truncate=False)

print('Test Data:')
df_test.show(5, truncate=False)

Train Data:
+------+-----+-----+-----------+--------+---------------------------------------------------+------+----+----------------+-------+-----+--------+-----------+----------------+----------+------------------------------+
|Pclass|SibSp|Parch|PassengerId|Survived|Name                                               |Sex   |Age |Ticket          |Fare   |Cabin|Embarked|Sex_Indexed|Embarked_Indexed|FamilySize|Independent_Features          |
+------+-----+-----+-----------+--------+---------------------------------------------------+------+----+----------------+-------+-----+--------+-----------+----------------+----------+------------------------------+
|3     |1    |0    |1          |0       |Braund, Mr. Owen Harris                            |male  |22.0|A/5 21171       |7.25   |null |S       |0.0        |0.0             |2         |[3.0,0.0,22.0,7.25,2.0,0.0]   |
|1     |1    |0    |2          |1       |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|PC 17599       

# Step 4 - Prepare for Submission

In [0]:
# Select the label and features columns
train_data = df_train.select(col("Independent_Features").alias('features'), col("Survived").alias("label"))
test_data = df_test.select(col("Independent_Features").alias('features'), "PassengerId")

In [0]:
# Train RandomForest model
rf = RandomForestClassifier(numTrees=100, seed=45)
model = rf.fit(train_data)

In [0]:
# Make predictions
predictions = model.transform(test_data)

# Select PassengerId and prediction (as Survived)
submission = predictions.select(col("PassengerId"), col("prediction").alias("Survived"))

# You can right click and download the submission, public score 0.77990
display(submission)

PassengerId,Survived
892,0.0
893,0.0
894,0.0
895,0.0
896,1.0
897,0.0
898,0.0
899,0.0
900,1.0
901,0.0
