### pyspark documentation
https://spark.apache.org/docs/latest/api/python/getting_started/testing_pyspark.html

### pyspark tutorial
https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/

### titanic metadata
https://www.kaggle.com/competitions/titanic/data

In [21]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('spark-trial').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/06 20:13:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.csv('titanic.csv', inferSchema=True, header=True)

In [6]:
df.printSchema()
df.show()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|        892|       0|     3|    Kelly, Mr. James|  male|34.5|    0|    0|          330911| 7.8292| NULL|       Q|
|        893|       1|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0|   

In [8]:
# let us change the data type of survived from integer to binary
from pyspark.sql.functions import col
df2 = df.withColumn("Survived", col("Survived").cast("boolean"))

In [9]:
df2.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: boolean (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [13]:
# how to convert the spark dataframe to pandas dataframe
import pandas as pd
rows = df.collect()

In [14]:
for row in rows[:10]:
    print(row)

Row(PassengerId=892, Survived=0, Pclass=3, Name='Kelly, Mr. James', Sex='male', Age=34.5, SibSp=0, Parch=0, Ticket='330911', Fare=7.8292, Cabin=None, Embarked='Q')
Row(PassengerId=893, Survived=1, Pclass=3, Name='Wilkes, Mrs. James (Ellen Needs)', Sex='female', Age=47.0, SibSp=1, Parch=0, Ticket='363272', Fare=7.0, Cabin=None, Embarked='S')
Row(PassengerId=894, Survived=0, Pclass=2, Name='Myles, Mr. Thomas Francis', Sex='male', Age=62.0, SibSp=0, Parch=0, Ticket='240276', Fare=9.6875, Cabin=None, Embarked='Q')
Row(PassengerId=895, Survived=0, Pclass=3, Name='Wirz, Mr. Albert', Sex='male', Age=27.0, SibSp=0, Parch=0, Ticket='315154', Fare=8.6625, Cabin=None, Embarked='S')
Row(PassengerId=896, Survived=1, Pclass=3, Name='Hirvonen, Mrs. Alexander (Helga E Lindqvist)', Sex='female', Age=22.0, SibSp=1, Parch=1, Ticket='3101298', Fare=12.2875, Cabin=None, Embarked='S')
Row(PassengerId=897, Survived=0, Pclass=3, Name='Svensson, Mr. Johan Cervin', Sex='male', Age=14.0, SibSp=0, Parch=0, Ticket

In [16]:
data1 = [row.asDict() for row in rows]

In [18]:
pandas_df = pd.DataFrame(data1)

In [20]:
display(pandas_df.head())

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,892,0,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
1,893,1,3,"Wilkes, Mrs. James (Ellen Needs)",female,47.0,1,0,363272,7.0,,S
2,894,0,2,"Myles, Mr. Thomas Francis",male,62.0,0,0,240276,9.6875,,Q
3,895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
4,896,1,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22.0,1,1,3101298,12.2875,,S


In [23]:
# okay back to spark!, what i want to do is to to convert the Embarked data to its full text.
# according to the data details in kaggle: C = Cherbourg, Q = Queenstown, S = Southampton
from pyspark.sql.functions import when
df3 = df2.withColumn("Embarked_full", when(df2.Embarked == "Q", "Queenstown").
                                        when(df2.Embarked == "C", "Cherbourg").
                                        when(df2.Embarked == "S", "Southampton").otherwise("Unknown"))

In [25]:
df3.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|Embarked_full|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+-------------+
|        892|   false|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| NULL|       Q|   Queenstown|
|        893|    true|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| NULL|       S|  Southampton|
|        894|   false|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| NULL|       Q|   Queenstown|
|        895|   false|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| NULL|       S|  Southampton|
|        896|    true|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| NULL|       S|  Southampton|
+-----------+--------+------+-----------

In [26]:
# now convert the male and female to M and F, respectively
df4 = df3.withColumn("Gender", when(df3.Sex == 'male', "M").when(df3.Sex == 'female', "F").otherwise(None))

In [29]:
df4.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Embarked_full|Gender|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+------+
|        892|   false|     3|    Kelly, Mr. James|  male|34.5|    0|    0|          330911| 7.8292| NULL|       Q|   Queenstown|     M|
|        893|    true|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0|          363272|    7.0| NULL|       S|  Southampton|     F|
|        894|   false|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0|          240276| 9.6875| NULL|       Q|   Queenstown|     M|
|        895|   false|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0|          315154| 8.6625| NULL|       S|  Southampton|     M|
|        896|    true|     3|Hirvonen, Mrs. Al..

In [31]:
# convert the survive to binary boolean data
df5 = df4.withColumn("Survived", when(df4.Survived == 'false', 0).otherwise(1))

In [32]:
df5.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Embarked_full|Gender|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+------+
|        892|       0|     3|    Kelly, Mr. James|  male|34.5|    0|    0|          330911| 7.8292| NULL|       Q|   Queenstown|     M|
|        893|       1|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0|          363272|    7.0| NULL|       S|  Southampton|     F|
|        894|       0|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0|          240276| 9.6875| NULL|       Q|   Queenstown|     M|
|        895|       0|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0|          315154| 8.6625| NULL|       S|  Southampton|     M|
|        896|       1|     3|Hirvonen, Mrs. Al..

In [54]:
# now i want to create an aggregation to check how many survived per passenger class
print('Survivability per Passenger Class')

total_passenger_per_class = df5.groupby('PClass').count().withColumnRenamed("count", "total_passengers")
survivability_per_class = df5.filter(df5.Survived == 1).groupby('PClass').count().withColumnRenamed("count", "total_survivors").withColumnRenamed("PClass", "PClass_x")

complete_df = total_passenger_per_class.join(survivability_per_class, total_passenger_per_class["PClass"] == survivability_per_class["PClass_x"], how = "inner")
complete_df = complete_df.drop("PClass_x")
from pyspark.sql.functions import round
complete_df = complete_df.withColumn("survivability rate", round(col("total_survivors")/col("total_passengers"), 2)*100)
complete_df.show()

Survivability per Passenger Class
+------+----------------+---------------+------------------+
|PClass|total_passengers|total_survivors|survivability rate|
+------+----------------+---------------+------------------+
|     1|             107|             50|              47.0|
|     3|             218|             72|              33.0|
|     2|              93|             30|              32.0|
+------+----------------+---------------+------------------+

