Let's start with your project: 

Are you a data scientist? 

I think you are an awesome a data scientist.

### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 69 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 56.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=3ed695e5af4015c89f26e37158f7a71c7a2532823196a1b82759b26642ce2a66
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Build Spark Session

In [3]:
spark = SparkSession.builder.appName("App1").getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [6]:
train_df = spark.read.csv('train.csv', header=True, inferSchema=True)
train_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [7]:
test_df = spark.read.csv('test.csv', header=True, inferSchema=True)
test_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|             17453|89.1042|  C92|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|          A/5 2817|   8.05| null|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|            349240| 7.8958| null|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|             13509|  26.55|  E38|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|             17464|51.8625|  D21|       S|
|          6|       1|     2| Toomey, Miss. Ellen|female|50.0|  

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [8]:
type(train_df)

pyspark.sql.dataframe.DataFrame

**Show 5 rows.**

In [9]:
train_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [10]:
train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [12]:
train_df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [13]:
train_df.count()

891

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [14]:
train_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [21]:
train_df.select('Survived')\
 .groupBy('Survived')\
 .count()\
 .show()


+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [22]:
survived = train_df.select('Survived').where(train_df['Survived'] == 1).count()
not_survived =train_df.select('Survived').where(train_df['Survived'] == 0).count()

**Display your result:**

In [24]:
print(survived)
print(not_survived)

342
549


**Can you display your answer in ratio form?(Hint: Use "UDF" Function. (Hint: Use "UDF" Function. This is a hint you can use any method.)**






In [30]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.types import IntegerType

In [25]:
total = train_df.select('Survived').count()
total

891

In [27]:
Survived_ratio = survived / total
Not_Survived_ratio = not_survived / total
print("Ratio of survived : {}".format(Survived_ratio))
print("Ratio of not survived : {}".format(Not_Survived_ratio))

Ratio of survived : 0.3838383838383838
Ratio of not survived : 0.6161616161616161


In [37]:
@udf(returnType=IntegerType())
def getRatio(number):
  ratio = number/total
  return ratio

In [40]:
spark.udf.register("getRatioudf", getRatio)

<function __main__.getRatio>

In [44]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
train_df.withColumn("ratio", getRatio(col("Survived"))).show(truncate=False)

+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|Name                                                   |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|ratio|
+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|1          |0       |3     |Braund, Mr. Owen Harris                                |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |null |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |null |
|3          |1       |3     |Heikkinen, Miss. Laina                                 |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |null |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily 

**Can you get the number of males and females?**


In [45]:
train_df.select('Sex')\
 .groupBy('Sex')\
 .count()\
 .show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column. This is a hint you can use any method.)

In [51]:
train_df.groupby(["Sex"]).agg(count('Survived')).show()

+------+---------------+
|   Sex|count(Survived)|
+------+---------------+
|female|            314|
|  male|            577|
+------+---------------+



In [80]:
from pyspark.sql.functions import col
(train_df.where(col('Survived') == 1).groupBy('Sex').count().show()
)

+------+-----+
|   Sex|count|
+------+-----+
|female|  233|
|  male|  109|
+------+-----+



In [50]:
train_df.groupBy("Sex").agg(avg("Survived")).show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



**Create temporary view PySpark:**

In [53]:
train_df.createOrReplaceTempView("DF")

**How many people survived, and how many didn't survive? By SQL:**

In [55]:
spark.sql("SELECT Survived, COUNT(Survived) FROM DF GROUP BY Survived").show()

+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            342|
|       0|            549|
+--------+---------------+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column. This is a hint you can use any method.)

**Can you do this via SQL?**

In [70]:
spark.sql("SELECT Sex, SUM(Survived)/COUNT(Survived) as count FROM DF GROUP BY Sex").show()

+------+-------------------+
|   Sex|              count|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



In [72]:
spark.sql("SELECT Sex, AVG(Survived) FROM DF GROUP BY Sex").show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



**Display a ratio for "p-class": SUM(Survived)/count for p-class**


In [75]:
#spark.sql("SELECT Pclass, SUM(Survived)/COUNT(Survived) as count FROM DF GROUP BY Pclass").show()
spark.sql("SELECT COUNT(Survived)/(SELECT COUNT(SURVIVED) FROM DF) FROM DF GROUP BY Pclass").show()

+--------------------------------------------------------------------+
|(CAST(count(Survived) AS DOUBLE) / CAST(scalarsubquery() AS DOUBLE))|
+--------------------------------------------------------------------+
|                                                 0.24242424242424243|
|                                                  0.5510662177328844|
|                                                 0.20650953984287318|
+--------------------------------------------------------------------+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [266]:
df = train_df.union(test_df)

**Display count:**

In [267]:
df.count()

1329

**Can you define the number of null values in each column?**


In [268]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [269]:
nulls = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
print("---------------------------------------------------")

import pyspark.sql.functions as F
nulldf = nulls.select([F.col('Age'), F.col('Cabin'), F.col('Embarked')])
nulldf.show()

---------------------------------------------------
+---+-----+--------+
|Age|Cabin|Embarked|
+---+-----+--------+
|265| 1021|       3|
+---+-----+--------+



## Preprocessing 

**Create Temporary view PySpark:**

In [270]:
df.createOrReplaceTempView("DF")

**Can you show the "name" column from your temporary table?**

In [271]:
spark.sql('SELECT Name FROM DF').show(truncate=False)

+-------------------------------------------------------+
|Name                                                   |
+-------------------------------------------------------+
|Braund, Mr. Owen Harris                                |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |
|Heikkinen, Miss. Laina                                 |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)           |
|Allen, Mr. William Henry                               |
|Moran, Mr. James                                       |
|McCarthy, Mr. Timothy J                                |
|Palsson, Master. Gosta Leonard                         |
|Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)      |
|Nasser, Mrs. Nicholas (Adele Achem)                    |
|Sandstrom, Miss. Marguerite Rut                        |
|Bonnell, Miss. Elizabeth                               |
|Saundercock, Mr. William Henry                         |
|Andersson, Mr. Anders Johan                            |
|Vestrom, Miss

**Run this code:**

In [272]:
import pyspark.sql.functions as F
combined = df.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

**Display "Title" column and count "Title" column:**

In [273]:
combined.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|    Mr|
|          6|       0|  

In [274]:
spark.sql("SELECT Title FROM combined").show()

+------+
| Title|
+------+
|    Mr|
|   Mrs|
|  Miss|
|   Mrs|
|    Mr|
|    Mr|
|    Mr|
|Master|
|   Mrs|
|   Mrs|
|  Miss|
|  Miss|
|    Mr|
|    Mr|
|  Miss|
|   Mrs|
|Master|
|    Mr|
|   Mrs|
|   Mrs|
+------+
only showing top 20 rows



In [275]:
spark.sql("SELECT Title, COUNT(Title) FROM combined GROUP BY Title").show()

+--------+------------+
|   Title|count(Title)|
+--------+------------+
|     Don|           1|
|    Miss|         257|
|Countess|           2|
|     Col|           4|
|     Rev|           9|
|    Lady|           2|
|  Master|          56|
|     Mme|           1|
|    Capt|           2|
|      Mr|         786|
|      Dr|          11|
|     Mrs|         186|
|     Sir|           2|
|Jonkheer|           2|
|    Mlle|           4|
|   Major|           3|
|      Ms|           1|
+--------+------------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [276]:
titles_map = {"rare": ["Dr", "Rev", "Major", "Col", "Mlle", "Capt", "Don", "Jonkheer", "Countess", "Ms", "Sir", "Lady", "Mme"]}
titles_list = ["Dr", "Rev", "Major", "Col", "Mlle", "Capt", "Don", "Jonkheer", "Countess", "Ms", "Sir", "Lady", "Mme"]

**Run the function:**

In [111]:
def impute_title(title):
    return titles_map[title]# Title_map is your dictionary. please change this name with your dictionary name.

In [277]:
@udf(returnType=StringType())
def impute_title(title):
  if title in titles_list:
    title = "rare"
    return title 
  else:  
    return title

**Apply the function on "Title" column using UDF:**

In [278]:
combined.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|   Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|  Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S| Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|  Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|   Mr|
+-----------+--------+------+---

In [279]:
new_combined = combined.withColumn("NewTitle", impute_title(F.col("Title")))
new_combined.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|NewTitle|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|      Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|     Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|    Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|     Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|         

In [323]:
combined.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Title', 'string')]

**Display "Title" from table and group by "Title" column:**

In [280]:
new_combined.createOrReplaceTempView("new_combined")

In [281]:
spark.sql("SELECT NewTitle FROM new_combined").show()

+--------+
|NewTitle|
+--------+
|      Mr|
|     Mrs|
|    Miss|
|     Mrs|
|      Mr|
|      Mr|
|      Mr|
|  Master|
|     Mrs|
|     Mrs|
|    Miss|
|    Miss|
|      Mr|
|      Mr|
|    Miss|
|     Mrs|
|  Master|
|      Mr|
|     Mrs|
|     Mrs|
+--------+
only showing top 20 rows



In [282]:
spark.sql("SELECT NewTitle, COUNT(NewTitle) FROM new_combined GROUP BY NewTitle").show()

+--------+---------------+
|NewTitle|count(NewTitle)|
+--------+---------------+
|    rare|             44|
|    Miss|            257|
|  Master|             56|
|      Mr|            786|
|     Mrs|            186|
+--------+---------------+



## **Preprocessing Age**

**Based on the "age" column mean, you will fill in the missing age values:**

In [283]:
mean_age = new_combined.select(F.avg("Age"))
mean_age.show()

+------------------+
|          avg(Age)|
+------------------+
|30.079501879699244|
+------------------+



In [284]:
from pyspark.sql.functions import avg
mean_age = new_combined.select(avg('Age')).collect()[0][0]

**Fill missing with "age" mean:**

In [285]:
new_combined = new_combined.na.fill(value=mean_age,subset=["Age"])

## **Preprocessing Embarked**

**Select "Embarked" column, count them, order by count Desc, and save in grouped_Embarked variable:**




In [286]:
grouped_Embarked = spark.sql("""SELECT Embarked,
                     COUNT(Embarked) AS count_embarked 
                     FROM new_combined 
                     GROUP BY (Embarked)
                     ORDER BY count_embarked DESC """)


**Show "groupped_Embarked" your variable:**

In [287]:
grouped_Embarked.show()

+--------+--------------+
|Embarked|count_embarked|
+--------+--------------+
|       S|           962|
|       C|           253|
|       Q|           111|
|    null|             0|
+--------+--------------+



**Get max of groupped_Embarked:** 

In [288]:
max_embarked = grouped_Embarked.select(max('count_embarked')).collect()[0][0]
max_embarked

962

In [289]:
grouped_Embarked.select(F.max("count_embarked")).show()

+-------------------+
|max(count_embarked)|
+-------------------+
|                962|
+-------------------+



**Fill missing values with max 'S' of grouped_Embarked:**

In [290]:
type(new_combined)

pyspark.sql.dataframe.DataFrame

In [291]:
new_combined = new_combined.fillna(value="S",subset=["Embarked"])

In [292]:
new_combined.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in new_combined.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|Title|NewTitle|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0| 1021|       0|    0|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+--------+



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [293]:
new_combined.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Title|NewTitle|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|   Mr|      Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|  Mrs|     Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S| Miss|    Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|  Mrs|     Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450

In [294]:
new_combined2 = new_combined.withColumn('Cabin', substring('Cabin', 1, 1))

**Show the result:**

In [295]:
new_combined2.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|NewTitle|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|      Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|     Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|    Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1|    C|       S|  

**Create the temporary view:**

In [296]:
new_combined2.createOrReplaceTempView("new_combined2")

**Select "Cabin" column, count "Cabin" column, Group by "Cabin" column, Order By count DESC**  

In [298]:
spark.sql("""SELECT Cabin, COUNT(Cabin)
              FROM new_combined2
              GROUP BY(Cabin)
              ORDER BY COUNT(Cabin)
              DESC""").show()

+-----+------------+
|Cabin|count(Cabin)|
+-----+------------+
|    C|          82|
|    B|          77|
|    D|          52|
|    E|          51|
|    A|          23|
|    F|          18|
|    G|           4|
|    T|           1|
| null|           0|
+-----+------------+



**Fill missing values with "U":**

In [299]:
new_combined2 = new_combined2.fillna(value="U",subset=["Cabin"])

In [321]:
new_combined2.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Title', 'string'),
 ('NewTitle', 'string')]

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

**OneHotEncoder(inputCols=None, outputCols=None)**

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None). A feature transformer that merges multiple columns into a vector column.**



**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [300]:
trainDF, testDF = new_combined2.randomSplit([.8,.2], seed=42)

In [301]:
trainDF.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Title', 'string'),
 ('NewTitle', 'string')]

In [302]:
categoricalCols = ['Sex','Cabin','Embarked','NewTitle']
indexOutputCols = [x + "_Index" for x in categoricalCols]
indexOutputCols

['Sex_Index', 'Cabin_Index', 'Embarked_Index', 'NewTitle_Index']

In [303]:
oheOutputCols = [x + "_OHE" for x in categoricalCols]
oheOutputCols

['Sex_OHE', 'Cabin_OHE', 'Embarked_OHE', 'NewTitle_OHE']

In [306]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

In [308]:
numericCols = ['Pclass','Parch','SibSp','Age']

In [309]:
assemblerInputs = oheOutputCols + numericCols
assemblerInputs

['Sex_OHE',
 'Cabin_OHE',
 'Embarked_OHE',
 'NewTitle_OHE',
 'Pclass',
 'Parch',
 'SibSp',
 'Age']

In [307]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

In [322]:
train_df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [310]:
from pyspark.ml.classification import RandomForestClassifier
RC = RandomForestClassifier(labelCol='Survived',featuresCol='features')

In [311]:
from pyspark.ml import Pipeline
pipeline =Pipeline(stages = [stringIndexer,oheEncoder,vecAssembler,RC])

In [312]:
pipelineModel = pipeline.fit(trainDF)

In [313]:
predDF = pipelineModel.transform(testDF)

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [317]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

accuracy = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                         labelCol='Survived',
                                         metricName='areaUnderROC').evaluate(predDF)
print(f"Accuracy is {accuracy}")

Accuracy is 1.0


In [320]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracy = MulticlassClassificationEvaluator(predicti onCol='prediction',
                                         labelCol='Survived',
                                         metricName='accuracy').evaluate(predDF)
print(f"Accuracy is {accuracy}")

Exception ignored in: <function JavaWrapper.__del__ at 0x7f6b1e0d7f80>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'MulticlassClassificationEvaluator' object has no attribute '_java_obj'


Accuracy is 1.0


**When you are finished send the project via Google classroom**
**Please let me know if you have any questions.**
* nabieh.mostafa@yahoo.com
* +201015197566 (Whatsapp)

**Don't Hate me, I push you to learn**

**I will help you to become an awesome data engineer.**

**Why did I say that "Data Engineer"?**

**Tricky question, but an optional question, if you would like to know the answer, ask me.**
