### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**We are using the so popular Titanic dataset**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Import Libraries
Let's import PySpark:

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import *

## Build Spark Session

In [2]:
spark = SparkSession.builder.getOrCreate()

## Data Loading


We have two datasets: 
* Train  
* Test.

Let's read two datasets: 
* Train
* Test.



In [3]:
df_train = spark.read.csv("train (1).csv",header=True,inferSchema=True)


In [4]:
df_test = spark.read.csv("test.csv",header=True,inferSchema=True)


Let's work with train dataset:

**Let's confirm if this is a dataframe or not:**

In [5]:
df_train

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

**Show 5 rows.**

In [6]:
df_train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [15]:
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [19]:
df_train.summary().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [20]:
df_train.count()

891

**Let's answer this question:** 

**How many people survived, and how many didn't survive?** 

In [23]:
survived_num = df_train.groupBy('Survived').count()

**Displaying result:**

In [24]:
survived_num.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



**Ratio of survived people**






In [43]:
df_train.select(mean('Survived')).show()

+------------------+
|     avg(Survived)|
+------------------+
|0.3838383838383838|
+------------------+



In [48]:
1-df_train.select(mean('Survived')).take(1)[0][0]

0.6161616161616161

**Let's get the number of males and females?**


In [49]:
df_train.groupBy('Sex').count().show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

In [51]:
df_train.groupBy('Sex','Survived').count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



In [127]:
df_train.groupBy('Sex').agg(avg('Survived')).show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



**Now we Create temporary view PySpark:**

In [100]:
df_train.createOrReplaceTempView("df1")
df_test.createOrReplaceTempView("df2")


**How many people survived, and how many didn't survive? By SQL:**

In [101]:
spark.sql("""SELECT Survived,count(Survived) 
          FROM df1 
          group by Survived  
          """).show()

+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            342|
|       0|            549|
+--------+---------------+



**Can you display the number of survivors from each gender as a ratio?**


In [102]:
spark.sql("""SELECT Sex,avg(survived)
          FROM df1 
          group by Sex
         
          """).show()

+------+-------------------+
|   Sex|      avg(survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



**Display a ratio for p-class:**


In [103]:
spark.sql("""SELECT Pclass,count(Pclass)/(select count(*) from df1) as ratio
          FROM df1 
          group by Pclass
         
          """).show()

+------+-------------------+
|Pclass|              ratio|
+------+-------------------+
|     1|0.24242424242424243|
|     3| 0.5510662177328844|
|     2|0.20650953984287318|
+------+-------------------+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets.**



In [251]:
combined= df_train.unionAll(df_test)

**Display count:**

In [252]:
combined.count()

1329

**Temporary view PySpark:**

In [254]:
combined.createOrReplaceTempView("combined")


**Let's define the number of null values in each column?**


In [255]:
from pyspark.sql.functions import when, count, col, isnull
combined.select([count(when(isnull(c), c)).alias(c) for c in combined.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Creating Dataframe for null values**

1. Column
2. Number of missing values.

In [256]:
null_table= combined.select([count(when(isnull(c), c)).alias(c) for c in combined.columns])

## Preprocessing 

**Let's show me the column names from your temporary table?**

In [257]:
null_table.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [258]:
combined = combined.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('df_all')

**Displaying the title and count "Title" column:**

In [259]:
spark.sql("""SELECT Title,count(*) as count

          FROM df_all 
          group by Title
         
          """).show()

+--------+-----+
|   Title|count|
+--------+-----+
|     Don|    1|
|    Miss|  257|
|Countess|    2|
|     Col|    4|
|     Rev|    9|
|    Lady|    2|
|  Master|   56|
|     Mme|    1|
|    Capt|    2|
|      Mr|  786|
|      Dr|   11|
|     Mrs|  186|
|     Sir|    2|
|Jonkheer|    2|
|    Mlle|    4|
|   Major|    3|
|      Ms|    1|
+--------+-----+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so we create Dictionary and set the value to "rare".**

In [260]:
t=spark.sql("""SELECT Title,count(*) as count

          FROM df_all 
          group by Title
         
          """)

In [261]:
lst=['Dr', 'Rev', 'Major', 'Col', 'Mlle', 'Capt', 'Don', 'Jonkheer', 'Countess', 'Ms', 'Sir', 'Lady',  'Mme']

In [262]:
table= t.where(col('count') <56 )

In [263]:
table.show()

+--------+-----+
|   Title|count|
+--------+-----+
|     Don|    1|
|Countess|    2|
|     Col|    4|
|     Rev|    9|
|    Lady|    2|
|     Mme|    1|
|    Capt|    2|
|      Dr|   11|
|     Sir|    2|
|Jonkheer|    2|
|    Mlle|    4|
|   Major|    3|
|      Ms|    1|
+--------+-----+



In [264]:
t1= table.select('Title').collect()
t2= table.select('count').collect()


In [199]:
l1 = [t1[i].asDict()['Title'] for i in range(table.count())]
l2 = [t2[i].asDict()['count'] for i in range(table.count())]

In [200]:
my_dict= dict(zip(l1,l2))
my_dict

{'Don': 1,
 'Countess': 2,
 'Col': 4,
 'Rev': 9,
 'Lady': 2,
 'Mme': 1,
 'Capt': 2,
 'Dr': 11,
 'Sir': 2,
 'Jonkheer': 2,
 'Mlle': 4,
 'Major': 3,
 'Ms': 1}

In [203]:
for i in my_dict.keys():
    
    my_dict[i]='rare'

In [204]:
my_dict

{'Don': 'rare',
 'Countess': 'rare',
 'Col': 'rare',
 'Rev': 'rare',
 'Lady': 'rare',
 'Mme': 'rare',
 'Capt': 'rare',
 'Dr': 'rare',
 'Sir': 'rare',
 'Jonkheer': 'rare',
 'Mlle': 'rare',
 'Major': 'rare',
 'Ms': 'rare'}

**Creating user defined function:**

In [14]:
from pyspark.sql.types import *


In [216]:

@udf(returnType=StringType()) 
def impute_title(title):
    
    
    
    if title in lst:
        
        
        return 'rare'
    
    else:
        return title
    
    
    
    

**Apply the function on "Title" column using UDF:**

In [272]:
combined= combined.withColumn('Title',impute_title('Title'))

**Display "Title" from table and group by "Title" column:**

In [276]:
combined.select('Title').groupBy('Title').count().show()

+------+-----+
| Title|count|
+------+-----+
|  rare|   44|
|  Miss|  257|
|Master|   56|
|    Mr|  786|
|   Mrs|  186|
+------+-----+



## **Preprocessing Age**

**Based on the age mean, we will fill in the missing age values:**

In [277]:
m=combined.select(avg('Age')).collect()[0][0]

**Filling missing age with age mean:**

In [287]:
combined=combined.na.fill(m,'Age')

In [288]:
combined.select('age').show()

+------------------+
|               age|
+------------------+
|              22.0|
|              38.0|
|              26.0|
|              35.0|
|              35.0|
|30.079501879699244|
|              54.0|
|               2.0|
|              27.0|
|              14.0|
|               4.0|
|              58.0|
|              20.0|
|              39.0|
|              14.0|
|              55.0|
|               2.0|
|30.079501879699244|
|              31.0|
|30.079501879699244|
+------------------+
only showing top 20 rows



## **Preprocessing Embarked**

**Let's select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [289]:
combined.groupBy('Embarked').count().orderBy('count', ascending=False).show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       C|  253|
|       Q|  111|
|    null|    3|
+--------+-----+



**Show groupped_Embarked:**

**Get the groupped_Embarked:** 

In [290]:
mode=combined.groupBy('Embarked').count().orderBy('count', ascending=False).select('Embarked').first()[0]

In [291]:
mode

'S'

**Fill missing values with grouped_Embarked:**

In [292]:
combined= combined.na.fill(mode,'Embarked')

In [293]:
combined.select('Embarked').show()

+--------+
|Embarked|
+--------+
|       S|
|       C|
|       S|
|       S|
|       S|
|       Q|
|       S|
|       S|
|       S|
|       C|
|       S|
|       S|
|       S|
|       S|
|       S|
|       S|
|       Q|
|       S|
|       S|
|       C|
+--------+
only showing top 20 rows



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [295]:
combined=combined.withColumn('cabin',substring('cabin',1,1))

In [299]:
combined.select('cabin').show()

+-----+
|cabin|
+-----+
| null|
|    C|
| null|
|    C|
| null|
| null|
|    E|
| null|
| null|
| null|
|    G|
|    C|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
+-----+
only showing top 20 rows



**Create the temporary view:**

In [297]:
combined.createOrReplaceTempView('df_all')

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [298]:
spark.sql("""SELECT cabin,count(cabin) 

          FROM df_all 
          group by cabin
          
          order by count(cabin) DESC
         
          """).show()

+-----+------------+
|cabin|count(cabin)|
+-----+------------+
|    C|          82|
|    B|          77|
|    D|          52|
|    E|          51|
|    A|          23|
|    F|          18|
|    G|           4|
|    T|           1|
| null|           0|
+-----+------------+



**Fill missing values with "U":**

In [300]:
combined = combined.na.fill('U','cabin')

In [301]:
combined.select('cabin').show()

+-----+
|cabin|
+-----+
|    U|
|    C|
|    U|
|    C|
|    U|
|    U|
|    E|
|    U|
|    U|
|    U|
|    G|
|    C|
|    U|
|    U|
|    U|
|    U|
|    U|
|    U|
|    U|
|    U|
+-----+
only showing top 20 rows



**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

In [312]:
combined=combined.drop(*['PassengerId','Name','Ticket'])

In [342]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


____________________________________________

**Now we use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [348]:
trainDF, testDF = combined.randomSplit([.8,.2],seed=42)

**StringIndexer(inputCol=None, outputCol=None)**

In [349]:
categoricalCols = [field for (field, dataType) in trainDF.dtypes
                   if dataType == "string"]

In [350]:
categoricalCols

['Sex', 'cabin', 'Embarked', 'Title']

In [351]:
indexOutputCols = [x + "_Index" for x in categoricalCols]
indexOutputCols

['Sex_Index', 'cabin_Index', 'Embarked_Index', 'Title_Index']

In [352]:
oheOutputCols = [x + "_OHE" for x in categoricalCols]
oheOutputCols

['Sex_OHE', 'cabin_OHE', 'Embarked_OHE', 'Title_OHE']

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

In [353]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

In [354]:
numericCols = [field for (field,dataType) in trainDF.dtypes
              if (((dataType=='double')| (dataType=='int') )& (field!='Survived'))]
numericCols

['Pclass', 'Age', 'SibSp', 'Parch', 'Fare']

In [355]:
assemblerInputs = oheOutputCols + numericCols
assemblerInputs

['Sex_OHE',
 'cabin_OHE',
 'Embarked_OHE',
 'Title_OHE',
 'Pclass',
 'Age',
 'SibSp',
 'Parch',
 'Fare']

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None) A feature transformer that merges multiple columns into a vector column.**



In [356]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

**Let's build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [357]:
rf = RandomForestClassifier(labelCol='Survived',featuresCol='features')
pipeline =Pipeline(stages = [stringIndexer,oheEncoder,vecAssembler,rf])

**Use Pipline to fit and transform:**

In [358]:
pipelineModel = pipeline.fit(trainDF)

In [359]:
predDF = pipelineModel.transform(testDF)

In [360]:
predDF.select('features','Survived','prediction').show(5)

+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|(19,[2,9,11,14,15...|       0|       1.0|
|(19,[0,5,8,10,14,...|       0|       0.0|
|(19,[0,3,9,10,14,...|       0|       0.0|
|(19,[0,5,8,10,14,...|       0|       0.0|
|(19,[0,6,8,10,14,...|       0|       0.0|
+--------------------+--------+----------+
only showing top 5 rows



**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [361]:
Evaluator = MulticlassClassificationEvaluator(predictionCol='prediction',
                                         labelCol='Survived',
                                         metricName='accuracy')

In [362]:
Evaluator.evaluate(predDF)

0.8247863247863247

**We are done!!! Congratulations!!!**