Take the cleaned silver table of titanic survival data and aggregate into gold tables (aggregated metrics ready for dashboards).

In [0]:
from pyspark.sql import functions as F

In [0]:
titanic_df = spark.table('silver.titanic')
titanic_df.limit(5).display()

## Group by sex and passenger class

Group data by sex and passenger class to see how survival and other numeric features average.

In [0]:
titanic_df_sex_class = (
    titanic_df
    .groupBy("Sex", "Pclass")
    #   Average numeric columns
    .agg(
        F.mean("Survived").alias("SurvivalRate"),
        F.mean("Age").alias("AvgAge"),
        F.mean("SibSp").alias("AvgSibSp"),
        F.mean("Parch").alias("AvgParch"),
        F.mean("FamilyAboard").alias("AvgFamilyAboard")
    )
    #   Sort
    .orderBy("Sex", "Pclass")
)
titanic_df_sex_class.display()

## Port-level statistics

In [0]:
#   To later map port abbreviations to their full name, create a
#   Spark column expression/ map
mapping = {
    "C": "Cherbourg",
    "Q": "Queenstown",
    "S": "Southampton"
}

mapping_expr = F.create_map(
    [F.lit(x) for pair in mapping.items() for x in pair]
)

In [0]:
titanic_df_port = (
    titanic_df
    #   Drop null port
    .filter(F.col("Embarked").isNotNull())
    #   Count passengers and average survival rate and fare
    .groupBy("Embarked")
    .agg(
        F.count("*").alias("NumPassengers"),
        F.mean("Survived").alias("SurvivalRate"),
        F.mean("Fare").alias("AvgFare")
    )
    #   Map abbreviations to ports' full names
    .withColumn("Embarked", mapping_expr[F.col("Embarked")])
)
titanic_df_port.display()

## Write gold tables

In [0]:
#   Write to Delta tables. Ensure the schema exists in the workspace
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

#   Stats grouped by sex and class
(
    titanic_df_sex_class
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("gold.titanic_sex_class")
)

#   Port-level stats
(
    titanic_df_port
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("gold.titanic_port")
)