Let's start with your project: 

### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [3]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [6]:
import findspark
findspark.init()

In [7]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=1200034760796f7170b88397a855549b4849c8076560f45530841f0f8a3b6cee
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [8]:
import pyspark
from pyspark.sql import *

In [9]:
from pyspark.sql.functions import *
from pyspark.sql import SQLContext

## Build Spark Session

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .appName("Colab")\
        .getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [12]:
df = spark.read.csv("train_spark.csv",header=True,inferSchema=True)

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [14]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

**Show 5 rows.**

In [15]:
df.show(5, truncate=False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35.0|1    |0    |113803          |53

**Display schema for the dataset:**

In [16]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [17]:
df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [18]:
all=df.count()
# all=spark.sql("select count(*) from titanic").show()
print(all)


891


**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

**Display your result:**

In [19]:
Survived=df.filter(df['Survived'] == 1).count()
print(Survived)

342


**Can you display your answer in ratio form?(Hint: Use "UDF" Function. (Hint: Use "UDF" Function. This is a hint you can use any method.)**






In [20]:
Survived/all

0.3838383838383838

**Can you get the number of males and females?**


**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column. This is a hint you can use any method.)

In [21]:
males=df.filter(df['Sex']=='male').count()
print(males)

577


In [22]:
females=all-males
print(females)

314


[link text](https://)**Create temporary view PySpark:**

In [23]:
df.createTempView('titanic')

**How many people survived, and how many didn't survive? By SQL:**

In [24]:
survived_sql=df.select('Survived').where(col('Survived')==1).count()
not_survived_sql=df.select('Survived').where(col('Survived')==0).count()
print('survived:',survived_sql)
print('didnt survived:',not_survived_sql)

# spark.sql("select sum(Survived) from titanic where Survived==0").show()

survived: 342
didnt survived: 549


**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column. This is a hint you can use any method.)

**Can you do this via SQL?**

In [25]:
males_survivors=df.select('Sex').where(col('Sex')=='male').where(col('Survived')==1).count()/males
print('males_survivors:',males_survivors)

females_survivors=df.select('Sex').where(col('Sex')=='female').where(col('Survived')==1).count()/females
print('females_survivors:',females_survivors)

males_survivors: 0.18890814558058924
females_survivors: 0.7420382165605095


**Display a ratio for "p-class": SUM(Survived)/count for p-class**


In [26]:
df.groupby(['Survived','Pclass']).count().show()

+--------+------+-----+
|Survived|Pclass|count|
+--------+------+-----+
|       1|     2|   87|
|       1|     1|  136|
|       1|     3|  119|
|       0|     1|   80|
|       0|     2|   97|
|       0|     3|  372|
+--------+------+-----+



**Let's take a break and continue after this.**

## Data Cleaning

In [32]:
df1 = spark.read.csv("test_spark.csv",header=True,inferSchema=True)

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [33]:
df2=df.union(df1)
df2.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

**Display count:**

In [34]:
df2.count()

1329

**Can you define the number of null values in each column?**


In [35]:
nulls=df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns])
nulls.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [36]:
nulls.describe().show()

+-------+-----------+--------+------+----+----+-----+-----+-----+------+----+------+--------+
|summary|PassengerId|Survived|Pclass|Name| Sex|  Age|SibSp|Parch|Ticket|Fare| Cabin|Embarked|
+-------+-----------+--------+------+----+----+-----+-----+-----+------+----+------+--------+
|  count|          1|       1|     1|   1|   1|    1|    1|    1|     1|   1|     1|       1|
|   mean|        0.0|     0.0|   0.0| 0.0| 0.0|265.0|  0.0|  0.0|   0.0| 0.0|1021.0|     3.0|
| stddev|       null|    null|  null|null|null| null| null| null|  null|null|  null|    null|
|    min|          0|       0|     0|   0|   0|  265|    0|    0|     0|   0|  1021|       3|
|    max|          0|       0|     0|   0|   0|  265|    0|    0|     0|   0|  1021|       3|
+-------+-----------+--------+------+----+----+-----+-----+-----+------+----+------+--------+



## Preprocessing 

**Create Temporary view PySpark:**

In [37]:
df2.createTempView('Preprocessing_')

**Can you show the "name" column from your temporary table?**

In [38]:
df_name = spark.sql("SELECT Name FROM Preprocessing_")
df_name.show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



**Run this code:**

In [39]:
import pyspark.sql.functions as F
df2 = df2.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
df2.createOrReplaceTempView('df2')

**Display "Title" column and count "Title" column:**

In [40]:
df2.groupBy("Title").count().show()

+--------+-----+
|   Title|count|
+--------+-----+
|     Don|    1|
|    Miss|  257|
|Countess|    2|
|     Col|    4|
|     Rev|    9|
|    Lady|    2|
|  Master|   56|
|     Mme|    1|
|    Capt|    2|
|      Mr|  786|
|      Dr|   11|
|     Mrs|  186|
|     Sir|    2|
|Jonkheer|    2|
|    Mlle|    4|
|   Major|    3|
|      Ms|    1|
+--------+-----+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [41]:
i=['Dr', 'Rev', 'Major', 'Col', 'Mlle', 'Capt', 'Don', 'Jonkheer', 'Countess', 'Ms', 'Sir', 'Lady', 'Mme']
y=['rare']*13
rares=dict(zip(i,y))
print(rares)

{'Dr': 'rare', 'Rev': 'rare', 'Major': 'rare', 'Col': 'rare', 'Mlle': 'rare', 'Capt': 'rare', 'Don': 'rare', 'Jonkheer': 'rare', 'Countess': 'rare', 'Ms': 'rare', 'Sir': 'rare', 'Lady': 'rare', 'Mme': 'rare'}


**Run the function:**

**Apply the function on "Title" column using UDF:**

In [42]:
def impute_title(title):
    if title in rares:
      return rares[title]# Title_map is your dictionary. please change this name with your dictionary name.
    else:
      return title

**Display "Title" from table and group by "Title" column:**

In [43]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

imputeUDF = udf(lambda z: impute_title(z))

combined_df = df2.withColumn("Title", imputeUDF(col("Title")))
combined_df.groupby('Title').count().show()

+------+-----+
| Title|count|
+------+-----+
|  rare|   44|
|  Miss|  257|
|Master|   56|
|    Mr|  786|
|   Mrs|  186|
+------+-----+



In [44]:
combined_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|    Mr|
|          6|       0|  

## **Preprocessing Age**

**Based on the "age" column mean, you will fill in the missing age values:**


**Fill missing with "age" mean:**

In [45]:
mean_age = combined_df.select(mean(combined_df['Age'])).collect()
mean = mean_age[0][0]
combined_df=combined_df.fillna(mean,subset=['Age'])
combined_df.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

In [46]:
combined_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in combined_df.columns]).show()
print(combined_df.dtypes)


+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|Title|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0| 1021|       3|    0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+

[('PassengerId', 'int'), ('Survived', 'int'), ('Pclass', 'int'), ('Name', 'string'), ('Sex', 'string'), ('Age', 'double'), ('SibSp', 'int'), ('Parch', 'int'), ('Ticket', 'string'), ('Fare', 'double'), ('Cabin', 'string'), ('Embarked', 'string'), ('Title', 'string')]


## **Preprocessing Embarked**

**Select "Embarked" column, count them, order by count Desc, and save in grouped_Embarked variable:**




In [47]:
grouped_Embarked=combined_df.groupBy("Embarked").count().orderBy("Embarked",ascending=False)

**Show "groupped_Embarked" your variable:**

In [48]:
grouped_Embarked.show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       Q|  111|
|       C|  253|
|    null|    3|
+--------+-----+



**Get max of groupped_Embarked:** 

In [49]:
import pyspark.sql.functions as F
max=grouped_Embarked.select(F.max('count'))
print(max)

DataFrame[max(count): bigint]


**Fill missing values with max 'S' of grouped_Embarked:**

> Indented block



In [50]:
combined_df=combined_df.na.fill('S',subset=['Embarked'])
combined_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in combined_df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|Title|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0| 1021|       0|    0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**




In [51]:
combined_df=combined_df.withColumn('new_cabin',substring('Cabin',0,1))

**Show the result:**

In [52]:
combined_df.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+---------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|new_cabin|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+---------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|     null|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|        C|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|     null|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1| C123|     

In [53]:
combined_df=combined_df.drop('Cabin')

In [54]:
combined_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in combined_df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+--------+-----+---------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Embarked|Title|new_cabin|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+-----+---------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|       0|    0|     1021|
+-----------+--------+------+----+---+---+-----+-----+------+----+--------+-----+---------+



**Create the temporary view:**

In [55]:
combined_df.createTempView('cabin_')

**Select "Cabin" column, count "Cabin" column, Group by "Cabin" column, Order By count DESC**  

In [56]:
combined_df.groupBy("new_cabin").count().orderBy("count",ascending=False).show()

+---------+-----+
|new_cabin|count|
+---------+-----+
|     null| 1021|
|        C|   82|
|        B|   77|
|        D|   52|
|        E|   51|
|        A|   23|
|        F|   18|
|        G|    4|
|        T|    1|
+---------+-----+



**Fill missing values with "U":**

In [57]:
combined_df=combined_df.na.fill('U',subset=['new_cabin'])

In [58]:
combined_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)
 |-- new_cabin: string (nullable = false)



**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

In [59]:
train,test=combined_df.randomSplit([0.8,0.2],seed=42)
# airbnbDF.randomSplit([.8,.2],seed=42)

**StringIndexer(inputCol=None, outputCol=None)**

In [60]:
import pyspark.ml.feature as ml
categoricalCols = [field for (field, dataType) in combined_df.dtypes
                   if ((dataType == "string") & (field not in ['Cabin','Name','Ticket']))]
indexOutputCols = [x + "_Index" for x in categoricalCols]
oheOutputCols = [x + "_OHE" for x in categoricalCols]
indexOutputCols

['Sex_Index', 'Embarked_Index', 'Title_Index', 'new_cabin_Index']

In [61]:
print(oheOutputCols)

['Sex_OHE', 'Embarked_OHE', 'Title_OHE', 'new_cabin_OHE']


In [62]:
stringIndexer = ml.StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')
print(indexOutputCols)

['Sex_Index', 'Embarked_Index', 'Title_Index', 'new_cabin_Index']


**OneHotEncoder(inputCols=None, outputCols=None)**

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

In [63]:
oheEncoder = ml.OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None). A feature transformer that merges multiple columns into a vector column.**



In [64]:
numericCols = [field for (field,dataType) in combined_df.dtypes
              if ((dataType!='string')& (field not in ['Survived','PassengerId']))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = ml.VectorAssembler(inputCols=assemblerInputs,outputCol='features')


**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [65]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

In [66]:
RFC=RandomForestClassifier(labelCol='Survived')
pipeline =Pipeline(stages = [stringIndexer,oheEncoder,vecAssembler,RFC])

In [67]:
pipelineModel = pipeline.fit(train)
predDF = pipelineModel.transform(test)

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [68]:
predDF.select('features','Survived','prediction').show(5)
predDF.select('features').show(2,truncate=False)

+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|(20,[1,4,7,15,16,...|       1|       1.0|
|(20,[0,1,3,11,15,...|       0|       0.0|
|(20,[1,5,7,15,16,...|       1|       1.0|
|(20,[0,1,3,7,15,1...|       0|       0.0|
|(20,[2,5,7,15,16,...|       1|       1.0|
+--------------------+--------+----------+
only showing top 5 rows

+-----------------------------------------------------------+
|features                                                   |
+-----------------------------------------------------------+
|(20,[1,4,7,15,16,19],[1.0,1.0,1.0,3.0,26.0,7.925])         |
|(20,[0,1,3,11,15,16,19],[1.0,1.0,1.0,1.0,1.0,54.0,51.8625])|
+-----------------------------------------------------------+
only showing top 2 rows



In [69]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [70]:
evaluator=MulticlassClassificationEvaluator(predictionCol='prediction',
                                         labelCol='Survived',
                                         metricName='accuracy')

In [71]:
accuracy = evaluator.evaluate(predDF)
#print("RMSE is {:.1f}".format(rmse))
print(f"Accuracy is {accuracy:}")

Accuracy is 0.8340425531914893
