
### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 63 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=e767c7b2c376b516ef036da2efdabf1a514fdc6972fb49cdc33618cf4846730f
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:

import pyspark

from pyspark.sql import SparkSession 

from pyspark.sql.functions import col,isnan,when,count,mean



## Build Spark Session

In [None]:
spark = SparkSession.builder.getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

In [None]:
train_path='/content/train (1).csv'

test_path='/content/test (1).csv'

In [None]:
train_df=spark.read.csv(train_path,header=True,inferSchema=True)
test_df=spark.read.csv(test_path,header=True,inferSchema=True)

Let's work with train dataset:

In [None]:
train_df

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [None]:
train_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [None]:
train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [None]:
train_df.summary().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [None]:
train_df.count()

891

In [None]:
total_count=train_df.count()

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 


In [None]:
#train_df.where(train_df.Survived=='0').show()

Not_Survived_Count=train_df.filter(train_df['Survived']== '0').count()
Not_Survived_Count

549

In [None]:
Survived_Count=train_df.filter(train_df['Survived']== '1').count()
Survived_Count

342

**Display the answer in ratio form (Hint: Use "UDF" Function.)**






In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *


In [None]:
@udf(returnType=StringType()) 
def Percentage(str1,str2):
    return srt1/str2

In [None]:
percentage_lived=Survived_Count/total_count
percentage_lived

0.3838383838383838

In [None]:
percentage_Not_lived=Not_Survived_Count/total_count
percentage_Not_lived

0.6161616161616161

**The number of males and females?**


In [None]:
train_df.filter(train_df['Sex']== 'male').count()

577

In [None]:
train_df.filter(train_df['Sex']== 'female').count()

314

**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**


In [None]:
train_df.groupBy('Sex').agg({'Survived':'avg'}).show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



In [None]:
train_df.groupBy('Sex').agg({'Survived':'count'}).show()

+------+---------------+
|   Sex|count(Survived)|
+------+---------------+
|female|            314|
|  male|            577|
+------+---------------+



**Temporary view PySpark:**

In [None]:
train_df.createOrReplaceTempView('trainDf_View')

**How many people survived, and how many didn't survive? By SQL:**

In [None]:
spark.sql("""SELECT Survived,COUNT(Survived)
          FROM trainDf_View 
          GROUP BY Survived """).show()

+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            342|
|       0|            549|
+--------+---------------+



**Can you display the number of survivors from each gender as a ratio?**


In [None]:
spark.sql("""SELECT Sex,AVG(Survived) as ratioOfSurvived
          FROM trainDf_View 
          GROUP BY Sex """).show()

+------+-------------------+
|   Sex|    ratioOfSurvived|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



**Display a ratio for "p-class": SUM(Survived)/count for p-class**


In [None]:
spark.sql("""SELECT Pclass,SUM(Survived)/COUNT(Pclass) as ratioOfSurvived_of_P_class
          FROM trainDf_View 
          GROUP BY Pclass
           """).show()

+------+--------------------------+
|Pclass|ratioOfSurvived_of_P_class|
+------+--------------------------+
|     1|        0.6296296296296297|
|     3|       0.24236252545824846|
|     2|       0.47282608695652173|
+------+--------------------------+



## Data Cleaning

**First and foremost, we must merge both the train and test datasets.**



In [None]:
all_df=train_df.union(test_df)
all_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

**Display count:**

In [None]:
all_df.count()

1329

**Can you define the number of null values in each column?**


In [None]:
from pyspark.sql.functions import isnull, when, count, col 
Nulls=all_df.select([count(when(isnull(c), c)).alias(c) for c in all_df.columns]).show()


+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [None]:
import pyspark.sql.functions as F

In [None]:
null_cols=[]
for col_name in all_df.columns:
    null_values= all_df.where(F.col(col_name).isNull()).count()
    if (null_values>0 ):
        null_cols.append((col_name,null_values))
print(null_cols)

[('Age', 265), ('Cabin', 1021), ('Embarked', 3)]


In [None]:
spark.createDataFrame(null_cols,['column','missing_value']).show()

+--------+-------------+
|  column|missing_value|
+--------+-------------+
|     Age|          265|
|   Cabin|         1021|
|Embarked|            3|
+--------+-------------+



## Preprocessing 

**Create Temporary view PySpark:**

In [None]:
all_df.createOrReplaceTempView('allDf_View')

**show the "name" column from your temporary table**

In [None]:
spark.sql("""SELECT name
          FROM allDf_view 
           """).show()

+--------------------+
|                name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



In [None]:
import pyspark.sql.functions as F

combined_df = all_df.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined_df.createOrReplaceTempView('combined_df')


In [None]:
spark.sql("""SELECT Title
          FROM combined_df 
           """).show()

+------+
| Title|
+------+
|    Mr|
|   Mrs|
|  Miss|
|   Mrs|
|    Mr|
|    Mr|
|    Mr|
|Master|
|   Mrs|
|   Mrs|
|  Miss|
|  Miss|
|    Mr|
|    Mr|
|  Miss|
|   Mrs|
|Master|
|    Mr|
|   Mrs|
|   Mrs|
+------+
only showing top 20 rows



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so set their value to "rare".**

In [None]:

titles_map={'Dr':'rare', 'Rev':'rare', 'Major':'rare', 'Col':'rare', 'Mlle':'rare', 'Capt':'rare', 'Don':'rare', 'Jonkheer':'rare', 'Countess':'rare', 'Ms':'rare', 'Sir':'rare', 'Lady':'rare','Mme':'rare'}

**Run the function:**

**Applying the function on "Title" column using UDF:**

In [None]:
@udf(returnType=StringType()) 
def impute_title(title):
    """
    This function helps modifying the title column
    """
    if title in titles_map.keys():
        return titles_map[title]
    else:
        return title

In [None]:
combined_df = combined_df.withColumn("Title_up", impute_title(combined_df["Title"]))
combined_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|Title_up|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|      Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|     Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|    Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|     Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|         

In [None]:
combined_df.filter(df['Title']=='Dr').show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|Title|Title_up|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+-----+--------+
|        246|       0|     1|Minahan, Dr. Will...|  male|44.0|    2|    0|   19928|   90.0|  C78|       Q|   Dr|    rare|
|        318|       0|     2|Moraweck, Dr. Ernest|  male|54.0|    0|    0|   29011|   14.0| null|       S|   Dr|    rare|
|        399|       0|     2|    Pain, Dr. Alfred|  male|23.0|    0|    0|  244278|   10.5| null|       S|   Dr|    rare|
|        633|       1|     1|Stahelin-Maeglin,...|  male|32.0|    0|    0|   13214|   30.5|  B50|       C|   Dr|    rare|
|        661|       1|     1|Frauenthal, Dr. H...|  male|50.0|    2|    0|PC 17611| 133.65| null|       S|   Dr|    rare|
|        767|       0|  

**Display "Title" from table and group by "Title" column:**

In [None]:
# to get the unique values of the New title Column
combined_df.createOrReplaceTempView('df_view')

In [None]:
spark.sql("""SELECT Title_up
          FROM df_view 
          GROUP BY Title_up """).show()

+--------+
|Title_up|
+--------+
|    rare|
|    Miss|
|  Master|
|      Mr|
|     Mrs|
+--------+



## **Preprocessing Age**

**Based on the "age" column mean, you will fill in the missing age values:**

In [None]:
combined_df.select(mean('age')).collect()[0][0]

30.079501879699244

In [None]:
combined_df = combined_df.na.fill(
    combined_df.select(mean('age')).collect()[0][0],
    subset=['age']
)

**Fill missing with "age" mean:**

In [None]:
#check that no more null values in Age
combined_df.select([count(when(isnull(c), c)).alias(c) for c in combined_df.columns]).show()


+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|Title|Title_up|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0| 1021|       3|    0|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----+--------+



## **Preprocessing Embarked**

**Select "Embarked" column, count them, order by count Desc, and save in grouped_Embarked variable:**




In [None]:
combined_df.select('Embarked').distinct().show()

+--------+
|Embarked|
+--------+
|       Q|
|    null|
|       C|
|       S|
+--------+



In [None]:
groupped_Embarked=combined_df.groupBy('Embarked').agg({'Embarked':'count'}).orderBy(df['Embarked'].desc())


**Show "groupped_Embarked" your variable:**

In [None]:
groupped_Embarked.show()

+--------+---------------+
|Embarked|count(Embarked)|
+--------+---------------+
|       S|            962|
|       Q|            111|
|       C|            253|
|    null|              0|
+--------+---------------+



**Get max of groupped_Embarked:** 

In [None]:
groupped_Embarked.collect()[0][0]

'S'

**Fill missing values with max 'S' of grouped_Embarked:**

In [None]:
combined_df = combined_df.na.fill(
    'S',
    subset=['Embarked']
)

In [None]:
combined_df.groupBy('Embarked').agg({'Embarked':'count'}).show()


+--------+---------------+
|Embarked|count(Embarked)|
+--------+---------------+
|       Q|            111|
|       C|            253|
|       S|            965|
+--------+---------------+



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [None]:
from pyspark.sql.functions import udf, col

In [None]:
@udf(returnType=StringType()) 
def first_char(col):
    if col==None:
      return
    else:
      return col[0]
    


In [None]:
combined_df = combined_df.withColumn('Cabin_Prev', first_char(combined_df['Cabin']))

**Show the result:**

In [None]:
combined_df.select('Cabin','Cabin_Prev').show()

+-----+----------+
|Cabin|Cabin_Prev|
+-----+----------+
| null|      null|
|  C85|         C|
| null|      null|
| C123|         C|
| null|      null|
| null|      null|
|  E46|         E|
| null|      null|
| null|      null|
| null|      null|
|   G6|         G|
| C103|         C|
| null|      null|
| null|      null|
| null|      null|
| null|      null|
| null|      null|
| null|      null|
| null|      null|
| null|      null|
+-----+----------+
only showing top 20 rows



**Create the temporary view:**

In [None]:
combined_df.createOrReplaceTempView('combined_df')

**Select "Cabin" column, count "Cabin" column, Group by "Cabin" column, Order By count DESC**  

In [None]:
spark.sql("""SELECT Cabin_Prev,COUNT(1) as count
          FROM combined_df
          GROUP BY Cabin_Prev
          ORDER BY count DESC""").show()

+----------+-----+
|Cabin_Prev|count|
+----------+-----+
|      null| 1021|
|         C|   82|
|         B|   77|
|         D|   52|
|         E|   51|
|         A|   23|
|         F|   18|
|         G|    4|
|         T|    1|
+----------+-----+



**Fill missing values with "U":**

In [None]:
combined_df = combined_df.na.fill(
    'U',
    subset=['Cabin_Prev']
)

In [None]:
#checking:
combined_df.groupBy('Cabin_Prev').agg({'Cabin_Prev':'count'}).show()


+----------+-----------------+
|Cabin_Prev|count(Cabin_Prev)|
+----------+-----------------+
|         F|               18|
|         E|               51|
|         T|                1|
|         B|               77|
|         U|             1021|
|         D|               52|
|         C|               82|
|         A|               23|
|         G|                4|
+----------+-----------------+



**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

In [None]:
combined_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)
 |-- Title_up: string (nullable = true)
 |-- Cabin_Prev: string (nullable = false)



In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
categoricalCols = ['Sex','Cabin_Prev','Title_up']

indexOutputCols = [x + "_Index" for x in categoricalCols]



In [None]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')

**OneHotEncoder(inputCols=None, outputCols=None)**

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

In [None]:
from pyspark.ml.feature import OneHotEncoder


In [None]:
oheOutputCols = [x + "_OHE" for x in categoricalCols]

In [None]:
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None). A feature transformer that merges multiple columns into a vector column.**



In [None]:
numericCols = [field for (field,dataType) in combined_df.dtypes
              if ((dataType=='double')& (field!='Survived'))]
numericCols.append('Pclass')
numericCols.append('SibSp')
numericCols.append('Parch')

numericCols

['Age', 'Fare', 'Pclass', 'SibSp', 'Parch']

In [None]:
assemblerInputs = oheOutputCols + numericCols
assemblerInputs

['Sex_OHE',
 'Cabin_Prev_OHE',
 'Title_up_OHE',
 'Age',
 'Fare',
 'Pclass',
 'SibSp',
 'Parch']

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [None]:
x_train, X_test = combined_df.randomSplit([.8,.2],seed=42)

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
RF_model = RandomForestClassifier(labelCol='Survived',featuresCol='features')


In [None]:
# Building the pipeline
from pyspark.ml import Pipeline

In [None]:
pipeline =Pipeline(stages = [stringIndexer,oheEncoder,vecAssembler,RF_model])
pipelineModel = pipeline.fit(x_train)

In [None]:
predDF = pipelineModel.transform(X_test)

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator=MulticlassClassificationEvaluator(labelCol="Survived",predictionCol="prediction",metricName='accuracy')


#the accuracy of the model 

evaluator.evaluate(predDF)

0.825531914893617