Let's start with your project: 

Are you a data scientist? 

I think you are an awesome a data scientist.

### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 46.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=f91b72bdeab689959b5ff323a955c0c2e59df433e3b297c0a24690b1871be53c
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


## Build Spark Session

In [None]:
# importing libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Lab01').getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [None]:
# read test dataset
df_test = spark.read.csv("test.csv",header=True,inferSchema=True)

In [None]:
# read train dataset
df_train = spark.read.csv("train.csv",header=True,inferSchema=True)

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [None]:
# Type of test dataset
type(df_test)

pyspark.sql.dataframe.DataFrame

In [None]:
# Type of train dataset
type(df_train)

pyspark.sql.dataframe.DataFrame

**Show 5 rows.**

In [None]:
# Show 5 row of test dataset
df_test.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|   17453|89.1042|  C92|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|A/5 2817|   8.05| null|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|  349240| 7.8958| null|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|   13509|  26.55|  E38|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|   17464|51.8625|  D21|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



In [None]:
# Show 5 row of train dataset
df_train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [None]:
#schema of test dataset
df_test.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
#schema of train dataset
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [None]:
# Summary of test dataset
df_test.summary().show()

+-------+------------------+------------------+------------------+--------------------+------+------------------+-----------------+------------------+------------------+-----------------+-----+--------+
|summary|       PassengerId|          Survived|            Pclass|                Name|   Sex|               Age|            SibSp|             Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+------------------+------------------+------------------+--------------------+------+------------------+-----------------+------------------+------------------+-----------------+-----+--------+
|  count|               438|               438|               438|                 438|   438|               350|              438|               438|               438|              438|  104|     437|
|   mean|             219.5|0.3721461187214612|2.2831050228310503|                null|  null|30.855485714285713| 0.45662100456621| 0.365296803652968|228762.85970149253|32.04006803652967| 

In [None]:
# Summary of train dataset
df_train.summary().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [None]:
total_count=df_train.count()
total_count

891

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [None]:
survived_count = df_train[df_train['Survived']==1].count()
unsurvive_count = df_train[df_train['Survived']==0].count()

**Display your result:**

In [None]:
print("num of people survived: ", survived_count)
print("num of people didn't survive: ", unsurvive_count)

num of people survived:  342
num of people didn't survive:  549


In [None]:
survived_df = df_train.groupby('Survived').count()
survived_df.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



**Can you display your answer in ratio form?(Hint: Use "UDF" Function. (Hint: Use "UDF" Function. This is a hint you can use any method.)**






In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [None]:
survived_UDF = udf(lambda x: x/total_count)

In [None]:
survived_df.withColumn("Survived col", survived_UDF(F.col("count"))) \
  .show(truncate=False)

+--------+-----+------------------+
|Survived|count|Survived col      |
+--------+-----+------------------+
|1       |342  |0.3838383838383838|
|0       |549  |0.6161616161616161|
+--------+-----+------------------+



**Can you get the number of males and females?**


In [None]:
Sex_df = df_train.groupby('Sex').count()
Sex_df.show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



In [None]:
males_count = df_train[df_train['Sex']=='male'].count()
females_count = df_train[df_train['Sex']=='female'].count()

In [None]:
print("num of people males: ", males_count)
print("num of people females: ", females_count)

num of people males:  577
num of people females:  314


**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column. This is a hint you can use any method.)

In [None]:
avgSurvives_df=df_train.select('Survived', 'Sex').groupBy('Sex').avg('Survived')
avgSurvives_df.show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



In [None]:
countSurvives_df = df_train.groupby('Sex','Survived').count()
countSurvives_df.show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



**Create temporary view PySpark:**

In [None]:
df_train.createOrReplaceTempView("df_view")


**How many people survived, and how many didn't survive? By SQL:**

In [None]:
spark.sql("select Survived , count(Survived) as Count_Survived from df_view group by Survived ") \
     .show(truncate=False)

+--------+--------------+
|Survived|Count_Survived|
+--------+--------------+
|1       |342           |
|0       |549           |
+--------+--------------+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column. This is a hint you can use any method.)

**Can you do this via SQL?**

In [None]:
spark.sql("select Sex , avg(Survived) from df_view group by Sex") \
     .show(truncate=False) 

+------+-------------------+
|Sex   |avg(Survived)      |
+------+-------------------+
|female|0.7420382165605095 |
|male  |0.18890814558058924|
+------+-------------------+



**Display a ratio for "p-class": SUM(Survived)/count for p-class**


In [None]:
spark.sql('SELECT Pclass, sum(Survived)/count(Survived) as pclass_ratio FROM df_view  GROUP BY Pclass ORDER BY Pclass') \
     .show(truncate=False)

+------+-------------------+
|Pclass|pclass_ratio       |
+------+-------------------+
|1     |0.6296296296296297 |
|2     |0.47282608695652173|
|3     |0.24236252545824846|
+------+-------------------+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [None]:
df_merge = df_train.union(df_test)
df_merge.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display count:**

In [None]:
print("# of row in dataset after mrege: ",df_merge.count())

# of row in dataset after mrege:  1329


**Can you define the number of null values in each column?**


In [None]:
null_count=df_merge.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_merge.columns])

In [None]:
null_count.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [None]:
null_count.createOrReplaceTempView("null_count_view")


In [None]:
spark.sql('select Age,Cabin,Embarked from null_count_view').show()

+---+-----+--------+
|Age|Cabin|Embarked|
+---+-----+--------+
|265| 1021|       3|
+---+-----+--------+



## Preprocessing 

**Create Temporary view PySpark:**

In [None]:
df_merge.createOrReplaceTempView("df_merge_view")

**Can you show the "name" column from your temporary table?**

In [None]:
spark.sql('select  PassengerId,Name from df_merge_view').show(5)

+-----------+--------------------+
|PassengerId|                Name|
+-----------+--------------------+
|          1|Braund, Mr. Owen ...|
|          2|Cumings, Mrs. Joh...|
|          3|Heikkinen, Miss. ...|
|          4|Futrelle, Mrs. Ja...|
|          5|Allen, Mr. Willia...|
+-----------+--------------------+
only showing top 5 rows



**Run this code:**

In [None]:
import pyspark.sql.functions as F
combined = df_merge.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

**Display "Title" column and count "Title" column:**

In [None]:
spark.sql('select Title, count(Title) from combined group by Title').show()

+--------+------------+
|   Title|count(Title)|
+--------+------------+
|     Don|           1|
|    Miss|         257|
|Countess|           2|
|     Col|           4|
|     Rev|           9|
|    Lady|           2|
|  Master|          56|
|     Mme|           1|
|    Capt|           2|
|      Mr|         786|
|      Dr|          11|
|     Mrs|         186|
|     Sir|           2|
|Jonkheer|           2|
|    Mlle|           4|
|   Major|           3|
|      Ms|           1|
+--------+------------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [None]:
titles_map = {'Don': 'rare', 'Miss': 'Miss', 'Countess': 'rare','Col':'rare', 'Rev':'rare', 'Lady':'rare', 'Master':'Master', 'Mme':'rare', 'Capt':'rare', 'Mr':'Mr', 'Dr':'rare', 'Mrs':'Mrs', 'Sir':'rare', 'Jonkheer':'rare', 'Mlle':'rare', 'Major':'rare','Ms':'rare'  }
print(titles_map)

{'Don': 'rare', 'Miss': 'Miss', 'Countess': 'rare', 'Col': 'rare', 'Rev': 'rare', 'Lady': 'rare', 'Master': 'Master', 'Mme': 'rare', 'Capt': 'rare', 'Mr': 'Mr', 'Dr': 'rare', 'Mrs': 'Mrs', 'Sir': 'rare', 'Jonkheer': 'rare', 'Mlle': 'rare', 'Major': 'rare', 'Ms': 'rare'}


**Run the function:**

In [None]:
def impute_title(title):
    return titles_map[title]# Title_map is your dictionary. please change this name with your dictionary name.

**Apply the function on "Title" column using UDF:**

In [None]:
impute_titleUDF = udf(lambda z:impute_title(z)) 

In [None]:
combined.select(
    impute_titleUDF(F.col("Title")).alias("Title") ) \
   .show(truncate=False)

+------+
|Title |
+------+
|Mr    |
|Mrs   |
|Miss  |
|Mrs   |
|Mr    |
|Mr    |
|Mr    |
|Master|
|Mrs   |
|Mrs   |
|Miss  |
|Miss  |
|Mr    |
|Mr    |
|Miss  |
|Mrs   |
|Master|
|Mr    |
|Mrs   |
|Mrs   |
+------+
only showing top 20 rows



**Display "Title" from table and group by "Title" column:**

In [None]:
spark.udf.register("convertUDF2", impute_title,StringType())
spark.sql("select Title, convertUDF2(Title) as impute_Title from combined group by Title") \
     .show(truncate=False)

+--------+------------+
|Title   |impute_Title|
+--------+------------+
|Don     |rare        |
|Miss    |Miss        |
|Countess|rare        |
|Col     |rare        |
|Rev     |rare        |
|Lady    |rare        |
|Master  |Master      |
|Mme     |rare        |
|Capt    |rare        |
|Mr      |Mr          |
|Dr      |rare        |
|Mrs     |Mrs         |
|Sir     |rare        |
|Jonkheer|rare        |
|Mlle    |rare        |
|Major   |rare        |
|Ms      |rare        |
+--------+------------+



In [None]:
spark.sql("SELECT Title,count(Title) FROM combined group by Title").show()

+--------+------------+
|   Title|count(Title)|
+--------+------------+
|     Don|           1|
|    Miss|         257|
|Countess|           2|
|     Col|           4|
|     Rev|           9|
|    Lady|           2|
|  Master|          56|
|     Mme|           1|
|    Capt|           2|
|      Mr|         786|
|      Dr|          11|
|     Mrs|         186|
|     Sir|           2|
|Jonkheer|           2|
|    Mlle|           4|
|   Major|           3|
|      Ms|           1|
+--------+------------+



## **Preprocessing Age**

**Based on the "age" column mean, you will fill in the missing age values:**

In [None]:
avg_age= combined.select(F.avg(combined.Age))
avg_age.show()


+------------------+
|          avg(Age)|
+------------------+
|30.079501879699244|
+------------------+



**Fill missing with "age" mean:**

In [None]:
combined=combined.na.fill({'age': 30.079501879699244 })

## **Preprocessing Embarked**

**Select "Embarked" column, count them, order by count Desc, and save in grouped_Embarked variable:**




In [None]:
grouped_Embarked=spark.sql("SELECT Embarked,count(Embarked) FROM combined group by Embarked order by count(Embarked) DESC")


**Show "groupped_Embarked" your variable:**

In [None]:
grouped_Embarked.show()

+--------+---------------+
|Embarked|count(Embarked)|
+--------+---------------+
|       S|            962|
|       C|            253|
|       Q|            111|
|    null|              0|
+--------+---------------+



**Get max of groupped_Embarked:** 

In [None]:
max_groupped_Embarked= grouped_Embarked.select(F.max(F.col('count(Embarked)')))
max_groupped_Embarked.show()

+--------------------+
|max(count(Embarked))|
+--------------------+
|                 962|
+--------------------+



**Fill missing values with max 'S' of grouped_Embarked:**

In [None]:
combined=combined.na.fill({'Embarked': 'S' })

## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [None]:
def select_first(x):
  if x==None:
    return None
  else:  
    return x[0]
replaceUDF = udf(lambda z: select_first(z))  

In [None]:
df_cabin=combined.withColumn("Cabin", replaceUDF(F.col("Cabin")))

**Show the result:**

**Create the temporary view:**

In [None]:
df_cabin.createOrReplaceTempView('df_cabin2')

**Select "Cabin" column, count "Cabin" column, Group by "Cabin" column, Order By count DESC**  

In [None]:
grouped_Cabin=spark.sql("SELECT Cabin,count(Cabin) FROM df_cabin2 group by Cabin order by count(Cabin) DESC")


**Fill missing values with "U":**

In [None]:
combined= combined.na.fill({'Cabin': 'U' })

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
combined.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = false)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)



In [None]:

null_count=combined.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in combined.columns])
null_count.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|Title|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|    0|       0|    0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+



In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [None]:
trainDF, testDF = combined.randomSplit([.8,.2],seed=42)

In [None]:
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "_Index" for x in categoricalCols]
oheOutputCols = [x + "_OHE" for x in categoricalCols]
numericCols = [field for (field,dataType) in trainDF.dtypes if ((dataType=='integer')& (field!='Survived'))]

In [None]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)
assemblerInputs = oheOutputCols + numericCols


**OneHotEncoder(inputCols=None, outputCols=None)**

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None). A feature transformer that merges multiple columns into a vector column.**



In [None]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=32)

In [None]:
pipeline =Pipeline(stages = [stringIndexer,oheEncoder,vecAssembler,rf])

In [None]:
pipelineModel = pipeline.fit(trainDF)

In [None]:
predDF = pipelineModel.transform(testDF)

In [None]:
predDF.select('features','Survived','prediction').show(5)

+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|(1600,[600,811,11...|       0|       0.0|
|(1600,[533,1414,1...|       1|       1.0|
|(1600,[684,811,87...|       0|       0.0|
|(1600,[515,1225,1...|       1|       0.0|
|(1600,[366,811,11...|       1|       0.0|
+--------------------+--------+----------+
only showing top 5 rows



**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predDF)
print("accuracy: ",accuracy)
print("Test Error = %g" % (1.0 - accuracy))

accuracy:  0.6713286713286714
Test Error = 0.328671


**When you are finished send the project via Google classroom**
**Please let me know if you have any questions.**
* nabieh.mostafa@yahoo.com
* +201015197566 (Whatsapp)

**Don't Hate me, I push you to learn**

**I will help you to become an awesome data engineer.**

**Why did I say that "Data Engineer"?**

**Tricky question, but an optional question, if you would like to know the answer, ask me.**
