Let's start with your project: 

Are you a data scientist? 

I think you are an awesome a data scientist.

### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

## Install and Import Libraries
Let's install PySpark:

In [1]:
# pip install spark

In [2]:
# pip install findspark

## Build Spark Session

In [3]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import count

spark = (SparkSession
.builder
.appName("DataFrame")
.getOrCreate())


## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [4]:
train_df=spark.read.csv('./test.csv',inferSchema=True,header=True)
test_df=spark.read.csv('./train.csv',inferSchema=True,header=True)

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [5]:
print(type(train_df))
print(type(test_df))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


**Show 5 rows.**

In [6]:
train_df.head(5)

[Row(PassengerId=1, Survived=1, Pclass=1, Name='Goldenberg, Mr. Samuel L', Sex='male', Age=49.0, SibSp=1, Parch=0, Ticket='17453', Fare=89.1042, Cabin='C92', Embarked='C'),
 Row(PassengerId=2, Survived=0, Pclass=3, Name='Peduzzi, Mr. Joseph', Sex='male', Age=None, SibSp=0, Parch=0, Ticket='A/5 2817', Fare=8.05, Cabin=None, Embarked='S'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Jalsevac, Mr. Ivan', Sex='male', Age=29.0, SibSp=0, Parch=0, Ticket='349240', Fare=7.8958, Cabin=None, Embarked='C'),
 Row(PassengerId=4, Survived=0, Pclass=1, Name='Millet, Mr. Francis Davis', Sex='male', Age=65.0, SibSp=0, Parch=0, Ticket='13509', Fare=26.55, Cabin='E38', Embarked='S'),
 Row(PassengerId=5, Survived=1, Pclass=1, Name='Kenyon, Mrs. Frederick R (Marion)', Sex='female', Age=None, SibSp=1, Parch=0, Ticket='17464', Fare=51.8625, Cabin='D21', Embarked='S')]

**Display schema for the dataset:**

In [7]:
train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [8]:
train_df.summary()

DataFrame[summary: string, PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [9]:
print(train_df.count())

438


**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [10]:
survived =  train_df.filter('Survived == 1').count()
not_survived =  train_df.filter('Survived == 0').count()

**Display your result:**

In [11]:
print(survived,"person survived ")
print(not_survived,"person didnot survive ")

163 person survived 
275 person didnot survive 


**Can you display your answer in ratio form?(Hint: Use UDF.)**



In [12]:

def ratio_form(num):
    return num / float(train_df.count())


In [13]:
df1 = train_df.groupby('Survived').count()

In [14]:
df1.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  163|
|       0|  275|
+--------+-----+



In [15]:
print("survived ratio",ratio_form(survived) )
print("not survived ratio",ratio_form(not_survived) )

survived ratio 0.3721461187214612
not survived ratio 0.6278538812785388


**Can you get the number of males and females?**


In [16]:
train_df.head()

Row(PassengerId=1, Survived=1, Pclass=1, Name='Goldenberg, Mr. Samuel L', Sex='male', Age=49.0, SibSp=1, Parch=0, Ticket='17453', Fare=89.1042, Cabin='C92', Embarked='C')

In [17]:
df2 = train_df.groupby('Sex').count()
df2.show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  141|
|  male|  297|
+------+-----+



**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column.)

In [18]:
train_df.groupby('Sex').avg('Survived').show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7304964539007093|
|  male|0.20202020202020202|
+------+-------------------+



In [19]:
train_df.head()

Row(PassengerId=1, Survived=1, Pclass=1, Name='Goldenberg, Mr. Samuel L', Sex='male', Age=49.0, SibSp=1, Parch=0, Ticket='17453', Fare=89.1042, Cabin='C92', Embarked='C')

**Create temporary view PySpark:**

In [20]:
train_df.createOrReplaceTempView('DF_view')
spark.sql('SELECT PassengerId, Survived, Pclass, Name,Sex, Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked FROM DF_view').show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|   17453|89.1042|  C92|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|A/5 2817|   8.05| null|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|  349240| 7.8958| null|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|   13509|  26.55|  E38|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|   17464|51.8625|  D21|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



**How many people survived, and how many didn't survive? By SQL:**

In [21]:
spark.sql("""SELECT Survived,COUNT(Survived) 
          FROM DF_view 
          GROUP BY Survived
          """).show()



+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            163|
|       0|            275|
+--------+---------------+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column.)

**Can you do this via SQL?**

In [22]:
spark.sql("""SELECT Sex ,SUM(case when Survived =1 then 1 else 0 end) / count(*) as ratio
          FROM DF_view 
          GROUP BY Sex
          """).show()



+------+-------------------+
|   Sex|              ratio|
+------+-------------------+
|female| 0.7304964539007093|
|  male|0.20202020202020202|
+------+-------------------+



**Display a ratio for p-class:**


In [23]:

spark.sql("""SELECT Pclass ,count (Pclass) / (select count (*) from DF_view ) as RATIO from DF_view group by Pclass
          """).show()



+------+------------------+
|Pclass|             RATIO|
+------+------------------+
|     1|0.2602739726027397|
|     3|  0.54337899543379|
|     2|0.1963470319634703|
+------+------------------+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [24]:
total_df = train_df.union(test_df)

**Display count:**

In [25]:
total_df.count()

1329

**Temporary view PySpark:**

In [26]:
total_df.createOrReplaceTempView('total_view')
spark.sql('SELECT PassengerId, Survived, Pclass, Name,Sex, Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked FROM DF_view').show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|   17453|89.1042|  C92|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|A/5 2817|   8.05| null|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|  349240| 7.8958| null|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|   13509|  26.55|  E38|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|   17464|51.8625|  D21|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



**Can you define the number of null values in each column?**


In [27]:
from pyspark.sql.functions import isnan, when, count, col,mean

total_df.select([count(when(isnan(c) |col(c).isNull(), c)).alias(c) for c in total_df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [28]:
missing_df = total_df.select([count(when(isnan(c) |col(c).isNull(), c)).alias(c) for c in total_df.columns])

## Preprocessing 

**Can you show me the name column from your temporary table?**

In [29]:
print(total_df.columns)

['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']


**Run this code:**

In [30]:
from pyspark.sql import functions as F

combined = total_df.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

**Display the title and count "Title" column:**

In [31]:

spark.sql("""SELECT Title,COUNT(Title) 
          FROM combined 
          GROUP BY Title
          """).show()


+--------+------------+
|   Title|count(Title)|
+--------+------------+
|     Don|           1|
|    Miss|         257|
|Countess|           2|
|     Col|           4|
|    Lady|           2|
|     Rev|           9|
|  Master|          56|
|    Capt|           2|
|     Mme|           1|
|      Mr|         786|
|      Dr|          11|
|     Mrs|         186|
|     Sir|           2|
|Jonkheer|           2|
|    Mlle|           4|
|   Major|           3|
|      Ms|           1|
+--------+------------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [32]:
rare = {'Dr':'rare', 'Rev':'rare', 'Major':'rare', 'Col':'rare', 'Mlle':'rare', 'Capt':'rare', 'Don':'rare', 'Jonkheer':'rare', 'Countess':'rare', 'Ms':'rare', 'Sir':'rare', 'Lady':'rare',  'Mme':'rare','Miss':'Miss','Master':'Master','Mr':'Mr','Mrs':'Mrs'}

**Run the function:**

In [33]:
def impute_title(title):
    return rare[title]

**Apply the function on "Title" column using UDF:**

In [34]:
from pyspark.sql.functions import udf

convertUDF = udf(lambda z: impute_title(z))
combined = combined.withColumn("Title", convertUDF(F.col("Title")))

In [35]:
combined.createOrReplaceTempView('combined')

**Display "Title" from table and group by "Title" column:**

In [36]:
spark.sql("""SELECT Title,COUNT(Title) 
          FROM combined 
          GROUP BY Title
          """).show()



+------+------------+
| Title|count(Title)|
+------+------------+
|  rare|          44|
|  Miss|         257|
|Master|          56|
|    Mr|         786|
|   Mrs|         186|
+------+------------+



## **Preprocessing Age**

**Based on the age mean, you will fill in the missing age values:**

In [37]:
mean_of_age= total_df.select(mean('Age')).collect()[0][0]
mean_of_age

30.079501879699244

**Fill missing age with age mean:**

In [38]:
total_df.fillna(mean_of_age, subset= ['Age'])

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

## **Preprocessing Embarked**

**Select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [39]:
grouped_Embarked= total_df.select('Embarked').groupBy('Embarked').agg(count('Embarked').alias('group_of_Embarked')).orderBy('group_of_Embarked', ascending= False)

**Show groupped_Embarked:**

In [40]:
spark.sql(""" SELECT Embarked, count(Embarked) AS group_of_Embarked
FROM combined
GROUP BY Embarked
ORDER BY group_of_Embarked DESC
 """).show()

+--------+-----------------+
|Embarked|group_of_Embarked|
+--------+-----------------+
|       S|              962|
|       C|              253|
|       Q|              111|
|    null|                0|
+--------+-----------------+



**Get the groupped_Embarked:** 

In [41]:
grouped_Embarked.show(1)

+--------+-----------------+
|Embarked|group_of_Embarked|
+--------+-----------------+
|       S|              962|
+--------+-----------------+
only showing top 1 row



**Fill missing values with Top 'S' of grouped_Embarked:**

In [42]:
grouped_Embarked.fillna(value= 'S', subset= 'Embarked').show()

+--------+-----------------+
|Embarked|group_of_Embarked|
+--------+-----------------+
|       S|              962|
|       C|              253|
|       Q|              111|
|       S|                0|
+--------+-----------------+



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [43]:
import numpy

In [44]:
def convertCabin(x):
  if not x:
    return x
  return x[0]
# total_df = total_df.dropna(subset='cabin')
convertCabinUDF = udf(lambda z: convertCabin(z))
total_df=total_df.withColumn("cabin", convertCabinUDF(F.col("cabin")))

# total_df.cabin.map(lambda x: convertCabin(x)).show()

**Show the result:**

In [45]:
total_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|   Fare|cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|             17453|89.1042|    C|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|          A/5 2817|   8.05| null|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|            349240| 7.8958| null|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|             13509|  26.55|    E|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|             17464|51.8625|    D|       S|
|          6|       1|     2| Toomey, Miss. Ellen|female|50.0|  

**Create the temporary view:**

In [46]:
total_df.createOrReplaceTempView('total')

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [47]:
spark.sql(""" SELECT cabin, count(cabin) AS count
FROM total
GROUP BY cabin
ORDER BY count DESC
 """).show()

+-----+-----+
|cabin|count|
+-----+-----+
|    C|   82|
|    B|   77|
|    D|   52|
|    E|   51|
|    A|   23|
|    F|   18|
|    G|    4|
|    T|    1|
| null|    0|
+-----+-----+



**Fill missing values with "U":**

In [48]:
total_df = total_df.fillna(value= 'U', subset= 'cabin')
total_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|   Fare|cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|             17453|89.1042|    C|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|          A/5 2817|   8.05|    U|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|            349240| 7.8958|    U|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|             13509|  26.55|    E|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|             17464|51.8625|    D|       S|
|          6|       1|     2| Toomey, Miss. Ellen|female|50.0|  

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

____________________________________________

**Use Pipline to fit and transform:**

In [49]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [50]:
total_df = total_df.dropna()


In [51]:
stage_1 = StringIndexer(inputCol= 'Name', outputCol= 'Name_index')
stage_2 = StringIndexer(inputCol= 'Sex', outputCol= 'Sex_index')
stage_3 = StringIndexer(inputCol= 'cabin', outputCol= 'cabin_index')
stage_4 = StringIndexer(inputCol= 'Embarked', outputCol= 'Embarked_index')

pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4])



In [52]:
pipeline_model = pipeline.fit(total_df)

In [53]:
total_df_updated = pipeline_model.transform(total_df)


In [54]:
total_df_updated.show()


+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+----------+---------+-----------+--------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|   Fare|cabin|Embarked|Name_index|Sex_index|cabin_index|Embarked_index|
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+----------+---------+-----------+--------------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|             17453|89.1042|    C|       C|     139.0|      0.0|        2.0|           1.0|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|            349240| 7.8958|    U|       C|     180.0|      0.0|        0.0|           1.0|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|             13509|  26.55|    E|       S|     234.0|      0.0|        3.0|           0.0|
|         

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None) A feature transformer that merges multiple columns into a vector column.**



In [55]:
from pyspark.ml.feature import VectorAssembler
numericCols = [ 'Pclass', 'SibSp', 'Parch', 'Sex_index', 'cabin_index', 'Embarked_index']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
total_df_updated = assembler.transform(total_df_updated)
total_df_updated.show()

+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+----------+---------+-----------+--------------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|   Fare|cabin|Embarked|Name_index|Sex_index|cabin_index|Embarked_index|            features|
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+-------+-----+--------+----------+---------+-----------+--------------+--------------------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|             17453|89.1042|    C|       C|     139.0|      0.0|        2.0|           1.0|[1.0,1.0,0.0,0.0,...|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|            349240| 7.8958|    U|       C|     180.0|      0.0|        0.0|           1.0| (6,[0,5],[3.0,1.0])|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|  

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [56]:
x_train, X_test = total_df_updated.randomSplit([0.8, 0.2],seed = 42)

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [57]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survived')
rfModel = rf.fit(x_train)
predictions = rfModel.transform(X_test)
predictions.select('PassengerId', 'Pclass', 'SibSp', 'Parch', 'Name_index', 'Sex_index', 'cabin_index', 'Embarked_index').show(10)


+-----------+------+-----+-----+----------+---------+-----------+--------------+
|PassengerId|Pclass|SibSp|Parch|Name_index|Sex_index|cabin_index|Embarked_index|
+-----------+------+-----+-----+----------+---------+-----------+--------------+
|          4|     1|    0|    0|     234.0|      0.0|        3.0|           0.0|
|         10|     1|    0|    0|     134.0|      0.0|        3.0|           0.0|
|         13|     3|    0|    0|     141.0|      0.0|        0.0|           0.0|
|         21|     2|    0|    0|     185.0|      1.0|        4.0|           1.0|
|         28|     3|    5|    2|     142.0|      0.0|        0.0|           0.0|
|         34|     1|    1|    0|     175.0|      1.0|        2.0|           0.0|
|         41|     1|    0|    0|      34.0|      0.0|        0.0|           1.0|
|         49|     3|    0|    0|      70.0|      1.0|        0.0|           2.0|
|         62|     3|    0|    0|      86.0|      0.0|        0.0|           0.0|
|         63|     1|    0|  

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [62]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Create both evaluators
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="predictions")
evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="predictions", metricName='accuracy')
