In [0]:
# Use Bronze Store DB to retrieve data from rider table
spark.sql("USE DATABASE bronze_store")

In [0]:
# Load data from trip table to dataframe
trip_df = (
spark
    .read
    .table(tableName = "trip")
)

display(trip_df)

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12 16:14:56,2021-02-12 16:21:43,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24 15:43:33,2021-02-24 15:49:05,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01 17:47:42,2021-02-01 17:48:33,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11 18:33:53,2021-02-11 18:35:09,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27 15:13:39,2021-02-27 15:36:36,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20 08:59:42,2021-02-20 09:17:04,KP1705001026,KP1705001026,57068


In [0]:
# Use Gold Store DB to retrieve data from dim_rider, dim_date, dim_station tables to use in join operations against trip data
spark.sql("USE DATABASE gold_store")

In [0]:
# Load data from dim_rider table to dataframe
rider_df = (
spark
    .read
    .table(tableName = "dim_rider")
)

display(rider_df)

rider_id,first_name,last_name,address,birth_dt,account_start_dt,account_end_dt,is_member,rider_age_at_acc_start
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True,30
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True,43
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True,23
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False,20
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True,50
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False,46
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True,17
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False,29
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True,34
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True,39


In [0]:
# Load data from dim_rider table to dataframe
date_df = (
spark
    .read
    .table(tableName = "dim_date")
)

display(date_df)

date_id,date_value,month,month_name,year,quarter,day_of_week
20130201,2013-02-01,2,February,2013,1,Fri
20130202,2013-02-02,2,February,2013,1,Sat
20130203,2013-02-03,2,February,2013,1,Sun
20130204,2013-02-04,2,February,2013,1,Mon
20130205,2013-02-05,2,February,2013,1,Tue
20130206,2013-02-06,2,February,2013,1,Wed
20130207,2013-02-07,2,February,2013,1,Thu
20130208,2013-02-08,2,February,2013,1,Fri
20130209,2013-02-09,2,February,2013,1,Sat
20130210,2013-02-10,2,February,2013,1,Sun


In [0]:
# Load data from dim_station table to dataframe
station_df = (
spark
    .read
    .table(tableName = "dim_station")
)

display(station_df)

station_id,station_name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.0127,-87.66606
KA1503000012,Clark St & Lake St,41.885796,-87.6311
637,Wood St & Chicago Ave,41.895634,-87.672066
13216,State St & 33rd St,41.834732,-87.625824
18003,Fairbanks St & Superior St,41.89581,-87.620255
KP1705001026,LaSalle Dr & Huron St,41.89488,-87.632324
13253,Lincoln Ave & Waveland Ave,41.948795,-87.67528
KA1503000044,Rush St & Hubbard St,41.890175,-87.62618
KA1504000140,Winchester Ave & Elston Ave,41.924038,-87.676414
TA1305000032,Clinton St & Madison St,41.88224,-87.64107


In [0]:
# Import required functions and data types
from pyspark.sql.functions import col, months_between, to_date, to_timestamp, lit, round, regexp_replace, substring, unix_timestamp, hour, floor
from pyspark.sql.types import IntegerType, FloatType

# Convert rider_id to integer
# Convert timestamp-like string columns (start_at, end_at) to integer ids (start_date_id, end_date_id)
# Calculate trip duration in minutes as time delta between start_at and end_at, after converting those two columns to timestamp type
# Calculate Hour of Day by extracting hour from start_at values.
# Calculate Rider Age At Trip Time as time delta (in years) between start_at and birth_dt.
trip_df = (
    trip_df
       .join(other = rider_df.select(col("rider_id"), col("birth_dt")), # enforce referential integrity between tables &
                on = "rider_id",                                        # bring in birth_dt column to use in calculation expression for rider_age_at_trip_time
                how = "inner"
         )
        .select(
              col("trip_id"),
              col("start_station_id"),
              col("end_station_id"),        
              regexp_replace(
                              substring(col("start_at"), 1, 10), 
                              "-", ""
                          )
                    .cast(IntegerType())
                    .alias("start_date_id"),
              regexp_replace(
                              substring(col("ended_at"), 1, 10), 
                              "-", ""
                          )
                    .cast(IntegerType())
                    .alias("end_date_id"),
               col("rider_id").cast(IntegerType()),   
               col("rideable_type"),
                (
                   (unix_timestamp(to_timestamp(col("ended_at"))) -
                    unix_timestamp(to_timestamp(col("start_at")))
                    ) / 60
               )
                .cast(IntegerType())
                .alias("trip_duration_minutes"),
               hour(to_timestamp(col("start_at")))
                    .cast(IntegerType())
                    .alias("hour_of_day"),
                (
                    months_between(
                                     to_date(col("start_at")),
                                     to_date(col("birth_dt"))
                     )
                      / lit(12)
                )
                .cast(IntegerType())
                .alias("rider_age_at_trip_time")
    )
)

trip_df = (
    trip_df
        .join(other = date_df.select(col("date_id")),  # enforce referential integrity between tables
                    on = trip_df['start_date_id'] == date_df['date_id']
               )
        .drop(col("date_id")) # drop column to avoid throwing errors around column ambiguity
        .join(other = date_df.select(col("date_id")),  # enforce referential integrity between tables
                on = trip_df['end_date_id'] == date_df['date_id']
           )
        .drop(col("date_id")) # drop column to avoid throwing errors around column ambiguity
        .join(other = station_df.select(col("station_id")), # enforce referential integrity between tables
                on = trip_df['start_station_id'] == station_df['station_id']
           )
        .drop(col("station_id")) # drop column to avoid throwing errors around column ambiguity
        .join(other = station_df.select(col("station_id")),  # enforce referential integrity between tables
                on = trip_df['end_station_id'] == station_df['station_id']
           )
        .drop(col("station_id")) # drop column to avoid throwing errors around column ambiguity
)

trip_df.printSchema()

In [0]:
display(trip_df)

trip_id,start_station_id,end_station_id,start_date_id,end_date_id,rider_id,rideable_type,trip_duration_minutes,hour_of_day,rider_age_at_trip_time
222BB8E5059252D7,KA1503000064,13021,20210613,20210613,34062,classic_bike,18,9,30
1826E16CB5486018,TA1306000010,13021,20210621,20210621,5342,classic_bike,5,22,26
3D9B6A0A5330B04D,TA1305000030,13021,20210618,20210618,3714,classic_bike,5,16,26
07E82F5E9C9E490F,TA1305000034,13021,20210617,20210617,18793,classic_bike,16,16,18
A8E94BAECBF0C2DD,TA1308000009,TA1308000009,20210613,20210613,43342,docked_bike,54,17,28
378F4AB323AA1D14,TA1308000009,TA1308000009,20210613,20210613,6693,docked_bike,46,13,28
38AD311DC2EB1FBE,KA1503000019,KA1503000019,20210616,20210616,71480,docked_bike,14,17,56
1D466737F0B18097,TA1308000009,TA1308000009,20210627,20210627,50846,docked_bike,34,14,40
27E1142E1ACFAEFB,13257,13257,20210621,20210621,18951,electric_bike,0,13,21
67F2A115DAE77924,TA1308000009,TA1308000009,20210622,20210622,63987,classic_bike,16,0,37


In [0]:
# Load dataframe content to dimension table inside the Gold Store. The column header has been rearranged in the process
(
trip_df
    .select(
        col("trip_id"),
        col("start_station_id"),
        col("end_station_id"),
        col("start_date_id"),
        col("end_date_id"),
        col("rider_id"),
        col("rideable_type"),
        col("trip_duration_minutes"),
        col("hour_of_day"),
        col("rider_age_at_trip_time")
    )
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("fact_trip")
)