In [1]:
from google.colab import drive
drive.mount("/content/gdrive", force_remount=True)

Mounted at /content/gdrive


In [2]:
cd /content/gdrive/MyDrive/pyspark/Titanic

/content/gdrive/MyDrive/pyspark/Titanic


In [3]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [4]:
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
import pyspark.sql.functions as f
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [5]:
spark = SparkSession.builder.getOrCreate()

In [6]:
df = spark.read.csv("/content/gdrive/MyDrive/pyspark/Titanic/titanic.csv",header=True)

In [7]:
df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [8]:
df.count()

891

In [9]:
# casting
df = df.withColumn("Survived",df.Survived.cast("int"))
df = df.withColumn("Age",df.Age.cast("int"))

df = df.withColumn("Parch",df.Parch.cast("int"))
df = df.withColumn("SibSp",df.SibSp.cast("float"))

df = df.withColumn("Fare",df.Fare.cast("int"))


In [10]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: float (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: integer (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
df.groupby('Survived').agg(
    (f.count('Survived')).alias('count'),
    (f.count('Survived') / df.count()).alias('percentage')*100
).show()

+--------+-----+---------------------------------------------+
|Survived|count|((count(Survived) / 891) AS percentage * 100)|
+--------+-----+---------------------------------------------+
|       1|  342|                            38.38383838383838|
|       0|  549|                            61.61616161616161|
+--------+-----+---------------------------------------------+



In [12]:
df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null|29.679271708683473|0.5230078563411896|0.38159371492704824|260318.54916792738|31.785634

In [13]:
#finding the count of missing values

In [14]:
df.select([f.count(f.when(f.isnull(column), column )).alias(column) for column in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



Age , Cabin and Embarked contains null values

In [15]:
# filtering not null age                                        

In [16]:
df.filter('Age is not Null').show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|  1.0|    0|       A/5 21171|   7| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|  1.0|    0|        PC 17599|  71|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|  0.0|    0|STON/O2. 3101282|   7| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|  1.0|    0|          113803|  53| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|  0.0|    0|          373450|   8| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+----+-----+--------+
o

filtering null values

In [17]:
df.filter('Age is null').show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|          6|       0|     3|    Moran, Mr. James|  male|null|  0.0|    0|330877|   8| null|       Q|
|         18|       1|     2|Williams, Mr. Cha...|  male|null|  0.0|    0|244373|  13| null|       S|
|         20|       1|     3|Masselmani, Mrs. ...|female|null|  0.0|    0|  2649|   7| null|       C|
|         27|       0|     3|Emir, Mr. Farred ...|  male|null|  0.0|    0|  2631|   7| null|       C|
|         29|       1|     3|"O'Dwyer, Miss. E...|female|null|  0.0|    0|330959|   7| null|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
only showing top 5 rows



In [18]:
# filling missing values

In [19]:
mean_Age=df.select(f.mean(f.col('Age'))).take(1)[0][0]

In [20]:
df = df.fillna({'Age':mean_Age})

In [21]:
(687/891)*100

77.10437710437711

we are going to drop cabin as it contains missing values around 77 %

In [22]:
df = df.drop('Cabin')
df=df.drop('PassengerId')

In [23]:
#filling embarked by mode
Embarked_mode=df.groupby("Embarked").count().orderBy("count", ascending=False).first()[0]
Embarked_mode

'S'

In [24]:
df=df.na.fill(value=Embarked_mode,subset=["Embarked"])

In [25]:
df.show(5)

+--------+------+--------------------+------+---+-----+-----+----------------+----+--------+
|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|Fare|Embarked|
+--------+------+--------------------+------+---+-----+-----+----------------+----+--------+
|       0|     3|Braund, Mr. Owen ...|  male| 22|  1.0|    0|       A/5 21171|   7|       S|
|       1|     1|Cumings, Mrs. Joh...|female| 38|  1.0|    0|        PC 17599|  71|       C|
|       1|     3|Heikkinen, Miss. ...|female| 26|  0.0|    0|STON/O2. 3101282|   7|       S|
|       1|     1|Futrelle, Mrs. Ja...|female| 35|  1.0|    0|          113803|  53|       S|
|       0|     3|Allen, Mr. Willia...|  male| 35|  0.0|    0|          373450|   8|       S|
+--------+------+--------------------+------+---+-----+-----+----------------+----+--------+
only showing top 5 rows



In [26]:
df.groupby('Sex').agg(f.mean('Fare')).show()

+------+------------------+
|   Sex|         avg(Fare)|
+------+------------------+
|female| 44.06369426751592|
|  male|25.103986135181977|
+------+------------------+



from here we came to know that avg fare paid by woman is higher

In [27]:
df.groupby('Sex').agg(f.mean('AGE')).show()

+------+------------------+
|   Sex|          avg(AGE)|
+------+------------------+
|female|28.089171974522294|
|  male|30.336221837088388|
+------+------------------+



In [28]:
df.groupby('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [29]:
df.groupby('Survived','Sex').count().show()

+--------+------+-----+
|Survived|   Sex|count|
+--------+------+-----+
|       0|female|   81|
|       1|  male|  109|
|       1|female|  233|
|       0|  male|  468|
+--------+------+-----+



from here we can see feamles survive more than men

In [30]:
df.createOrReplaceTempView("data")

In [31]:
spark.sql('''select Survived ,Pclass , count(*) from
 data group by Survived,Pclass order by Survived''').show()

+--------+------+--------+
|Survived|Pclass|count(1)|
+--------+------+--------+
|       0|     2|      97|
|       0|     1|      80|
|       0|     3|     372|
|       1|     3|     119|
|       1|     1|     136|
|       1|     2|      87|
+--------+------+--------+



from here we can see people from passenger class 1 survive more where as People from Pclass 3 died more

In [32]:
df.show(5)

+--------+------+--------------------+------+---+-----+-----+----------------+----+--------+
|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|Fare|Embarked|
+--------+------+--------------------+------+---+-----+-----+----------------+----+--------+
|       0|     3|Braund, Mr. Owen ...|  male| 22|  1.0|    0|       A/5 21171|   7|       S|
|       1|     1|Cumings, Mrs. Joh...|female| 38|  1.0|    0|        PC 17599|  71|       C|
|       1|     3|Heikkinen, Miss. ...|female| 26|  0.0|    0|STON/O2. 3101282|   7|       S|
|       1|     1|Futrelle, Mrs. Ja...|female| 35|  1.0|    0|          113803|  53|       S|
|       0|     3|Allen, Mr. Willia...|  male| 35|  0.0|    0|          373450|   8|       S|
+--------+------+--------------------+------+---+-----+-----+----------------+----+--------+
only showing top 5 rows



In [33]:
df = df \
      .withColumnRenamed("Pclass","PassengerClass") \
      .withColumnRenamed("SibSp","SiblingsSpouses") \
      .withColumnRenamed("Parch","ParentsChildren") \

In [34]:
df.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- PassengerClass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SiblingsSpouses: float (nullable = true)
 |-- ParentsChildren: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: integer (nullable = true)
 |-- Embarked: string (nullable = false)



In [35]:
df = df \
        .drop("Name") \
        .drop("Ticket") \
        .drop("Cabin")

In [36]:
df.show(5)

+--------+--------------+------+---+---------------+---------------+----+--------+
|Survived|PassengerClass|   Sex|Age|SiblingsSpouses|ParentsChildren|Fare|Embarked|
+--------+--------------+------+---+---------------+---------------+----+--------+
|       0|             3|  male| 22|            1.0|              0|   7|       S|
|       1|             1|female| 38|            1.0|              0|  71|       C|
|       1|             3|female| 26|            0.0|              0|   7|       S|
|       1|             1|female| 35|            1.0|              0|  53|       S|
|       0|             3|  male| 35|            0.0|              0|   8|       S|
+--------+--------------+------+---+---------------+---------------+----+--------+
only showing top 5 rows



In [37]:
df.select([f.count(f.when(f.isnull(column), column )).alias(column) for column in df.columns]).show()

+--------+--------------+---+---+---------------+---------------+----+--------+
|Survived|PassengerClass|Sex|Age|SiblingsSpouses|ParentsChildren|Fare|Embarked|
+--------+--------------+---+---+---------------+---------------+----+--------+
|       0|             0|  0|  0|              0|              0|   0|       0|
+--------+--------------+---+---+---------------+---------------+----+--------+



In [38]:
from pyspark.ml import Pipeline

In [39]:
cols= df.columns

In [40]:
stages=[]
categoricalColumns = ["Sex","Embarked","PassengerClass"]
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
numericCols = [ "Age","SiblingsSpouses","ParentsChildren","Fare"]
assemblerInputs= [c + "classVec" for c in categoricalColumns] + numericCols 
assembler = VectorAssembler(inputCols = assemblerInputs,outputCol="features")
stages += [assembler]
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df_encoded = pipelineModel.transform(df)
selectCols = ['features'] + cols
df_encoded = df_encoded.select(selectCols)

In [41]:
df_encoded.show(2)

+--------------------+--------+--------------+------+---+---------------+---------------+----+--------+
|            features|Survived|PassengerClass|   Sex|Age|SiblingsSpouses|ParentsChildren|Fare|Embarked|
+--------------------+--------+--------------+------+---+---------------+---------------+----+--------+
|[1.0,1.0,0.0,1.0,...|       0|             3|  male| 22|            1.0|              0|   7|       S|
|[0.0,0.0,1.0,0.0,...|       1|             1|female| 38|            1.0|              0|  71|       C|
+--------------------+--------+--------------+------+---+---------------+---------------+----+--------+
only showing top 2 rows



In [42]:
# Train Test Split

In [43]:
train , test = df_encoded.randomSplit([0.80,0.30],seed=99)
print("There are %d training examples and %d test examples"%(train.count(), test.count()))

There are 638 training examples and 253 test examples


# Machine Learning
# Logistic Regression

In [44]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',labelCol='Survived')
model_lr= lr.fit(train)
lr_pred = model_lr.transform(test)


In [45]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
f1_eval= MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="f1")
prec_eval= MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision")
recall_eval= MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedRecall")


In [46]:
# Logistric Regression Evaluaion
lr_acc = acc_evaluator.evaluate(lr_pred)
lr_f1= f1_eval.evaluate(lr_pred)
lr_prec = prec_eval.evaluate(lr_pred)
lr_recall=recall_eval.evaluate(lr_pred)


print('Logistic Regression Accuracy: {0:2.2f}%'.format(lr_acc*100))
print('Logistic Regression F1 score of: {}'.format(lr_f1))
print('Logistic Regression Precision score : {}'.format(lr_prec))
print('Logistic Regression Recall score of: {}'.format(lr_recall))

Logistic Regression Accuracy: 80.24%
Logistic Regression F1 score of: 0.7974544053498442
Logistic Regression Precision score : 0.8035424615595512
Logistic Regression Recall score of: 0.8023715415019763


In [47]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'Survived')
dtModel = dt.fit(train)
dtPreds = dtModel.transform(test)
dtc_predictions = dtModel.transform(test)
# Logistric Regression Evaluaion
dt_acc = acc_evaluator.evaluate(dtc_predictions)
dt_f1= f1_eval.evaluate(dtc_predictions)
dt_prec = prec_eval.evaluate(dtc_predictions)
dt_recall=recall_eval.evaluate(dtc_predictions)


print('Decision Tree Accuracy: {0:2.2f}%'.format(dt_acc*100))
print('Decision Tree F1 score of: {}'.format(dt_f1))
print('Decision Tree Precision score : {}'.format(dt_prec))
print('Decision Tree Recall score of: {}'.format(dt_recall))

Decision Tree Accuracy: 80.24%
Decision Tree F1 score of: 0.7967863305448348
Decision Tree Precision score : 0.8046521594347681
Decision Tree Recall score of: 0.8023715415019763


# Random Forest

In [48]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survived')
rfModel = rf.fit(train)
rfPreds = rfModel.transform(test)
rf_acc = acc_evaluator.evaluate(rfPreds)
rf_f1= f1_eval.evaluate(rfPreds)
rf_prec = prec_eval.evaluate(rfPreds)
rf_recall=recall_eval.evaluate(rfPreds)


print('Random Forest Accuracy: {0:2.2f}%'.format(rf_acc*100))
print('Random Forest F1 score of: {}'.format(rf_f1))
print('Random Forest Precision score : {}'.format(rf_prec))
print('Random Forest Recall score of: {}'.format(rf_recall))

Random Forest Accuracy: 77.47%
Random Forest F1 score of: 0.7653511100623366
Random Forest Precision score : 0.779183156096812
Random Forest Recall score of: 0.7747035573122529


# Gradient Boosting

In [49]:
from pyspark.ml.classification import GBTClassifier
gb = GBTClassifier(featuresCol = 'features', labelCol = 'Survived')
gbModel = gb.fit(train)
gbPreds = gbModel.transform(test)
gb_acc = acc_evaluator.evaluate(gbPreds)
gb_f1= f1_eval.evaluate(gbPreds)
gb_prec = prec_eval.evaluate(gbPreds)
gb_recall=recall_eval.evaluate(gbPreds)


print('Gradient Boosting Accuracy: {0:2.2f}%'.format(gb_acc*100))
print('Gradient Boosting F1 score of: {}'.format(gb_f1))
print('Gradient Boosting Precision score : {}'.format(gb_prec))
print('Gradient Recall score of: {}'.format(gb_recall))

Gradient Boosting Accuracy: 83.00%
Gradient Boosting F1 score of: 0.8249377636479973
Gradient Boosting Precision score : 0.8352786047944151
Gradient Recall score of: 0.8300395256916996


# Linear SVC

In [50]:
from pyspark.ml.classification import LinearSVC
svc = LinearSVC(featuresCol = 'features', labelCol = 'Survived')
svcModel = svc.fit(train)
svcPreds = svcModel.transform(test)
svc_acc = acc_evaluator.evaluate(svcPreds)
svc_f1= f1_eval.evaluate(svcPreds)
svc_prec = prec_eval.evaluate(svcPreds)
svc_recall=recall_eval.evaluate(svcPreds)


print('Linear SVC Accuracy: {0:2.2f}%'.format(svc_acc*100))
print('Linear SVC  F1 score of: {}'.format(svc_f1))
print('Linear SVC  Precision score : {}'.format(svc_prec))
print('Linear SVC Recall score of: {}'.format(svc_recall))

Linear SVC Accuracy: 79.05%
Linear SVC  F1 score of: 0.7875007561848635
Linear SVC  Precision score : 0.7889163304437594
Linear SVC Recall score of: 0.7905138339920948


we have applied 5 baseline machine algorithms and evaluated them and come to a conclusion that Gradient Boosting Classification  works best with accuracy score: 83.00% , F1 score of: 0.8249 , Precision score : 0.8352 ,Recall score of: 0.8300