# Assingment spark | Hadoop

Author: <b>Job Vermeulen</b> <br>
Student nr: <b>616184</b> <br>
Date: <b>1-7-2020</b>

Import all of the necessary packages

In [111]:
import findspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
import statistics

We need to initialize the spark package. We do this to find the spark package, and to build spark into a variable.

In [124]:
findspark.init()

spark = SparkSession.builder.master("local").appName("Linear Regression Model").config("spark.executor.memory", "1gb").getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

## Load dataset
We have a titanic dataset, this dataset is included in the folder.

In [125]:
dataset = sc.textFile('titanic.csv')

We can see what the dataset contains, by doing a take on this one.

In [126]:
dataset.take(10)

['0,3,Mr. Owen Harris Braund,male,22,1,0,7.25',
 '1,1,Mrs. John Bradley (Florence Briggs Thayer) Cumings,female,38,1,0,71.2833',
 '1,3,Miss. Laina Heikkinen,female,26,0,0,7.925',
 '1,1,Mrs. Jacques Heath (Lily May Peel) Futrelle,female,35,1,0,53.1',
 '0,3,Mr. William Henry Allen,male,35,0,0,8.05',
 '0,3,Mr. James Moran,male,27,0,0,8.4583',
 '0,1,Mr. Timothy J McCarthy,male,54,0,0,51.8625',
 '0,3,Master. Gosta Leonard Palsson,male,2,3,1,21.075',
 '1,3,Mrs. Oscar W (Elisabeth Vilhelmina Berg) Johnson,female,27,0,2,11.1333',
 '1,2,Mrs. Nicholas (Adele Achem) Nasser,female,14,1,0,30.0708']

Afterwards we need to split the dataset in to readable rows.

In [115]:
dataset = dataset.map(lambda line: line.split(","))
dataset.top(5)

[['1',
  '3',
  'Mrs. Thomas Henry (Mary E Finck) Davison',
  'female',
  '34',
  '1',
  '0',
  '16.1'],
 ['1',
  '3',
  "Mrs. Thomas (Johanna Godfrey) O'Brien",
  'female',
  '26',
  '1',
  '0',
  '15.5'],
 ['1',
  '3',
  'Mrs. Stanton (Rosa Hunt) Abbott',
  'female',
  '35',
  '1',
  '1',
  '20.25'],
 ['1',
  '3',
  'Mrs. Solomon (Latifa Qurban) Baclini',
  'female',
  '24',
  '0',
  '3',
  '19.2583'],
 ['1', '3', 'Mrs. Sam (Leah Rosen) Aks', 'female', '18', '0', '1', '9.35']]

To work with this dataset we need to convert he dataset into a dataframe. This can be done by mapping the dataset and with lambda function to get each row.

In [116]:
dataframe = dataset.map(lambda line: Row(survived_indicator = line[0], 
                                         passenger_class = line[1], 
                                         name = line[2], 
                                         sex = line[3],
                                         age = line[4],
                                         siblings_aboard = line[5],
                                         parents_aboard = line[6],
                                         fare_paid_in_pounds = line[7])).toDF()
dataframe.show()

+------------------+---------------+--------------------+------+---+---------------+--------------+-------------------+
|survived_indicator|passenger_class|                name|   sex|age|siblings_aboard|parents_aboard|fare_paid_in_pounds|
+------------------+---------------+--------------------+------+---+---------------+--------------+-------------------+
|                 0|              3|Mr. Owen Harris B...|  male| 22|              1|             0|               7.25|
|                 1|              1|Mrs. John Bradley...|female| 38|              1|             0|            71.2833|
|                 1|              3|Miss. Laina Heikk...|female| 26|              0|             0|              7.925|
|                 1|              1|Mrs. Jacques Heat...|female| 35|              1|             0|               53.1|
|                 0|              3|Mr. William Henry...|  male| 35|              0|             0|               8.05|
|                 0|              3|    

Now the dataframe looks a bit more like a table, instead like loose data
<br>
We need to convert the intergers to float type, so we can work with it.

In [117]:
dataframe = dataframe.withColumn("survived_indicator", dataframe["survived_indicator"].cast(FloatType())) \
.withColumn("passenger_class", dataframe["passenger_class"].cast(FloatType())) \
.withColumn("age", dataframe["age"].cast(FloatType())) \
.withColumn("siblings_aboard", dataframe["siblings_aboard"].cast(FloatType())) \
.withColumn("parents_aboard", dataframe["parents_aboard"].cast(FloatType())) \
.withColumn("fare_paid_in_pounds", dataframe["fare_paid_in_pounds"].cast(FloatType())) 
dataframe.show()

+------------------+---------------+--------------------+------+----+---------------+--------------+-------------------+
|survived_indicator|passenger_class|                name|   sex| age|siblings_aboard|parents_aboard|fare_paid_in_pounds|
+------------------+---------------+--------------------+------+----+---------------+--------------+-------------------+
|               0.0|            3.0|Mr. Owen Harris B...|  male|22.0|            1.0|           0.0|               7.25|
|               1.0|            1.0|Mrs. John Bradley...|female|38.0|            1.0|           0.0|            71.2833|
|               1.0|            3.0|Miss. Laina Heikk...|female|26.0|            0.0|           0.0|              7.925|
|               1.0|            1.0|Mrs. Jacques Heat...|female|35.0|            1.0|           0.0|               53.1|
|               0.0|            3.0|Mr. William Henry...|  male|35.0|            0.0|           0.0|               8.05|
|               0.0|            

We also need to categorize some values, beacuse we can't read text

In [118]:
indexer = StringIndexer(inputCol="sex", outputCol="sexIndex")
dataframe = indexer.fit(dataframe).transform(dataframe)
dataframe.show()

+------------------+---------------+--------------------+------+----+---------------+--------------+-------------------+--------+
|survived_indicator|passenger_class|                name|   sex| age|siblings_aboard|parents_aboard|fare_paid_in_pounds|sexIndex|
+------------------+---------------+--------------------+------+----+---------------+--------------+-------------------+--------+
|               0.0|            3.0|Mr. Owen Harris B...|  male|22.0|            1.0|           0.0|               7.25|     0.0|
|               1.0|            1.0|Mrs. John Bradley...|female|38.0|            1.0|           0.0|            71.2833|     1.0|
|               1.0|            3.0|Miss. Laina Heikk...|female|26.0|            0.0|           0.0|              7.925|     1.0|
|               1.0|            1.0|Mrs. Jacques Heat...|female|35.0|            1.0|           0.0|               53.1|     1.0|
|               0.0|            3.0|Mr. William Henry...|  male|35.0|            0.0|     

Make the dataframe more specific by removing the unnecessary columns, now we have the correct dataset to work with.

In [119]:
dataframe = dataframe.select("survived_indicator", "passenger_class", "sexIndex", "age", "fare_paid_in_pounds")
dataframe.show()

+------------------+---------------+--------+----+-------------------+
|survived_indicator|passenger_class|sexIndex| age|fare_paid_in_pounds|
+------------------+---------------+--------+----+-------------------+
|               0.0|            3.0|     0.0|22.0|               7.25|
|               1.0|            1.0|     1.0|38.0|            71.2833|
|               1.0|            3.0|     1.0|26.0|              7.925|
|               1.0|            1.0|     1.0|35.0|               53.1|
|               0.0|            3.0|     0.0|35.0|               8.05|
|               0.0|            3.0|     0.0|27.0|             8.4583|
|               0.0|            1.0|     0.0|54.0|            51.8625|
|               0.0|            3.0|     0.0| 2.0|             21.075|
|               1.0|            3.0|     1.0|27.0|            11.1333|
|               1.0|            2.0|     1.0|14.0|            30.0708|
|               1.0|            3.0|     1.0| 4.0|               16.7|
|     

### Assigment A
For this assingment we need to calculate the probability that a person survives given their sex and passenger class. <br>
person 1: Survival = true / Gender = female, passenger_class = 1 <br>
person 2: Survival = true / Gender = female, passenger_class = 2 <br>
person 3: Survival = true / Gender = female, passenger_class = 3 <br>
person 4: Survival = true / Gender = male, passenger_class = 1 <br>
person 5: Survival = true / Gender = male, passenger_class = 2 <br>
person 6: Survival = true / Gender = male, passenger_class = 3 <br>

First do a selection on the data we need. Make a where clausule (like slq) so make a more specific selection. Afterwards make a selection from the currecnt selection.

In [120]:
print('-------- Assignment A --------')

prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 1 AND passenger_class = 1')
probA = prob.where('survived_indicator = 1').count() / prob.count()
print("Person 1 =",str(probA * 100)+" %")

prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 1 AND passenger_class = 2')
probB = prob.where('survived_indicator = 1').count() / prob.count()
print("Person 2 =",str(probB * 100)+" %")

prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 1 AND passenger_class = 3')
probC = prob.where('survived_indicator = 1').count() / prob.count()
print("Person 3 =",str(probC * 100)+" %")

prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 0 AND passenger_class = 1')
probD = prob.where('survived_indicator = 1').count() / prob.count()
print("Person 4 =",str(probD * 100)+" %")

prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 0 AND passenger_class = 2')
probE = prob.where('survived_indicator = 1').count() / prob.count()
print("Person 5 =",str(probE * 100)+" %")

prob = dataframe.select('survived_indicator', "passenger_class", "sexIndex").where('sexIndex = 0 AND passenger_class = 3')
probF = prob.where('survived_indicator = 1').count() / prob.count()
print("Person 6 =",str(probF * 100)+" %")


-------- Assignment A --------
Person 1 = 96.80851063829788 %
Person 2 = 92.10526315789474 %
Person 3 = 50.0 %
Person 4 = 36.885245901639344 %
Person 5 = 15.74074074074074 %
Person 6 = 13.702623906705538 %


### Assignment B
For this assignment the probability of a child who is in third class and is 10 years or younger survives need to calculated.
This is also done like a sql formula. First a selection is made with the necessary columns, where also an where clasule is given. Afterwards a more specific selection is made to calculate the probability.

In [121]:
print('-------- Assignment B --------')

probChild = dataframe.select('survived_indicator', "passenger_class", "age").where('age <= 10 AND passenger_class = 3')
probChildAnswer = probChild.where('survived_indicator = 1').count() / probChild.count()
print('Child in third class (younger than 10) probability = ', str(probChildAnswer * 100) + " %")

-------- Assignment B --------
Child in third class (younger than 10) probability =  41.509433962264154 %


### Assignment C
For this assingmnet the price of ticket needs to be calculated for the differnt passenger classes.
First we need to create a different data set, this data is specific for this assignment. It only has the fare_paid_in_pounds and we need to specify it where passenger_class is 1,2 or 3.
Afterwards we can calculate the average and print it.

In [122]:
print('-------- Assignment C --------')

group1DataFrame = dataframe.select("fare_paid_in_pounds").where('passenger_class = 1')
print("Class 1 fare expectation = ", group1DataFrame.groupBy().avg().collect()[0][0])

group2DataFrame = dataframe.select("fare_paid_in_pounds").where('passenger_class = 2')
print("Class 2 fare expectation = ", group2DataFrame.groupBy().avg().collect()[0][0])

group3DataFrame = dataframe.select("fare_paid_in_pounds").where('passenger_class = 3')
print("Class 3 fare expectation = ", group3DataFrame.groupBy().avg().collect()[0][0])

-------- Assignment C --------
Class 1 fare expectation =  84.15468752825701
Class 2 fare expectation =  20.66218318109927
Class 3 fare expectation =  13.707707501045244


Only thing left to do, is to stop spark

In [123]:
spark.stop()