### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [120]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

## Build Spark Session

In [121]:
spark = SparkSession.builder.appName("Sructured Streaming").getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [122]:
SCHEMA = """
PassengerId INT, Survived INT, Pclass INT, 
Name STRING, Sex STRING, Age INT, 
SibSp INT, Parch INT, Ticket STRING, 
Fare FLOAT, Cabin STRING, Embarked STRING"""
df_train = spark.read.format('csv')\
    .schema(SCHEMA)\
    .option("header", "true")\
    .load("train.csv")

df_test = spark.read.format('csv')\
    .schema(SCHEMA)\
    .option("header", "true")\
    .load("train.csv")

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [123]:
type(df_train)

pyspark.sql.dataframe.DataFrame

**Show 5 rows.**

In [124]:
df_train.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

**Display schema for the dataset:**

In [125]:
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [126]:
df_train.describe()

DataFrame[summary: string, PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [127]:
df_train.select('*').count()

891


**How many people survived, and how many didn't survive?** 

In [128]:
from pyspark.sql.functions import *

In [129]:
n_survived = df_train.filter(col('Survived') == 1).count()
n_died = df_train.filter(col('Survived') == 0).count()

In [130]:
print("Count of those who survived:", n_survived)
print("Count of those who didn't survive:", n_died)

Count of those who survived: 342
Count of those who didn't survive: 549


In [131]:
print("Ratio of those who survived:", n_survived / (n_survived + n_died))
print("Ratio of those who didn't survive:", n_died / (n_survived + n_died))

Ratio of those who survived: 0.3838383838383838
Ratio of those who didn't survive: 0.6161616161616161


In [132]:
print("Number of males: ", df_train.filter(col('Sex') == 'male').count())
print("Number of females: ", df_train.filter(col('Sex') == 'female').count())

Number of males:  577
Number of females:  314


**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

In [133]:
print('Number of survived for each gender')
survived_by_gender = df_train.groupBy('Sex').agg(sum('Survived'))
survived_by_gender.show()

Number of survived for each gender
+------+-------------+
|   Sex|sum(Survived)|
+------+-------------+
|female|          233|
|  male|          109|
+------+-------------+



In [134]:
print("Average number of survivors for both genders")
survived_by_gender.agg(avg('sum(Survived)')).show()

Average number of survivors for both genders
+------------------+
|avg(sum(Survived))|
+------------------+
|             171.0|
+------------------+



**Create temporary view PySpark:**

In [135]:
df_train.createOrReplaceTempView('table')

**How many people survived, and how many didn't survive? By SQL:**

In [136]:
spark.sql(
    """
    SELECT Survived, count(Survived)
    FROM table
    GROUP BY Survived;""").show()

+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            342|
|       0|            549|
+--------+---------------+



**Display the number of survivors from each gender as a ratio**

In [137]:
spark.sql(
    """
    SELECT Sex, 
        sum(Survived)  / (SELECT count(Survived) FROM table) AS Ratio_of_Survivors
    FROM table
    GROUP BY Sex;""").show()

+------+------------------+
|   Sex|Ratio_of_Survivors|
+------+------------------+
|female|0.2615039281705948|
|  male| 0.122334455667789|
+------+------------------+



**Display a ratio for "p-class":**

In [138]:
spark.sql(
    """
    SELECT Pclass, sum(Survived) / count(Pclass) AS Survival_Ratio
    FROM table
    GROUP BY Pclass;""").show()

+------+-------------------+
|Pclass|     Survival_Ratio|
+------+-------------------+
|     1| 0.6296296296296297|
|     3|0.24236252545824846|
|     2|0.47282608695652173|
+------+-------------------+



## Data Cleaning

In [139]:
combined = df_train.union(df_test)

**Display count:**

In [140]:
combined.count()

1782

**Define the number of null values in each column?**


In [141]:
combined.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in combined.columns)).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|404|    0|    0|     0|   0| 1374|       4|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

In [142]:
df_na = combined.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in combined.columns))
df_na.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|404|    0|    0|     0|   0| 1374|       4|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



## Preprocessing 

**Create Temporary view PySpark:**

In [143]:
combined.createOrReplaceTempView('full_table')

**Can you show the "name" column from your temporary table?**

In [144]:
spark.sql("""SELECT Name FROM full_table LIMIT 10;""").show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
+--------------------+



In [145]:
import pyspark.sql.functions as F
combined = combined.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

**Display "Title" column and count "Title" column:**

In [146]:
count_by_name = spark.sql("SELECT Title, count(Title) AS Counts FROM combined GROUP BY Title ORDER BY Counts ASC;")
count_by_name.show()

+--------+------+
|   Title|Counts|
+--------+------+
|Countess|     2|
|Jonkheer|     2|
|     Mme|     2|
|    Lady|     2|
|     Don|     2|
|     Sir|     2|
|      Ms|     2|
|    Capt|     2|
|     Col|     4|
|    Mlle|     4|
|   Major|     4|
|     Rev|    12|
|      Dr|    14|
|  Master|    80|
|     Mrs|   250|
|    Miss|   364|
|      Mr|  1034|
+--------+------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [147]:
titles_list = count_by_name.select('Title').limit(13).collect()
titles_map = {c[0] : 'rare' for c in titles_list}

In [148]:
def impute_title(title):
    return titles_map[title]# Title_map is your dictionary. please change this name with your dictionary name.

**Apply the function on "Title" column using UDF:**

In [149]:
def impute_wrapper(title):
    """Function checks if title is one of the rare ones"""
    if title in titles_map:
        return(impute_title(title))
    else:
        return(title)
impute_title_udf = udf(lambda t: impute_wrapper(t))

# Apply function on `combined` dataframe
combined = combined.withColumn('Title', impute_title_udf(col('Title')))

**Display "Title" from table and group by "Title" column:**

In [150]:
combined.createOrReplaceTempView('combined')
spark.sql("SELECT Title, count(Title) AS Counts FROM combined GROUP BY Title ORDER BY Counts ASC;").show()

+------+------+
| Title|Counts|
+------+------+
|  rare|    54|
|Master|    80|
|   Mrs|   250|
|  Miss|   364|
|    Mr|  1034|
+------+------+



## **Preprocessing Age**

In [151]:
age_avg = combined.select(mean(col('age'))).collect()[0][0]

**Fill missing with "age" mean:**

In [152]:
combined = combined.fillna(age_avg, subset=['age'])

## **Preprocessing Embarked**

**Select "Embarked" column, count them, order by count Desc, and save in grouped_Embarked variable:**




In [153]:
grouped_Embarked = spark.sql(
    """
    SELECT Embarked, count(Embarked) AS Counts 
    FROM combined 
    GROUP BY Embarked
    ORDER BY Counts DESC;""")

In [154]:
grouped_Embarked.show()

+--------+------+
|Embarked|Counts|
+--------+------+
|       S|  1288|
|       C|   336|
|       Q|   154|
|    null|     0|
+--------+------+



**Get max of groupped_Embarked:** 

In [155]:
grouped_Embarked.limit(1).collect()[0][:]

('S', 1288)

**Fill missing values with max 'S' of grouped_Embarked:**

In [156]:
combined = combined.fillna('S', subset=['Embarked'])

## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [157]:
def first_letter(cabin):
    if cabin != None:
        return(cabin[0])
    else:
        return None

first_letter_udf = udf(lambda c: first_letter(c))
combined = combined.withColumn('cabin', first_letter_udf(col('cabin')))

**Show the result:**

In [158]:
combined.select(col('cabin')).show()

+-----+
|cabin|
+-----+
| null|
|    C|
| null|
|    C|
| null|
| null|
|    E|
| null|
| null|
| null|
|    G|
|    C|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
+-----+
only showing top 20 rows



**Create the temporary view:**

In [159]:
combined.createOrReplaceTempView('combined')

**Select "Cabin" column, count "Cabin" column, Group by "Cabin" column, Order By count DESC**  

In [161]:
spark.sql(
    """
    SELECT cabin,
        count(cabin) AS Counts
    FROM combined
    GROUP BY cabin
    ORDER BY Counts DESC; 
    """
).show()

+-----+------+
|cabin|Counts|
+-----+------+
|    C|   118|
|    B|    94|
|    D|    66|
|    E|    64|
|    A|    30|
|    F|    26|
|    G|     8|
|    T|     2|
| null|     0|
+-----+------+



**Fill missing values with "U":**

In [163]:
combined = combined.fillna('U', subset=['cabin'])

In [176]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [169]:
categorical = [col for (col, dtype) in combined.dtypes if dtype == 'string']
numerical = [col for (col, dtype) in combined.dtypes if dtype != 'string']

index_cols = [x + "_Index" for x in categorical]
ohe_cols = [x + "_OHE" for x in categorical]

string_indexer = StringIndexer(inputCols=categorical, outputCols=index_cols, handleInvalid='skip')

In [170]:
ohe_encoder = OneHotEncoder(inputCols=index_cols, outputCols=ohe_cols)

In [171]:
vec_assembler = VectorAssembler(inputCols=ohe_cols + numerical, outputCol='features')

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [172]:
X_train, X_test = combined.randomSplit([.8, .2], seed=42)

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [178]:
model = RandomForestClassifier(labelCol="Survived", seed=42, featuresCol='features')
pipeline = Pipeline(stages=[string_indexer, ohe_encoder, vec_assembler, model])

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [179]:
pipeline_model = pipeline.fit(X_train)
df_pred = pipeline_model.transform(X_test)
df_pred.select('features', 'prediction').show()

21/11/04 22:01:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|(1540,[700,1516,1...|       1.0|
|(1540,[715,915,15...|       1.0|
|(1540,[621,856,85...|       0.0|
|(1540,[746,1404,1...|       1.0|
|(1540,[821,856,13...|       0.0|
|(1540,[837,856,14...|       0.0|
|(1540,[735,856,14...|       0.0|
|(1540,[775,1372,1...|       1.0|
|(1540,[624,956,15...|       0.0|
|(1540,[771,856,14...|       0.0|
|(1540,[695,856,96...|       0.0|
|(1540,[720,856,14...|       0.0|
|(1540,[765,856,14...|       0.0|
|(1540,[788,1450,1...|       0.0|
|(1540,[631,856,14...|       0.0|
|(1540,[662,856,14...|       0.0|
|(1540,[842,856,93...|       0.0|
|(1540,[633,856,97...|       0.0|
|(1540,[673,856,14...|       0.0|
|(1540,[754,856,14...|       0.0|
+--------------------+----------+
only showing top 20 rows



In [182]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Survived', metricName='accuracy')
evaluator.evaluate(df_pred)

0.8984375