In [0]:
# Drop if the trips facts exists
spark.sql("DROP TABLE IF EXISTS facts.trips")

Out[1]: DataFrame[]

In [0]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from pyspark.sql.types import DateType

# read the trips and riders staging data.
trips = spark.table("staging.trips")
rider = spark.table("staging.riders")

# Adding the time colum, in order to provide context to the time dimension.
# These time values are extracted from started_at & ended_at datetime columns
trips = trips.withColumn("trip_start_time", date_format(to_timestamp("started_at"),'HH:MM:ss'))
trips = trips.withColumn("trip_end_time", date_format(to_timestamp("ended_at"),'HH:MM:ss'))

# transform SQL statement to prepare the trips facts
final_dim = trips.join(rider,'rider_id').select(trips["trip_id"], 
                                                rider["rider_id"], 
                                                trips["start_station_id"], 
                                                trips["end_station_id"],
                                                to_date(trips.started_at).alias('trip_start_date'),
                                                to_date(trips.ended_at).alias('trip_end_date'), 
                                                trips["trip_start_time"], 
                                                trips["trip_end_time"], 
                                                (to_timestamp(trips.ended_at) - to_timestamp(trips.started_at)).alias("trip_duration"),
                                                round(months_between(to_date(trips.started_at),to_date(rider.birthday))/lit(12)).alias("rider_age").cast("Integer")
                                               )
print(final_dim.show())

# Saves the data as a table in delta location.
final_dim.write.format("delta").mode("overwrite").saveAsTable("facts.trips")

+----------------+--------+----------------+--------------+---------------+-------------+---------------+-------------+--------------------+---------+
|         trip_id|rider_id|start_station_id|end_station_id|trip_start_date|trip_end_date|trip_start_time|trip_end_time|       trip_duration|rider_age|
+----------------+--------+----------------+--------------+---------------+-------------+---------------+-------------+--------------------+---------+
|9506E0F317DE3528|   47823|           13146|  TA1309000024|     2021-08-06|   2021-08-06|       22:08:29|     22:08:16|INTERVAL '0 00:05...|       36|
|02F52EC184442971|   71777|           13146|  TA1307000151|     2021-08-23|   2021-08-23|       09:08:29|     10:08:29|INTERVAL '0 00:09...|       35|
|B47FF3EEB2039C65|   39370|           13206|         13158|     2021-08-24|   2021-08-24|       11:08:39|     11:08:40|INTERVAL '0 00:03...|       36|
|CA6B1F0DE566112F|   14990|           13430|  TA1307000151|     2021-08-12|   2021-08-12|     