**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark                                                                            
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"
!pip install findspark                                                                            
!pip install pyspark                                                                            
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 58 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=c69a1705b3d88ea639859e4e09b21f959476c899eebdc65ceda68c9ff7388653
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


## Build Spark Session

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
train = spark.read.csv('train.csv', header=True,inferSchema=True)
test = spark.read.csv('test.csv', header=True,inferSchema=True)

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [None]:
print(type(train))

<class 'pyspark.sql.dataframe.DataFrame'>


**Show 5 rows.**

In [None]:
train.head(5)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
 Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
 Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]

**Display schema for the dataset:**

In [None]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [None]:
train.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [None]:
train.count()

891

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [None]:
train.select('Survived').show()
survived = train.filter('Survived=1').count()
not_survived = train.filter('Survived=0').count()

+--------+
|Survived|
+--------+
|       0|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       0|
|       1|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       1|
|       0|
|       1|
|       0|
|       1|
+--------+
only showing top 20 rows



**Display your result:**

In [None]:
print(survived)
print(not_survived)

342
549


**Can you display your answer in ratio form?(Hint: Use UDF.)**






In [None]:
from pyspark.sql.functions import udf
from pyspark.sql import functions as f

def ratio (x):
    
     return f.udf(lambda col: col/x, FloatType())





In [None]:
train.select('Survived').groupby('Survived').count().withColumn('Ratio', ratio(train.count())('count')).show()


+--------+-----+----------+
|Survived|count|     Ratio|
+--------+-----+----------+
|       1|  342| 0.3838384|
|       0|  549|0.61616164|
+--------+-----+----------+



**Can you get the number of males and females?**


In [None]:
train.select('Sex').groupby('Sex').count().show()


+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column.)

In [None]:
train.filter('Survived=1').groupBy('Survived','Sex').count().withColumn('Ratio', ratio(train.filter('Survived=1').count())('count')).show()



+--------+------+-----+----------+
|Survived|   Sex|count|     Ratio|
+--------+------+-----+----------+
|       1|  male|  109|0.31871346|
|       1|female|  233| 0.6812866|
+--------+------+-----+----------+



In [None]:
train.groupBy('Survived','Sex').count().show()

+--------+------+-----+
|Survived|   Sex|count|
+--------+------+-----+
|       0|female|   81|
|       1|  male|  109|
|       1|female|  233|
|       0|  male|  468|
+--------+------+-----+





```
# This is formatted as code
```

**Create temporary view PySpark:**

In [None]:
train.createOrReplaceTempView("TrainData")

**How many people survived, and how many didn't survive? By SQL:**

In [None]:
spark.sql(''' SELECT Survived,count(Survived)
From TrainData
group by Survived ''').show()

+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            342|
|       0|            549|
+--------+---------------+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column.)

**Can you do this via SQL?**

In [None]:
spark.sql(''' SELECT Sex,count(Sex),avg(Survived)
From TrainData
group by Sex''').show()

+------+----------+-------------------+
|   Sex|count(Sex)|      avg(Survived)|
+------+----------+-------------------+
|female|       314| 0.7420382165605095|
|  male|       577|0.18890814558058924|
+------+----------+-------------------+



**Display a ratio for p-class:**


In [None]:
spark.sql(''' SELECT Pclass,count(Pclass) Each_Class , count(Pclass)/891
From TrainData
group by Pclass''').show()

+------+----------+-----------------------------------------------------+
|Pclass|Each_Class|(CAST(count(Pclass) AS DOUBLE) / CAST(891 AS DOUBLE))|
+------+----------+-----------------------------------------------------+
|     1|       216|                                  0.24242424242424243|
|     3|       491|                                   0.5510662177328844|
|     2|       184|                                  0.20650953984287318|
+------+----------+-----------------------------------------------------+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [None]:
df = train.union(test)

**Display count:**

In [None]:
df.count()

1329

**Temporary view PySpark:**

In [None]:
df.createOrReplaceTempView("dataframe")

**Can you define the number of null values in each column?**


In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [None]:
new= df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
new.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



## Preprocessing 

**Can you show me the name column from your temporary table?**

In [None]:
spark.sql('''SELECT Name from dataframe ''' ).show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



**Run this code:**

In [None]:
combined = df.withColumn('Title',f.regexp_extract(f.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

**Display the title and count "Title" column:**

In [None]:
spark.sql(''' SELECT Title ,count(Title) 
FROM combined
group by Title ''').show()

+--------+------------+
|   Title|count(Title)|
+--------+------------+
|     Don|           1|
|    Miss|         257|
|Countess|           2|
|     Col|           4|
|     Rev|           9|
|    Lady|           2|
|  Master|          56|
|     Mme|           1|
|    Capt|           2|
|      Mr|         786|
|      Dr|          11|
|     Mrs|         186|
|     Sir|           2|
|Jonkheer|           2|
|    Mlle|           4|
|   Major|           3|
|      Ms|           1|
+--------+------------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [None]:
titles_map = {'Mr':'not rare','Mrs':'not rare','Miss':'not rare','Master':'not rare','Dr':'rare', 'Rev':'rare', 'Major':'rare', 'Col':'rare', 'Mlle':'rare', 'Capt':'rare', 'Don':'rare', 'Jonkheer':'rare', 'Countess':'rare', 'Ms':'rare', 'Sir':'rare', 'Lady':'rare', 'Mme':'rare'}

**Run the function:**

In [None]:
def impute_title(title):
    return titles_map[title]

udf_new=udf(lambda x: impute_title(x))



**Apply the function on "Title" column using UDF:**

In [None]:

new_df=combined.withColumn('New_Titles',udf_new(f.col('Title')))

**Display "Title" from table and group by "Title" column:**

In [None]:
new_df.select('New_Titles').groupBy('New_Titles').count().show()

+----------+-----+
|New_Titles|count|
+----------+-----+
|      rare|   44|
|  not rare| 1285|
+----------+-----+



## **Preprocessing Age**

**Based on the age mean, you will fill in the missing age values:**

In [None]:
mean_age = new_df.select(f.mean(col('Age')).alias('mean')).collect()
mean = mean_age[0]['mean']
print(mean)

30.079501879699244


**Fill missing age with age mean:**

In [None]:
new_df1=new_df.na.fill(value=mean,subset=["Age"])

In [None]:
new_df1.select('Age').show()

+------------------+
|               Age|
+------------------+
|              22.0|
|              38.0|
|              26.0|
|              35.0|
|              35.0|
|30.079501879699244|
|              54.0|
|               2.0|
|              27.0|
|              14.0|
|               4.0|
|              58.0|
|              20.0|
|              39.0|
|              14.0|
|              55.0|
|               2.0|
|30.079501879699244|
|              31.0|
|30.079501879699244|
+------------------+
only showing top 20 rows



## **Preprocessing Embarked**

**Select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [None]:
grouped_Embarked=new_df1.select('Embarked').groupBy('Embarked').count().orderBy('count',ascending=False)

**Show groupped_Embarked:**

In [None]:
grouped_Embarked.show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       C|  253|
|       Q|  111|
|    null|    3|
+--------+-----+



**Get the groupped_Embarked:** 

In [None]:
grouped=grouped_Embarked.head(1)[0][0]

**Fill missing values with grouped_Embarked:**

In [None]:
new_df1=new_df1.na.fill(value=grouped,subset=["Embarked"])



## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [None]:
def cabin_function(x):
    if x is not None:
       
       x=x[0]
       return x
    

convertUDF = udf(lambda x: cabin_function(x))


**Show the result:**

In [None]:
new_df1=new_df1.withColumn('new_cabin',convertUDF(f.col("cabin")))


**Create the temporary view:**

In [None]:
new_df1.createOrReplaceTempView("Temp3")

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [None]:
new_df1.select('new_cabin').groupBy('new_cabin').count().orderBy('count',ascending=False).show()

+---------+-----+
|new_cabin|count|
+---------+-----+
|     null| 1021|
|        C|   82|
|        B|   77|
|        D|   52|
|        E|   51|
|        A|   23|
|        F|   18|
|        G|    4|
|        T|    1|
+---------+-----+



**Fill missing values with "U":**

In [None]:
new_df1=new_df1.na.fill(value='U',subset=["new_cabin"])


In [None]:
def drop_null_columns(df):
    """
    This function drops all columns which contain null values.
    :param df: A PySpark DataFrame
    """
    null_counts = df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v > 0]
    df = df.drop(*to_drop)
    return df


last_df = drop_null_columns(new_df1)
last_df.show()
print(last_df.dtypes)

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+--------+------+----------+---------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Embarked| Title|New_Titles|new_cabin|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+--------+------+----------+---------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25|       S|    Mr|  not rare|        U|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|       C|   Mrs|  not rare|        C|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925|       S|  Miss|  not rare|        U|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|      

In [None]:
trainDF, testDF = last_df.randomSplit([.8,.2],seed=42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

There are 1094 rows in the training set, and 235 in the test set


**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

In [None]:
categorical_columns = [field for (field,dataType) in trainDF.dtypes if ((dataType == 'string') & (field != 'Ticket')  & (field != 'Name'))]
print(type(categorical_columns))

<class 'list'>


In [None]:
indexOutput= [x + "_Index" for x in categorical_columns]

In [None]:
OneHotColumns = [x + "_ OHE" for x in categorical_columns]

In [None]:
from pyspark.ml.feature import VectorAssembler ,StringIndexer,OneHotEncoder


In [None]:
stringIndexer = StringIndexer(inputCols=categorical_columns ,outputCols=indexOutput,handleInvalid='skip')
onehotEncoder = OneHotEncoder(inputCols=indexOutput,outputCols=OneHotColumns)

numericColumns  = [field for (field,dataType) in trainDF.dtypes if ((dataType == 'double') | (dataType =='int')) & ((field != 'PassengerId') &(field != 'Survived')) ]


In [None]:
assembler_Inputs = OneHotColumns + numericColumns

In [None]:
vecAssembler = VectorAssembler(inputCols=assembler_Inputs , outputCol='features')

In [None]:
from pyspark.ml.classification import RandomForestClassifier
random= RandomForestClassifier(labelCol='Survived',featuresCol='features')

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[stringIndexer,onehotEncoder,vecAssembler,random])
pipeline_Model = pipeline.fit(trainDF)


In [None]:
predictions = pipeline_Model.transform(testDF)

In [None]:
predictions.select('features','Survived','prediction').show()

+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|(32,[1,4,18,19,27...|       1|       0.0|
|(32,[0,1,3,18,23,...|       0|       0.0|
|(32,[1,5,18,19,27...|       1|       1.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[2,5,18,19,27...|       1|       1.0|
|(32,[0,1,3,18,24,...|       1|       0.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[0,3,18,19,27...|       0|       0.0|
|(32,[4,18,19,27,2...|       1|       1.0|
|(32,[1,5,18,19,27...|       0|       1.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[0,1,3,18,20,...|       1|       0.0|
|(32,[0,1,3,18,20,...|       0|       0.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[1,4,18,19,27...|       0|       0.0|
|(32,[0,1,3,18,19,...|       0|       0.0|
|(32,[0,3,18,19,27...|       0|       0.0|
+----------

____________________________________________

In [None]:
 from pyspark.ml.evaluation import MulticlassClassificationEvaluator

 eval = MulticlassClassificationEvaluator(labelCol='Survived',predictionCol='prediction',metricName='accuracy')

In [None]:
eval.evaluate(predictions)

0.8034188034188035