# Assignment 3 - Spark

In [1]:
import findspark
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
import statistics

First, Spark has to be initialized. This is done by finding the Spark package and building Spark into a local variable.

In [2]:
# init spark
findspark.init()

# declare a local variable to work with
spark = SparkSession.builder.master("local").appName("Linear Regression Model").config("spark.executor.memory", "1gb").getOrCreate()

# declare sql context for using queries without a DB
sc = spark.sparkContext
sqlContext = SQLContext(sc)

The dataset has to be loaded in order to expermiment with

In [3]:
# loading the dataset
dataset = sc.textFile('titanic.csv')

For example, the first five records can be shown using the `take` function

In [56]:
# show first five rows of the dataset in readible format
dataset.take(5)

[['0', '3', 'Mr. Owen Harris Braund', 'male', '22', '1', '0', '7.25'],
 ['1',
  '1',
  'Mrs. John Bradley (Florence Briggs Thayer) Cumings',
  'female',
  '38',
  '1',
  '0',
  '71.2833'],
 ['1', '3', 'Miss. Laina Heikkinen', 'female', '26', '0', '0', '7.925'],
 ['1',
  '1',
  'Mrs. Jacques Heath (Lily May Peel) Futrelle',
  'female',
  '35',
  '1',
  '0',
  '53.1'],
 ['0', '3', 'Mr. William Henry Allen', 'male', '35', '0', '0', '8.05']]

The dataset can be converted into readible rows. We won't do this in our assignment. We are going to work with dataframes.

In [5]:
# split the dataset using a lambda function
dataset = dataset.map(lambda line: line.split(","))

# showing the top five rows from the dataset
dataset.top(5)

[['Survived',
  'Pclass',
  'Name',
  'Sex',
  'Age',
  'Siblings/Spouses Aboard',
  'Parents/Children Aboard',
  'Fare'],
 ['1',
  '3',
  'Mrs. Thomas Henry (Mary E Finck) Davison',
  'female',
  '34',
  '1',
  '0',
  '16.1'],
 ['1',
  '3',
  "Mrs. Thomas (Johanna Godfrey) O'Brien",
  'female',
  '26',
  '1',
  '0',
  '15.5'],
 ['1',
  '3',
  'Mrs. Stanton (Rosa Hunt) Abbott',
  'female',
  '35',
  '1',
  '1',
  '20.25'],
 ['1',
  '3',
  'Mrs. Solomon (Latifa Qurban) Baclini',
  'female',
  '24',
  '0',
  '3',
  '19.2583']]

The following code wil convert the dataset into a dataframe. How does this work?
The dataset is mapped with a lambda function to get the data from each row.

In [11]:
# generate the dataframe using the right columns
dataframe = dataset.map(lambda line: Row(survived_indicator = line[0], 
                                         passenger_class = line[1], 
                                         name = line[2], 
                                         sex = line[3],
                                         age = line[4],
                                         siblings_aboard = line[5],
                                         parents_aboard = line[6],
                                         fare_paid_in_pounds = line[7])).toDF()

The dataframe can be shown using the `show()` function.

In [12]:
dataframe.show()

+------------------+---------------+--------------------+------+---+---------------+--------------+-------------------+
|survived_indicator|passenger_class|                name|   sex|age|siblings_aboard|parents_aboard|fare_paid_in_pounds|
+------------------+---------------+--------------------+------+---+---------------+--------------+-------------------+
|                 0|              3|Mr. Owen Harris B...|  male| 22|              1|             0|               7.25|
|                 1|              1|Mrs. John Bradley...|female| 38|              1|             0|            71.2833|
|                 1|              3|Miss. Laina Heikk...|female| 26|              0|             0|              7.925|
|                 1|              1|Mrs. Jacques Heat...|female| 35|              1|             0|               53.1|
|                 0|              3|Mr. William Henry...|  male| 35|              0|             0|               8.05|
|                 0|              3|    

Before we can do any magic on these data, the integers have to be converted into floats. It will be easier to work with the same data types later on.

In [13]:
# convert the columns
dataframe = dataframe.withColumn("survived_indicator", dataframe["survived_indicator"].cast(FloatType())) \
.withColumn("passenger_class", dataframe["passenger_class"].cast(FloatType())) \
.withColumn("age", dataframe["age"].cast(FloatType())) \
.withColumn("siblings_aboard", dataframe["siblings_aboard"].cast(FloatType())) \
.withColumn("parents_aboard", dataframe["parents_aboard"].cast(FloatType())) \
.withColumn("fare_paid_in_pounds", dataframe["fare_paid_in_pounds"].cast(FloatType())) 

Also, the sex has to be converted into numeric values. Spark can't read text.

In [14]:
# convert sex to sexIndex
indexer = StringIndexer(inputCol="sex", outputCol="sexIndex")

# transform the data
dataframe = indexer.fit(dataframe).transform(dataframe)

# select the right columns
dataframe = dataframe.select("survived_indicator", "passenger_class", "sexIndex", "age", "fare_paid_in_pounds")

In [15]:
dataframe.show()

+------------------+---------------+--------+----+-------------------+
|survived_indicator|passenger_class|sexIndex| age|fare_paid_in_pounds|
+------------------+---------------+--------+----+-------------------+
|               0.0|            3.0|     0.0|22.0|               7.25|
|               1.0|            1.0|     1.0|38.0|            71.2833|
|               1.0|            3.0|     1.0|26.0|              7.925|
|               1.0|            1.0|     1.0|35.0|               53.1|
|               0.0|            3.0|     0.0|35.0|               8.05|
|               0.0|            3.0|     0.0|27.0|             8.4583|
|               0.0|            1.0|     0.0|54.0|            51.8625|
|               0.0|            3.0|     0.0| 2.0|             21.075|
|               1.0|            3.0|     1.0|27.0|            11.1333|
|               1.0|            2.0|     1.0|14.0|            30.0708|
|               1.0|            3.0|     1.0| 4.0|               16.7|
|     

Right now, the data consists only of floating point values and is ready for processing with Spark.

# Question A
Calculate the conditional probability that a person survives given their sex and passenger-class: <br>
<br>
$\dot{P}(S = True \; | \; G = Female, C = 1) $ <br>
$\dot{P}(S = True \; | \; G = Female, C = 2) $ <br>
$\dot{P}(S = True \; | \; G = Female, C = 3) $ <br>
$\dot{P}(S = True \; | \; G = Male, C = 1) $ <br>
$\dot{P}(S = True \; | \; G = Male, C = 2) $ <br>
$\dot{P}(S = True \; | \; G = Male, C = 3) $ <br>

In [55]:
print("Question A")
print("")

# selecting the right information from the dataframe and query it
prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 1 AND passenger_class = 1')
prob_sf1 = prob.where('survived_indicator = 1').count() / prob.count()
# print prob_sf1
print("Probability of a survived female in 1st class =",str(prob_sf1 * 100)+" %")

# selecting the right information from the dataframe and query it
prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 1 AND passenger_class = 2')
prob_sf2 = prob.where('survived_indicator = 1').count() / prob.count()
# print prob_sf2
print("Probability of a survived female in 2st class =",str(prob_sf2 * 100)+" %")

# selecting the right information from the dataframe and query it
prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 1 AND passenger_class = 3')
prob_sf3 = prob.where('survived_indicator = 1').count() / prob.count()
# print prob_sf3
print("Probability of a survived female in 3st class =",str(prob_sf3 * 100)+" %")

print("")

# selecting the right information from the dataframe and query it
prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 0 AND passenger_class = 1')
prob_sm1 = prob.where('survived_indicator = 1').count() / prob.count()
# print prob_sm1
print("Probability of a survived male in 1st class =",str(prob_sm1 * 100)+" %")

# selecting the right information from the dataframe and query it
prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 0 AND passenger_class = 2')
prob_sm2 = prob.where('survived_indicator = 1').count() / prob.count()
# print prob_sm2
print("Probability of a survived male in 2st class =",str(prob_sm2 * 100)+" %")

# selecting the right information from the dataframe and query it
prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 0 AND passenger_class = 3')
prob_sm3 = prob.where('survived_indicator = 1').count() / prob.count()
# print prob_sm3
print("Probability of a survived male in 3st class =",str(prob_sm3 * 100)+" %")

Question A

Probability of a survived female in 1st class = 96.80851063829788 %
Probability of a survived female in 2st class = 92.10526315789474 %
Probability of a survived female in 3st class = 50.0 %

Probability of a survived male in 1st class = 36.885245901639344 %
Probability of a survived male in 2st class = 15.74074074074074 %
Probability of a survived male in 3st class = 13.702623906705538 %


# Question B
What is the probability that a child who is in third class and is 10 years old or younger survives? <br>
Since the nbumber of data points that satisfy the condition is small, use the "bayesian" approach and represent your probability as a beta distribution. 
calculate a beflief distribution  for: <br>
<br>
$\dot S = True \;|\; A \leqslant 10, \;C = 3) $
<br>
<br>
You can express your answer as a parametarized distribution.

In [42]:
print("Question B")
print("")

# the probability of a child that is less or equal than 10 years old and in 3st class
prob_child = dataframe.select('survived_indicator', "passenger_class", "age").where('age <= 10 AND passenger_class = 3')

# the result is compared to the actual survived childs
prob_child_answer = prob_child.where('survived_indicator = 1').count() / prob_child.count()

# print result
print('probability of a child in third class (equal or younger than 10) = ', str(prob_child_answer * 100) + " %")

Question B

probability of a child in third class (equal or younger than 10) =  41.509433962264154 %


# Question C
How much did people pay to be on the ship? Calculate the expectation of fare conditioned on passenger-class: <br>
<br>
$\dot{E}(X \; | \; C = 1) $ <br>
$\dot{E}(X \; | \; C = 2) $ <br>
$\dot{E}(X \; | \; C = 3) $ <br>

In [54]:
print("Question C")
print("")

# data from 1st class fare
class_1_data = dataframe.select("fare_paid_in_pounds").where('passenger_class = 1')
print("Class 1 fare expectation = ", class_1_data.groupBy().avg().collect()[0][0], " pounds")

# data from 2st class fare
class_2_data = dataframe.select("fare_paid_in_pounds").where('passenger_class = 2')
print("Class 2 fare expectation = ", class_2_data.groupBy().avg().collect()[0][0], " pounds")

# data from 3st class fare
class_3_data = dataframe.select("fare_paid_in_pounds").where('passenger_class = 3')
print("Class 3 fare expectation = ", class_3_data.groupBy().avg().collect()[0][0], " pounds")

Question C

Class 1 fare expectation =  84.15468752825701  pounds
Class 2 fare expectation =  20.66218318109927  pounds
Class 3 fare expectation =  13.707707501045244  pounds


The last step is to stop Spark. This can be done by the following command: <br>
`spark.stop()`