In [1]:
from pyspark.sql import SparkSession

In [2]:
import numpy as np
import pandas as pd

### Lets begin

- Read the dataset

In [4]:
spark =SparkSession.builder.appName('titanic').getOrCreate()

In [5]:
spark

In [6]:
df_train = spark.read.csv('train.csv', header = True, inferSchema=True)
df_test = spark.read.csv('test.csv', header = True, inferSchema=True)

In [7]:
titanic_train = df_train.alias("titanic_train")

In [8]:
type(df_train)

pyspark.sql.dataframe.DataFrame

In [9]:
df_train.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [10]:
df_train.describe()

DataFrame[summary: string, PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

In [11]:
df_train.count()

891

In [12]:
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [18]:
df_train.head()

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')

In [19]:
df_train[['Age']].show()

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|null|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|null|
|31.0|
|null|
+----+
only showing top 20 rows



In [20]:
from pyspark.sql.functions import mean
fare_mean = df_train.select(mean("Fare")).collect()
fare_mean[0][0]

32.2042079685746

In [21]:
fare_mean = fare_mean[0][0]
fare_mean

32.2042079685746

### Filter

In [22]:
df_train.filter('Fare > 32.20').limit(5).show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----------+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|        C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1|       C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|        E46|       S|
|         24|       1|     1|Sloper, Mr. Willi...|  male|28.0|    0|    0|  113788|   35.5|         A6|       S|
|         28|       0|     1|Fortune, Mr. Char...|  male|19.0|    3|    2|   19950|  263.0|C23 C25 C27|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----

In [23]:
#similarly
df_train[df_train.Fare > 32.20].limit(3).show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|  C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|  E46|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+



In [24]:
## What if we want to filter by multiple columns.
# passenger with below average fare with a sex equals male
temp_df = df_train.filter((df_train['Fare'] < fare_mean) &
          (df_train['Sex'] ==  'male')
         )
temp_df.show(5)

+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|   Ticket|  Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|male|22.0|    1|    0|A/5 21171|  7.25| null|       S|
|          5|       0|     3|Allen, Mr. Willia...|male|35.0|    0|    0|   373450|  8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|male|null|    0|    0|   330877|8.4583| null|       Q|
|          8|       0|     3|Palsson, Master. ...|male| 2.0|    3|    1|   349909|21.075| null|       S|
|         13|       0|     3|Saundercock, Mr. ...|male|20.0|    0|    0|A/5. 2151|  8.05| null|       S|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
only showing top 5 rows



In [25]:
# passenger with below average fare and are not male
filter1_less_than_mean_fare = df_train['Fare'] < fare_mean
filter2_sex_not_male = df_train['Sex'] != "male"
df_train.filter((filter1_less_than_mean_fare) &
                (filter2_sex_not_male)).show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| null|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736|30.0708| null|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|         PP 9549|   16.7|   G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|          113783|  26.55| C103|       S|
|         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0|      

In [26]:
df_train.select("PassengerID", df_train.Fare.between(10,40)).show()

+-----------+-------------------------------+
|PassengerID|((Fare >= 10) AND (Fare <= 40))|
+-----------+-------------------------------+
|          1|                          false|
|          2|                          false|
|          3|                          false|
|          4|                          false|
|          5|                          false|
|          6|                          false|
|          7|                          false|
|          8|                           true|
|          9|                           true|
|         10|                           true|
|         11|                           true|
|         12|                           true|
|         13|                          false|
|         14|                           true|
|         15|                          false|
|         16|                           true|
|         17|                           true|
|         18|                           true|
|         19|                     

In [27]:
# contains
df_train.select("PassengerId", "Name").filter(df_train.Name.contains("Mr")).show()

+-----------+--------------------+
|PassengerId|                Name|
+-----------+--------------------+
|          1|Braund, Mr. Owen ...|
|          2|Cumings, Mrs. Joh...|
|          4|Futrelle, Mrs. Ja...|
|          5|Allen, Mr. Willia...|
|          6|    Moran, Mr. James|
|          7|McCarthy, Mr. Tim...|
|          9|Johnson, Mrs. Osc...|
|         10|Nasser, Mrs. Nich...|
|         13|Saundercock, Mr. ...|
|         14|Andersson, Mr. An...|
|         16|Hewlett, Mrs. (Ma...|
|         18|Williams, Mr. Cha...|
|         19|Vander Planke, Mr...|
|         20|Masselmani, Mrs. ...|
|         21|Fynney, Mr. Joseph J|
|         22|Beesley, Mr. Lawr...|
|         24|Sloper, Mr. Willi...|
|         26|Asplund, Mrs. Car...|
|         27|Emir, Mr. Farred ...|
|         28|Fortune, Mr. Char...|
+-----------+--------------------+
only showing top 20 rows



In [28]:
# startswith 
df_train.select("PassengerID", 'Sex').filter(df_train.Sex.startswith("fe")).show()

+-----------+------+
|PassengerID|   Sex|
+-----------+------+
|          2|female|
|          3|female|
|          4|female|
|          9|female|
|         10|female|
|         11|female|
|         12|female|
|         15|female|
|         16|female|
|         19|female|
|         20|female|
|         23|female|
|         25|female|
|         26|female|
|         29|female|
|         32|female|
|         33|female|
|         39|female|
|         40|female|
|         41|female|
+-----------+------+
only showing top 20 rows



### Groupby

In [33]:
## Let's group by Pclass and get the average fare price per Pclass.  
#df_train.groupBy("Pclass").mean().show()
df_train.groupBy("Pclass").mean().toPandas()

Unnamed: 0,Pclass,avg(PassengerId),avg(Survived),avg(Pclass),avg(Age),avg(SibSp),avg(Parch),avg(Fare)
0,1,461.597222,0.62963,1.0,38.233441,0.416667,0.356481,84.154687
1,3,439.154786,0.242363,3.0,25.14062,0.615071,0.393075,13.67555
2,2,445.956522,0.472826,2.0,29.87763,0.402174,0.380435,20.662183


In [30]:
## let's just look at the Pclass and avg(Fare)
df_train.groupBy("Pclass").mean().select('Pclass', 'avg(Fare)').show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



In [34]:
# Alternative way
df_train.groupBy("Pclass").mean("Fare").show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



### Order By

In [35]:
df_train.orderBy("Fare").limit(20).toPandas()


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,180,0,3,"Leonard, Mr. Lionel",male,36.0,0,0,LINE,0.0,,S
1,482,0,2,"""Frost, Mr. Anthony Wood """"Archie""""""",male,,0,0,239854,0.0,,S
2,414,0,2,"Cunningham, Mr. Alfred Fleming",male,,0,0,239853,0.0,,S
3,303,0,3,"Johnson, Mr. William Cahoone Jr",male,19.0,0,0,LINE,0.0,,S
4,264,0,1,"Harrison, Mr. William",male,40.0,0,0,112059,0.0,B94,S
5,675,0,2,"Watson, Mr. Ennis Hastings",male,,0,0,239856,0.0,,S
6,733,0,2,"Knight, Mr. Robert J",male,,0,0,239855,0.0,,S
7,807,0,1,"Andrews, Mr. Thomas Jr",male,39.0,0,0,112050,0.0,A36,S
8,816,0,1,"Fry, Mr. Richard",male,,0,0,112058,0.0,B102,S
9,823,0,1,"Reuchlin, Jonkheer. John George",male,38.0,0,0,19972,0.0,,S


In [36]:
df_train.orderBy(df_train.Fare.asc()).show()

+-----------+--------+------+--------------------+----+----+-----+-----+------+------+-----------+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|Ticket|  Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+------+------+-----------+--------+
|        272|       1|     3|Tornquist, Mr. Wi...|male|25.0|    0|    0|  LINE|   0.0|       null|       S|
|        180|       0|     3| Leonard, Mr. Lionel|male|36.0|    0|    0|  LINE|   0.0|       null|       S|
|        414|       0|     2|Cunningham, Mr. A...|male|null|    0|    0|239853|   0.0|       null|       S|
|        278|       0|     2|"Parkes, Mr. Fran...|male|null|    0|    0|239853|   0.0|       null|       S|
|        675|       0|     2|Watson, Mr. Ennis...|male|null|    0|    0|239856|   0.0|       null|       S|
|        733|       0|     2|Knight, Mr. Robert J|male|null|    0|    0|239855|   0.0|       null|       S|
|        807|       0|     1

In [37]:
df_train.orderBy(df_train.Fare.desc()).limit(5).show()


+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|        738|       1|     1|Lesurer, Mr. Gust...|  male|35.0|    0|    0|PC 17755|512.3292|       B101|       C|
|        680|       1|     1|Cardeza, Mr. Thom...|  male|36.0|    0|    1|PC 17755|512.3292|B51 B53 B55|       C|
|        259|       1|     1|    Ward, Miss. Anna|female|35.0|    0|    0|PC 17755|512.3292|       null|       C|
|        439|       0|     1|   Fortune, Mr. Mark|  male|64.0|    1|    4|   19950|   263.0|C23 C25 C27|       S|
|         89|       1|     1|Fortune, Miss. Ma...|female|23.0|    3|    2|   19950|   263.0|C23 C25 C27|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-----

In [38]:
df_train.filter(df_train.Embarked.isNull()).count()

2

In [39]:
df_train.select('PassengerID','Embarked').orderBy(df_train.Embarked.asc_nulls_first()).show()

+-----------+--------+
|PassengerID|Embarked|
+-----------+--------+
|         62|    null|
|        830|    null|
|        204|       C|
|         74|       C|
|         97|       C|
|         98|       C|
|         66|       C|
|        115|       C|
|         65|       C|
|        123|       C|
|         31|       C|
|        112|       C|
|         35|       C|
|         40|       C|
|        119|       C|
|         44|       C|
|         53|       C|
|        126|       C|
|         58|       C|
|        129|       C|
+-----------+--------+
only showing top 20 rows



In [40]:
df_train.na.drop(how="any").limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
1,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
2,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
3,11,1,3,"Sandstrom, Miss. Marguerite Rut",female,4.0,1,1,PP 9549,16.7,G6,S
4,12,1,1,"Bonnell, Miss. Elizabeth",female,58.0,0,0,113783,26.55,C103,S


### Dealing with missing values

In [41]:
df_train = df_train.na.fill('N', subset=['Cabin'])
df_test = df_test.na.fill('N', subset=['Cabin'])

In [42]:
df_test.where(df_test['Fare'].isNull()).show()

+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|              Name| Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+
|       1044|     3|Storey, Mr. Thomas|male|60.5|    0|    0|  3701|null|    N|       S|
+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+



In [43]:
missing_value = df_test.filter(
    (df_test['Pclass'] == 3) &
    (df_test.Embarked == 'S') &
    (df_test.Sex == "male")
)
## filling in the null value in the fare column using Fare mean. 
df_test = df_test.na.fill(
    missing_value.select(mean('Fare')).collect()[0][0],
    subset=['Fare']
)

In [44]:
# Checking
df_test.where(df_test['Fare'].isNull()).show()

+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+



In [45]:
df_train.where(df_train['Embarked'].isNull()).show()

+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|         62|       1|     1| Icard, Miss. Amelie|female|38.0|    0|    0|113572|80.0|  B28|    null|
|        830|       1|     1|Stone, Mrs. Georg...|female|62.0|    0|    0|113572|80.0|  B28|    null|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+



In [46]:
df_train = df_train.na.fill('C', subset=['Embarked'])

In [47]:
df_train.where(df_train['Embarked'].isNull()).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [48]:
df_test.where(df_test.Embarked.isNull()).show()

+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+

