**Fact Table: Trip**
- Grain: One row per trip.
- Stores trip measures and foreign keys to rider, station, and date dimensions.

In [0]:
from pyspark.sql.functions import (
    col, unix_timestamp, year, monotonically_increasing_id
)

# Aliases for dimensions
r = spark.table("gold.dim_rider").alias("r")
s_start = spark.table("gold.dim_station").alias("s_start")
s_end = spark.table("gold.dim_station").alias("s_end")
d = spark.table("gold.dim_date").alias("d")
t = spark.table("bronze.trips").alias("t")

fact_trip = (
    t
    .join(r, col("t.rider_id") == col("r.rider_id"))
    .join(
        s_start,
        col("t.start_station_id") == col("s_start.station_id")
    )
    .join(
        s_end,
        col("t.end_station_id") == col("s_end.station_id")
    )
    .join(
        d,
        col("t.started_at").cast("date") == col("d.full_date")
    )
    .withColumn(
        "trip_duration_minutes",
        (unix_timestamp(col("t.ended_at")) - unix_timestamp(col("t.started_at"))) / 60
    )
    .withColumn(
        "rider_age_at_trip",
        year(col("t.started_at")) - year(col("r.birthday"))
    )
    .select(
        monotonically_increasing_id().alias("trip_key"),
        col("t.trip_id"),
        col("r.rider_key"),
        col("s_start.station_key").alias("start_station_key"),
        col("s_end.station_key").alias("end_station_key"),
        col("d.date_key"),
        col("t.started_at").alias("start_time"),
        col("t.ended_at").alias("end_time"),
        col("trip_duration_minutes"),
        col("rider_age_at_trip")
    )
)

fact_trip.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.fact_trip")


**Fact Table: Payment**
- Grain: One row per payment.
- Stores payment amount and foreign keys to rider and date dimensions.


In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

p = spark.table("bronze.payments").alias("p")
r = spark.table("gold.dim_rider").alias("r")
d = spark.table("gold.dim_date").alias("d")

fact_payment = (
    p
    .join(r, col("p.rider_id") == col("r.rider_id"))
    .join(
        d,
        col("p.date").cast("date") == col("d.full_date")
    )
    .select(
        monotonically_increasing_id().alias("payment_key"),
        col("p.payment_id"),
        col("r.rider_key"),
        col("d.date_key"),
        col("p.amount")
    )
)

fact_payment.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.fact_payment")
