**Data about passengers:**
*   Name
*   Age
*   Gender.


In [1]:
import pyspark

In [12]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import seaborn as sns

## Building Spark Session

In [4]:
spark = SparkSession.builder.getOrCreate()

## Data Loading


In [13]:
df_train = spark.read.csv("train.csv",header=True, inferSchema=True)

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [14]:
type(df_train)

pyspark.sql.dataframe.DataFrame

**Showing 5 rows.**

In [15]:
df_train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Displaying schema for the dataset:**

In [16]:
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [17]:
df_train.summary().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

In [22]:
import pyspark.sql.functions as F

In [18]:
df_train.count()

891

**How many people survived, and how many didn't survive?** 

In [19]:
df_train.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [27]:
survivor_count =  df_train.groupBy(F.col("Survived")).agg(F.count("Survived"))

**Displaying results:**

In [28]:
survivor_count.show()

+--------+---------------+
|Survived|count(Survived)|
+--------+---------------+
|       1|            342|
|       0|            549|
+--------+---------------+



In [32]:
ls = [4, 6]

In [40]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

In [55]:
def ratio(col):
    return col/891

ratioUDF = udf(lambda z: ratio(z),DoubleType())

In [56]:
survivor_count.select(ratioUDF(F.col("count(Survived)"))).show()

+-------------------------+
|<lambda>(count(Survived))|
+-------------------------+
|       0.3838383838383838|
|       0.6161616161616161|
+-------------------------+



**Number of males and females**


In [58]:
df_train.groupBy("Sex").agg(F.count("Sex")).show()

+------+----------+
|   Sex|count(Sex)|
+------+----------+
|female|       314|
|  male|       577|
+------+----------+



**What is the average number of survivors of each gender?**

In [61]:
# 1 
df_train.groupBy("Sex").agg(F.avg("Survived")).show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



**What is the number of survivors of each gender?**

In [60]:
# 2 
df_train.groupBy("Sex").agg(F.sum("Survived")).show()

+------+-------------+
|   Sex|sum(Survived)|
+------+-------------+
|female|          233|
|  male|          109|
+------+-------------+



**Creating a view:**

In [63]:
df_train.createTempView("view1")

**How many people survived, and how many didn't survive?:**

In [76]:
spark.sql("""
SELECT count(Survived) FROM view1 GROUP BY Sex
""").show(5)

+---------------+
|count(Survived)|
+---------------+
|            314|
|            577|
+---------------+



In [78]:
spark.udf.register("ratioUDF", ratioUDF)

<function __main__.<lambda>(z)>

**the number of survivors from each gender ratio**

In [80]:
spark.sql("""
SELECT ratioUDF(count(Survived)) FROM view1 GROUP BY Survived
""").show(5)

+-------------------------+
|ratioUDF(count(Survived))|
+-------------------------+
|       0.3838383838383838|
|       0.6161616161616161|
+-------------------------+



In [81]:
df_train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Displaying a ratio for "p-class":**


In [85]:
df_train.groupBy("Pclass").agg(F.count("Pclass")).show()

+------+-------------+
|Pclass|count(Pclass)|
+------+-------------+
|     1|          216|
|     3|          491|
|     2|          184|
+------+-------------+



In [93]:
spark.sql("""
SELECT ratioUDF(sum(Survived)) FROM view1 GROUP BY Pclass
""").show(5)

+---------------------------------------+
|ratioUDF(sum(cast(Survived as bigint)))|
+---------------------------------------+
|                     0.1526374859708193|
|                     0.1335578002244669|
|                    0.09764309764309764|
+---------------------------------------+



## Data Cleaning

In [94]:
df_test = spark.read.csv("test.csv",header=True, inferSchema=True)

In [97]:
df_test.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          1|       1|     1|Goldenberg, Mr. S...|  male|49.0|    1|    0|   17453|89.1042|  C92|       C|
|          2|       0|     3| Peduzzi, Mr. Joseph|  male|null|    0|    0|A/5 2817|   8.05| null|       S|
|          3|       1|     3|  Jalsevac, Mr. Ivan|  male|29.0|    0|    0|  349240| 7.8958| null|       C|
|          4|       0|     1|Millet, Mr. Franc...|  male|65.0|    0|    0|   13509|  26.55|  E38|       S|
|          5|       1|     1|Kenyon, Mrs. Fred...|female|null|    1|    0|   17464|51.8625|  D21|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



In [105]:
df = df_train.union(df_test)

**Displaying count:**

In [107]:
df.count()

1329

In [120]:
df.summary().select("summary","Age","Cabin","Embarked").show()

+-------+------------------+-----+--------+
|summary|               Age|Cabin|Embarked|
+-------+------------------+-----+--------+
|  count|              1064|  308|    1326|
|   mean|30.079501879699244| null|    null|
| stddev|14.648220239867674| null|    null|
|    min|              0.42|  A10|       C|
|    25%|              21.0| null|    null|
|    50%|              29.0| null|    null|
|    75%|              39.0| null|    null|
|    max|              80.0|    T|       S|
+-------+------------------+-----+--------+



**number of null values in each column**


In [167]:
temp_tuples = []
for col in df.columns:
    nan_count = df.where(F.col(col).isNull()).count()
    if nan_count > 0 :
        temp_tuples.append((col, nan_count))

In [173]:
temp_tuples

[('Age', 265), ('Cabin', 1021), ('Embarked', 3)]

In [172]:
spark.createDataFrame(temp_tuples, ["Col_Name" , "Num_of_Nans"]).show()

+--------+-----------+
|Col_Name|Num_of_Nans|
+--------+-----------+
|     Age|        265|
|   Cabin|       1021|
|Embarked|          3|
+--------+-----------+



## Preprocessing 

In [176]:
df.createTempView("view_df")

In [179]:
spark.sql("""
select name from view_df
""").show(5)

+--------------------+
|                name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
+--------------------+
only showing top 5 rows



**Run this code:**

In [258]:
import pyspark.sql.functions as F
combined = df.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

In [182]:
combined.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Title: string (nullable = true)



In [190]:
title_grouped_df = combined.groupBy("Title").agg(F.count("Title"))

In [192]:
title_grouped_df.show()

+--------+------------+
|   Title|count(Title)|
+--------+------------+
|     Don|           1|
|    Miss|         257|
|Countess|           2|
|     Col|           4|
|     Rev|           9|
|    Lady|           2|
|  Master|          56|
|     Mme|           1|
|    Capt|           2|
|      Mr|         786|
|      Dr|          11|
|     Mrs|         186|
|     Sir|           2|
|Jonkheer|           2|
|    Mlle|           4|
|   Major|           3|
|      Ms|           1|
+--------+------------+



In [228]:
title_israre_df = title_grouped_df.withColumn("is_rare", 
                            F.when(F.col("count(Title)") < 50,"rare" )
                            .when(F.col("count(Title)") > 50,F.col("Title"))
).select("Title", "is_rare")

In [248]:
temp_df = title_israre_df.toPandas()
dict_vals =dict()
for col_titl , col_israre in zip(temp_df.Title ,temp_df.is_rare):
    #dict_vals[column] = temp_df[column].values.tolist()
    dict_vals[col_titl] = col_israre
    
    
print(dict_vals)

{'Don': 'rare', 'Miss': 'Miss', 'Countess': 'rare', 'Col': 'rare', 'Rev': 'rare', 'Lady': 'rare', 'Master': 'Master', 'Mme': 'rare', 'Capt': 'rare', 'Mr': 'Mr', 'Dr': 'rare', 'Mrs': 'Mrs', 'Sir': 'rare', 'Jonkheer': 'rare', 'Mlle': 'rare', 'Major': 'rare', 'Ms': 'rare'}


**Run the function:**

In [249]:
def impute_title(title):
    return dict_vals[title]# Title_map is your dictionary. please change this name with your dictionary name.

**Apply the function on "Title" column using UDF:**

In [250]:
impute_titleUDF = udf(lambda z: impute_title(z))

In [259]:
#combined.select(impute_titleUDF(F.col("Title"))).show(5)



combined = combined.\
withColumn("Title",impute_titleUDF(F.col("Title")))

In [264]:
combined.createTempView("combined2")

**Display "Title" from table and group by "Title" column:**

In [265]:
spark.sql("""
select Title, count('Title') from combined2 group by Title
""").show(10)

+------+------------+
| Title|count(Title)|
+------+------------+
|  rare|          44|
|  Miss|         257|
|Master|          56|
|    Mr|         786|
|   Mrs|         186|
+------+------------+



## **Preprocessing Age**

**Based on the "age" column mean, you will fill in the missing age values:**

In [277]:
mean = spark.sql("""
select sum(age) / count(age) from combined2 
""").collect()[0][0]

In [428]:
combined.select(F.mean(F.col("Age"))).collect()[0][0]

30.0795018796993

**Fill missing with "age" mean:**

In [294]:
combined = combined.na.fill(mean,"Age")


# combined.na.fill(mean,"Age").select("Age").show()

In [295]:
combined.select("Age").show()

+------------------+
|               Age|
+------------------+
|              22.0|
|              38.0|
|              26.0|
|              35.0|
|              35.0|
|30.079501879699244|
|              54.0|
|               2.0|
|              27.0|
|              14.0|
|               4.0|
|              58.0|
|              20.0|
|              39.0|
|              14.0|
|              55.0|
|               2.0|
|30.079501879699244|
|              31.0|
|30.079501879699244|
+------------------+
only showing top 20 rows



## **Preprocessing Embarked**

In [300]:
grouped_Embarked = spark.sql("""
select Embarked,count(Embarked) as count from combined2 group by Embarked order by count desc  
""")

In [301]:
grouped_Embarked.show(5)

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       C|  253|
|       Q|  111|
|    null|    0|
+--------+-----+



In [305]:
grouped_Embarked.select(F.max(F.col("count"))).show()

+----------+
|max(count)|
+----------+
|       962|
+----------+



In [308]:
#max_val = grouped_Embarked.select(F.max(F.col("count"))).collect()[0][0]

In [309]:
combined = combined.na.fill("S","Embarked")

## **Preprocessing Cabin**

In [310]:
combined.select("cabin").show(5)

+-----+
|cabin|
+-----+
| null|
|  C85|
| null|
| C123|
| null|
+-----+
only showing top 5 rows



In [325]:
def replace_with_first(s):
    if  s == None :
        return s
    return s[0]
replace_with_firstUDF = udf(lambda z: replace_with_first(z),StringType())

In [328]:
combined =  combined.withColumn("cabin", replace_with_firstUDF(F.col("cabin")))

In [329]:
combined.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|cabin|Embarked|Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|   Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|  Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S| Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|    C|       S|  Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|   Mr|
+-----------+--------+------+---

In [330]:
combined.createTempView("view_df3")

In [332]:
spark.sql("""
select cabin, count('cabin') from view_df3 group by cabin order by count('cabin') desc 
""").show(5)

+-----+------------+
|cabin|count(cabin)|
+-----+------------+
| null|        1021|
|    C|          82|
|    B|          77|
|    D|          52|
|    E|          51|
+-----+------------+
only showing top 5 rows



In [333]:
combined = combined.na.fill("U","cabin")

# Importing important packages 

In [342]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [356]:
cat_cols = [c for (c, d) in combined.dtypes if (d == "string") ]  # string columns

In [357]:
cat_cols

['Name', 'Sex', 'Ticket', 'cabin', 'Embarked', 'Title']

In [358]:
indexed_Col = [col_name + "_indexed" for col_name in cat_cols] 

In [359]:
ohe_Col = [col_name + "_ohe" for col_name in cat_cols]   

In [376]:
num_col = [c for (c , d) in combined.dtypes if (d == "int") | (d == "double")]   

In [377]:
num_col.remove("Survived")

In [429]:
strInd = StringIndexer(inputCols= cat_cols , outputCols= indexed_Col, handleInvalid='skip') 

**OneHotEncoding**

In [430]:
ohe = OneHotEncoder(inputCols= indexed_Col , outputCols= ohe_Col)

**VectorAssembler:**


In [431]:
assemb_col = ohe_Col + num_col 
assemb_col

['Name_ohe',
 'Sex_ohe',
 'Ticket_ohe',
 'cabin_ohe',
 'Embarked_ohe',
 'Title_ohe',
 'PassengerId',
 'Pclass',
 'Age',
 'SibSp',
 'Parch',
 'Fare']

In [432]:
vec_assembler = VectorAssembler(inputCols=assemb_col, outputCol="features")

In [433]:
combined = combined.na.drop(how = "any")

In [434]:
temp_tuples = []
for col in combined.columns:
    nan_count = combined.where(F.col(col).isNull()).count()
    if nan_count > 0 :
        temp_tuples.append((col, nan_count))

In [435]:
temp_tuples

[]

**Spliting The Data**

In [436]:
X_train , X_test = combined.randomSplit([.8,.2])

**Pipeline**

In [437]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

**Building RandomForestClassifier**

In [438]:
clf = RandomForestClassifier(featuresCol='features',labelCol='Survived')

In [439]:
pipeline =  Pipeline(stages=[strInd , ohe, vec_assembler, clf])

In [440]:
clf_pipeline = pipeline.fit(X_train)

In [441]:
clf_pipeline.transform(X_test).printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- cabin: string (nullable = false)
 |-- Embarked: string (nullable = false)
 |-- Title: string (nullable = true)
 |-- Name_indexed: double (nullable = false)
 |-- Sex_indexed: double (nullable = false)
 |-- cabin_indexed: double (nullable = false)
 |-- Title_indexed: double (nullable = false)
 |-- Embarked_indexed: double (nullable = false)
 |-- Ticket_indexed: double (nullable = false)
 |-- Embarked_ohe: vector (nullable = true)
 |-- Title_ohe: vector (nullable = true)
 |-- Sex_ohe: vector (nullable = true)
 |-- cabin_ohe: vector (nullable = true)
 |-- Name_ohe: vector (nullable = true)
 |-- Ticket_ohe:

**Evaluating the model using MulticlassClassificationEvaluator"** 

In [442]:
clf_pipeline.transform(X_test).select("Survived","prediction").show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       1|       1.0|
|       1|       1.0|
|       1|       0.0|
|       1|       1.0|
|       0|       0.0|
|       1|       0.0|
|       0|       0.0|
|       1|       0.0|
|       1|       0.0|
|       0|       0.0|
|       1|       1.0|
+--------+----------+
only showing top 20 rows



In [443]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction",metricName='accuracy')
#evaluator.setPredictionCol("prediction")
evaluator.evaluate(clf_pipeline.transform(X_test).select("Survived","prediction"))

0.8028169014084507