In [1]:
import findspark

In [2]:
findspark.init("/home/hp/spark-3.0.0-bin-hadoop2.7") 

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC



In [5]:
spark = SparkSession.builder.master('local').appName("titanic").getOrCreate()

# Loading the data

In [6]:
titanic_df = spark.read.csv('/home/hp/sparkmllib/titanic.csv', header=True,inferSchema=True )

In [7]:
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [8]:
print(titanic_df.count())

891


In [9]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



# Checking for missing values

In [10]:
for c in titanic_df.columns:
    print(c,titanic_df.filter(col(c).isNull()).count())

PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 177
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 2


In [11]:
mode_embarked=titanic_df.groupBy('Embarked').count().orderBy('count',ascending=False).first()[0]
print(mode_embarked)

S


In [12]:
age_mean=titanic_df.select(mean('Age')).first()[0]
print(int(age_mean))

29


# Imputation of Missing Values

In [13]:
titanic_df=titanic_df.fillna({'Age':int(age_mean),'Embarked':mode_embarked})


In [14]:
for c in titanic_df.columns:
    print(c,titanic_df.filter(col(c).isNull()).count())

PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 0
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 0


In [15]:
titanic_df=titanic_df.drop('Cabin')
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|
|          6|       0|     3|    Moran, Mr. James|  male|29.0|    0|    0|          330877| 8.4583|       Q|
|          7|      

# Creating a new column named Title from existing columns

In [16]:
def designation(name):
    design = name.split(',')
    design=design[1].split(".")
    design=design[0]
    return design
title_extract_udf = udf(designation)
titanic_df = titanic_df.withColumn('Title', title_extract_udf('Name'))
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|  Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|   Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|    Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|     Mr|
|          6|       0|     3|    Moran, Mr. James|  male|29.0|  

In [17]:
titanic_df.groupBy('Title').count().show()

+-------------+-----+
|        Title|count|
+-------------+-----+
|       Master|   40|
|          Rev|    6|
|         Capt|    1|
|          Mrs|  125|
|         Miss|  182|
|         Lady|    1|
|     Jonkheer|    1|
|           Mr|  517|
|          Sir|    1|
|          Col|    2|
| the Countess|    1|
|         Mlle|    2|
|           Dr|    7|
|        Major|    2|
|          Don|    1|
|           Ms|    1|
|          Mme|    1|
+-------------+-----+



#  Converting Sex, Embarked & Title columns from string to number using StringIndexer

In [18]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex","Embarked","Title"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)


In [19]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- Title_index: double (nullable = false)



# Drop columns which are not required


In [20]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Title")

In [21]:
titanic_df.show()

+--------+------+----+-----+-----+-------+---------+--------------+-----------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Sex_index|Embarked_index|Title_index|
+--------+------+----+-----+-----+-------+---------+--------------+-----------+
|       0|     3|22.0|    1|    0|   7.25|      0.0|           0.0|        0.0|
|       1|     1|38.0|    1|    0|71.2833|      1.0|           1.0|        2.0|
|       1|     3|26.0|    0|    0|  7.925|      1.0|           0.0|        1.0|
|       1|     1|35.0|    1|    0|   53.1|      1.0|           0.0|        2.0|
|       0|     3|35.0|    0|    0|   8.05|      0.0|           0.0|        0.0|
|       0|     3|29.0|    0|    0| 8.4583|      0.0|           2.0|        0.0|
|       0|     1|54.0|    0|    0|51.8625|      0.0|           0.0|        0.0|
|       0|     3| 2.0|    3|    1| 21.075|      0.0|           0.0|        3.0|
|       1|     3|27.0|    0|    2|11.1333|      1.0|           0.0|        2.0|
|       1|     2|14.0|    1|    0|30.070

# Convert features into vector using Vector Assembler



In [22]:
feature = VectorAssembler(inputCols=titanic_df.columns[1:],outputCol="features")
feature_vector= feature.transform(titanic_df)


In [23]:
feature_vector.show()

+--------+------+----+-----+-----+-------+---------+--------------+-----------+--------------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Sex_index|Embarked_index|Title_index|            features|
+--------+------+----+-----+-----+-------+---------+--------------+-----------+--------------------+
|       0|     3|22.0|    1|    0|   7.25|      0.0|           0.0|        0.0|(8,[0,1,2,4],[3.0...|
|       1|     1|38.0|    1|    0|71.2833|      1.0|           1.0|        2.0|[1.0,38.0,1.0,0.0...|
|       1|     3|26.0|    0|    0|  7.925|      1.0|           0.0|        1.0|[3.0,26.0,0.0,0.0...|
|       1|     1|35.0|    1|    0|   53.1|      1.0|           0.0|        2.0|[1.0,35.0,1.0,0.0...|
|       0|     3|35.0|    0|    0|   8.05|      0.0|           0.0|        0.0|(8,[0,1,4],[3.0,3...|
|       0|     3|29.0|    0|    0| 8.4583|      0.0|           2.0|        0.0|(8,[0,1,4,6],[3.0...|
|       0|     1|54.0|    0|    0|51.8625|      0.0|           0.0|        0.0|(8,[0,1,4],[

# Splitting into Test and Train Data sets

In [24]:
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)


# MODELLING

# Logistic Regression

In [25]:
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")




+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|(8,[0,1,4,6],[1.0...|
|       1.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       1.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       1.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,3,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,4...|
|       0.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [26]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))


Accuracy of LogisticRegression is = 0.829787
Test Error of LogisticRegression = 0.170213 


# Decision Tree Classifier 

In [27]:
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "Survived", "features").show()


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       1.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,3,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,4...|
|       0.0|       0|[1.0,58.0,0.0,2.0...|
|       1.0|       0|(8,[0,1,4],[1.0,6...|
|       1.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [28]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))

Accuracy of DecisionTreeClassifier is = 0.845745
Test Error of DecisionTreeClassifier = 0.154255 


# Random Forest Classifier

In [29]:
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "Survived", "features").show()


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,3,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,4...|
|       0.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [30]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.0 - rf_accuracy))

Accuracy of RandomForestClassifier is = 0.861702
Test Error of RandomForestClassifier  = 0.138298 


# Naive Bayes

In [31]:
nb = NaiveBayes(labelCol="Survived", featuresCol="features")
nb_model = nb.fit(trainingData)
nb_prediction = nb_model.transform(testData)
nb_prediction.select("prediction", "Survived", "features").show()


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|(8,[0,1,4,6],[1.0...|
|       1.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       1.0|       0|(8,[0,1,4],[1.0,2...|
|       1.0|       0|(8,[0,1,2,4],[1.0...|
|       1.0|       0|(8,[0,1,2,4],[1.0...|
|       1.0|       0|(8,[0,1,2,4],[1.0...|
|       1.0|       0|(8,[0,1,3,4],[1.0...|
|       1.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,4...|
|       1.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       1.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [32]:
nb_accuracy = evaluator.evaluate(nb_prediction)
print("Accuracy of NaiveBayes is  = %g"% (nb_accuracy))
print("Test Error of NaiveBayes  = %g " % (1.0 - nb_accuracy))


Accuracy of NaiveBayes is  = 0.734043
Test Error of NaiveBayes  = 0.265957 


# Support Vector Classifier

In [33]:
svm = LinearSVC(labelCol="Survived", featuresCol="features")
svm_model = svm.fit(trainingData)
svm_prediction = svm_model.transform(testData)
svm_prediction.select("prediction", "Survived", "features").show()



+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1],[1.0,29.0])|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,4],[1.0,2...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,3,4],[1.0...|
|       0.0|       0|(8,[0,1,2,4],[1.0...|
|       0.0|       0|(8,[0,1,4],[1.0,4...|
|       0.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4],[1.0,6...|
|       0.0|       0|(8,[0,1,4,6],[1.0...|
|       0.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [34]:
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Accuracy of Support Vector Machine is = %g"% (svm_accuracy))
print("Test Error of Support Vector Machine = %g " % (1.0 - svm_accuracy))

Accuracy of Support Vector Machine is = 0.824468
Test Error of Support Vector Machine = 0.175532 


# Dictionary to compare accuracies shown by the models

In [35]:
accuracy_dict={'Accuracy of LogisticRegression is':lr_accuracy,'Accuracy of DecisionTreeClassifier is':dt_accuracy,'Accuracy of RandomForestClassifier is':rf_accuracy,'Accuracy of NaiveBayes is':nb_accuracy,'Accuracy of Support Vector Machine is':svm_accuracy}
accuracy_dict

{'Accuracy of LogisticRegression is': 0.8297872340425532,
 'Accuracy of DecisionTreeClassifier is': 0.8457446808510638,
 'Accuracy of RandomForestClassifier is': 0.8617021276595744,
 'Accuracy of NaiveBayes is': 0.7340425531914894,
 'Accuracy of Support Vector Machine is': 0.824468085106383}

# Decision Tree Classifier has the highest accuracy. So that is selected as the final model for future predictions