**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [None]:
# 1. install all the dependencies in Colab environment i.e. Apache Spark 3.0.1 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

# 2. Setup Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

## Build Spark Session

In [None]:
import findspark
findspark.init()

In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Ml').getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [None]:
train_filepath = 'train.csv'
test_filepath = 'test.csv'

In [None]:
train_df = spark.read.csv(train_filepath,header=True,inferSchema=True)
train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
train_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [None]:
test_df = spark.read.csv(test_filepath,header=True,inferSchema=True)
test_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
test_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|   17453|89.1042|  C92|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|A/5 2817|   8.05| null|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|  349240| 7.8958| null|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|   13509|  26.55|  E38|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|   17464|51.8625|  D21|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [None]:
type(train_df)

pyspark.sql.dataframe.DataFrame

**Show 5 rows.**

In [None]:
train_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [None]:
train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [None]:
train_df.describe(train_df.columns).show(truncate = False)

+-------+-----------------+-------------------+------------------+------------------------------------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|PassengerId      |Survived           |Pclass            |Name                                            |Sex   |Age               |SibSp             |Parch              |Ticket            |Fare             |Cabin|Embarked|
+-------+-----------------+-------------------+------------------+------------------------------------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|count  |891              |891                |891               |891                                             |891   |714               |891               |891                |891               |891              |204  |889     |
|mean   |446.0            |0.3838383838383838 |2.308641975308642 |nu

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [None]:
train_df.count()

891

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [None]:
survived_df = train_df.groupby("Survived").count()

**Display your result:**

In [None]:
survived_df.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



**Can you display your answer in ratio form?(Hint: Use UDF.)**






In [None]:
import pyspark.sql.functions as f
total = survived_df.select("count").agg({"count": "sum"}).collect().pop()['sum(count)']
result = survived_df.withColumn('percent', (survived_df['count']/total)).show()

+--------+-----+------------------+
|Survived|count|           percent|
+--------+-----+------------------+
|       1|  342|0.3838383838383838|
|       0|  549|0.6161616161616161|
+--------+-----+------------------+



In [None]:
total = train_df.select('survived').count()
total

891

In [None]:
import pyspark.sql.functions as F     

survived_df.agg(F.sum("count")).collect()[0][0]

891

# Another Method

In [None]:
survived_df.withColumn('ratio',survived_df['count'] / total).show()

+--------+-----+------------------+
|Survived|count|             ratio|
+--------+-----+------------------+
|       1|  342|0.3838383838383838|
|       0|  549|0.6161616161616161|
+--------+-----+------------------+



In [None]:
from pyspark.sql.types import FloatType, IntegerType, StringType
from pyspark.sql.functions import udf,col

In [None]:
def ratio(number):
    return number/total

# Using UDF

In [None]:
ratio_udf = udf(lambda z: ratio(z),FloatType())

In [None]:
survived_df.withColumn("ratio", ratio_udf(col("count"))).show(truncate=False)

+--------+-----+----------+
|Survived|count|ratio     |
+--------+-----+----------+
|1       |342  |0.3838384 |
|0       |549  |0.61616164|
+--------+-----+----------+



**Can you get the number of males and females?**


In [None]:
count_df = train_df.groupby("Sex").count()
count_df.show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



In [None]:
total_female = count_df.select('count').where(count_df.Sex == 'female').show()

+-----+
|count|
+-----+
|  314|
+-----+



In [None]:
total_female = train_df.select('Sex').where(train_df.Sex == 'female').count()
total_female

314

In [None]:
total_male = train_df.select('Sex').where(train_df.Sex == 'male').count()
total_male

577

**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column.)

In [None]:
survivors = train_df.groupby(["Sex","Survived"]).count()
survivors.show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



In [None]:
survivors_df = survivors.select("Sex","Survived","count").where(survivors.Survived == 1)
survivors_df.show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|female|       1|  233|
|  male|       1|  109|
+------+--------+-----+



In [None]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col
survivors_avg = survivors_df.withColumn("avg", when(col("Sex") == "female",survivors_df['count']/total_female)
      .when(col("Sex") == "male",survivors_df['count']/total_male)).show()

+------+--------+-----+-------------------+
|   Sex|Survived|count|                avg|
+------+--------+-----+-------------------+
|female|       1|  233| 0.7420382165605095|
|  male|       1|  109|0.18890814558058924|
+------+--------+-----+-------------------+



**Create temporary view PySpark:**

In [None]:
train_df.createOrReplaceTempView('train_df_view')

**How many people survived, and how many didn't survive? By SQL:**

In [None]:
spark.sql('SELECT Survived,COUNT(Survived) AS Count FROM train_df_view GROUP BY Survived').show()

+--------+-----+
|Survived|Count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column.)

**Can you do this via SQL?**

In [None]:
spark.sql('SELECT Sex,round(SUM(Survived) / COUNT(Survived),2) AS Gender_ratio FROM train_df_view GROUP BY Sex').show() 

+------+------------+
|   Sex|Gender_ratio|
+------+------------+
|female|        0.74|
|  male|        0.19|
+------+------------+



**Display a ratio for p-class:**


In [None]:
spark.sql('SELECT Pclass,round(SUM(Survived) / COUNT(Survived),2) AS Gender_ratio FROM train_df_view GROUP BY Pclass').show() 

+------+------------+
|Pclass|Gender_ratio|
+------+------------+
|     1|        0.63|
|     3|        0.24|
|     2|        0.47|
+------+------------+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [None]:
all_data = train_df.union(test_df)
all_data.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display count:**

In [None]:
all_data.count()

1329

**Temporary view PySpark:**

In [None]:
all_data.createOrReplaceTempView('all_df_view')

**Can you define the number of null values in each column?**


In [None]:
from pyspark.sql.functions import *
all_data.select([count(when(isnull(c), c)).alias(c) for c in all_data.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [None]:
new_df = all_data.select([count(when(isnull(c), c)).alias(c) for c in all_data.columns])
new_df.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



## Preprocessing 

**Can you show me the name column from your temporary table?**

In [None]:
spark.sql('SELECT Name FROM all_df_view').show(5,truncate = False)

+---------------------------------------------------+
|Name                                               |
+---------------------------------------------------+
|Braund, Mr. Owen Harris                            |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)|
|Heikkinen, Miss. Laina                             |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)       |
|Allen, Mr. William Henry                           |
+---------------------------------------------------+
only showing top 5 rows



**Run this code:**

In [None]:
combined = all_data.withColumn('Title',regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined_view')

**Display the title and count "Title" column:**

In [None]:
spark.sql('SELECT Title,COUNT(Title) FROM combined_view GROUP BY TITLE').show(truncate = False)

+--------+------------+
|Title   |count(Title)|
+--------+------------+
|Don     |1           |
|Miss    |257         |
|Countess|2           |
|Col     |4           |
|Rev     |9           |
|Lady    |2           |
|Master  |56          |
|Mme     |1           |
|Capt    |2           |
|Mr      |786         |
|Dr      |11          |
|Mrs     |186         |
|Sir     |2           |
|Jonkheer|2           |
|Mlle    |4           |
|Major   |3           |
|Ms      |1           |
+--------+------------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [None]:
rare_list = ["Dr", "Rev", "Major", "Col", "Mlle", "Capt", "Don", "Jonkheer", "Countess", "Ms", "Sir", "Lady", "Mme"]

In [None]:
title_dictionary = {"Dr":"rare","Rev":"rare","Major":"rare","Col":"rare", "Mlle":"rare", "Capt":"rare", "Don":"rare", "Jonkheer":"rare", "Countess":"rare", "Ms":"rare", "Sir":"rare", "Lady":"rare", "Mme":"rare"}

**Run the function:**

In [None]:
def impute_title(title):
    return titles_map[title]

In [None]:
def impute_title(title):
    if title in rare_list:
        return "rare"
    else:
        return title

**Apply the function on "Title" column using UDF:**

In [None]:
udf_data = udf(lambda z: impute_title(z),StringType())

In [None]:
#udf_data = udf(impute_title)
title_df = combined.withColumn("Title",udf_data(combined['Title']))

**Display "Title" from table and group by "Title" column:**

In [None]:
title_df.groupBy("Title").count().show()

+------+-----+
| Title|count|
+------+-----+
|  rare|   44|
|  Miss|  257|
|Master|   56|
|    Mr|  786|
|   Mrs|  186|
+------+-----+



In [None]:
combined.show(truncate = False)

+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|Name                                                   |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|Title |
+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|1          |0       |3     |Braund, Mr. Owen Harris                                |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |Mr    |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |Mrs   |
|3          |1       |3     |Heikkinen, Miss. Laina                                 |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |Miss  |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath 

## **Preprocessing Age**

**Based on the age mean, you will fill in the missing age values:**

In [None]:
avg_age = combined.select(F.mean("Age")).collect()
avg_age[0][0]

30.079501879699244

**Fill missing age with age mean:**

In [None]:
df_filled = combined.na.fill(avg_age[0][0],subset = ['Age'])
df_filled.show(truncate= False)

+-----------+--------+------+-------------------------------------------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|Name                                                   |Sex   |Age               |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|Title |
+-----------+--------+------+-------------------------------------------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|1          |0       |3     |Braund, Mr. Owen Harris                                |male  |22.0              |1    |0    |A/5 21171       |7.25   |null |S       |Mr    |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |female|38.0              |1    |0    |PC 17599        |71.2833|C85  |C       |Mrs   |
|3          |1       |3     |Heikkinen, Miss. Laina                                 |female|26.0              |0    |0    |STON/O2. 3101282|7.925

## **Preprocessing Embarked**

**Select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [None]:
grouped_Embarked = df_filled.groupBy("Embarked").count().orderBy('count',ascending = False)

**Show groupped_Embarked:**

In [None]:
grouped_Embarked.show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       C|  253|
|       Q|  111|
|    null|    3|
+--------+-----+



**Get the groupped_Embarked:** 

In [None]:
mode = grouped_Embarked.orderBy("count",ascending = False).first()[0]

**Fill missing values with grouped_Embarked:**

In [None]:
df_filled2 = df_filled.na.fill(mode,subset = ['Embarked'])
df_filled2.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [None]:
from pyspark.sql.functions import substring
df_filled3 = df_filled2.withColumn(
  'Cabin', substring('Cabin', 1, 1))

**Show the result:**

In [None]:
df_filled3.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

**Create the temporary view:**

In [None]:
df_filled3.createOrReplaceTempView("df_view")

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [None]:
df_filled3.select("cabin").groupBy("cabin").count().orderBy("count",ascending = False).show()

+-----+-----+
|cabin|count|
+-----+-----+
| null| 1021|
|    C|   82|
|    B|   77|
|    D|   52|
|    E|   51|
|    A|   23|
|    F|   18|
|    G|    4|
|    T|    1|
+-----+-----+



In [None]:
spark.sql('''SELECT Cabin, count(*) AS count
             FROM df_view
             GROUP BY Cabin
             ORDER BY count DESC''').show()

+-----+-----+
|Cabin|count|
+-----+-----+
| null| 1021|
|    C|   82|
|    B|   77|
|    D|   52|
|    E|   51|
|    A|   23|
|    F|   18|
|    G|    4|
|    T|    1|
+-----+-----+



**Fill missing values with "U":**

In [None]:
df_filled4 = df_filled3.na.fill("U",subset = ["Cabin"])
df_filled4.show(5,truncate = False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|Title|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |U    |S       |Mr   |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C    |C       |Mrs  |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |U    |S       |Miss |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|

In [None]:
df_filled4.select("cabin").groupBy("cabin").count().orderBy("count",ascending = False).show()

+-----+-----+
|cabin|count|
+-----+-----+
|    U| 1021|
|    C|   82|
|    B|   77|
|    D|   52|
|    E|   51|
|    A|   23|
|    F|   18|
|    G|    4|
|    T|    1|
+-----+-----+



**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

____________________________________________

**Create list comprehension, use StringIndexer to Converting "Sex, Embarked, Title, and Cabin" columns to column name+index like "Title_index":**

In [None]:
df_filled4.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

In [None]:
trainDF, testDF = df_filled4.randomSplit([.8,.2],seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

There are 1094 rows in the training set, and 235 in the test set


**Use Pipline to fit and transform:**

In [None]:
categoricalCols = ["Sex", "Embarked", "Title", "Cabin"]

In [None]:
indexOutputCols = [x + "_Index" for x in categoricalCols]
indexOutputCols

['Sex_Index', 'Embarked_Index', 'Title_Index', 'Cabin_Index']

In [None]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')

In [None]:
numericCols = [field for (field,dataType) in trainDF.dtypes
              if ((dataType=='double') or (dataType=='int') and (field !='Survived') )]
numericCols

['PassengerId', 'Pclass', 'Age', 'SibSp', 'Parch', 'Fare']

In [None]:
assemblerInputs = indexOutputCols + numericCols
assemblerInputs

['Sex_Index',
 'Embarked_Index',
 'Title_Index',
 'Cabin_Index',
 'PassengerId',
 'Pclass',
 'Age',
 'SibSp',
 'Parch',
 'Fare']

In [None]:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Survived',featuresCol='features')
from pyspark.ml import Pipeline
pipeline =Pipeline(stages = [stringIndexer,vecAssembler,lr])

In [None]:
pipelineModel = pipeline.fit(trainDF)

In [None]:
predDF = pipelineModel.transform(testDF)

In [None]:
predDF.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+-----+---------+--------------+-----------+-----------+--------------------+--------------------+--------------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Title|Sex_Index|Embarked_Index|Title_Index|Cabin_Index|            features|       rawPrediction|         probability|prediction|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+-----+---------+--------------+-----------+-----------+--------------------+--------------------+--------------------+----------+
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S| Miss|      1.0|           0.0|        1.0|        0.0|[1.0,0.0,1.0,0.0,...|[-0.5415354743287...|[

**Drop "Sex, PassengerId, Name, Title, SibSp, Parch, Ticket, Cabin, Embarked" columns**

In [None]:
drop_columns = ["Sex", "PassengerId", "Name", "Title", "SibSp", "Parch", "Ticket", "Cabin", "Embarked"]
df = df_filled4.drop(*drop_columns)
df.show(5)

+--------+------+----+-------+
|Survived|Pclass| Age|   Fare|
+--------+------+----+-------+
|       0|     3|22.0|   7.25|
|       1|     1|38.0|71.2833|
|       1|     3|26.0|  7.925|
|       1|     1|35.0|   53.1|
|       0|     3|35.0|   8.05|
+--------+------+----+-------+
only showing top 5 rows



**Convert to pandas**

In [None]:
df_panda = df.toPandas()

**Display result**

In [None]:
df_panda

Unnamed: 0,Survived,Pclass,Age,Fare
0,0,3,22.000000,7.2500
1,1,1,38.000000,71.2833
2,1,3,26.000000,7.9250
3,1,1,35.000000,53.1000
4,0,3,35.000000,8.0500
...,...,...,...,...
1324,0,2,27.000000,13.0000
1325,1,1,19.000000,30.0000
1326,0,3,30.079502,23.4500
1327,1,1,26.000000,30.0000


**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None) A feature transformer that merges multiple columns into a vector column.**



**Use VectorAssembler and set InputCols the to "train from first coloumn to end" and set the Output to "features"** 

In [None]:
vector_input = list(df_panda.columns[1:])

In [None]:
vecAssembler = VectorAssembler(inputCols=vector_input,outputCol='features')

**Use VectorAssembler and set the "InputCols" to test all column and set the "Output" to "features":**

In [None]:
vecAssembler2 = VectorAssembler(inputCols=list(df_panda.columns),outputCol='features')

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [None]:
x_train, X_test = df.randomSplit([.8,.2],seed=42)

**Use RandomForestClassifier to fit and transform then display "prediction, Survived, features" columns**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='Survived',featuresCol='features')
pipeline =Pipeline(stages = [vecAssembler,rf])

In [None]:
pipelineModel = pipeline.fit(x_train)
predDF = pipelineModel.transform(X_test)
predDF.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|     [1.0,19.0,53.1]|
|       1.0|       0|     [1.0,24.0,79.2]|
|       1.0|       0|   [1.0,25.0,151.55]|
|       1.0|       0|     [1.0,29.0,66.6]|
|       0.0|       0|[1.0,30.079501879...|
+----------+--------+--------------------+
only showing top 5 rows



**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
classificationEvaluator = MulticlassClassificationEvaluator(predictionCol='prediction',
                                         labelCol='Survived',
                                         metricName='accuracy')

**Use the model to predict**

In [None]:
acc = classificationEvaluator.evaluate(predDF)

In [None]:
acc

0.723404255319149

**When you are finished send the project via Google classroom**
**Please let me know if you have any questions.**
* nabieh.mostafa@yahoo.com
* +201015197566 (Whatsapp)

**Don't Hate me, I push you to learn**

**I will help you to become an awesome data engineer.**

**Why did I say that "Data Engineer"?**

**Tricky question, but an optional question, if you would like to know the answer, ask me.**
