#Titanic Dataset Exercise

## Import libraries and Data
- Import the libraries you'll need to perform the exercises
- Import the data, producing spark dataframes. NB the options used e.g. header. Make sure to display your dataframes to check they are as expected

In [3]:
""" Import Libraries """
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql.window import Window

""" Import the data """
# File location and type
file_location = "/FileStore/tables/"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location + 'train.csv')

df_test = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location + 'test.csv')

df_gender_schema = t.StructType([
                    t.StructField("PassengerId", t.IntegerType()),
                    t.StructField("Survived", t.IntegerType())
                ])

df_gender = spark.read.format(file_type) \
  .option("inferSchema", False) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(df_gender_schema) \
  .load(file_location + 'gender_submission.csv')



In [4]:
df_train.show(n=5, truncate = False)

##Questions
### 1) Excluding the child passengers (Age < 18) find the the minimum, maximum, and average fares for each passenger class.

1) Create a complete dataset by: <br>
<Tab> a. Join `df_gender` with `df_test` on `PassengeId` <br>
<Tab> b. Append the dataframe from a with `df_train`
  
Tip: Check that the transformations you are doing produce the expected output e.g. by checking number of rows, columns, schema.
  
2) Filter the complete dataframe `df` selecting only passenger with `Age < 18`, aggregate by `Pclass` calculating the minimum, maximum and average fares.

In [7]:
# Join df_gender to df_test
df_join = df_test.join(df_gender, df_test.PassengerId == df_gender.PassengerId, 'left_outer').drop(df_gender.PassengerId)

#Check we have the same number of rows in df_join as df_test since we are using a left outer join.
assert df_join.count() == df_gender.count()

# Union the df_train with df_join
df = df_train.unionAll(df_join.select(*df_train.columns))

#Check we get the correct number of rows and the schema is correct
assert df.count() == df_train.count() + df_join.count()
assert df.schema == df_train.schema
df.show(n = 5, truncate = False)

In [8]:
# Filter for over 18s, Use groupBy and agg functions to aggregate over class using pyspark sql functions min, max, mean
df_agg_fare = df.where(df.Age>=18).groupBy(['Pclass']).agg(f.round(f.min('Fare'),2).alias('min_fare'),
                                                 f.round(f.max('Fare'),2).alias('max_fare'),
                                                 f.round(f.mean('Fare'),2).alias('mean_fare'))
df_agg_fare.show()

#Using sql
df.createOrReplaceTempView('table_df')
sql_query = 'SELECT Pclass, round(min(Fare),2) as min_fare, round(max(Fare),2) as max_fare, round(avg(Fare),2) as mean_fare FROM table_df WHERE Age >= 18 GROUP BY Pclass'
spark.sql(sql_query).show()

### 2) Produce a summary with the survival rates for each passenger gender, with survival rate as a %?

In [10]:
# Aggregate by by gender and survived indicator
df_survival = df.groupBy(['Sex','Survived']).agg(f.count('Sex').alias('count')) \
                .withColumn('gender_total',f.sum(f.col('count')).over(Window.partitionBy(f.col('Sex'))))
df_survival.show()

# Calculate the survival rate
df_survival_rate = df_survival.withColumn('survival_rate (%)',f.round((f.col('count')/f.col('gender_total'))*100,2))\
                              .where(f.col('Survived') == 1)\
                              .select('Sex','survival_rate (%)')
df_survival_rate.show()

### BONUS - Produce a summary of the passenger cabins. How will you handle missing values?

In [12]:
df_cabin = df.select('PassengerId','Pclass','Fare',
                    f.when(f.col('Cabin').isNull(),'N/A').otherwise(f.col('Cabin').substr(0,1)).alias('Cabin_Letter')
                    )
df_cabin.show(n=5)

In [13]:
# Create a pivot table to show the classes who stay in each floor
df_cabin_summary = df_cabin.groupBy(['Cabin_Letter']).pivot('Pclass').agg(f.count("PassengerId")).orderBy('Cabin_Letter')

# Create 
df_cabin_fare = df_cabin.groupBy('Cabin_Letter').agg(f.round(f.mean('Fare'),2).alias('Avg_Price'))
df_cabin_summary = df_cabin_summary.join(df_cabin_fare,df_cabin_summary.Cabin_Letter == df_cabin_fare.Cabin_Letter,'left_outer').select(df_cabin_summary.Cabin_Letter,'1','2','3','Avg_Price')
df_cabin_summary.show()
