# ML classification model - Titanic Dataset using pyspark

### Data Description
- survival: Survival	0 = No, 1 = Yes
- pclass: Ticket class	1 = 1st, 2 = 2nd, 3 = 3rd
- Age: Age in years	
- sibsp: # of siblings / spouses aboard the Titanic	
- parch: # of parents / children aboard the Titanic	
- ticket: Ticket number	
- fare: Passenger fare	
- embarked: Port of Embarkation	C = Cherbourg, Q = Queenstown, S = Southampton

In [1]:
import findspark

findspark.init()
findspark.find()

'C:\\spark\\spark-3.2.1-bin-hadoop3.2'

In [2]:
# Import PySpark
from pyspark.sql import SparkSession

In [3]:
filename = "train.csv"

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [5]:
df = spark.read.csv(filename, header=True, inferSchema=True)
df.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
only showing top 2 rows



In [6]:
df.count()

891

In [7]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [8]:
# Subsetting the columns from the DataFrame
df.select(['Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp',]).show(5)

+--------+------+--------------------+------+----+-----+
|Survived|Pclass|                Name|   Sex| Age|SibSp|
+--------+------+--------------------+------+----+-----+
|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|
|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|
|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|
|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|
|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|
+--------+------+--------------------+------+----+-----+
only showing top 5 rows



In [9]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 12 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   PassengerId  891 non-null    int32  
 1   Survived     891 non-null    int32  
 2   Pclass       891 non-null    int32  
 3   Name         891 non-null    object 
 4   Sex          891 non-null    object 
 5   Age          714 non-null    float64
 6   SibSp        891 non-null    int32  
 7   Parch        891 non-null    int32  
 8   Ticket       891 non-null    object 
 9   Fare         891 non-null    float64
 10  Cabin        204 non-null    object 
 11  Embarked     889 non-null    object 
dtypes: float64(2), int32(5), object(5)
memory usage: 66.3+ KB


In [10]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
from pyspark.sql.functions import col,isnan, when, count
    
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [12]:
from pyspark.sql.functions import mean
from pyspark.sql import functions as F

In [13]:
df.select('Age').summary().show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|               714|
|   mean| 29.69911764705882|
| stddev|14.526497332334035|
|    min|              0.42|
|    25%|              20.0|
|    50%|              28.0|
|    75%|              38.0|
|    max|              80.0|
+-------+------------------+



In [14]:
mean_age = df.select(mean('Age')).collect()
# mean_age[0][0]
df = df.na.fill(mean_age[0][0],subset=['Age'])

In [15]:
df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

The Name and Ticket columns are not needed for out analysis, hence we will remove them from the dataset.

In [16]:
df = df.drop('Name', 'Ticket', 'Cabin')

In [17]:
df.show(2)

+-----------+--------+------+------+----+-----+-----+-------+--------+
|PassengerId|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+-----------+--------+------+------+----+-----+-----+-------+--------+
|          1|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|          2|       1|     1|female|38.0|    1|    0|71.2833|       C|
+-----------+--------+------+------+----+-----+-----+-------+--------+
only showing top 2 rows



In [18]:
print((df.count(), len(df.columns)))

(891, 9)


In [19]:
from pyspark.sql.functions import corr
# df.stat.corr("Quantity", "UnitPrice")
df.select(corr('SibSp', 'Parch')).show()

+-------------------+
| corr(SibSp, Parch)|
+-------------------+
|0.41483769862015635|
+-------------------+



In [20]:
df.select('Embarked').distinct().show()

+--------+
|Embarked|
+--------+
|       Q|
|    null|
|       C|
|       S|
+--------+



In [21]:
from pyspark.sql.functions import asc, desc
em_mode = df.groupBy('Embarked').count().sort(desc('count')).collect()[0][1]

In [22]:
df = df.na.fill(em_mode,subset=['Embarked'])

In [23]:
df.select('Sex').distinct().show()

+------+
|   Sex|
+------+
|female|
|  male|
+------+



In [24]:
from pyspark.sql.functions import col, when

df = df.withColumn("Sex", when(col("Sex")=='male', 1).otherwise(0))

In [25]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+---+---+-----+-----+----+--------+
|PassengerId|Survived|Pclass|Sex|Age|SibSp|Parch|Fare|Embarked|
+-----------+--------+------+---+---+-----+-----+----+--------+
|          0|       0|     0|  0|  0|    0|    0|   0|       2|
+-----------+--------+------+---+---+-----+-----+----+--------+



In [26]:
df=df.dropna(how='any')

In [27]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+---+---+-----+-----+----+--------+
|PassengerId|Survived|Pclass|Sex|Age|SibSp|Parch|Fare|Embarked|
+-----------+--------+------+---+---+-----+-----+----+--------+
|          0|       0|     0|  0|  0|    0|    0|   0|       0|
+-----------+--------+------+---+---+-----+-----+----+--------+



In [37]:
df.show(5)

+-----------+--------+------+---+----+-----+-----+-------+--------+
|PassengerId|Survived|Pclass|Sex| Age|SibSp|Parch|   Fare|Embarked|
+-----------+--------+------+---+----+-----+-----+-------+--------+
|          1|       0|     3|  1|22.0|    1|    0|   7.25|       S|
|          2|       1|     1|  0|38.0|    1|    0|71.2833|       C|
|          3|       1|     3|  0|26.0|    0|    0|  7.925|       S|
|          4|       1|     1|  0|35.0|    1|    0|   53.1|       S|
|          5|       0|     3|  1|35.0|    0|    0|   8.05|       S|
+-----------+--------+------+---+----+-----+-----+-------+--------+
only showing top 5 rows



In [49]:
stages = []
target = 'Survived'

In [50]:
#One hot encoding
from pyspark.ml.feature import OneHotEncoder, StringIndexer
catCols = ["Sex", "Embarked"]
for c in catCols:
    indexer = StringIndexer(inputCol=c, outputCol=c+"_index").setHandleInvalid("skip")
    encoder = OneHotEncoder(inputCol=c+"_index", outputCol=c+"_vec", dropLast=False)
    stages += [indexer, encoder]

In [51]:
categoricalColumns_vec = [c + "_vec" for c in catCols]

In [52]:
numCols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare']

In [47]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [53]:
assemblerInputs = categoricalColumns_vec + numCols
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol="features")
stages += [assembler]

In [54]:
prepPipeline = Pipeline().setStages(stages)
pipelineModel = prepPipeline.fit(df)
data = pipelineModel.transform(df)

## Save Transformation Pipeline
pipelineModel.save("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
    
display(dbutils.fs.ls("/mnt/<YOURMOUNTEDSTORAGE>/pipeline"))

## Read in Transformation Pipeline
    
from pyspark.ml import PipelineModel
    
pipelineModel = PipelineModel.load("/mnt/<YOURMOUNTEDSTORAGE>/pipeline")
    
dataset = pipelineModel.transform(dataset)
    
display(dataset)

In [65]:
selectedcols = ["Survived", "features"]# + cols
data = data.select(selectedcols)
##dataset.printSchema()
data.show(5)

+--------+--------------------+
|Survived|            features|
+--------+--------------------+
|       0|[1.0,0.0,1.0,0.0,...|
|       1|[0.0,1.0,0.0,1.0,...|
|       1|(10,[1,2,5,6,9],[...|
|       1|[0.0,1.0,1.0,0.0,...|
|       0|(10,[0,2,5,6,9],[...|
+--------+--------------------+
only showing top 5 rows



In [64]:
data.select('features').show(5)

+--------------------+
|            features|
+--------------------+
|[1.0,0.0,1.0,0.0,...|
|[0.0,1.0,0.0,1.0,...|
|(10,[1,2,5,6,9],[...|
|[0.0,1.0,1.0,0.0,...|
|(10,[0,2,5,6,9],[...|
+--------------------+
only showing top 5 rows



## Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
train, test = data.randomSplit([0.75,0.25])

In [79]:
test.show(5)

+--------+--------------------+
|Survived|            features|
+--------+--------------------+
|       0|(10,[0,2,5,6],[1....|
|       0|(10,[0,2,5,6],[1....|
|       0|(10,[0,2,5,6],[1....|
|       0|(10,[0,2,5,6],[1....|
|       0|(10,[0,2,5,6,9],[...|
+--------+--------------------+
only showing top 5 rows



In [66]:
lr = LogisticRegression(featuresCol='features', labelCol='Survived')
lrModel = lr.fit(train)

In [67]:
lrModel.coefficients

DenseVector([-1.3555, 1.3555, -0.272, 0.353, 0.0062, -0.942, -0.0374, -0.4003, -0.1412, 0.0035])

In [68]:
lrModel.intercept

3.3536835811676875

In [69]:
pred = lrModel.evaluate(test)

In [70]:
pred.predictions.show(5)

+--------+--------------------+--------------------+--------------------+----------+
|Survived|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+--------------------+----------+
|       0|(10,[0,2,5,6],[1....|[0.67273131754455...|[0.66211447685265...|       0.0|
|       0|(10,[0,2,5,6],[1....|[1.26733272663847...|[0.78028581280714...|       0.0|
|       0|(10,[0,2,5,6],[1....|[1.26733272663847...|[0.78028581280714...|       0.0|
|       0|(10,[0,2,5,6],[1....|[2.44473591992502...|[0.92017564558902...|       0.0|
|       0|(10,[0,2,5,6,9],[...|[0.23518260931702...|[0.55852613975517...|       0.0|
+--------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [74]:
pred.fMeasureByLabel()

[0.853582554517134, 0.6928104575163399]

In [75]:
pred.weightedFMeasure()

0.7986351289599006

## Decision Trees

In [76]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol='features',
                           labelCol='Survived',
                           maxDepth=5,
                           maxBins=4,
                           impurity='gini')
dtModel = dt.fit(train)

In [81]:
pred = dtModel.transform(test)

In [82]:
pred.show(5)

+--------+--------------------+-------------+--------------------+----------+
|Survived|            features|rawPrediction|         probability|prediction|
+--------+--------------------+-------------+--------------------+----------+
|       0|(10,[0,2,5,6],[1....|  [46.0,21.0]|[0.68656716417910...|       0.0|
|       0|(10,[0,2,5,6],[1....| [249.0,37.0]|[0.87062937062937...|       0.0|
|       0|(10,[0,2,5,6],[1....| [249.0,37.0]|[0.87062937062937...|       0.0|
|       0|(10,[0,2,5,6],[1....| [249.0,37.0]|[0.87062937062937...|       0.0|
|       0|(10,[0,2,5,6,9],[...|  [46.0,21.0]|[0.68656716417910...|       0.0|
+--------+--------------------+-------------+--------------------+----------+
only showing top 5 rows



In [84]:
dtModel.featureImportances

SparseVector(10, {0: 0.6559, 2: 0.0275, 3: 0.0134, 5: 0.1863, 6: 0.0284, 7: 0.0454, 8: 0.0212, 9: 0.022})