Let's start with your project: 

Are you a data scientist? 

I think you are an awesome a data scientist.

### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Build Spark Session

In [2]:
import warnings
warnings.filterwarnings("ignore")

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark = SparkSession.builder.appName('Titanic_Survival_Predection').getOrCreate()
spark

21/11/01 08:27:02 WARN Utils: Your hostname, TheDragon resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlo1)
21/11/01 08:27:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/11/01 08:27:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
from pyspark.ml.feature import VectorAssembler # Transformer
from pyspark.ml.classification import RandomForestClassifier # Estimator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [4]:
trainDF = spark.read.csv('./train.csv', header=True, inferSchema=True)
testDF = spark.read.csv('./test.csv', header=True, inferSchema=True)

                                                                                

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [5]:
type(trainDF)

pyspark.sql.dataframe.DataFrame

**Show 5 rows.**

In [6]:
trainDF.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [7]:
trainDF.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [8]:
trainDF.describe().show()

                                                                                

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [9]:
trainDF.count()

891

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [10]:
surv_and_no_surv = trainDF.groupBy('Survived').count()

**Display your result:**

In [11]:
surv_and_no_surv.show()



+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



                                                                                

**Can you display your answer in ratio form?(Hint: Use UDF.)**






In [12]:
train_count = trainDF.count()

In [13]:
surv_and_no_surv_ratio = surv_and_no_surv.withColumn('Ratio', F.format_number((F.col('count') / train_count), 2))

In [14]:
surv_and_no_surv_ratio.show()

                                                                                

+--------+-----+-----+
|Survived|count|Ratio|
+--------+-----+-----+
|       1|  342| 0.38|
|       0|  549| 0.62|
+--------+-----+-----+



                                                                                

In [15]:
# OR
getRatio = F.udf(lambda x: round(x / train_count, 2), DoubleType())
surv_and_no_surv_ratio1 = surv_and_no_surv.withColumn("Ratio", getRatio('count'))
surv_and_no_surv_ratio1.show()



+--------+-----+-----+
|Survived|count|Ratio|
+--------+-----+-----+
|       1|  342| 0.38|
|       0|  549| 0.62|
+--------+-----+-----+



                                                                                

**Can you get the number of males and females?**


In [16]:
males_felmales = trainDF.groupBy('sex').count()
males_felmales.show()

                                                                                

+------+-----+
|   sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



                                                                                

In [17]:
males_felmales_ratio = males_felmales.withColumn("Ratio", getRatio('count'))
males_felmales_ratio.show()



+------+-----+-----+
|   sex|count|Ratio|
+------+-----+-----+
|female|  314| 0.35|
|  male|  577| 0.65|
+------+-----+-----+



                                                                                

**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column.)

In [18]:
trainDF.groupBy('sex').agg(F.avg('Survived'), F.sum('Survived')).show()

+------+-------------------+-------------+
|   sex|      avg(Survived)|sum(Survived)|
+------+-------------------+-------------+
|female| 0.7420382165605095|          233|
|  male|0.18890814558058924|          109|
+------+-------------------+-------------+



**Create temporary view PySpark:**

In [19]:
trainDF.createOrReplaceTempView("train_view")

**What is the average number of survivors of each gender?**


In [20]:
spark.sql("""select sex, avg(Survived)
from train_view 
group by sex""").show()

+------+-------------------+
|   sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



**How many people survived, and how many didn't survive? By SQL:**

In [21]:
spark.sql("""select Survived, count(Survived)
from train_view 
group by Survived""").show()

+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            342|
|       0|            549|
+--------+---------------+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column.)

**Can you do this via SQL?**

In [22]:
spark.sql("""select sex, round(avg(Survived), 2) as Ratio
from train_view 
group by sex""").show()

+------+-----+
|   sex|Ratio|
+------+-----+
|female| 0.74|
|  male| 0.19|
+------+-----+



In [23]:
#OR
spark.sql("SELECT Sex, round(SUM(Survived)/count(1),2) as ratio  FROM train_view GROUP BY Sex").show()

+------+-----+
|   Sex|ratio|
+------+-----+
|female| 0.74|
|  male| 0.19|
+------+-----+



**Display a ratio for p-class:**


In [24]:
spark.sql("""select Pclass, round(avg(Survived), 2) as Ratio
from train_view 
group by Pclass""").show()

                                                                                

+------+-----+
|Pclass|Ratio|
+------+-----+
|     1| 0.63|
|     3| 0.24|
|     2| 0.47|
+------+-----+



                                                                                

**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [25]:
data = trainDF.union(testDF)

**Display count:**

In [26]:
data.count()

1329

**Can you define the number of null values in each column?**


In [27]:
Null_columns = []
for column in data.columns:
    Null_count = data.filter(F.col(column).isNull()).count()
    if Null_count != 0:
        Null_columns.append((column, Null_count))

In [28]:
Null_columns

[('Age', 265), ('Cabin', 1021), ('Embarked', 3)]

**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [29]:
Null_columns_df  = spark.createDataFrame(Null_columns, ['Column', 'Missing_value'])
Null_columns_df.show()

+--------+-------------+
|  Column|Missing_value|
+--------+-------------+
|     Age|          265|
|   Cabin|         1021|
|Embarked|            3|
+--------+-------------+



## Preprocessing 

**Temporary view PySpark:**

In [30]:
data.createOrReplaceTempView('data_view')

**Can you show me the name column from your temporary table?**

In [31]:
spark.sql("""select Name
from data_view""").show(truncate=False)

+-------------------------------------------------------+
|Name                                                   |
+-------------------------------------------------------+
|Braund, Mr. Owen Harris                                |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |
|Heikkinen, Miss. Laina                                 |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)           |
|Allen, Mr. William Henry                               |
|Moran, Mr. James                                       |
|McCarthy, Mr. Timothy J                                |
|Palsson, Master. Gosta Leonard                         |
|Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)      |
|Nasser, Mrs. Nicholas (Adele Achem)                    |
|Sandstrom, Miss. Marguerite Rut                        |
|Bonnell, Miss. Elizabeth                               |
|Saundercock, Mr. William Henry                         |
|Andersson, Mr. Anders Johan                            |
|Vestrom, Miss

**Run this code:**

In [32]:
data = data.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1)) # like Mr. or Dr. 
data.createOrReplaceTempView('data_view')

**Display the title and count "Title" column:**

In [33]:
spark.sql("""select Title
from data_view""").show(truncate=False)

+------+
|Title |
+------+
|Mr    |
|Mrs   |
|Miss  |
|Mrs   |
|Mr    |
|Mr    |
|Mr    |
|Master|
|Mrs   |
|Mrs   |
|Miss  |
|Miss  |
|Mr    |
|Mr    |
|Miss  |
|Mrs   |
|Master|
|Mr    |
|Mrs   |
|Mrs   |
+------+
only showing top 20 rows



In [34]:
spark.sql("""select distinct(Title)
from data_view""").show(truncate=False)

                                                                                

+--------+
|Title   |
+--------+
|Don     |
|Miss    |
|Countess|
|Col     |
|Rev     |
|Lady    |
|Master  |
|Mme     |
|Capt    |
|Mr      |
|Dr      |
|Mrs     |
|Sir     |
|Jonkheer|
|Mlle    |
|Major   |
|Ms      |
+--------+



**Display the "title" column and count "Title" column:**

In [35]:
spark.sql("""select Title, count(Title)
from data_view
group by Title""").show(truncate=False)

                                                                                

+--------+------------+
|Title   |count(Title)|
+--------+------------+
|Don     |1           |
|Miss    |257         |
|Countess|2           |
|Col     |4           |
|Rev     |9           |
|Lady    |2           |
|Master  |56          |
|Mme     |1           |
|Capt    |2           |
|Mr      |786         |
|Dr      |11          |
|Mrs     |186         |
|Sir     |2           |
|Jonkheer|2           |
|Mlle    |4           |
|Major   |3           |
|Ms      |1           |
+--------+------------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [36]:
titles_map = {
 'Capt': 'Rare',
 'Col': 'Rare',
 'Don': 'Rare',
 'Dona': 'Rare',
 'Dr': 'Rare',
 'Jonkheer':'Rare' ,
 'Lady': 'Rare',
 'Major': 'Rare',
 'Master': 'Master',
 'Miss': 'Miss',
 'Mlle': 'Rare',
 'Mme': 'Rare',
 'Mr': 'Mr',
 'Mrs': 'Mrs',
 'Ms': 'Rare',
 'Rev': 'Rare',
 'Sir': 'Rare',
 'Countess': 'Rare'
}

**Run the function:**

In [37]:
def impute_title(title):
    return titles_map[title]

**Apply the function on "Title" column using UDF:**

In [38]:
title_map_func = F.udf(lambda x: impute_title(x), StringType())
data = data.withColumn('Title', title_map_func('Title'))

**Display "Title" from table and group by "Title" column:**

In [39]:
data.createOrReplaceTempView('data_view')
spark.sql("SELECT Title FROM data_view GROUP BY Title").show()

+------+
| Title|
+------+
|  Miss|
|Master|
|    Mr|
|   Mrs|
|  Rare|
+------+



## **Preprocessing Age**

**Based on the age mean, you will fill in the missing age values:**

In [40]:
age_mean = int(data.select(F.mean(F.col('Age'))).collect()[0][0])
age_mean

30

**Fill missing age with age mean:**

In [41]:
data = data.fillna(age_mean, subset=['Age'])

## **Preprocessing Embarked**

**Select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [42]:
data.createOrReplaceTempView('data_view')

In [43]:
grouped_Embarked = spark.sql("""select Embarked, count(1) as count
from data_view
group by Embarked
order by count desc""")

**Show groupped_Embarked:**

In [44]:
grouped_Embarked.show()



+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       C|  253|
|       Q|  111|
|    null|    3|
+--------+-----+



                                                                                

**Get max of groupped_Embarked:** 

In [45]:
Embarked_fill_null = grouped_Embarked.collect()[0][0]
Embarked_fill_null

                                                                                

'S'

**Fill missing values with grouped_Embarked:**

In [46]:
data = data.fillna(Embarked_fill_null, subset=['Embarked'])

## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [47]:
data = data.withColumn('Cabin', F.substring(F.col('Cabin'), 0, 1))
# OR data = data.withColumn("Cabin", data.Cabin.substr(0, 1))

**Show the result:**

In [48]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|    Mr|
|          6|       0|  

**Create the temporary view:**

In [49]:
data.createOrReplaceTempView('data_view')

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [50]:
grouped_Cabin = spark.sql("""select Cabin, count(1) as count
from data_view
group by Cabin
order by count desc""")
grouped_Cabin.show()



+-----+-----+
|Cabin|count|
+-----+-----+
| null| 1021|
|    C|   82|
|    B|   77|
|    D|   52|
|    E|   51|
|    A|   23|
|    F|   18|
|    G|    4|
|    T|    1|
+-----+-----+



                                                                                

**Fill missing values with "U":**

In [51]:
data = data.fillna('U', subset=['Cabin'])

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [52]:
trainDF, testDF = data.randomSplit([0.8, 0.2], seed = 42)

**StringIndexer(inputCol=None, outputCol=None)**

In [53]:
trainDF.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Title', 'string')]

In [54]:
cat_features = [col for (col, dtype) in trainDF.dtypes if ((dtype == 'string') & (col != 'Ticket') & (col != 'Name'))]

In [55]:
cat_features

['Sex', 'Cabin', 'Embarked', 'Title']

In [56]:
indexed_out_cols = [x + "_index" for x in cat_features] 

In [57]:
stringIndexObject = StringIndexer(inputCols=cat_features, outputCols=indexed_out_cols, handleInvalid='skip')

**OneHotEncoder(inputCols=None, outputCols=None)**

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

In [58]:
ohe_out_cols = [x + "_OHE" for x in cat_features] 

In [59]:
OHEObject = OneHotEncoder(inputCols=indexed_out_cols, outputCols=ohe_out_cols)

In [60]:
numCols = [col for (col, dtype) in trainDF.dtypes if (((dtype == 'double') | (dtype == 'int'))\
                                                      & ((col != 'Survived') & (col != 'PassengerId')))]

In [61]:
numCols

['Pclass', 'Age', 'SibSp', 'Parch', 'Fare']

In [62]:
assembler_cols = numCols + ohe_out_cols 
assembler_cols

['Pclass',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Sex_OHE',
 'Cabin_OHE',
 'Embarked_OHE',
 'Title_OHE']

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None) A feature transformer that merges multiple columns into a vector column.**



In [63]:
vectorAssemberObj = VectorAssembler(inputCols=assembler_cols, outputCol='features')

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [64]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survived')

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

In [65]:
pipeline = Pipeline(stages=[stringIndexObject, OHEObject, vectorAssemberObj, rf])

**Use Pipline to fit and transform:**

In [66]:
pipelineModel = pipeline.fit(trainDF) # Transformer

                                                                                

In [67]:
predDF = pipelineModel.transform(testDF) 

In [68]:
predDF.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = false)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)
 |-- Sex_index: double (nullable = false)
 |-- Cabin_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- Title_index: double (nullable = false)
 |-- Sex_OHE: vector (nullable = true)
 |-- Cabin_OHE: vector (nullable = true)
 |-- Embarked_OHE: vector (nullable = true)
 |-- Title_OHE: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nul

In [72]:
predDF.select("features", "Survived", "prediction").show(truncate=False)

+--------------------------------------------------------------------+--------+----------+
|features                                                            |Survived|prediction|
+--------------------------------------------------------------------+--------+----------+
|(20,[0,1,4,6,14,17],[3.0,26.0,7.925,1.0,1.0,1.0])                   |1       |1.0       |
|(20,[0,1,4,5,10,14,16],[1.0,54.0,51.8625,1.0,1.0,1.0,1.0])          |0       |0.0       |
|(20,[0,1,3,4,6,14,18],[3.0,27.0,2.0,11.1333,1.0,1.0,1.0])           |1       |1.0       |
|(20,[0,1,2,3,4,5,6,14,16],[3.0,39.0,1.0,5.0,31.275,1.0,1.0,1.0,1.0])|0       |0.0       |
|(20,[0,1,4,6,15,18],[3.0,30.0,7.225,1.0,1.0,1.0])                   |1       |1.0       |
|(20,[0,1,4,5,11,14,16],[1.0,28.0,35.5,1.0,1.0,1.0,1.0])             |1       |0.0       |
|(20,[0,1,4,5,6,14,16],[3.0,30.0,7.8958,1.0,1.0,1.0,1.0])            |0       |0.0       |
|(20,[0,1,2,4,5,6,14,16],[1.0,42.0,1.0,52.0,1.0,1.0,1.0,1.0])        |0       |0.0       |

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [70]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
print("Accuracy : " + str(evaluator.evaluate(predDF)))

Accuracy : 0.8340425531914893
