In [None]:
import pandas as pd
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    when,
    count,
)
from pyspark.sql.window import Window
from config.config import Config

pd.options.display.max_columns = None

In [4]:
# Initialise configurations
config = Config()

# Initialise Spark session
builder = (
    SparkSession.builder.appName("CuratedData")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.memory.fraction", "0.6")
    .config("spark.memory.storageFraction", "0.5")
)

# Configure Spark with Delta Lake
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
# Load data
dim_date = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/dim_date")
dim_time_bucket = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/dim_time_bucket")
dim_train = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/dim_train")
dim_station = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/dim_station")
dim_line = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/dim_line")
dim_direction = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/dim_direction")
dim_mode = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/dim_mode")
fact_train = spark.read.format("delta").load(f"{config.GOLD_DATA_PATH}/fact_train")

In [None]:
# Basic Counts & Null Checks

tables = [
    (dim_date, "dim_date"),
    (dim_time_bucket, "dim_time_bucket"),
    (dim_train, "dim_train"),
    (dim_station, "dim_station"),
    (dim_line, "dim_line"),
    (dim_direction, "dim_direction"),
    (dim_mode, "dim_mode"),
    (fact_train, "fact_train"),
]

for df, name in tables:
    print(f"{name} row count: {df.count()}")
    print(f"{name} null counts per column:")
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show(
        truncate=False
    )

dim_date row count: 366
dim_date null counts per column:
+--------+-------------+-----------+--------+----+-----+---+
|Date_Key|Business_Date|Day_of_Week|Day_Type|year|month|day|
+--------+-------------+-----------+--------+----+-----+---+
|0       |0            |0          |0       |0   |0    |0  |
+--------+-------------+-----------+--------+----+-----+---+

dim_time_bucket row count: 48
dim_time_bucket null counts per column:
+---------------+-----------+----------------------+
|Time_Bucket_Key|Time_Bucket|Time_Range_Description|
+---------------+-----------+----------------------+
|0              |0          |0                     |
+---------------+-----------+----------------------+

dim_train row count: 21847
dim_train null counts per column:
+---------+------------+----+---------+-----+---------+--------------+-------------------+
|Train_Key|Train_Number|Mode|Line_Name|Group|Direction|Origin_Station|Destination_Station|
+---------+------------+----+---------+-----+---------+---

In [None]:
# Check Referential Integrity
# Verifying that foreign keys in fact_train exist in dimension tables

def check_fk_integrity(fact_df, dim_df, fk_col, pk_col, dim_name):
    missing_fk = fact_df.select(fk_col).subtract(dim_df.select(pk_col))
    count_missing = missing_fk.count()
    print(f"Foreign key '{fk_col}' missing in {dim_name}: {count_missing}")
    if count_missing > 0:
        missing_fk.show()

fk_checks = [
    ("Date_Key", dim_date, "Date_Key", "dim_date"),
    ("Train_Key", dim_train, "Train_Key", "dim_train"),
    ("Station_Key", dim_station, "Station_Key", "dim_station"),
    ("Line_Key", dim_line, "Line_Key", "dim_line"),
    ("Direction_Key", dim_direction, "Direction_Key", "dim_direction"),
    ("Mode_Key", dim_mode, "Mode_Key", "dim_mode"),
    ("Arrival_Time_Bucket_Key", dim_time_bucket, "Time_Bucket_Key", "dim_time_bucket"),
    ("Departure_Time_Bucket_Key", dim_time_bucket, "Time_Bucket_Key", "dim_time_bucket"),
]

for fk_col, dim_df, pk_col, dim_name in fk_checks:
    if fk_col in fact_train.columns:
        check_fk_integrity(fact_train, dim_df, fk_col, pk_col, dim_name)

Foreign key 'Date_Key' missing in dim_date: 0
Foreign key 'Train_Key' missing in dim_train: 0
Foreign key 'Station_Key' missing in dim_station: 0
Foreign key 'Line_Key' missing in dim_line: 0
Foreign key 'Direction_Key' missing in dim_direction: 0
Foreign key 'Mode_Key' missing in dim_mode: 0
Foreign key 'Arrival_Time_Bucket_Key' missing in dim_time_bucket: 0
Foreign key 'Departure_Time_Bucket_Key' missing in dim_time_bucket: 0


In [9]:
# Sample Spot Checks
# Random sample rows from fact with joined dimension data for human-readable validation

fact_train.alias("f").join(dim_train.alias("t"), "Train_Key") \
    .join(dim_station.alias("s"), "Station_Key") \
    .join(dim_date.alias("d"), "Date_Key") \
    .select("f.Passenger_Boardings", "f.Passenger_Alightings", "t.Train_Number", "s.Station_Name", "d.Business_Date") \
    .show(10, truncate=False)

+-------------------+--------------------+------------+------------+-------------+
|Passenger_Boardings|Passenger_Alightings|Train_Number|Station_Name|Business_Date|
+-------------------+--------------------+------------+------------+-------------+
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10                 |0                   |5008        |Merlynston  |2023-12-21   |
|10 

In [None]:
# Statistical Checks on Measures
# Look for unexpected zero or negative values again (should be none now)

fact_train.filter((col("Passenger_Boardings") < 0) | (col("Passenger_Alightings") < 0)).count()

0

In [11]:
# Time Bucket Consistency
# Checking time bucket keys only contain expected values

dim_time_bucket.select("Time_Bucket_Key", "Time_Range_Description").show(truncate=False)

+---------------+----------------------+
|Time_Bucket_Key|Time_Range_Description|
+---------------+----------------------+
|0              |00:00 - 00:00         |
|30             |00:30 - 00:00         |
|60             |01:00 - 01:01         |
|90             |01:30 - 01:01         |
|120            |02:00 - 02:02         |
|150            |02:30 - 02:02         |
|180            |03:00 - 03:03         |
|210            |03:30 - 03:03         |
|240            |04:00 - 04:04         |
|270            |04:30 - 04:04         |
|300            |05:00 - 05:05         |
|330            |05:30 - 05:05         |
|360            |06:00 - 06:06         |
|390            |06:30 - 06:06         |
|420            |07:00 - 07:07         |
|450            |07:30 - 07:07         |
|480            |08:00 - 08:08         |
|510            |08:30 - 08:08         |
|540            |09:00 - 09:09         |
|570            |09:30 - 09:09         |
+---------------+----------------------+
only showing top