# PREDICTING THE SURVIVAL OF TITANIC DISASTER

In [1]:
#Importing neccessary libraries
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

Starting the spark session
* The spark session class is the entry point to all functionality in the spark.
* Use sparksession.builder to create a basic spark session.

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark ML example on titanic data ") \
    .getOrCreate()

In [3]:
#Loading the dataset from the given path
s3_bucket_path = "C:/users/Dell/DATA/train.csv"

In [4]:
#Giving a name for the dataframe as STD=Survival of titanic disaster
STD_df = spark.read.csv(s3_bucket_path,header = 'True',inferSchema='True')

In [5]:
#Displaying the dataframe
display(STD_df)

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [6]:
#Checking schema of the dataset
STD_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
#Displaying count of the passengers 
passengers_count = STD_df.count()
print(passengers_count)

891


In [8]:
STD_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

# Now lets do some Exploratory Data Analysis
* Knowing the survival rate of the passengers
* Checking the survival rate using the sex feature
* Checking the Survival rate using the passenger class feature

In [9]:
#selecting the features like Pclass and embarked 
STD_df.select("Survived","Pclass", "Embarked").show(5)

+--------+------+--------+
|Survived|Pclass|Embarked|
+--------+------+--------+
|       0|     3|       S|
|       1|     1|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
+--------+------+--------+
only showing top 5 rows



In [10]:
#Grouping the above selected features to know the survival rate accordingly
STD_df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [11]:
gropuBy_output = STD_df.groupBy("Survived").count()
display(gropuBy_output)

DataFrame[Survived: int, count: bigint]

In [12]:
#Displaying the count of survivors depending upon the Sex
STD_df.groupBy("Sex","Survived").count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



In [13]:
#Displaying the count of survivors depending upon the passengers class
STD_df.groupBy("Pclass","Survived").count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     3|       1|  119|
|     1|       1|  136|
|     2|       1|   87|
|     2|       0|   97|
|     3|       0|  372|
+------+--------+-----+



# Data Cleaning and Preprocessing 
* To deal with the null values an extensive data cleaning is done by finding the mean values for the age.
* Mean of the age is identified by the intials before the names of the passengers.
* Inorder to proceed further we need to first replace the misspelled intials

In [14]:
# This function is used to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [15]:
# Calling function
null_columns_count_list = null_value_count(STD_df)
#Displaying the Null values count from columns containing null values 
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                   Age|              177|
|                 Cabin|              687|
|              Embarked|                2|
+----------------------+-----------------+



* To replace these NaN values, we can assign them the mean age of the dataset.But the problem is, there were many people with many different ages.We just cant assign a 4 year kid with the mean age that is 29 years
* We should search the Feature Description. Looking at the function, we can see that the names have a titles like Mr or Mrs. So we can allocate to the respective groups the mean values of Mr and Mrs.

In [16]:
#Calculating the mean age to handle the null values in the age feature
mean_age = STD_df.select(mean('Age')).collect()[0][0]
print(mean_age)

29.69911764705882


In [17]:
STD_df.select("Name").show(5)

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
+--------------------+
only showing top 5 rows



Using the Regex ""[A-Za-z]+)." we extract the initials from the Name. It looks for strings which lie between A-Z or a-z and followed by a .(dot).

In [18]:
STD_df = STD_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
STD_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Initial|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|     Mr|
+-----------+---

In [19]:
#displaying the intials for the names
STD_df.select("Initial").distinct().show()

+--------+
| Initial|
+--------+
|     Don|
|    Miss|
|Countess|
|     Col|
|     Rev|
|    Lady|
|  Master|
|     Mme|
|    Capt|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



Some intials are mispelled like mlle and mme are related to miss. so I have replaced the misspelled intials with the correct ones. 

In [20]:
#Replacing the misspelled Intials
STD_df = STD_df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])
STD_df.select("Initial").distinct().show()

+-------+
|Initial|
+-------+
|   Miss|
|  Other|
| Master|
|     Mr|
|    Mrs|
+-------+



In [21]:
#Now calculating the average age of the passengers depending on the Initials
STD_df.groupby('Initial').avg('Age').collect()

[Row(Initial='Miss', avg(Age)=21.86),
 Row(Initial='Other', avg(Age)=45.888888888888886),
 Row(Initial='Master', avg(Age)=4.574166666666667),
 Row(Initial='Mr', avg(Age)=32.73960880195599),
 Row(Initial='Mrs', avg(Age)=35.981818181818184)]

In [22]:
#Now lets assign the missing values in the columns based on the average age of the Initials
STD_df = STD_df.withColumn("Age",when((STD_df["Initial"] == "Miss") & (STD_df["Age"].isNull()), 22).otherwise(STD_df["Age"]))
STD_df = STD_df.withColumn("Age",when((STD_df["Initial"] == "Other") & (STD_df["Age"].isNull()), 46).otherwise(STD_df["Age"]))
STD_df = STD_df.withColumn("Age",when((STD_df["Initial"] == "Master") & (STD_df["Age"].isNull()), 5).otherwise(STD_df["Age"]))
STD_df = STD_df.withColumn("Age",when((STD_df["Initial"] == "Mr") & (STD_df["Age"].isNull()), 33).otherwise(STD_df["Age"]))
STD_df = STD_df.withColumn("Age",when((STD_df["Initial"] == "Mrs") & (STD_df["Age"].isNull()), 36).otherwise(STD_df["Age"]))

In [23]:
STD_df.filter(STD_df.Age==46).select("Initial").show()

+-------+
|Initial|
+-------+
|     Mr|
|     Mr|
|     Mr|
+-------+



In [24]:
STD_df.select("Age").show(5)

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
+----+
only showing top 5 rows



In [25]:
#Checking the null values in the 
STD_df.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [26]:
#Filling the null values in the Embarked column
STD_df = STD_df.na.fill({"Embarked" : 'S'})
#Dropping the cabin column as it is not an important feature which effects the model and has lot of null values
STD_df = STD_df.drop("Cabin")
STD_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Initial: string (nullable = true)



We will build and evaluate a new function called "Family size" and "Alone." This functionality is combined sum of Parch(parents / children) and SibSp(siblings / spouses). This provides us a combined data so we can test if survival rate has anything to do with passenger family size.

In [27]:
#creating family_size column
STD_df = STD_df.withColumn("Family_Size",col('SibSp')+col('Parch'))
STD_df.groupBy("Family_Size").count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  161|
|          6|   12|
|          3|   29|
|          5|   22|
|          4|   15|
|          7|    6|
|         10|    7|
|          2|  102|
|          0|  537|
+-----------+-----+



In [28]:
#creating the Alone column and assign some values 0 means alone and 1 means family size
STD_df = STD_df.withColumn('Alone',lit(0))
STD_df = STD_df.withColumn("Alone",when(STD_df["Family_Size"] == 0, 1).otherwise(STD_df["Alone"]))
STD_df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Embarked',
 'Initial',
 'Family_Size',
 'Alone']

# Pipeline

In [29]:
# converting Sex, Embarked & Initial columns from string to number using StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(STD_df) for column in ["Sex","Embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
STD_df = pipeline.fit(STD_df).transform(STD_df)
STD_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+-----------+-----+---------+--------------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|Initial|Family_Size|Alone|Sex_index|Embarked_index|Initial_index|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+-----------+-----+---------+--------------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Mr|          1|    0|      0.0|           0.0|          0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Mrs|          1|    0|      1.0|           1.0|          2.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|   Miss|          0|  

In [30]:
#dropping columns which are not required
STD_df = STD_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")
STD_df.show(5)

+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Family_Size|Alone|Sex_index|Embarked_index|Initial_index|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+
|       0|     3|22.0|    1|    0|   7.25|          1|    0|      0.0|           0.0|          0.0|
|       1|     1|38.0|    1|    0|71.2833|          1|    0|      1.0|           1.0|          2.0|
|       1|     3|26.0|    0|    0|  7.925|          0|    1|      1.0|           0.0|          1.0|
|       1|     1|35.0|    1|    0|   53.1|          1|    0|      1.0|           0.0|          2.0|
|       0|     3|35.0|    0|    0|   8.05|          0|    1|      0.0|           0.0|          0.0|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+
only showing top 5 rows



# Feature Engineering

In [31]:
#Converting the features into one single vector
feature = VectorAssembler(inputCols=STD_df.columns[1:],outputCol="features")
feature_vector= feature.transform(STD_df)
feature_vector.show(5)

+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+--------------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Family_Size|Alone|Sex_index|Embarked_index|Initial_index|            features|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+-------------+--------------------+
|       0|     3|22.0|    1|    0|   7.25|          1|    0|      0.0|           0.0|          0.0|(10,[0,1,2,4,5],[...|
|       1|     1|38.0|    1|    0|71.2833|          1|    0|      1.0|           1.0|          2.0|[1.0,38.0,1.0,0.0...|
|       1|     3|26.0|    0|    0|  7.925|          0|    1|      1.0|           0.0|          1.0|[3.0,26.0,0.0,0.0...|
|       1|     1|35.0|    1|    0|   53.1|          1|    0|      1.0|           0.0|          2.0|[1.0,35.0,1.0,0.0...|
|       0|     3|35.0|    0|    0|   8.05|          0|    1|      0.0|           0.0|          0.0|(10,[0,1,4,6],[3....|
+--------+------+----+-----+----

# Modelling 

In [32]:
# Now spliting the training and testing set by utilizing 80% of the train data
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

In [33]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[1.0,19.0,3.0,2.0...|
|       1.0|       0|[1.0,27.0,0.0,2.0...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|[1.0,28.0,1.0,0.0...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,3,4,5],[...|
|       0.0|       0|(10,[0,1,6],[1.0,...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|[1.0,51.0,0.0,1.0...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[2....|
|       0.0|       0|(10,[0,1,4,6],[2....|
|       0.0|       0|(10,[0,1,4,6],[2....|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|(10,[0,1,4,6],[2....|
+----------

# Performance Evaluation

In [34]:
#Checking the accuracy of the random forest calssifier
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.00 - rf_accuracy))

Accuracy of RandomForestClassifier is = 0.836257
Test Error of RandomForestClassifier  = 0.163743 


In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="prediction")
print("Test Area Under ROC: " + str(evaluator.evaluate(rf_prediction, 
                                    {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 1.0
