Let's start with your project: 

Are you a data scientist? 

I think you are an awesome  data scientist.

### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [1]:
# pip install pyspark

## Build Spark Session

In [2]:
# importing libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Lab01').getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



##Train

In [3]:
path='/content/train.csv'
df_train=spark.read.csv(path, header="true", inferSchema="true")



##Test

In [4]:
path='/content/test (1).csv'
df_test=spark.read.csv(path, header="true", inferSchema="true")



Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [5]:
print("df_train is {}".format(type(df_train)))
print("df_test  is {}".format(type(df_test)))

df_train is <class 'pyspark.sql.dataframe.DataFrame'>
df_test  is <class 'pyspark.sql.dataframe.DataFrame'>


**Show 5 rows.**

In [6]:
df_train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [7]:
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [8]:
df_train.summary().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [9]:
print("count of train dataset is {}".format(df_train.count()))
print("count of test  dataset is {}".format(df_test.count()))

count of train dataset is 891
count of test  dataset is 438


**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [10]:
life=df_train.groupBy("Survived").count()


**Display your result:**

In [11]:
life.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



**Can you display your answer in ratio form?(Hint: Use "UDF" Function. (Hint: Use "UDF" Function. This is a hint you can use any method.)**






In [12]:
# import org.apache.spark.sql.functions.udf package
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
countUDF = udf(lambda z: z/891)
life.select(countUDF(F.col("count")).alias("ratio")).show(truncate=False)

+------------------+
|ratio             |
+------------------+
|0.3838383838383838|
|0.6161616161616161|
+------------------+



**Can you get the number of males and females?**


In [13]:
df_train.groupBy("Sex").count().show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column. This is a hint you can use any method.)

In [14]:
df_train.createOrReplaceTempView("train_data")
spark.sql("""SELECT count(*) as count ,Sex ,Survived
                FROM train_data
                GROUP BY Sex,Survived
          """).show()

+-----+------+--------+
|count|   Sex|Survived|
+-----+------+--------+
|  468|  male|       0|
|  233|female|       1|
|   81|female|       0|
|  109|  male|       1|
+-----+------+--------+



**Create temporary view PySpark:**

In [15]:
df_train.createOrReplaceTempView("train_data")

**How many people survived, and how many didn't survive? By SQL:**

In [16]:
spark.sql("""SELECT count(Survived),Survived as Survived
                FROM train_data
                GROUP BY Survived
          """).show()

+---------------+--------+
|count(Survived)|Survived|
+---------------+--------+
|            342|       1|
|            549|       0|
+---------------+--------+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column. This is a hint you can use any method.)

**Can you do this via SQL?**

In [17]:
spark.sql("""SELECT count(Sex),Sex as count
                FROM train_data
                GROUP BY Sex
          """).show()

+----------+------+
|count(Sex)| count|
+----------+------+
|       314|female|
|       577|  male|
+----------+------+



**Display a ratio for "p-class": SUM(Survived)/count for p-class**


**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [18]:
unionDF = df_train.union(df_test)


**Display count:**

In [19]:
unionDF.count()

1329

**Can you define the number of null values in each column?**


In [20]:
from pyspark.sql.functions import  when, count, col,isnull
unionDF.select([count(when(isnull(c), c)).alias(c) for c in unionDF.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

## Preprocessing 

**Create Temporary view PySpark:**

In [21]:
unionDF.createOrReplaceTempView("union_data")

**Can you show the "name" column from your temporary table?**

In [22]:
spark.sql("""SELECT name
             FROM union_data
             
          """).show(truncate=False)

+-------------------------------------------------------+
|name                                                   |
+-------------------------------------------------------+
|Braund, Mr. Owen Harris                                |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |
|Heikkinen, Miss. Laina                                 |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)           |
|Allen, Mr. William Henry                               |
|Moran, Mr. James                                       |
|McCarthy, Mr. Timothy J                                |
|Palsson, Master. Gosta Leonard                         |
|Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)      |
|Nasser, Mrs. Nicholas (Adele Achem)                    |
|Sandstrom, Miss. Marguerite Rut                        |
|Bonnell, Miss. Elizabeth                               |
|Saundercock, Mr. William Henry                         |
|Andersson, Mr. Anders Johan                            |
|Vestrom, Miss

**Run this code:**

In [23]:
import pyspark.sql.functions as F
unionDF= unionDF.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
unionDF.createOrReplaceTempView('unionDF')

**Display "Title" column and count "Title" column:**

In [24]:
spark.sql("""SELECT count(*) as count,Title
             FROM unionDF
             GROUP BY Title
             ORDER BY count
          """).show()



+-----+--------+
|count|   Title|
+-----+--------+
|    1|     Don|
|    1|      Ms|
|    1|     Mme|
|    2|Jonkheer|
|    2|Countess|
|    2|    Capt|
|    2|     Sir|
|    2|    Lady|
|    3|   Major|
|    4|     Col|
|    4|    Mlle|
|    9|     Rev|
|   11|      Dr|
|   56|  Master|
|  186|     Mrs|
|  257|    Miss|
|  786|      Mr|
+-----+--------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [25]:
titles_map={'Dr':"rare", 
           'Rev':"rare",
           'Major':"rare", 
           'Col':"rare", 
           'Mlle':"rare", 
           'Capt':"rare", 
           'Don':"rare", 
           'Jonkheer':"rare", 
           'Countess':"rare", 
           'Ms':"rare", 
           'Sir':"rare", 
           'Lady':"rare",
           'Mme':"rare",
           'Master':'Master',
           'Mrs':'Mrs',
           'Mr':'Mr',
           'Miss':'Miss'}



**Run the function:**

In [26]:
@udf(returnType=StringType()) 
def impute_title(title):
    return titles_map[title]# Title_map is your dictionary. please change this name with your dictionary name.

**Apply the function on "Title" column using UDF:**

In [27]:
# import org.apache.spark.sql.functions.udf package
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
unionDF=unionDF.withColumn("Title", impute_title(F.col("Title")))
unionDF.createOrReplaceTempView('unionDF')


**Display "Title" from table and group by "Title" column:**

In [28]:
spark.sql("""SELECT count(*) as count,Title
             FROM unionDF
             GROUP BY Title
             ORDER BY count
          """).show()

+-----+------+
|count| Title|
+-----+------+
|   44|  rare|
|   56|Master|
|  186|   Mrs|
|  257|  Miss|
|  786|    Mr|
+-----+------+



## **Preprocessing Age**

**Based on the "age" column mean, you will fill in the missing age values:**

In [29]:
from pyspark.sql.functions import mean as _mean
unionDF.select(_mean(col('Age'))).show()



+------------------+
|          avg(Age)|
+------------------+
|30.079501879699244|
+------------------+



**Fill missing with "age" mean:**

In [47]:
unionDF=unionDF.na.fill(30.079501879699244,subset=['Age'])
unionDF.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

## **Preprocessing Embarked**

**Select "Embarked" column, count them, order by count Desc, and save in grouped_Embarked variable:**




In [48]:
grouped_Embarked=spark.sql("""SELECT count(*) as count,Embarked
             FROM unionDF
             GROUP BY Embarked
             ORDER BY Embarked Desc
          """)

**Show "groupped_Embarked" your variable:**

In [49]:
grouped_Embarked.show()


+-----+--------+
|count|Embarked|
+-----+--------+
|  965|       S|
|  111|       Q|
|  253|       C|
+-----+--------+



**Get max of groupped_Embarked:** 

**Fill missing values with max 'S' of grouped_Embarked:**

In [50]:
unionDF=unionDF.na.fill(value="S",subset=["Embarked"])
unionDF.createOrReplaceTempView('unionDF')
grouped_Embarked=spark.sql("""SELECT count(*) as count,Embarked
             FROM unionDF
             GROUP BY Embarked
             ORDER BY Embarked Desc
          """)
grouped_Embarked.show()

+-----+--------+
|count|Embarked|
+-----+--------+
|  965|       S|
|  111|       Q|
|  253|       C|
+-----+--------+



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [51]:
first_letter = udf(lambda z: z[0] )
unionDF=unionDF.withColumn("Cabin", unionDF.Cabin.substr(0, 1))

**Show the result:**

In [52]:
unionDF.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|    U|       S|   Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|  Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S| Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|    C|       S|  Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|    U|       S|   Mr|
+-----------+--------+------+---

**Create the temporary view:**

In [53]:
unionDF.createOrReplaceTempView('unionDF')

**Select "Cabin" column, count "Cabin" column, Group by "Cabin" column, Order By count DESC**  

In [54]:
spark.sql("""SELECT count(*) as count,Cabin
             FROM unionDF
             GROUP BY Cabin
             ORDER BY Cabin Desc
          """).show()

+-----+-----+
|count|Cabin|
+-----+-----+
| 1021|    U|
|    1|    T|
|    4|    G|
|   18|    F|
|   51|    E|
|   52|    D|
|   82|    C|
|   77|    B|
|   23|    A|
+-----+-----+



**Fill missing values with "U":**

In [55]:
unionDF=unionDF.na.fill(value="U",subset=["Cabin"])
unionDF.createOrReplaceTempView('unionDF')
spark.sql("""SELECT count(*) as count,Cabin
             FROM unionDF
             GROUP BY Cabin
             ORDER BY Cabin Desc
          """).show()

+-----+-----+
|count|Cabin|
+-----+-----+
| 1021|    U|
|    1|    T|
|    4|    G|
|   18|    F|
|   51|    E|
|   52|    D|
|   82|    C|
|   77|    B|
|   23|    A|
+-----+-----+



In [56]:
unionDF.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

In [78]:
from pyspark.ml.feature import StringIndexer, VectorAssembler,OneHotEncoder
from pyspark.ml import Pipeline

categoricalCols = [field for (field, dataType) in unionDF.dtypes
                   if dataType == "string"]

indexOutputCols = [x + "_Index" for x in categoricalCols]


stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')

**OneHotEncoder(inputCols=None, outputCols=None)**

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

In [79]:

oheOutputCols = [x + "_OHE" for x in categoricalCols]

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)



**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None). A feature transformer that merges multiple columns into a vector column.**



In [83]:
from pyspark.ml.feature import VectorAssembler

numericCols = [field for (field,dataType) in unionDF.dtypes
              if ((dataType=='double')& (field!='Survived'))]

assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [84]:
X_train, X_test = unionDF.randomSplit([0.8, 0.2],seed = 10)

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [85]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survived')
pipeline = Pipeline(stages=[stringIndexer,oheEncoder,vecAssembler,rf])

predictions = pipeline.fit(X_train).transform(X_test)

predictions.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(1405,[607,779,12...|
|       0.0|       0|(1405,[576,779,13...|
|       1.0|       1|(1405,[524,1356,1...|
|       1.0|       1|(1405,[512,893,13...|
|       0.0|       0|(1405,[542,779,11...|
|       0.0|       1|(1405,[279,779,13...|
|       0.0|       0|(1405,[701,779,13...|
|       0.0|       0|(1405,[634,1191,1...|
|       0.0|       1|(1405,[667,835,13...|
|       0.0|       1|(1405,[275,779,10...|
|       1.0|       1|(1405,[504,864,13...|
|       0.0|       1|(1405,[675,1385,1...|
|       0.0|       0|(1405,[441,779,13...|
|       0.0|       0|(1405,[376,1182,1...|
|       0.0|       0|(1405,[374,779,10...|
|       0.0|       0|(1405,[676,779,12...|
|       1.0|       1|(1405,[414,862,13...|
|       0.0|       0|(1405,[428,779,85...|
|       0.0|       0|(1405,[623,779,12...|
|       0.0|       0|(1405,[699,779,11...|
+----------

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [86]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
print("Accuracy : " + str(evaluator.evaluate(predictions)))

Accuracy : 0.821917808219178
