## Initialisation de Spark

In [1]:
# Starting a Spark session
import pyspark
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Titanic-ML').getOrCreate()

In [2]:

from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract, col, split
from itertools import chain

In [3]:
# Chargement des données 
train = spark.read.csv('./data/train.csv', header=True, inferSchema=True)
test  = spark.read.csv('./data/test.csv', header=True, inferSchema=True)

In [4]:
train.count()

891

In [5]:
# We can use printSchema() to visualize column names, data types and whether they're nullable or not
train.printSchema()


root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
# Visualizing first 5 rows of each dataset
train.show(5)


+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

Right away, we can see that we have some null values in our datasets. We'll deal with them later on...

In [8]:
# Counting the number of classes
train.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



Most passengers didn't survive. 

In [9]:
# Counting average Fare value and average age for each class
train.groupBy('Survived').mean('Age', 'Fare').show()

+--------+------------------+------------------+
|Survived|          avg(Age)|         avg(Fare)|
+--------+------------------+------------------+
|       1|28.343689655172415| 48.39540760233917|
|       0| 30.62617924528302|22.117886885245877|
+--------+------------------+------------------+



On average, the passengers who did survive paid a higher fare and were younger than passengers who deceased.

In [10]:
# Counting survivors by sex
train.groupBy('Survived').pivot('Sex').count().show()

+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|
+--------+------+----+



We can definetly see that most females survived (68.12%) while most men died (85.24%)

In [11]:
# Counting survivors by ticket class
train.groupBy('Survived').pivot('Pclass').count().show()

+--------+---+---+---+
|Survived|  1|  2|  3|
+--------+---+---+---+
|       1|136| 87|119|
|       0| 80| 97|372|
+--------+---+---+---+



Most of the survivors were in the first class, while most of the deceased were in the third class.

In [12]:
# Counting survivors by the number of siblings/spouses in the titanic
train.groupBy('Survived').pivot('SibSp').count().show()

+--------+---+---+---+---+---+----+----+
|Survived|  0|  1|  2|  3|  4|   5|   8|
+--------+---+---+---+---+---+----+----+
|       1|210|112| 13|  4|  3|null|null|
|       0|398| 97| 15| 12| 15|   5|   7|
+--------+---+---+---+---+---+----+----+



In [13]:
# Counting survivors by the number of parents/children in the titanic
train.groupBy('Survived').pivot('Parch').count().show()

+--------+---+---+---+---+----+---+----+
|Survived|  0|  1|  2|  3|   4|  5|   6|
+--------+---+---+---+---+----+---+----+
|       1|233| 65| 40|  3|null|  1|null|
|       0|445| 53| 40|  2|   4|  4|   1|
+--------+---+---+---+---+----+---+----+



It doesn't seem like the number of siblings, spouses, children or parents in the Titanic would play a significant role in surviving the accident. Most of the survivors had no siblings with them, while most of those who were deceased were also alone. <br><br>
The only conclusion we could make is that it seems more likely that most large families didn't make it out alive.

In [14]:
# Counting survivors by the port of embarkation
train.groupBy('Survived').pivot('Embarked').count().show()

+--------+----+---+---+---+
|Survived|null|  C|  Q|  S|
+--------+----+---+---+---+
|       1|   2| 93| 30|217|
|       0|null| 75| 47|427|
+--------+----+---+---+---+



Ok! After getting some insights on survivors, we can realize that gender and socioeconomic class were the most significant features in increasing or decreasing the probability of passengers surviving the accident. <br><br>

Let's now take a look at the dataset and see any changes we must do. We know we have some null data to deal with. Let's count them!

In [15]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [16]:
for col in train.columns:
    print(col.ljust(20), train.filter(train[col].isNull()).count())

PassengerId          0
Survived             0
Pclass               0
Name                 0
Sex                  0
Age                  177
SibSp                0
Parch                0
Ticket               0
Fare                 0
Cabin                687
Embarked             2


Age, Cabin, and Embarked columns have some missing data.<br><br>

Not only does Cabin have a lot of missing values, but it's highly related to ticket class since cabins were distributed according to these classes. We'll drop this column!<br><br>

For Age and Embarked, we will fill in missing values.

In [17]:
# Filling in Embarked with 'S', most repeated value
train = train.fillna({'Embarked':'S'})

For age imputation, we ought to take the title of the person in the name column and impute the average age for that specific group of people with such title.<br><br>
Mrs, for instance, tends to be older than Miss

In [18]:
# Counting the ages per title and getting the average age for each title
train = train.withColumn('Title', regexp_extract(train.Name,'([A-Za-z]+)\.', 1))
train.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(age)').show()

+--------+----------+------------------+
|   Title|count(Age)|          avg(Age)|
+--------+----------+------------------+
|     Don|         1|              40.0|
|    Capt|         1|              70.0|
|    Lady|         1|              48.0|
|     Sir|         1|              49.0|
|Jonkheer|         1|              38.0|
|Countess|         1|              33.0|
|     Mme|         1|              24.0|
|      Ms|         1|              28.0|
|   Major|         2|              48.5|
|    Mlle|         2|              24.0|
|     Col|         2|              58.0|
|     Rev|         6|43.166666666666664|
|      Dr|         6|              42.0|
|  Master|        36| 4.574166666666667|
|     Mrs|       108|35.898148148148145|
|    Miss|       146|21.773972602739725|
|      Mr|       398|32.368090452261306|
+--------+----------+------------------+



In [19]:
# Considering that Mr, Miss and Mrs repeat much more than other titles, we may map some other titles
# with one of these three. For example, we may map Lady as Mrs and Capt ad Mr

titles = {'Mr':'Mr', 'Miss':'Miss', 'Mrs':'Mrs', 'Master':'Master', \
             'Mlle': 'Miss', 'Major': 'Mr', 'Col': 'Mr', 'Sir': 'Mr',\
             'Don': 'Mr', 'Mme': 'Miss', 'Jonkheer': 'Mr', 'Lady': 'Mrs',\
             'Capt': 'Mr', 'Countess': 'Mrs', 'Ms': 'Miss', 'Dona': 'Mrs', \
             'Dr':'Mr', 'Rev':'Mr'}

map = create_map([lit(x) for x in chain(*titles.items())])
train = train.withColumn('Title', map[train['Title']])
train.groupBy('Title').mean('Age').show() # Showing new averages for each title

+------+------------------+
| Title|          avg(Age)|
+------+------------------+
|  Miss|             21.86|
|Master| 4.574166666666667|
|    Mr| 33.02272727272727|
|   Mrs|35.981818181818184|
+------+------------------+



In [20]:
# Creating a function for age imputation
def age_imputer(data, title, age):
    return data.withColumn('Age', when((data.Age.isNull()) & (data.Title==title),age).otherwise(data.Age))

In [21]:
train = age_imputer(train, 'Miss', 21.86)
train = age_imputer(train, 'Master', 4.75)
train = age_imputer(train,'Mr', 33.02)
train = age_imputer(train, 'Mrs', 35.98)

In [22]:
# Creating a FamilySize column combining Parch and SibSp
train = train.withColumn('FamilySize', train.Parch + train.SibSp).drop('Parch', 'SibSp')

In [23]:
# Removing other unnecessary columns
train = train.drop('PassengerId','Cabin','Name','Ticket','Title')
# Visualizing data
train.show(5)

+--------+------+------+----+-------+--------+----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|FamilySize|
+--------+------+------+----+-------+--------+----------+
|       0|     3|  male|22.0|   7.25|       S|         1|
|       1|     1|female|38.0|71.2833|       C|         1|
|       1|     3|female|26.0|  7.925|       S|         0|
|       1|     1|female|35.0|   53.1|       S|         1|
|       0|     3|  male|35.0|   8.05|       S|         0|
+--------+------+------+----+-------+--------+----------+
only showing top 5 rows



# Creating a Machine Learning Model

In [24]:
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [25]:
# Converting Sex and Embarked columns to numeric index
indexer = StringIndexer(inputCols=['Sex','Embarked'],outputCols=['SexIndex', 'EmbarkedIndex'])
indexer_model = indexer.fit(train)
train_1 = indexer_model.transform(train).drop('Sex','Embarked') # Transforming and dropping original columns
train_1.show(5)

+--------+------+----+-------+----------+--------+-------------+
|Survived|Pclass| Age|   Fare|FamilySize|SexIndex|EmbarkedIndex|
+--------+------+----+-------+----------+--------+-------------+
|       0|     3|22.0|   7.25|         1|     0.0|          0.0|
|       1|     1|38.0|71.2833|         1|     1.0|          1.0|
|       1|     3|26.0|  7.925|         0|     1.0|          0.0|
|       1|     1|35.0|   53.1|         1|     1.0|          0.0|
|       0|     3|35.0|   8.05|         0|     0.0|          0.0|
+--------+------+----+-------+----------+--------+-------------+
only showing top 5 rows



In [26]:
# Separating variables into indepedent and dependent variables with VectorAssembler
assembler = VectorAssembler(inputCols=train_1.columns[1:], outputCol = 'features')
train_1 = assembler.transform(train_1).select('features','Survived')
train_1.show(5, False)

+------------------------------+--------+
|features                      |Survived|
+------------------------------+--------+
|[3.0,22.0,7.25,1.0,0.0,0.0]   |0       |
|[1.0,38.0,71.2833,1.0,1.0,1.0]|1       |
|[3.0,26.0,7.925,0.0,1.0,0.0]  |1       |
|[1.0,35.0,53.1,1.0,1.0,0.0]   |1       |
|[3.0,35.0,8.05,0.0,0.0,0.0]   |0       |
+------------------------------+--------+
only showing top 5 rows



In [27]:
# Splitting data into training and validation sets
train_ds, valid_ds = train_1.randomSplit([0.7,0.3]) # 70% of data will be used for training

In [28]:
train_ds.show(5, False)

+---------------------+--------+
|features             |Survived|
+---------------------+--------+
|(6,[0,1],[1.0,33.02])|0       |
|(6,[0,1],[1.0,33.02])|0       |
|(6,[0,1],[1.0,39.0]) |0       |
|(6,[0,1],[1.0,40.0]) |0       |
|(6,[0,1],[2.0,33.02])|0       |
+---------------------+--------+
only showing top 5 rows



In [29]:
valid_ds.show(5, False)

+------------------------------+--------+
|features                      |Survived|
+------------------------------+--------+
|(6,[0,1],[1.0,38.0])          |0       |
|(6,[0,1],[2.0,33.02])         |0       |
|[1.0,16.0,39.4,1.0,1.0,0.0]   |1       |
|[1.0,16.0,57.9792,1.0,1.0,1.0]|1       |
|[1.0,18.0,79.65,2.0,1.0,0.0]  |1       |
+------------------------------+--------+
only showing top 5 rows



In [30]:
accuracy = MulticlassClassificationEvaluator(labelCol='Survived', metricName = 'accuracy')

In [31]:
lr = LogisticRegression(labelCol='Survived')
model = lr.fit(train_ds)
prediction = model.transform(valid_ds)
accuracy.evaluate(prediction)

0.7976190476190477

In [32]:
rf = RandomForestClassifier(labelCol='Survived')
model = rf.fit(train_ds)
prediction = model.transform(valid_ds)
accuracy.evaluate(prediction)

0.8492063492063492

In [33]:
gbt = GBTClassifier(labelCol='Survived')
model = gbt.fit(train_ds)
predictions = model.transform(valid_ds)
accuracy.evaluate(predictions)

0.8253968253968254

Random Forest performed better.

Before testing and tuning our models, we first need to clean our test dataset

In [34]:
test.show(5)

+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| null|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| null|       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| null|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| null|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| null|       S|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [35]:
# Checking for missing data
for col in test.columns:
    print(col.ljust(20), test.filter(test[col].isNull()).count())

PassengerId          0
Pclass               0
Name                 0
Sex                  0
Age                  86
SibSp                0
Parch                0
Ticket               0
Fare                 1
Cabin                327
Embarked             0


In [36]:
# Filling Fare
test = test.fillna({'Fare':14.45})
# Creating FamilySize Column
test = test.withColumn('FamilySize', test.Parch + test.SibSp). drop('Parch', 'SibSp')

In [37]:
# For age imputing, we need to redo the same process again, getting the average values for each title
test = test.withColumn('Title', regexp_extract(test.Name, '([A-Za-z]+)\.', 1))
test = test.withColumn('Title', map[test.Title])
test.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+------+----------+------------------+
| Title|count(Age)|          avg(Age)|
+------+----------+------------------+
|Master|        17| 7.406470588235294|
|   Mrs|        63|38.904761904761905|
|  Miss|        64|21.774843750000002|
|    Mr|       188|32.340425531914896|
+------+----------+------------------+



In [38]:
test = age_imputer(test, 'Master', 7.40)
test = age_imputer(test, 'Mrs', 38.90)
test = age_imputer(test, 'Miss', 21.77)
test = age_imputer(test, 'Mr',32.34)
test = test.drop('Cabin', 'Name', 'Ticket', 'Title') # Keeping PassengerId for submission
test.show(5)

+-----------+------+------+----+-------+--------+----------+
|PassengerId|Pclass|   Sex| Age|   Fare|Embarked|FamilySize|
+-----------+------+------+----+-------+--------+----------+
|        892|     3|  male|34.5| 7.8292|       Q|         0|
|        893|     3|female|47.0|    7.0|       S|         1|
|        894|     2|  male|62.0| 9.6875|       Q|         0|
|        895|     3|  male|27.0| 8.6625|       S|         0|
|        896|     3|female|22.0|12.2875|       S|         2|
+-----------+------+------+----+-------+--------+----------+
only showing top 5 rows



In [39]:
pipeline = Pipeline(stages=[indexer, assembler, rf])

params = ParamGridBuilder().\
            addGrid(rf.maxDepth, [3, 4, 5]).\
            addGrid(rf.minInfoGain, [0., 0.01, 0.1]).\
            addGrid(rf.numTrees, [1000]).\
            addGrid(rf.impurity, ["gini", "entropy"]).\
            addGrid(rf.minInstancesPerNode, [1, 5, 10]).\
            addGrid(rf.maxBins, [20, 32, 50]).\
            build()

cv = CrossValidator(estimator=pipeline, 
                                estimatorParamMaps=params, 
                                evaluator=accuracy, 
                                numFolds=5)

model_final = cv.fit(train)
pred_train = model_final.transform(train)
accuracy.evaluate(pred_train)

0.8529741863075196

In [41]:
pred_test.printSchema()

NameError: name 'pred_test' is not defined

In [42]:
pred_test = model_final.transform(test)

predictions = pred_test.select('PassengerId','prediction')
predictions = predictions.withColumn('Survived', predictions['prediction'].\
                                    cast('integer')).drop('prediction')
predictions.show(5)

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|        892|       0|
|        893|       0|
|        894|       0|
|        895|       0|
|        896|       1|
+-----------+--------+
only showing top 5 rows



In [47]:
# Saving submission
predictions.toPandas().to_csv('C:/Users/yyani/Desktop/submiss.csv', index=False)