# Kaggle: titanic dataset with databricks
## Objective
### The Challenge 

The sinking of the Titanic is one of the most infamous shipwrecks in history.

On April 15, 1912, during her maiden voyage, the widely considered “unsinkable” RMS Titanic sank after colliding with an iceberg. Unfortunately, there weren’t enough lifeboats for everyone onboard, resulting in the death of 1502 out of 2224 passengers and crew.

While there was some element of luck involved in surviving, it seems some groups of people were more likely to survive than others.

### The investigation
In this notebook we will be looking at predicting which passengers are likely to survive the disaster by building a prediction model, as well as exploring the main factors that may have helped people to survive.

## Setup
The titanic dataset has been downloaded from kaggle: https://www.kaggle.com/c/titanic/overview. It was then uploaded to my Databricks Community Edition account and added in two tables (using data -> add data ). The datasets are available to me in titanic_train and titanic_test.

## Load data and explore
We can use spark.sql to load data from the tables in a train and test set:

In [2]:
train = spark.sql("SELECT * FROM titanic_train")
test = spark.sql("SELECT * FROM titanic_test")
train_exploration = train

There are 12 columns in this dataset:
- PassengerId: unique identifier for a passenger (int)
- Survived: whether the passenger survived,	0 = No, 1 = Yes
- Pclass: ticket class,	1 = 1st, 2 = 2nd, 3 = 3rd
- Sex: male of female	
- Age: Age in years	(float)
- SibSp: number of siblings / spouses aboard the Titanic for that passenger
- Parch: number of parents / children aboard the Titanic	
- Ticket: ticket number	
- Fare: ticket price
- Cabin: cabin number	
- Embarked: which port the passenger embarked from. C = Cherbourg, Q = Queenstown, S = Southampton

In [4]:
train_exploration.show(5)

### Ideas before statistical analysis and modeling 
- Sex and age: women and children may have had a priority to board the lifeboats and had more chances of survival
- Socio-economics: has the "upper" class been given a priority in boarding the life boats? Socio-economic descripters may be found in the fare, where they embarked, what (P)class they were in (1, 2, 3), maybe what Cabin they were staying in, and Name (Master...)
- Where they were staying on the ship: people closer to life boats may have had an easier access. This information may be in Cabin.

### Methodology
We will do the following in order:
- Look at missing values and how to impute them
- Look at transforming variables based on basic "common sense" logic
- Apply tree based methods for classification with cross validation and apply predictions
- Look at the important features and inspect the model to verify if it makes sense
- Review the model if needed and generate predictions (this is to get a score from Kaggle)
- Report on why certain passengers / groups of passengers had more chances of survival than others

### Missing values
Let's identify which columns have missing values and how many they have:

In [6]:
from pyspark.sql import functions as fns
cols_with_nan = {c: train_exploration.where(fns.col(c).isNull()).count() for c in train_exploration.columns if train.where(fns.col(c).isNull()).count() > 0 }
print(cols_with_nan)

##### The following columns have missing values:
- Age: 177
- Cabin: 687
- Embarked: 2

Age may be an important variable hence we will replace the value.
Cabin has many missing values. However, many people embarked with their families: can we re-create the cabin based on what family they were part of?
Embarked: we may be able to retrieve this from their relatives. If not, it may be better to drop both rows for the training. 


##### Strategy
We could replace by mean or mode, but if age for example is an important variable and we assign a child an age much greater than their own, we'll introduce a bias. 
- Add a column for Age, Cabin and Embarked to know if the data was originally missing. Maybe there is a reason it is missing ? Maybe the dataset creator applied rules before removing values?
- Identify families (and add a column for family is)
- Identify who is the father, mother, children in the families
- Create a column for "has someone in their family survived", and see later if a strategy of generating predictions in a recursive way could yield better results.

###### Add the Na flag
Adding an Na flag could be interesting if we impute the missing values for the model to take into account that it was missing. That is if the fact that it was missing is important and not noise.

In [8]:
def get_nan_flag(df):
  for c in cols_with_nan:
    print(c)
    df = df.withColumn(c + "_null_flag", fns.when((df[c].isNull()), 1).otherwise(0))
  return df
train_exploration = get_nan_flag(train)
train_exploration.show(5)

###### Get the titles
This will tell us if the person is the master, mrs, etc. and can help us regrouping by families later. They may contain information on social "ranking".

In [10]:
def get_title(df):
  return df.withColumn("title",fns.regexp_extract(fns.col("Name"),"([A-Za-z]+)\.",1))

train_exploration = get_title(train_exploration)
train_exploration.groupBy("title").count().sort(fns.col("count").desc()).show(100)

###### There are quite a few different titles and most of them have very few occurences. Let's bin them!
Lets first bin the ones who logically should be similar:

We are left with:
- Doctors: Dr
- People from the christian cleric: Rev
- Army / ranked: Major, Col, Capt

We should group these by correlation to the target variable, whether or not they usually survived. We will see correlation, but as we have a very imbalanced dataset, we'll also see for each title group what is the percentage who survived.

In [13]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd

df = train_exploration[["title", "Survived"]]
df = df.withColumn("Miss", fns.when(train_exploration["title"].isin(["Miss", "Mlle"]), 1).otherwise(0)) 
df = df.withColumn("Mrs", fns.when(train_exploration["title"].isin(["Ms", "Mrs", "Mme"]), 1).otherwise(0)) 
df = df.withColumn("Mr", fns.when(train_exploration["title"].isin(["Mr", "Master"]), 1).otherwise(0))
df = df.withColumn("Nobility", fns.when(train_exploration["title"].isin(["Don", "Lady", "Countess", "Jonkheer", "Sir"]), 1).otherwise(0))
df = df.withColumn("Doctors", fns.when(df["title"] == "Dr", 1).otherwise(0))
df = df.withColumn("Cleric", fns.when(df["title"] == "Rev", 1).otherwise(0))
df = df.withColumn("Army", fns.when(df["title"].isin(["Major", "Col", "Capt"]), 1).otherwise(0))
df = df.drop("title")

for col in df.columns:
  if col != "Survived":
    print(col, df.stat.corr(col, "Survived"))
    
# Compute percentage of survival
for col in df.columns:
  if col != "Survived":
    print("\n", col, "\n")
    grouped = df.filter(f"{col} = 1").groupBy("Survived").count().toPandas()
    grouped = grouped.set_index("Survived")
    print(grouped["count"] / grouped["count"].sum())

We can see that married women have more chances of survival than single woman, who in turn have more chances than men. This isn't surprising as women with children were probably given priority.
We can also see that the reverends never made it: **we will mix rules and modeling to set every reverend to Survived=0**.
As for doctors, members of nobility and army, we don't really see a particular pattern, so we'll bin them together.

In [15]:
def get_title_flag(df):
  df = df.withColumn("Other", fns.when(df["title"].isin(["Dr", "Major", "Col", "Capt", "Don", "Lady", "Countess", "Jonkheer", "Sir"]), 1).otherwise(0))
  df = df.withColumn("Miss", fns.when(df["title"].isin(["Miss", "Mlle"]), 1).otherwise(0))
  df = df.withColumn("Mrs", fns.when(df["title"].isin(["Ms", "Mrs", "Mme"]), 1).otherwise(0))
  df = df.withColumn("Mr", fns.when(df["title"].isin(["Mr", "Master"]), 1).otherwise(0))
  df = df.withColumn("Rev", fns.when(df["title"] == "Rev", 1).otherwise(0))
  return df

train_exploration = get_title_flag(train_exploration)

##### Infilling age
Classical infilling methods usually involve infilling with mode, median, mean... But if we give a child an age of over 18, we will be probably misclassifying as age is an important feature. We need to:
- Bin age
- Get an approximation of age to put into the bin based on the other variables and some common sense (e.g. if "master" the person is under 18)

Let's start by showing the distribution of age vs survived.

In [17]:
from pyspark.sql.window import Window
df1 = train_exploration.select("Survived", "Age").dropna().orderBy("age")
windowSpec = Window.orderBy(fns.col("Age")).rangeBetween(-50, 0)
df2 = df1.withColumn('Average survival rate', fns.avg("Survived").over(windowSpec))
display(df2)

Survived,Age,Average survival rate
1,0.42,1.0
1,0.67,1.0
1,0.75,1.0
1,0.75,1.0
1,0.83,1.0
1,0.83,1.0
1,0.92,1.0
0,1.0,0.8571428571428571
1,1.0,0.8571428571428571
1,1.0,0.8571428571428571


We see several cut off points:
- 1 year old: Everyone survives. We'll create a bin 0-1. These children would probably have survived with their mother, so we will need to build a feature that indicates that a family had a yound child among them
- 1+ -> 15: survival rate is relatively stable and higher than average
- 15 - 21: it gradually gows down on average there
- 21+: survival rate is relatively stable accross all age groups

In [19]:
def get_age_bins(df, age_col="Age"):
  df = df.withColumn(f"{age_col}_0_1", fns.when((df[age_col] <= 1), 1).otherwise(0))
  df = df.withColumn(f"{age_col}_1_15", fns.when((df[age_col] > 1) & (df[age_col] <= 15), 1).otherwise(0))
  df = df.withColumn(f"{age_col}_15_21", fns.when((df[age_col] > 15) & (df[age_col] <= 21), 1).otherwise(0))
  df = df.withColumn(f"{age_col}_21_all", fns.when((df[age_col] > 21), 1).otherwise(0))
  return df

train_exploration = get_age_bins(train_exploration)  
train_exploration.select("Survived", "Age", "Age_0_1", "Age_1_15", "Age_15_21", "Age_21_all").show(10)

In [20]:
train_exploration.show()

Note: I'm not infilling age for now as I will be using xgboost. In the future, I will replace this model with spark, and we will replace missing age values by their average given their title, or even see if they are travelling with a partner (if so, they could be likely to be of the same age).

##### Figure out if the person was traveling alone
The underlying assumption is that people who were traveling alone may have less chances of survival than people traveling with kids.

In [23]:
def get_traveler_group(df):
  df = df.withColumn("Family_Size", fns.col('SibSp') + fns.col('Parch'))
  df = df.withColumn("Alone", fns.when(df["Family_Size"] == 0, 1).otherwise(0))
  return df

train_exploration = get_traveler_group(train_exploration)
train_exploration.show(5)

##### How old was the youngest child in the family
The underlying assumption is that women traveling with yound children may have more chances of survival.
For this, we first need to group by family, hence we create a family id with the last name and the port of embarkation.

In [25]:
def get_family_id(df):
  df = df.withColumn("last_name",fns.regexp_extract(fns.col("Name"),"^(.+?),", 1))
  df = df.withColumn("family_id", fns.concat(fns.col('last_name'), fns.col('Embarked')))
  return df

train_exploration = get_family_id(train_exploration)
train_exploration.select("family_id").show(5)

In [26]:
def get_youngest_age(df):
  temp = df.select("Age", "family_id").groupBy("family_id").min().withColumnRenamed("MIN(Age)", "fam_youngest_age_temp")
  df = df.alias("df")
  temp = temp.alias("temp")
  df = df.join(temp, df.family_id == temp.family_id, "left").selectExpr("df.*", "temp.fam_youngest_age_temp")
  df = df.withColumn("fam_youngest_age", fns.when((fns.col("fam_youngest_age_temp") <= 21) & (fns.col("Alone") == 0), fns.col("fam_youngest_age_temp")).otherwise(None))
  df = df.drop("fam_youngest_age_temp")
  return df

train_exploration = get_youngest_age(train_exploration)
train_exploration.select("fam_youngest_age").show()

#### Cabin info
The underlying assumption is that the closer to life boats the easier it would be to get in one. However, the dataset has many missing values, and cabins are all different. We look for a way to group them and infill missing values.

In [28]:
train_exploration.withColumn("cabin_area",fns.regexp_extract(fns.col("Cabin"),"[A-Z]", 0)).select("cabin_area").distinct().sort(fns.col("cabin_area").desc()).show()

We can see there are 8  Cabins: A-G and T. Others are NULL. People of the same last name may have been traveling together, hence the cabin area should be the same. We may be able to retrieve the information when it is missing by grouping by last name, hoping that unrelated people do not have the same last name.

In [30]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.feature import StringIndexer

def impute_cabins(df):
  temp = df.withColumn("cabin_area",fns.regexp_extract(fns.col("Cabin"),"[A-Z]", 0)).select("cabin_area", "family_id")
  indexer = StringIndexer(inputCol="cabin_area", outputCol="cabin_area_indexed", handleInvalid="keep")
  indexer = indexer.fit(temp)
  temp = (indexer
     .transform(temp)
     .withColumn("cabin_area_indexed", fns.when(fns.col("cabin_area_indexed") == len(indexer.labels), None).otherwise(fns.col("cabin_area_indexed")))).distinct()
  temp = temp.select("family_id", "cabin_area_indexed").groupBy("family_id").max().withColumnRenamed("MAX(cabin_area_indexed)", "cabin_area_indexed")
  df = df.alias("df")
  temp = temp.alias("temp")
  df = df.join(temp, df.family_id == temp.family_id, "left").selectExpr("df.*", "temp.cabin_area_indexed")
  df.select("cabin_area_indexed").show()
  print(df.count())
  print(df.select("cabin_area_indexed").where(fns.col("cabin_area_indexed").isNull()).count())
  print(temp.where(~fns.col("cabin_area_indexed").isNull()).distinct().count())
  return df

train_exploration = impute_cabins(train_exploration)
train_exploration.show()

After having overfitted on Fare quite a bit, it seems that creating bins could be a good idea. Lets look at the distribution.

In [32]:
from pyspark.sql.window import Window
df1 = train_exploration.select("Survived", "Fare").dropna().orderBy("Fare")
windowSpec = Window.orderBy(fns.col("Fare")).rangeBetween(-50, 0)
df2 = df1.withColumn('Average survival rate', fns.avg("Survived").over(windowSpec))
display(df2)

Survived,Fare,Average survival rate
0,0.0,0.0666666666666666
0,0.0,0.0666666666666666
1,0.0,0.0666666666666666
0,0.0,0.0666666666666666
0,0.0,0.0666666666666666
0,0.0,0.0666666666666666
0,0.0,0.0666666666666666
0,0.0,0.0666666666666666
0,0.0,0.0666666666666666
0,0.0,0.0666666666666666


Lets create 3 bins:
- 0 -> 7.8
- 7.8 -> 58
- 58 +

In [34]:
def get_fare_bins(df):
  df = df.withColumn(f"fare_0_7_8", fns.when((df["Fare"] <= 7.8), 1).otherwise(0))
  df = df.withColumn(f"fare_7_8_48", fns.when((df["Fare"] > 7.8) & (df["Fare"] <= 58), 1).otherwise(0))
  df = df.withColumn(f"fare_58_all", fns.when(df["Fare"] > 58, 1).otherwise(0))
  return df

train_exploration = get_fare_bins(train_exploration)  
train_exploration.select("Survived", "fare_0_7_8", "fare_7_8_48", "fare_58_all").show(10)

In [35]:
from xgboost import XGBClassifier
from xgboost import plot_importance
import pandas as pd
import pyspark
from sklearn.model_selection import GridSearchCV


class TitanicModel:
  def __init__(self):
    self.model = GridSearchCV(
      XGBClassifier(),
      param_grid={
        "max_depth": [2, 3, 4],
        "n_estimators": [50, 100, 150, 500],
        "learning_rate":[0.1, 0.05]
      } , 
      n_jobs=-1,
      scoring="precision")
  
  def fit_predict(self, x_train: pyspark.sql.DataFrame, x_test: pyspark.sql.DataFrame,
                 cols_to_drop: list) -> pd.DataFrame:
    # Preprocess all data as families could be split accross train and test
    x = self.preprocessing(x_train, x_test)
    # Go to pandas and set index to passenger id
    x = x.toPandas()
    x = x.set_index("PassengerId")
    x = x.drop(columns=cols_to_drop)
    # Split back to train and test
    x_train, x_test = self.train_test_split(x)
    # Elimitate rev as they are set to 0
    x_train_fit, _ = self.split_rev_non_rev(x_train)
    # Fit
    self.model.fit(x_train_fit.drop("Survived", axis=1), x_train_fit.loc[:, "Survived"])
    print(self.model.best_params_)
    self.best_score = self.model.best_score_
    # Make predictions
    # First split rev
    x_test_predict, x_test_rev = self.split_rev_non_rev(x_test)
    # Predict
    x_test_predict["Survived"] = self.model.predict(x_test_predict)
    x_test_rev["Survived"] = 0
    # Keep only the predition and merge
    x_test_predict = x_test_predict.loc[:, "Survived"]
    x_test_rev = x_test_rev.loc[:, "Survived"]
    pred = x_test_predict.append(x_test_rev).reset_index()
    return pred
  
  def display_plot_importance(self):
    display(plot_importance(self.model.best_estimator_))
  
  def print_best_score(self):
    print(self.best_score)
    
  @staticmethod
  def split_rev_non_rev(x):
    cond = (x.loc[:, "Rev"] == 1)
    return x.loc[~cond, :].drop("Rev", axis=1), x.loc[cond, :]
                                        
  @staticmethod
  def preprocessing(x_train, x_test):
    x_test = x_test.withColumn("Survived", fns.lit(None))
    x = x_train.unionByName(x_test)
    x = get_nan_flag(x)
    x = get_title(x)
    x = get_title_flag(x)
    x = get_age_bins(x)
    x = get_traveler_group(x)
    x = get_family_id(x)
    x = get_youngest_age(x)
    #x = get_age_bins(x, "fam_youngest_age")
    x = get_fare_bins(x)
    x = impute_cabins(x)
    x = x.withColumn("Sex", fns.when(fns.col("Sex") == "female", 1).otherwise(0))
    return x
  
  @staticmethod
  def train_test_split(x):
    cond = x.loc[:, "Survived"].isna()
    return x.loc[~cond, :], x.loc[cond, :].drop("Survived", axis=1)
    
    

In [36]:
model = TitanicModel()
to_drop = ["Cabin", "Ticket", "Name", "last_name", "family_id", "title", "Embarked", "Age_null_flag", "Cabin_null_flag", "Embarked_null_flag", "Age", "Fare", "Mr", "Mrs", "Miss", "Other"]
pred = model.fit_predict(train, test, to_drop)

In [37]:
display(pred)

PassengerId,Survived
1011,1.0
934,0.0
1198,1.0
1279,0.0
1072,0.0
962,1.0
938,0.0
1221,0.0
1068,1.0
1241,1.0


In [38]:
model.display_plot_importance()

In [39]:
model.best_score

#### Interpretation
As expected, women traveling with young children seem to be the most likely to survive. 

In addition, people who paid a faire > 58 seem to have a higher chance of survival, although the model isn't picking this up as an important variable. Maybe ranking could be a better option than one hot encoding for this feature. However, the model does pick up Pclass as an important variable, which correlates with the fact that richer travelers would have a better chance of survival.

In [41]:
display(train_exploration.select("Survived", "Pclass").groupBy(fns.col("Pclass")).avg().orderBy("Pclass"))

Pclass,avg(Survived),avg(Pclass)
1,0.6296296296296297,1.0
2,0.4728260869565217,2.0
3,0.2423625254582484,3.0


We do see that 3rd class had the least chances of survival and that first class survived more often. Hence wealth was also a determining factor.

In [43]:
train_exploration = spark.sql("SELECT * FROM titanic_train")
df = train_exploration.withColumn("cabin_area",fns.regexp_extract(fns.col("Cabin"),"[A-Z]", 0)).sort(fns.col("cabin_area").desc())
display(df.select("Survived", "cabin_area").groupBy(fns.col("cabin_area")).avg())

cabin_area,avg(Survived)
T,0.0
G,0.5
F,0.6153846153846154
E,0.75
D,0.7575757575757576
C,0.5932203389830508
B,0.7446808510638298
A,0.4666666666666667
,0.2998544395924308


Although this would look very drastic for cabin T, we have to put this into perspective with the counts. In addition, even with the processing to re-create the missing cabins, we ended up with many missing values. The model above may be overfitting on this feature as it deems it important, when we could not have enough data for the features to actually be relevant.

In [45]:
display(df.select("cabin_area").where(~fns.col("cabin_area").isNull()).groupBy("cabin_area").count())

cabin_area,count
T,1
G,4
F,13
E,32
D,33
C,59
B,47
A,15


In [46]:
model = TitanicModel()
to_drop = ["Cabin", "Ticket", "Name", "last_name", "family_id", "title", "Embarked", "Age_null_flag", "Cabin_null_flag", "Embarked_null_flag", "Fare", "Mr", "Mrs", "Miss", "Other", "cabin_area_indexed",
          "Age_21_all", "Age_0_1", "Age_1_15"]
pred = model.fit_predict(train, test, to_drop)

In [47]:
model.display_plot_importance()

In [48]:
model.best_score

In [49]:
display(pred)

PassengerId,Survived
1011,1.0
934,0.0
1198,0.0
1279,0.0
1072,0.0
962,1.0
938,0.0
1221,0.0
1068,1.0
1241,1.0
