Let's start with your project: 

Are you a data scientist? 

I think you are an awesome a data scientist.

### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [5]:
import findspark
findspark.init()
from pyspark.sql import *
from pyspark.sql.functions import col,countDistinct,expr,concat,regexp_extract

## Build Spark Session

In [6]:
spark = (SparkSession
         .builder
         .appName('PracticalSession')
         .getOrCreate())

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [3]:
Train_df = spark.read.csv('train.csv',header=True,inferSchema=True)
Test_df  = spark.read.csv('test.csv',header=True,inferSchema=True) 

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [4]:
print(type(Train_df))

<class 'pyspark.sql.dataframe.DataFrame'>


**Show 5 rows.**

In [5]:
Train_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [6]:
Train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [7]:
Train_df.summary().toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7
summary,count,mean,stddev,min,25%,50%,75%,max
PassengerId,891,446.0,257.3538420152301,1,223,446,669,891
Survived,891,0.3838383838383838,0.48659245426485753,0,0,0,1,1
Pclass,891,2.308641975308642,0.8360712409770491,1,2,3,3,3
Name,891,,,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",,,,"van Melkebeke, Mr. Philemon"
Sex,891,,,female,,,,male
Age,714,29.69911764705882,14.526497332334035,0.42,20.0,28.0,38.0,80.0
SibSp,891,0.5230078563411896,1.1027434322934315,0,0,0,1,8
Parch,891,0.38159371492704824,0.8060572211299488,0,0,0,0,6
Ticket,891,260318.54916792738,471609.26868834975,110152,19996.0,236171.0,347743.0,WE/P 5735


## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [8]:
Train_df.count()


891

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [9]:
Train_df.select('Survived').show()

+--------+
|Survived|
+--------+
|       0|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       0|
|       1|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       1|
|       0|
|       1|
|       0|
|       1|
+--------+
only showing top 20 rows



In [10]:

Survived = Train_df.filter(Train_df['Survived']=='1').count()

In [11]:
Not_Survived =Train_df.filter(Train_df['Survived']=='0').count()

**Display your result:**

In [12]:
print('Survived' ,Train_df.filter(Train_df['Survived']=='1').count())
print('Not Survived' ,Train_df.filter(Train_df['Survived']=='0').count())

Survived 342
Not Survived 549


**Can you display your answer in ratio form?(Hint: Use UDF.)**






In [13]:
def ratio(x,a):
    return (x/a)*100
    

In [14]:
print('Survied ratio ',ratio(Survived,Train_df.count()))
print('Not Survied ratio ',ratio(Not_Survived,Train_df.count()))



Survied ratio  38.38383838383838
Not Survied ratio  61.61616161616161


**Can you get the number of males and females?**


In [15]:
male_num = Train_df.filter(Train_df['Sex']=='male').count()

print('male number is ',male_num)

female_num = Train_df.filter(Train_df['Sex']=='female').count()

print('female number is ',female_num)


male number is  577
female number is  314


**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column.)

In [16]:
gender_sur = Train_df.groupBy("Sex","Survived").count().show()
gender_sur

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



In [17]:
cms = ["Sex","Survived","count"]
data =  [("female", "0","81"),
        ("female", "1","233"),
        ("male", "1","109"),
        ("male","0","468")]
gender_sur = spark.createDataFrame(data=data,schema=cms)
gender_sur.show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|female|       0|   81|
|female|       1|  233|
|  male|       1|  109|
|  male|       0|  468|
+------+--------+-----+



**Create temporary view PySpark:**

In [18]:
Train_df.createOrReplaceTempView('Train')

**How many people survived, and how many didn't survive? By SQL:**

In [19]:
Sur_Sql = spark.sql("""SELECT Survived 
          FROM Train 
          WHERE Survived == '1' 
          """).count()
Sur_Sql

342

In [20]:
Not_Sur_Sql = spark.sql("""SELECT Survived 
          FROM Train 
          WHERE Survived == '0' 
          """).count()
Not_Sur_Sql

549

**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column.)

**Can you do this via SQL?**

In [21]:
survivors = spark.sql("""SELECT  Sex ,(SUM(Survived)/891)*100
                           FROM Train
                           WHERE Survived == '1'
                           GROUP BY Survived, Sex
                        """).show()
survivors

+------+---------------------------------------------------------------------------------------------+
|   Sex|((CAST(sum(CAST(Survived AS BIGINT)) AS DOUBLE) / CAST(891 AS DOUBLE)) * CAST(100 AS DOUBLE))|
+------+---------------------------------------------------------------------------------------------+
|  male|                                                                             12.2334455667789|
|female|                                                                            26.15039281705948|
+------+---------------------------------------------------------------------------------------------+



**Display a ratio for p-class:**


In [22]:
p_class = spark.sql("""SELECT Pclass,(SUM(Survived) /891)*100
                        FROM Train 
                        WHERE Survived == '1'
                        GROUP BY Survived,Pclass
          """).show()
p_class

+------+---------------------------------------------------------------------------------------------+
|Pclass|((CAST(sum(CAST(Survived AS BIGINT)) AS DOUBLE) / CAST(891 AS DOUBLE)) * CAST(100 AS DOUBLE))|
+------+---------------------------------------------------------------------------------------------+
|     2|                                                                            9.764309764309765|
|     1|                                                                            15.26374859708193|
|     3|                                                                            13.35578002244669|
+------+---------------------------------------------------------------------------------------------+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [23]:
df = Train_df.union(Test_df)


**Display count:**

In [24]:
df.count()

1329

**Temporary view PySpark:**

In [25]:
df.createOrReplaceTempView('Data')

**Can you define the number of null values in each column?**


In [26]:
null_colum = []
for i in df.columns:
    x = df.filter(col(i).isNull()).count()
    null_colum.append((i,x))
    
null_colum  



[('PassengerId', 0),
 ('Survived', 0),
 ('Pclass', 0),
 ('Name', 0),
 ('Sex', 0),
 ('Age', 265),
 ('SibSp', 0),
 ('Parch', 0),
 ('Ticket', 0),
 ('Fare', 0),
 ('Cabin', 1021),
 ('Embarked', 3)]

**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [27]:
cms=['Column','Number of missing values']

df_null = spark.createDataFrame(data=null_colum,schema=cms)
df_null.show()

+-----------+------------------------+
|     Column|Number of missing values|
+-----------+------------------------+
|PassengerId|                       0|
|   Survived|                       0|
|     Pclass|                       0|
|       Name|                       0|
|        Sex|                       0|
|        Age|                     265|
|      SibSp|                       0|
|      Parch|                       0|
|     Ticket|                       0|
|       Fare|                       0|
|      Cabin|                    1021|
|   Embarked|                       3|
+-----------+------------------------+



## Preprocessing 

**Can you show me the name column from your temporary table?**

In [28]:
name = spark.sql("""SELECT Name
                        FROM Data 
                        
          """).show(891,False)
name

+----------------------------------------------------------------------------------+
|Name                                                                              |
+----------------------------------------------------------------------------------+
|Braund, Mr. Owen Harris                                                           |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)                               |
|Heikkinen, Miss. Laina                                                            |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)                                      |
|Allen, Mr. William Henry                                                          |
|Moran, Mr. James                                                                  |
|McCarthy, Mr. Timothy J                                                           |
|Palsson, Master. Gosta Leonard                                                    |
|Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)               

**Run this code:**

In [29]:
df = df.withColumn('Title',regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
df.createOrReplaceTempView('Data')


**Display the title and count "Title" column:**

In [30]:
df.select('Title').show()

+------+
| Title|
+------+
|    Mr|
|   Mrs|
|  Miss|
|   Mrs|
|    Mr|
|    Mr|
|    Mr|
|Master|
|   Mrs|
|   Mrs|
|  Miss|
|  Miss|
|    Mr|
|    Mr|
|  Miss|
|   Mrs|
|Master|
|    Mr|
|   Mrs|
|   Mrs|
+------+
only showing top 20 rows



In [31]:
df.select('Title').count()

1329

**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [32]:
tit = df.select('Title')

In [33]:
tit = spark.sql('select Title,count(Title) as Count from Data group by Title').collect()

In [34]:
print(tit)

[Row(Title='Don', Count=1), Row(Title='Miss', Count=257), Row(Title='Countess', Count=2), Row(Title='Col', Count=4), Row(Title='Rev', Count=9), Row(Title='Lady', Count=2), Row(Title='Master', Count=56), Row(Title='Mme', Count=1), Row(Title='Capt', Count=2), Row(Title='Mr', Count=786), Row(Title='Dr', Count=11), Row(Title='Mrs', Count=186), Row(Title='Sir', Count=2), Row(Title='Jonkheer', Count=2), Row(Title='Mlle', Count=4), Row(Title='Major', Count=3), Row(Title='Ms', Count=1)]


In [35]:
rare_tit = set(['Dr','Rev','Major','Col','Mlle','Capt','Don','Jonkheer','Countess','Ms','Sir','Lady','Mme'])

In [36]:
new_title = {t[0]:t[0]if t[0] not in rare_tit else 'rare' for t in tit}
new_title

{'Don': 'rare',
 'Miss': 'Miss',
 'Countess': 'rare',
 'Col': 'rare',
 'Rev': 'rare',
 'Lady': 'rare',
 'Master': 'Master',
 'Mme': 'rare',
 'Capt': 'rare',
 'Mr': 'Mr',
 'Dr': 'rare',
 'Mrs': 'Mrs',
 'Sir': 'rare',
 'Jonkheer': 'rare',
 'Mlle': 'rare',
 'Major': 'rare',
 'Ms': 'rare'}

**Run the function:**

In [37]:
def impute_title(title):
    return new_title[title]

**Apply the function on "Title" column using UDF:**

In [38]:
from pyspark.sql.functions import * 

imputeudf = udf(lambda z: impute_title(z),StringType())

**Display "Title" from table and group by "Title" column:**

In [39]:
df = df.withColumn('Title',imputeudf('Title'))

In [40]:
df.select('Title').groupBy('Title').count().show()


+------+-----+
| Title|count|
+------+-----+
|  rare|   44|
|  Miss|  257|
|Master|   56|
|    Mr|  786|
|   Mrs|  186|
+------+-----+



## **Preprocessing Age**

**Based on the age mean, you will fill in the missing age values:**

In [41]:
mean_age = spark.sql('select avg(Age) from Data ').collect()

In [42]:
mean_age[0][0]

30.079501879699244

**Fill missing age with age mean:**

In [43]:
df = df.na.fill({'Age':mean_age[0][0]})

## **Preprocessing Embarked**

**Select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [44]:
grouped_Embarked = df.select('Embarked').groupBy('Embarked').count().orderBy(desc('count'))

**Show groupped_Embarked:**

In [45]:
grouped_Embarked.show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       C|  253|
|       Q|  111|
|    null|    3|
+--------+-----+



**Get the groupped_Embarked:** 

In [46]:
grouped_Embarked.collect()
S='S'

**Fill missing values with grouped_Embarked:**

In [47]:
df =  df.na.fill({'Embarked':S})
df.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

In [48]:
show_em = df.select('Embarked').groupBy('Embarked').count().orderBy(desc('count'))
show_em.show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  965|
|       C|  253|
|       Q|  111|
+--------+-----+



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [49]:
df = df.withColumn('Cabin',substring('Cabin',0,1))

**Show the result:**

In [50]:
df.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

**Create the temporary view:**

In [51]:
df.createOrReplaceTempView('Data')

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [52]:
spark.sql('select Cabin,count(Cabin) as CabinCount from Data group by Cabin order by CabinCount desc').show()

+-----+----------+
|Cabin|CabinCount|
+-----+----------+
|    C|        82|
|    B|        77|
|    D|        52|
|    E|        51|
|    A|        23|
|    F|        18|
|    G|         4|
|    T|         1|
| null|         0|
+-----+----------+



**Fill missing values with "U":**

In [53]:
df = df.na.fill({'Cabin':'U'})

In [54]:
df.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

In [55]:
from pyspark.ml.feature import StringIndexer

**StringIndexer(inputCol=None, outputCol=None)**

In [56]:
cat = [c[0] for c in df.dtypes if c[1] == 'string']
cat_indexed = [c + '_index'for c in cat]
cat,cat_indexed,

(['Name', 'Sex', 'Ticket', 'Cabin', 'Embarked', 'Title'],
 ['Name_index',
  'Sex_index',
  'Ticket_index',
  'Cabin_index',
  'Embarked_index',
  'Title_index'])

In [57]:
numer = [c[0] for c in df.dtypes if c[1] != 'string']
numer

['PassengerId', 'Survived', 'Pclass', 'Age', 'SibSp', 'Parch', 'Fare']

In [58]:
stringIndexer = StringIndexer(inputCols=cat,outputCols=cat_indexed,handleInvalid='skip')

In [59]:
st = stringIndexer.fit(df).transform(df)
st.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+------------+---------+--------------+-----------+-----------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|Ticket_index|Sex_index|Embarked_index|Title_index|Cabin_index|Name_index|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+------------+---------+--------------+-----------+-----------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|       610.0|      0.0|           0.0|        0.0|        0.0|     525.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|       641.0|      1.0|      

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

____________________________________________

**Create list comprehension, use StringIndexer to Converting "Sex, Embarked, Title, and Cabin" columns to column name+index like "Title_index":**

**Use Pipline to fit and transform:**

In [60]:
from pyspark.ml import Pipeline

**Drop "Sex, PassengerId, Name, Title, SibSp, Parch, Ticket, Cabin, Embarked" columns**

In [61]:
df_ = df.drop('Sex','PassengerId','Name','Title','SibSp','Ticket','Cabin','Embarked')

**Convert to pandas**

In [62]:
df_pd = df_.toPandas()

**Display result**

In [63]:
df_pd

Unnamed: 0,Survived,Pclass,Age,Parch,Fare
0,0,3,22.000000,0,7.2500
1,1,1,38.000000,0,71.2833
2,1,3,26.000000,0,7.9250
3,1,1,35.000000,0,53.1000
4,0,3,35.000000,0,8.0500
...,...,...,...,...,...
1324,0,2,27.000000,0,13.0000
1325,1,1,19.000000,0,30.0000
1326,0,3,30.079502,2,23.4500
1327,1,1,26.000000,0,30.0000


**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None) A feature transformer that merges multiple columns into a vector column.**



**Use VectorAssembler and set InputCols the to "train from first coloumn to end" and set the Output to "features"** 

In [64]:
from pyspark.ml.feature import VectorAssembler

**Use VectorAssembler and set the "InputCols" to test all column and set the "Output" to "features":**

In [65]:
df.select([countDistinct(c) for c in df.columns]).toPandas()

Unnamed: 0,count(DISTINCT PassengerId),count(DISTINCT Survived),count(DISTINCT Pclass),count(DISTINCT Name),count(DISTINCT Sex),count(DISTINCT Age),count(DISTINCT SibSp),count(DISTINCT Parch),count(DISTINCT Ticket),count(DISTINCT Fare),count(DISTINCT Cabin),count(DISTINCT Embarked),count(DISTINCT Title)
0,891,2,3,891,2,89,7,7,681,248,9,3,5


**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [66]:
features = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare'
,'Sex_index','Cabin_index','Embarked_index','Title_index']

In [67]:
vecAssem = VectorAssembler(inputCols=features,outputCol='features')

In [68]:
vecAssem.transform(st).show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+------------+---------+--------------+-----------+-----------+----------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|Ticket_index|Sex_index|Embarked_index|Title_index|Cabin_index|Name_index|            features|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+------------+---------+--------------+-----------+-----------+----------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|       610.0|      0.0|           0.0|        0.0|        0.0|     525.0|(9,[0,1,2,4],[3.0...|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    

**Use RandomForestClassifier to fit and transform then display "prediction, Survived, features" columns**

In [69]:
x_train,x_test = df.randomSplit([0.8,0.2],42)

In [70]:
x_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = false)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)



**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [71]:
from pyspark.ml.classification import RandomForestClassifier

In [72]:
rfc = RandomForestClassifier(featuresCol='features',labelCol='Survived')

In [73]:
stages = [stringIndexer,vecAssem,rfc]
pipeline = Pipeline(stages=stages)

In [74]:
pipelineModel = pipeline.fit(x_train)


In [75]:
predDF = pipelineModel.transform(x_test)

In [76]:
predDF.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = false)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)
 |-- Ticket_index: double (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- Title_index: double (nullable = false)
 |-- Cabin_index: double (nullable = false)
 |-- Name_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



**Use the model to predict**

In [77]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [78]:
rf_evaluator = MulticlassClassificationEvaluator(metricName='accuracy',labelCol='Survived',predictionCol='prediction')

In [79]:
rf_evaluator.evaluate(predDF)

0.8391608391608392

In [86]:
import pyspark.sql.functions as F

In [80]:
prediction = pipelineModel.transform(x_test)

In [81]:
prediction.select('PassengerId').show()

+-----------+
|PassengerId|
+-----------+
|        464|
|        474|
|        477|
|        480|
|        485|
|        490|
|        497|
|        504|
|        505|
|        507|
|        512|
|        515|
|        518|
|        519|
|        537|
|        549|
|        555|
|        558|
|        572|
|        577|
+-----------+
only showing top 20 rows



In [87]:
prediction.select('PassengerId','prediction').where(F.col('PassengerId')==558).show()

+-----------+----------+
|PassengerId|prediction|
+-----------+----------+
|        558|       0.0|
+-----------+----------+



**When you are finished send the project via Google classroom**
**Please let me know if you have any questions.**
* nabieh.mostafa@yahoo.com
* +201015197566 (Whatsapp)

**Don't Hate me, I push you to learn**

**I will help you to become an awesome data engineer.**

**Why did I say that "Data Engineer"?**

**Tricky question, but an optional question, if you would like to know the answer, ask me.**
