<a href="https://colab.research.google.com/github/catylyst/notebooks/blob/master/titanic_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# !pip install  --upgrade py4j

In [3]:
!java -version

openjdk version "11.0.21" 2023-10-17
OpenJDK Runtime Environment (build 11.0.21+9-post-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.21+9-post-Ubuntu-0ubuntu122.04, mixed mode, sharing)


In [4]:
# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Unzip the file
!tar xf spark-3.5.0-bin-hadoop3.tgz

In [5]:
# # Download Spark
# !wget -q https://www.apache.org/dyn/closer.lua/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz

# # Unzip the file
# !spark-3.2.4-bin-hadoop2.7.tgz

In [6]:
import os
import sys

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.5.0-bin-hadoop3'

In [7]:
# Install library for finding Spark
!pip install -q findspark

# Import the library
import findspark

# Initiate findspark
findspark.init()

# Check the location for Spark
findspark.find()

'/content/spark-3.5.0-bin-hadoop3'

In [8]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check  Spark Session Information
spark



In [29]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np
from pyspark.sql.functions import col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [10]:
# Create Spark Session

spark = SparkSession \
    .builder \
    .appName("Spark ML Titanic Classifier") \
    .getOrCreate()

In [11]:
# Load training data
train_data = spark.read.csv("titanic_train.csv", header=True, inferSchema=True)

# Load testing data
test_data = spark.read.csv("titanic_test.csv", header=True, inferSchema=True)

# OR

# test_data = "<insert vm path to dataset>"
# train_data = "<insert vm path to dataset>"

In [12]:
train_data.describe()

DataFrame[summary: string, PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

In [13]:
train_data.head(10)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
 Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
 Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S'),
 Row(PassengerId=6, Survived=0, Pclass=3, Name='Moran, Mr. James', Sex='male',

In [14]:
train_data.limit(10).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [15]:
test_data.limit(10).show()

+-----------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0|   330911| 7.8292| NULL|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0|   363272|    7.0| NULL|       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0|   240276| 9.6875| NULL|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0|   315154| 8.6625| NULL|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|  3101298|12.2875| NULL|       S|
|        897|     3|Svensson, Mr. Joh...|  male|14.0|    0|    0|     7538|  9.225| NULL|       S|
|        898|     3|Connolly, Miss. Kate|female|30.0|    0|    0|   330972| 7.6292| NULL|       Q|
|        8

In [16]:
# defined method to print feature names with null count/values
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [17]:
# call of method
null_columns_count_list = null_value_count(train_data)

In [18]:
for feature_name, null_count in null_columns_count_list:
    print("Feature:", feature_name)
    print("Null Count:", null_count)
    print()

Feature: Age
Null Count: 177

Feature: Cabin
Null Count: 687

Feature: Embarked
Null Count: 2



In [19]:
# Handle missing values
train_data = train_data.fillna(0, subset=["Age"])
test_data = test_data.fillna(0, subset=["Age"])
train_data = train_data.fillna("S", subset=["Embarked"])
test_data = test_data.fillna("S", subset=["Embarked"])

In [20]:
# Drop unwanted features
train_data = train_data.drop("PassengerId", "Name", "Ticket", "Cabin")
test_data = train_data.drop("PassengerId", "Name", "Ticket", "Cabin")

In [21]:
train_data.columns

['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']

In [22]:
# call of method
null_columns_count_list = null_value_count(train_data)


In [23]:
for feature_name, null_count in null_columns_count_list:
    print("Feature:", feature_name)
    print("Null Count:", null_count)
    print()

In [24]:
# # Handle categorical variables
# indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(train_data) for col in ["Sex", "Embarked"]]
# pipeline = Pipeline(stages=indexers)
# train_data = pipeline.fit(train_data).transform(train_data)
# test_data = pipeline.fit(test_data).transform(test_data)

# # Create feature vector
# feature_cols = ["Pclass", "Sex_index", "Age", "SibSp", "Parch", "Fare", "Embarked_index"]
# assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# train_data = assembler.transform(train_data)
# test_data = assembler.transform(test_data)

# # Select relevant columns
# train_data = train_data.select("Survived", "features")
# test_data = test_data.select("features")

In [25]:
# Handle categorical variables
indexers_train = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(train_data) for column in ["Sex", "Embarked"]]
pipeline_train = Pipeline(stages=indexers_train)
indexers_test = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(test_data) for column in ["Sex", "Embarked"]]
pipeline_test = Pipeline(stages=indexers_test)
train_data = pipeline_train.fit(train_data).transform(train_data)
test_data = pipeline_test.fit(test_data).transform(test_data)

# Create feature vector
feature_cols = ["Pclass", "Sex_index", "Age", "SibSp", "Parch", "Fare", "Embarked_index"]
#assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembler = VectorAssembler(inputCols=feature_cols,outputCol="features")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Select relevant columns
# train_data = train_data.select("Survived", "features")
# test_data = test_data.select("features")

In [26]:
train_data.show()

+--------+------+------+----+-----+-----+-------+--------+---------+--------------+--------------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Sex_index|Embarked_index|            features|
+--------+------+------+----+-----+-----+-------+--------+---------+--------------+--------------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|      0.0|           0.0|[3.0,0.0,22.0,1.0...|
|       1|     1|female|38.0|    1|    0|71.2833|       C|      1.0|           1.0|[1.0,1.0,38.0,1.0...|
|       1|     3|female|26.0|    0|    0|  7.925|       S|      1.0|           0.0|[3.0,1.0,26.0,0.0...|
|       1|     1|female|35.0|    1|    0|   53.1|       S|      1.0|           0.0|[1.0,1.0,35.0,1.0...|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|      0.0|           0.0|(7,[0,2,5],[3.0,3...|
|       0|     3|  male| 0.0|    0|    0| 8.4583|       Q|      0.0|           2.0|(7,[0,5,6],[3.0,8...|
|       0|     1|  male|54.0|    0|    0|51.8625|      

In [27]:
test_data.show()

+--------+------+------+----+-----+-----+-------+--------+---------+--------------+--------------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Sex_index|Embarked_index|            features|
+--------+------+------+----+-----+-----+-------+--------+---------+--------------+--------------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|      0.0|           0.0|[3.0,0.0,22.0,1.0...|
|       1|     1|female|38.0|    1|    0|71.2833|       C|      1.0|           1.0|[1.0,1.0,38.0,1.0...|
|       1|     3|female|26.0|    0|    0|  7.925|       S|      1.0|           0.0|[3.0,1.0,26.0,0.0...|
|       1|     1|female|35.0|    1|    0|   53.1|       S|      1.0|           0.0|[1.0,1.0,35.0,1.0...|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|      0.0|           0.0|(7,[0,2,5],[3.0,3...|
|       0|     3|  male| 0.0|    0|    0| 8.4583|       Q|      0.0|           2.0|(7,[0,5,6],[3.0,8...|
|       0|     1|  male|54.0|    0|    0|51.8625|      

In [30]:
from pyspark.ml.classification import LogisticRegression

# Logistic Regression model
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
model_lr = lr.fit(train_data)
prediction_lr = model_lr.transform(test_data)
prediction_lr.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[3.0,0.0,22.0,1.0...|
|       1.0|       1|[1.0,1.0,38.0,1.0...|
|       1.0|       1|[3.0,1.0,26.0,0.0...|
|       1.0|       1|[1.0,1.0,35.0,1.0...|
|       0.0|       0|(7,[0,2,5],[3.0,3...|
|       0.0|       0|(7,[0,5,6],[3.0,8...|
|       0.0|       0|(7,[0,2,5],[1.0,5...|
|       0.0|       0|[3.0,0.0,2.0,3.0,...|
|       1.0|       1|[3.0,1.0,27.0,0.0...|
|       1.0|       1|[2.0,1.0,14.0,1.0...|
|       1.0|       1|[3.0,1.0,4.0,1.0,...|
|       1.0|       1|[1.0,1.0,58.0,0.0...|
|       0.0|       0|(7,[0,2,5],[3.0,2...|
|       0.0|       0|[3.0,0.0,39.0,1.0...|
|       1.0|       0|[3.0,1.0,14.0,0.0...|
|       1.0|       1|[2.0,1.0,55.0,0.0...|
|       0.0|       0|[3.0,0.0,2.0,4.0,...|
|       0.0|       1|(7,[0,5],[2.0,13.0])|
|       1.0|       0|[3.0,1.0,31.0,1.0...|
|       1.0|       1|[3.0,1.0,0.0,0.0,...|
+----------

In [37]:

accuracy_lr = evaluator.evaluate(prediction_lr)
print(f"Accuracy of Logistic Regression is {accuracy_lr:.3f} %")
print(f"Test Error of Logistic Regression is {(1.0 - accuracy_lr):.3f} % ")

Accuracy of LogisticRegression is 0.793 %
Test Error of LogisticRegression is 0.207 % 


In [32]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol="features", labelCol="Survived", maxDepth=5)
model_dt = dt.fit(train_data)
prediction_dt = model_dt.transform(test_data)
prediction_dt.select("prediction", "Survived", "features").show()


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[3.0,0.0,22.0,1.0...|
|       1.0|       1|[1.0,1.0,38.0,1.0...|
|       0.0|       1|[3.0,1.0,26.0,0.0...|
|       1.0|       1|[1.0,1.0,35.0,1.0...|
|       0.0|       0|(7,[0,2,5],[3.0,3...|
|       0.0|       0|(7,[0,5,6],[3.0,8...|
|       0.0|       0|(7,[0,2,5],[1.0,5...|
|       0.0|       0|[3.0,0.0,2.0,3.0,...|
|       0.0|       1|[3.0,1.0,27.0,0.0...|
|       1.0|       1|[2.0,1.0,14.0,1.0...|
|       1.0|       1|[3.0,1.0,4.0,1.0,...|
|       1.0|       1|[1.0,1.0,58.0,0.0...|
|       0.0|       0|(7,[0,2,5],[3.0,2...|
|       0.0|       0|[3.0,0.0,39.0,1.0...|
|       0.0|       0|[3.0,1.0,14.0,0.0...|
|       1.0|       1|[2.0,1.0,55.0,0.0...|
|       0.0|       0|[3.0,0.0,2.0,4.0,...|
|       0.0|       1|(7,[0,5],[2.0,13.0])|
|       0.0|       0|[3.0,1.0,31.0,1.0...|
|       1.0|       1|[3.0,1.0,0.0,0.0,...|
+----------

In [38]:
accuracy_dt = evaluator.evaluate(prediction_dt)
print(f"Accuracy of DecisionTree Classifier is {accuracy_dt:.3f} %")
print(f"Test Error of DecisionTree Classifier is {(1.0 - accuracy_dt):.3f} % ")


Accuracy of DecisionTree Classifier is 0.846 %
Test Error of DecisionTree Classifier is 0.154 % 


In [None]:
# # Model Evaluation for Logistic Regression
# predictions_lr = model_lr.transform(test_data)
# evaluator_lr = BinaryClassificationEvaluator(labelCol="Survived", metricName="areaUnderROC")
# auc_lr = evaluator_lr.evaluate(predictions_lr)
# print("Logistic Regression Area under ROC = ", auc_lr)



In [None]:
# # Model Evaluation for Decision Tree
# predictions_dt = model_dt.transform(test_data)
# evaluator_dt = BinaryClassificationEvaluator(labelCol="Survived", metricName="areaUnderROC")
# auc_dt = evaluator_dt.evaluate(predictions_dt)
# print("Decision Tree Area under ROC = ", auc_dt)