In [0]:
dfTrips = spark.read.format("delta").load("/delta/bronze_trips")

dfPayments = spark.read.format("delta").load("/delta/bronze_payments")

dfRiders = spark.read.format("delta").load("/delta/bronze_riders")

dfStations = spark.read.format("delta").load("/delta/bronze_stations")

In [0]:
from pyspark.sql.functions import year, quarter, month, dayofmonth, hour, minute, second, monotonically_increasing_id

dfTripsStart = dfTrips.select(year("started_at").alias("year"), \
                            quarter("started_at").alias("quarter"), \
                            month("started_at").alias("month"), \
                            dayofmonth("started_at").alias("day"), \
                            hour("started_at").alias("hour"), \
                            minute("started_at").alias("minute"), \
                            second("started_at").alias("second"))

dfTripsEnd = dfTrips.select(year("ended_at").alias("year"), \
                            quarter("ended_at").alias("quarter"), \
                            month("ended_at").alias("month"), \
                            dayofmonth("ended_at").alias("day"), \
                            hour("ended_at").alias("hour"), \
                            minute("ended_at").alias("minute"), \
                            second("ended_at").alias("second"))

dfPaymentsDate = dfPayments.select(year("date").alias("year"), \
                            quarter("date").alias("quarter"), \
                            month("date").alias("month"), \
                            dayofmonth("date").alias("day"), \
                            hour("date").alias("hour"), \
                            minute("date").alias("minute"), \
                            second("date").alias("second"))

dfDimDates = dfTripsStart.union(dfTripsEnd).union(dfPaymentsDate) \
                            .dropDuplicates().withColumn("date_id", monotonically_increasing_id()) \
                            .select("date_id", "year", "quarter", "month", "day", "hour", "minute", "second")

dfDimDates.show()

+-------+----+-------+-----+---+----+------+------+
|date_id|year|quarter|month|day|hour|minute|second|
+-------+----+-------+-----+---+----+------+------+
|      0|2021|      2|    6| 23|  17|    56|    16|
|      1|2021|      2|    6|  8|  22|    25|    58|
|      2|2021|      2|    6| 13|  21|    12|    44|
|      3|2021|      2|    6| 30|  15|    15|    36|
|      4|2021|      2|    6| 20|  17|    47|    47|
|      5|2021|      2|    6|  5|   9|    40|     1|
|      6|2021|      2|    6|  3|  15|    16|    21|
|      7|2021|      2|    6| 20|  12|    18|    26|
|      8|2021|      2|    6|  6|   9|     9|    48|
|      9|2021|      2|    6| 19|  14|    56|    59|
|     10|2021|      2|    6| 16|  21|    32|    48|
|     11|2021|      2|    6| 20|  15|    47|    34|
|     12|2021|      2|    6|  3|  21|    49|     4|
|     13|2021|      2|    6| 10|  20|    36|    42|
|     14|2021|      2|    6| 18|  13|    50|    57|
|     15|2021|      2|    6| 16|  20|    55|     0|
|     16|202

In [0]:
dfTrips.createOrReplaceTempView("trips")
dfDimDates.createOrReplaceTempView("dim_dates")
dfRiders.createOrReplaceTempView("riders")

dfFactTrips = spark.sql("""
    SELECT t.trip_id, 
           UNIX_TIMESTAMP(t.ended_at) - UNIX_TIMESTAMP(t.started_at) AS trip_duration, 
           t.rideable_type, 
           d1.date_id AS start_date_id, 
           d2.date_id AS end_date_id, 
           t.start_station_id, 
           t.end_station_id, 
           t.rider_id, 
           DATEDIFF(t.started_at, r.birthday) / 365 AS rider_age
    FROM trips t
    JOIN dim_dates d1 ON YEAR(t.started_at) = d1.year 
                      AND QUARTER(t.started_at) = d1.quarter 
                      AND MONTH(t.started_at) = d1.month 
                      AND DAY(t.started_at) = d1.day 
                      AND HOUR(t.started_at) = d1.hour 
                      AND MINUTE(t.started_at) = d1.minute 
                      AND SECOND(t.started_at) = d1.second
    JOIN dim_dates d2 ON YEAR(t.ended_at) = d2.year 
                      AND QUARTER(t.ended_at) = d2.quarter 
                      AND MONTH(t.ended_at) = d2.month 
                      AND DAY(t.ended_at) = d2.day 
                      AND HOUR(t.ended_at) = d2.hour 
                      AND MINUTE(t.ended_at) = d2.minute 
                      AND SECOND(t.ended_at) = d2.second
    JOIN riders r ON t.rider_id = r.rider_id
    """)
dfFactTrips = dfFactTrips.withColumn("rider_age", dfFactTrips["rider_age"].cast("integer"))

dfFactTrips.show()

+----------------+-------------+-------------+-------------+-----------+----------------+--------------+--------+---------+
|         trip_id|trip_duration|rideable_type|start_date_id|end_date_id|start_station_id|end_station_id|rider_id|rider_age|
+----------------+-------------+-------------+-------------+-----------+----------------+--------------+--------+---------+
|E221EF1C4A86FAEE|         3308| classic_bike|          186|     806235|           13068|         13221|   28996|       42|
|78D438849F1CA541|          251|electric_bike|          335|     806357|    TA1308000050|           331|   41271|       40|
|CE968992CD5A9CA0|         2881|electric_bike|          776|     806659|    TA1307000142|        SL-013|   56450|       20|
|55284DD595F08B14|          431| classic_bike|          285|       1535|           13332|         13158|   33218|       17|
|B426A5704A7FAF7E|         1985|electric_bike|         1782|     807348|           13075|         13292|   41219|       26|
|B4032AF

In [0]:
dfFactPayments = dfPayments.join(dfDimDates, 
                              (dfDimDates["year"] == year(dfPayments["date"])) & 
                              (dfDimDates["quarter"] == quarter(dfPayments["date"])) & 
                              (dfDimDates["month"] == month(dfPayments["date"])) & 
                              (dfDimDates["day"] == dayofmonth(dfPayments["date"])) & 
                              (dfDimDates["hour"] == hour(dfPayments["date"])) & 
                              (dfDimDates["minute"] == minute(dfPayments["date"])) & 
                              (dfDimDates["second"] == second(dfPayments["date"])), 
                              "inner") \
                          .drop("year", "quarter", "month", "day", "hour", "minute", "second") \
                          .drop("date") \
                          .select("payment_id", "date_id", "amount", "rider_id")

display(dfFactPayments)

payment_id,date_id,amount,rider_id
1946486,1686593,9.0,75998
1946513,1686594,9.0,75998
1946522,1686595,9.0,75998
1946492,1686596,9.0,75998
1944395,1686597,9.0,75906
1946575,1686598,9.0,75999
1946599,1686599,9.0,75999
1946531,1686600,9.0,75998
1946474,1686601,9.0,75998
1946598,1686602,9.0,75999


In [0]:
dfFactTrips.write.format("delta").mode("overwrite").save("/delta/star_fact_trips")

dfFactPayments.write.format("delta").mode("overwrite").save("/delta/star_fact_payments")

dfRiders.write.format("delta").mode("overwrite").save("/delta/star_dim_riders")

dfStations.write.format("delta").mode("overwrite").save("/delta/star_dim_stations")

dfDimDates.write.format("delta").mode("overwrite").save("/delta/star_dim_dates")