In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.0-preview2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark

import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop3.2"

In [2]:
import findspark

findspark.init()

In [45]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("local") \
  .appName("Titanic") \
  .config("spark.executor.memory", "1gb") \
  .getOrCreate()

sc = spark.sparkContext

In [46]:
rdd = sc.textFile('titanic.csv')
rdd = rdd.map(lambda line: line.split(","))

In [87]:
from pyspark.sql import Row
from pyspark.sql.types import *

df = rdd.map(lambda line: Row(survived=line[0],
                              pclass=line[1],
                              name=line[2],
                              sex=line[3],
                              age=line[4],
                              siblings_spouses_aboard=line[5],
                              parents_children_aboard=line[6],
                              fare=line[7])).toDF()

df.show()

+---+-------+--------------------+-----------------------+------+------+-----------------------+--------+
|age|   fare|                name|parents_children_aboard|pclass|   sex|siblings_spouses_aboard|survived|
+---+-------+--------------------+-----------------------+------+------+-----------------------+--------+
| 22|   7.25|Mr. Owen Harris B...|                      0|     3|  male|                      1|       0|
| 38|71.2833|Mrs. John Bradley...|                      0|     1|female|                      1|       1|
| 26|  7.925|Miss. Laina Heikk...|                      0|     3|female|                      0|       1|
| 35|   53.1|Mrs. Jacques Heat...|                      0|     1|female|                      1|       1|
| 35|   8.05|Mr. William Henry...|                      0|     3|  male|                      0|       0|
| 27| 8.4583|     Mr. James Moran|                      0|     3|  male|                      0|       0|
| 54|51.8625|Mr. Timothy J McC...|            

**Assignment 3a**
Calculate the conditional probability that a person survives based on their sex and passenger-class.

***P(S=true | G=female, C=1)***

***P(S=true | G=female, C=2)***

***P(S=true | G=female, C=3)***

***P(S=true | G=male, C=1)***

***P(S=true | G=male, C=2)***

***P(S=true | G=male, C=3)***


In [72]:
def calculateSurvivalProbability(df, sex, pclass):
  filtered_data = df.rdd.filter(lambda line: line['sex'] == sex and line['pclass'] == pclass)

  survivors = filtered_data.filter(lambda line: line['survived'] == '1')

  amount_survivors = (spark.createDataFrame(survivors)).count()
  total_people = filtered_data.count()

  return amount_survivors, total_people

In [81]:
data = calculateSurvivalProbability(df, 'female', '1')
print ( "P(S=true | G=female, C=1) = " + str(data[0]) + "/" + str(data[1]) )

data = calculateSurvivalProbability(df, 'female', '2')
print ( "P(S=true | G=female, C=1) = " + str(data[0]) + "/" + str(data[1]) )

data = calculateSurvivalProbability(df, 'female', '3')
print ( "P(S=true | G=female, C=1) = " + str(data[0]) + "/" + str(data[1]) )

data = calculateSurvivalProbability(df, 'male', '1')
print ( "P(S=true | G=female, C=1) = " + str(data[0]) + "/" + str(data[1]) )

data = calculateSurvivalProbability(df, 'male', '2')
print ( "P(S=true | G=female, C=1) = " + str(data[0]) + "/" + str(data[1]) )

data = calculateSurvivalProbability(df, 'male', '3')
print ( "P(S=true | G=female, C=1) = " + str(data[0]) + "/" + str(data[1]) )

P(S=true | G=female, C=1) = 91/94
P(S=true | G=female, C=1) = 70/76
P(S=true | G=female, C=1) = 72/144
P(S=true | G=female, C=1) = 45/122
P(S=true | G=female, C=1) = 17/108
P(S=true | G=female, C=1) = 47/343


**Assignment 3b**
What is the probability that a child in third class and younger then 10 survives? Since the number of data points that satisfy the condition is small use the "bayesian" approach and represent your probability as a beta distribution. Calculate a belief distribution for

***S = true | A < 10, C=3*** 

In [157]:
df = df.withColumn("age", df["age"].cast(IntegerType()))
df = df.withColumn("fare", df["fare"].cast(FloatType()))

filtered_data = df.rdd.filter(lambda line: line['age'] <= 10 and line['pclass'] == '3')

survivors = filtered_data.filter(lambda line: line['survived'] == '1')

total_survivors = (spark.createDataFrame(survivors)).count()
total_children = filtered_data.count()

print ( "P(S=true|A<10, C=3) = " + str(total_survivors) + "/" + str(total_children) )
print ( "Beta(⍺=" + str(total_children) + ", β=" + str(total_children) + ")" )

P(S=true|A<10, C=3) = 22/53
Beta(⍺=53, β=53)


**Assignment 3c**. How much did people pay to be on the ship? Calculate the expectation of fare conditioned on passenger-class:

***E[X|C=1]***

***E[X|C=2]***

***E[X|C=3]***

In [171]:
def calculateFarePerClass(df, pclass):
  filtered_data = df.rdd.filter(lambda line: line['pclass'] == pclass)

  paid_df = spark.createDataFrame(filtered_data)
  avg = paid_df.groupBy('pclass').avg().collect()[0][1]

  return avg

In [172]:
print( "Average fare: " + str(calculateFarePerClass(df, "1")) )
print( "Average fare: " + str(calculateFarePerClass(df, "2")) )
print( "Average fare: " + str(calculateFarePerClass(df, "3")) )

Average fare: 38.782407407407405
Average fare: 29.847826086956523
Average fare: 25.170431211498972
