# Titanic:Machine Learning From Disaster 

## Analysing Titanic Dataset with PySpark

### Importing the Libriaries

In [1]:
import pyspark #pyspark package
from pyspark.sql import SparkSession #sparksession object to build the spark session
from pyspark.sql.types import * #spark-sql datatypes
from pyspark.sql.functions import * #spark-sql functions
import os # os module to work with the files
import matplotlib.pyplot as plt # matplotlib.pyplot for plotting 
import seaborn as sns # seaborn for aditional plotting options
from statsmodels.graphics.mosaicplot import mosaic # for mosaic Plot
from pyspark.ml.feature import (StringIndexer, OneHotEncoder, 
                                VectorAssembler, StandardScaler) #To Process the data for ML Model
from pyspark.ml.classification import RandomForestClassifier #Random Forest Clasiifier Model for Prediction
from pyspark.ml.evaluation import BinaryClassificationEvaluator #For Evaluating the Classification Model
from pyspark.ml import Pipeline #To define the model pipeline

#to plot the images in the jupyter notebook
%matplotlib inline 

In [2]:
#create spark session
spark = (SparkSession.builder
         #.config("spark.sql.execution.arrow.enabled", "true")
         .appName("Titanic Dataset Analysis").master('local')
         .getOrCreate())
spark

In [3]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

### Read Data from csv files

In [4]:
DATASET_PATH = '../DATASETS/titanic/'
os.listdir(DATASET_PATH)

['gender_submission.csv',
 'pyspark_rf_submission',
 'pyspark_rf_submission.csv',
 'PyTorchTitanicModel.pt',
 'PyTorchTitanicSubmission.csv',
 'test.csv',
 'test_preprocessed.csv',
 'train.csv',
 'train_preprocessed.csv']

In [5]:
#Read train DataFrame from CSV File
train = spark.read.csv(DATASET_PATH + 'train.csv', header = True, inferSchema = True)
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
#Read Test DataFrame from CSV File
test = spark.read.csv(DATASET_PATH + 'test.csv', header = True, inferSchema = True)
test.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



Using few columns at a time for readability

In [7]:
train.describe(train.columns[:6]).show(truncate = False)

+-------+-----------------+-------------------+------------------+------------------------------------------------+------+------------------+
|summary|PassengerId      |Survived           |Pclass            |Name                                            |Sex   |Age               |
+-------+-----------------+-------------------+------------------+------------------------------------------------+------+------------------+
|count  |891              |891                |891               |891                                             |891   |714               |
|mean   |446.0            |0.3838383838383838 |2.308641975308642 |null                                            |null  |29.69911764705882 |
|stddev |257.3538420152301|0.48659245426485753|0.8360712409770491|null                                            |null  |14.526497332334035|
|min    |1                |0                  |1                 |"Andersson, Mr. August Edvard (""Wennerstrom"")"|female|0.42              |
|max  

In [8]:
train.describe(train.columns[6:]).show(truncate = False)

+-------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|SibSp             |Parch              |Ticket            |Fare             |Cabin|Embarked|
+-------+------------------+-------------------+------------------+-----------------+-----+--------+
|count  |891               |891                |891               |891              |204  |889     |
|mean   |0.5230078563411896|0.38159371492704824|260318.54916792738|32.2042079685746 |null |null    |
|stddev |1.1027434322934315|0.8060572211299488 |471609.26868834975|49.69342859718089|null |null    |
|min    |0                 |0                  |110152            |0.0              |A10  |C       |
|max    |8                 |6                  |WE/P 5735         |512.3292         |T    |S       |
+-------+------------------+-------------------+------------------+-----------------+-----+--------+



In [9]:
test.describe(test.columns[:5]).show(truncate = False)

+-------+------------------+------------------+-----------------------------------------+------+------------------+
|summary|PassengerId       |Pclass            |Name                                     |Sex   |Age               |
+-------+------------------+------------------+-----------------------------------------+------+------------------+
|count  |418               |418               |418                                      |418   |332               |
|mean   |1100.5            |2.2655502392344498|null                                     |null  |30.272590361445783|
|stddev |120.81045760473994|0.8418375519640503|null                                     |null  |14.181209235624424|
|min    |892               |1                 |"Assaf Khalil, Mrs. Mariana (Miriam"")"""|female|0.17              |
|max    |1309              |3                 |van Billiard, Master. Walter John        |male  |76.0              |
+-------+------------------+------------------+-------------------------

In [10]:
test.describe(test.columns[5:]).show(truncate = False)

+-------+------------------+------------------+------------------+------------------+-----+--------+
|summary|SibSp             |Parch             |Ticket            |Fare              |Cabin|Embarked|
+-------+------------------+------------------+------------------+------------------+-----+--------+
|count  |418               |418               |418               |417               |91   |418     |
|mean   |0.4473684210526316|0.3923444976076555|223850.98986486485|35.6271884892086  |null |null    |
|stddev |0.8967595611217135|0.9814288785371694|369523.7764694362 |55.907576179973844|null |null    |
|min    |0                 |0                 |110469            |0.0               |A11  |C       |
|max    |8                 |9                 |W.E.P. 5734       |512.3292          |G6   |S       |
+-------+------------------+------------------+------------------+------------------+-----+--------+



In [11]:
train.show(5, truncate = True)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [12]:
test.show(10, False)

+-----------+------+--------------------------------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Pclass|Name                                        |Sex   |Age |SibSp|Parch|Ticket   |Fare   |Cabin|Embarked|
+-----------+------+--------------------------------------------+------+----+-----+-----+---------+-------+-----+--------+
|892        |3     |Kelly, Mr. James                            |male  |34.5|0    |0    |330911   |7.8292 |null |Q       |
|893        |3     |Wilkes, Mrs. James (Ellen Needs)            |female|47.0|1    |0    |363272   |7.0    |null |S       |
|894        |2     |Myles, Mr. Thomas Francis                   |male  |62.0|0    |0    |240276   |9.6875 |null |Q       |
|895        |3     |Wirz, Mr. Albert                            |male  |27.0|0    |0    |315154   |8.6625 |null |S       |
|896        |3     |Hirvonen, Mrs. Alexander (Helga E Lindqvist)|female|22.0|1    |1    |3101298  |12.2875|null |S       |
|897        |3  

Combining Train and Test DataFrames into a Single DataFrame

In [13]:
total = train.unionAll(test.withColumn("Survived", lit("null")).select(train.columns))
total.show(10, False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35.0|1    |0    |113803          |53

### Feature Engineering

Extracting Title from name Column

In [14]:
total.select("Name").show(10, truncate = False)

+---------------------------------------------------+
|Name                                               |
+---------------------------------------------------+
|Braund, Mr. Owen Harris                            |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)|
|Heikkinen, Miss. Laina                             |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)       |
|Allen, Mr. William Henry                           |
|Moran, Mr. James                                   |
|McCarthy, Mr. Timothy J                            |
|Palsson, Master. Gosta Leonard                     |
|Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)  |
|Nasser, Mrs. Nicholas (Adele Achem)                |
+---------------------------------------------------+
only showing top 10 rows



In [15]:
#Extracting the Title an replacing the similar words 
total = (total.withColumn("Title", regexp_extract(col("Name"), "(,.*\.)|(\\..*) ", 0))
         .withColumn("Title", regexp_replace("Title", ",", ""))
         .withColumn("Title", regexp_replace("Title", "\.", ""))
         .withColumn("Title", trim("Title"))
         .withColumn("Title", regexp_replace("Title", 'Mlle|Ms', 'Miss'))
         .withColumn("Title", regexp_replace("Title", 'Mme', 'Mrs'))
         .withColumn("Title", regexp_replace("Title", 'Mrs Martin .*', 'Mrs'))
        )
total.select("PassengerId", "Name", "Title").show(10, truncate = False)

+-----------+---------------------------------------------------+------+
|PassengerId|Name                                               |Title |
+-----------+---------------------------------------------------+------+
|1          |Braund, Mr. Owen Harris                            |Mr    |
|2          |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|Mrs   |
|3          |Heikkinen, Miss. Laina                             |Miss  |
|4          |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |Mrs   |
|5          |Allen, Mr. William Henry                           |Mr    |
|6          |Moran, Mr. James                                   |Mr    |
|7          |McCarthy, Mr. Timothy J                            |Mr    |
|8          |Palsson, Master. Gosta Leonard                     |Master|
|9          |Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)  |Mrs   |
|10         |Nasser, Mrs. Nicholas (Adele Achem)                |Mrs   |
+-----------+--------------------------------------

Extracting Rare Titles and replacing them with 'Rare' Keyword

In [16]:
total.groupby("Title").count().orderBy('count', ascending = False).show()

+------------+-----+
|       Title|count|
+------------+-----+
|          Mr|  757|
|        Miss|  264|
|         Mrs|  198|
|      Master|   61|
|         Rev|    8|
|          Dr|    8|
|         Col|    4|
|       Major|    2|
|        Dona|    1|
|         Don|    1|
|    Jonkheer|    1|
|        Capt|    1|
|        Lady|    1|
|the Countess|    1|
|         Sir|    1|
+------------+-----+



In [17]:
rare_titles = total.groupby("Title").count().filter("count<10").select("Title").toPandas()["Title"].values.tolist()
rare_titles = '|'.join(rare_titles)
rare_titles

  An error occurred while calling o138.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:88)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:84)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.Ca

Py4JJavaError: An error occurred while calling o138.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:88)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:84)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 20.0 failed 1 times, most recent failure: Lost task 13.0 in stage 20.0 (TID 221, NarendraBabuOggu, executor driver): org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by query. Memory leaked: (49152)
Allocator(toBatchIterator) 0/49152/49152/9223372036854775807 (res/actual/peak/limit)


Previous exception in task: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:226)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.$anonfun$next$1(ArrowConverters.scala:118)
	scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:121)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:97)
	scala.collection.Iterator.foreach(Iterator.scala:941)
	scala.collection.Iterator.foreach$(Iterator.scala:941)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
	scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to(ArrowConverters.scala:97)
	scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
	scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
	org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3560)
	org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2187)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:127)
	org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	java.base/java.lang.Thread.run(Thread.java:832)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
	at org.apache.spark.scheduler.Task.run(Task.scala:137)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2188)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3558)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3562)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3538)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:130)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:132)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:127)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:104)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:98)
	at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$1(SocketAuthServer.scala:60)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:60)
Caused by: org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by query. Memory leaked: (49152)
Allocator(toBatchIterator) 0/49152/49152/9223372036854775807 (res/actual/peak/limit)


Previous exception in task: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:226)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.$anonfun$next$1(ArrowConverters.scala:118)
	scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:121)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:97)
	scala.collection.Iterator.foreach(Iterator.scala:941)
	scala.collection.Iterator.foreach$(Iterator.scala:941)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
	scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to(ArrowConverters.scala:97)
	scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
	scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
	org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3560)
	org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2187)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:127)
	org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	java.base/java.lang.Thread.run(Thread.java:832)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
	at org.apache.spark.scheduler.Task.run(Task.scala:137)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)


In [None]:
total = total.withColumn("Title", regexp_replace("Title", rare_titles, 'Rare'))
total.show(10)

Extracting SurName from Name column

In [None]:
total = (total.withColumn("SurName", regexp_extract("Name", '.*,', 0))
         .withColumn("SurName", regexp_replace("Surname", ',', ''))
         .withColumn("SurName", trim("Surname"))
        )
total.show(10)

In [None]:
print("Total Unique Surnames in Data : ", total.select("SurName").distinct().count())
total.groupby("SurName").count().orderBy("count", ascending = False).show()

Computing Family Size Column

In [None]:
total = total.withColumn("FamilySize", col("SibSp") + col("Parch") + lit(1))
family_size_df = total.select("PassengerId", "FamilySize", "Survived").toPandas()
family_size_df

In [None]:
#Count Plot of Survival details with respect to Family Size
sns.countplot(data = family_size_df, x = 'FamilySize', hue = 'Survived')

Reducing the categories in the Family Size and Including them in Family Group Column

In [None]:
total = total.withColumn("FamilyGroup", (when(col("FamilySize")==1, 'single')
                            .when((col("FamilySize") > 1) & (col("FamilySize")<5), 'small')
                            .otherwise('large')))

In [None]:
family_group_df = total.filter("Survived != 'null'").select("FamilyGroup", "Survived").toPandas()

In [None]:
sns.countplot(data = family_group_df, x = 'FamilyGroup', hue = 'Survived')

In [None]:
#Mosaic Plot for readability
mosaic(family_group_df, ['FamilyGroup', 'Survived']);

<p>Extracting Deck Column from The Cabin Details</p>
<p>Extracting children and Mother Details</p>

In [None]:
total = (total.withColumn("Deck", substring(col("cabin"), 1, 1))
         .withColumn("ChildOrNot", when(col("Age")<18, "Child").otherwise("Adult"))
         .withColumn("MotherOrNot", when((col("Age")>18) & (col("Sex")=='female') & (col("Parch") > 0) & 
                                     (col("Title") != 'Miss'), "Mother").otherwise("NotAMother"))
        )

In [None]:
(total.filter("Survived != 'null'")
 .selectExpr("CAST(Survived as INT) AS Survived", "MotherOrNot")
 .groupby("MotherOrNot").mean("Survived").show()
)

We can see that Mothers have a high probability of Surviving.

In [None]:
(total.filter("Survived != 'null'")
 .selectExpr("CAST(Survived as INT) AS Survived", "ChildOrNot")
 .groupby("ChildOrNot").mean("Survived").show()
)

# Checking Missing Values

In [None]:
total.toPandas().isnull().sum()

<p>Filling Embarked Column</p>

In [None]:
total.filter("Embarked IS NULL").show()

In [None]:
(total.filter("Embarked IS NOT NULL").groupBy(["Pclass", "Embarked"])
 .agg(expr("percentile_approx(Fare, 0.5)").alias('median')).show()
)

We can see that for Pclass = 1 and Fare =~ 80, the embarked is 'C', so for missing records we can replaec embarked with the value 'C'

In [None]:
total = total.na.fill('C', subset = 'Embarked')

<p>Filling Fare Column </p>

In [None]:
total.filter("Fare IS NULL").show()

In [None]:
missing_fare = (total.groupBy(["Pclass", "Embarked"])
 .agg(expr("percentile_approx(Fare, 0.5)").alias('MEDIAN'), expr("mean(Fare)").alias("MEAN"))
 .filter("Pclass = 3 AND Embarked = 'S'")
 .selectExpr("ROUND((MEAN + MEDIAN)/2, 2)")
).collect()[0][0]

In [None]:
total = total.na.fill(missing_fare, subset = 'Fare')
total.show(10, False)

<p>Filling Cabin and Deck Columns with 'UNK' Value</p>

In [None]:
total = total.na.fill('UNK', subset = ["Cabin", "Deck"])
total = total.na.fill(-1, subset = "Age")

In [None]:
#Using pandas for easy analysis and understandability
total.toPandas().isnull().sum()

In [None]:
test_preprocessed = total.filter("Survived = 'null'")
train_preprocessed = total.filter("Survived != 'null'").withColumn("label", expr("CAST(Survived AS DOUBLE)"))
train_preprocessed.count(), test_preprocessed.count()

<p>Saving The preprocessed files </p>

In [None]:
#Converting into pandas and saving the file into csv
train_preprocessed.toPandas().reset_index(drop = True).to_csv(DATASET_PATH + "train_preprocessed.csv", index = False)
test_preprocessed.toPandas().reset_index(drop = True).to_csv(DATASET_PATH + "test_preprocessed.csv", index = False)

<p> Defining Categorical and Continuous Variables for Modelling </p>

In [None]:
train_columns = ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare", 
                 "Deck", "Embarked", "Title", "SurName", "FamilyGroup", 
                 "ChildOrNot", "MotherOrNot"]
continuous_columns = ["Age", "Fare"]
categorical_columns = [column_name for column_name in train_columns if column_name not in continuous_columns]
categorical_columns, continuous_columns

<p> Defining StringIndexers, OneHotEncoders for Categorical Columns </p>

In [None]:
#Storing StringIndexers, OneHotEncoders to include them in a pipeline
string_indexer = []
onehot_encoder = []
for column_name in categorical_columns : 
    #Defining String Indexer
    stringindexer = StringIndexer(inputCol = column_name, outputCol = column_name + "_indexed").fit(train_preprocessed).setHandleInvalid("keep")
    train_preprocessed = stringindexer.transform(train_preprocessed)
    #Defining OneHotEncoder
    onehotencoder = OneHotEncoder(inputCol = column_name + "_indexed", outputCol = column_name + "_encoded").fit(train_preprocessed).setHandleInvalid("keep")
    train_preprocessed = onehotencoder.transform(train_preprocessed)
    #Transforming Test data
    test_preprocessed = stringindexer.transform(test_preprocessed)
    test_preprocessed = onehotencoder.transform(test_preprocessed)
    string_indexer.append(stringindexer)
    onehot_encoder.append(onehotencoder)
string_indexer, onehot_encoder                   

In [None]:
#Defining Categorical feature Name after preprocessing(StringIndexing + OneHotEncoding)
transformed_categorical_features = [column_name + "_encoded" for column_name in categorical_columns]
transformed_categorical_features

In [None]:
#Combinig All the columns
train_columns = continuous_columns + transformed_categorical_features
train_columns

<p>Defining Vector Assembler to Wrap all the columns into a single Column and Standard Scaler to Scale the features </p>

In [None]:
vector_assembler = VectorAssembler(inputCols = train_columns, outputCol = "features")
train_preprocessed = vector_assembler.transform(train_preprocessed)
standard_scaler = StandardScaler(inputCol = "features", outputCol = "scaled_features").fit(train_preprocessed)
train_preprocessed = standard_scaler.transform(train_preprocessed)
test_preprocessed = vector_assembler.transform(test_preprocessed)
test_preprocessed = standard_scaler.transform(test_preprocessed)

In [None]:
train_data, val_data = train_preprocessed.randomSplit([0.8, 0.2])
train_data.count(), val_data.count()

In [None]:
train_data.show(5, False)

<p>Defining the Random Forest Calssifier with input, output features and other default parameters</p> 

In [None]:
rf_clf = RandomForestClassifier(featuresCol = "scaled_features", labelCol = "label")
rf_model = rf_clf.fit(train_data)

In [None]:
val_pred = rf_model.transform(val_data)
val_pred.select("Survived", "prediction").show()

In [None]:
rf_model.transform(test_preprocessed)

In [None]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol="label",labelCol="prediction")
print("The area under ROC for train set is {}".format(evaluator.evaluate(val_pred)))

In [None]:
#Defining the stages for model pipeline
stages = string_indexer + onehot_encoder + [vector_assembler, standard_scaler, rf_clf]
model_pipeline = Pipeline(stages = stages)

In [None]:
pipeline_model = model_pipeline.fit(total.filter("Survived != 'null'").withColumn("label", expr("CAST(Survived AS DOUBLE)")))

In [None]:
test_pred = pipeline_model.transform(total.filter("Survived = 'null'"))

In [None]:
test_pred.selectExpr("PassengerId", "CAST(prediction AS INT)").show(10)

In [None]:
submission = spark.read.csv(DATASET_PATH + 'gender_submission.csv', header = True, inferSchema = True)
submission.show(20, False)

In [None]:
#Writing the preditions to submission file
(test_pred.selectExpr("PassengerId", "CAST(prediction AS INT) AS Survived")
 .repartition(1)
 .write.option("header","true")
 .format("csv")
 .mode("overwrite")
 .save(DATASET_PATH + 'pyspark_rf_submission')
)