In [0]:
from pyspark.sql.functions import explode, sequence, to_date

In [0]:
df_riders = spark.read.format("delta") \
                    .load("/delta/bronze_riders")

In [0]:
df_riders.createOrReplaceTempView("riders")

In [0]:
spark.sql("""
        CREATE OR REPLACE TABLE dim_riders
        USING DELTA LOCATION '/delta/gold_riders'
        AS
        SELECT rider_id,
            address,
            first as first_name,
            last as last_name,
            birthday,
            account_start_date,
            account_end_date,
            CASE is_member WHEN 'True' THEN 'Member'
                ELSE 'Casual Rider'
                END AS member_status
            FROM riders
        """)

In [0]:
gold_riders = spark.sql ("""
            SELECT * FROM dim_riders
            """)

In [0]:
display(gold_riders)

rider_id,address,first_name,last_name,birthday,account_start_date,account_end_date,member_status
1000,1200 Alyssa Squares,Diana,Clark,1989-02-13,2019-04-23,,Member
1001,397 Diana Ferry,Jennifer,Smith,1976-08-10,2019-11-01,2020-09-01,Member
1002,644 Brittany Row Apt. 097,Karen,Smith,1998-08-10,2022-02-04,,Member
1003,996 Dickerson Turnpike,Bryan,Roberts,1999-03-29,2019-08-26,,Casual Rider
1004,7009 Nathan Expressway,Jesse,Middleton,1969-04-11,2019-09-14,,Member
1005,224 Washington Mills Apt. 467,Christine,Rodriguez,1974-08-27,2020-03-24,,Casual Rider
1006,1137 Angela Locks,Alicia,Taylor,2004-01-30,2020-11-27,2021-12-01,Member
1007,979 Phillips Ways,Benjamin,Fernandez,1988-01-11,2016-12-11,,Casual Rider
1008,7691 Evans Court,John,Crawford,1987-02-21,2021-03-28,2021-07-01,Member
1009,9922 Jim Crest Apt. 319,Victoria,Ritter,1981-02-07,2020-06-12,2021-11-01,Member


In [0]:
df_stations = spark.read.format("delta") \
                    .load("/delta/bronze_stations")

In [0]:
df_stations.createOrReplaceTempView("stations")

In [0]:
spark.sql("""
        CREATE OR REPLACE TABLE dim_stations
        USING DELTA LOCATION '/delta/gold_stations'
        AS
        SELECT station_id,
            name AS station_name,
            latitude,
            longitude
            FROM stations
""")

In [0]:
gold_stations = spark.sql(""" SELECT * FROM dim_stations""")
display(gold_stations)

station_id,station_name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.0127,-87.66606
KA1503000012,Clark St & Lake St,41.885796,-87.6311
637,Wood St & Chicago Ave,41.895634,-87.672066
13216,State St & 33rd St,41.834732,-87.625824
18003,Fairbanks St & Superior St,41.89581,-87.620255
KP1705001026,LaSalle Dr & Huron St,41.89488,-87.632324
13253,Lincoln Ave & Waveland Ave,41.948795,-87.67528
KA1503000044,Rush St & Hubbard St,41.890175,-87.62618
KA1504000140,Winchester Ave & Elston Ave,41.924038,-87.676414
TA1305000032,Clinton St & Madison St,41.88224,-87.64107


In [0]:
beginDate = '2013-01-31'
endDate = '2022-02-13'

(
  spark.sql(f"select explode(sequence(to_timestamp('{beginDate}'), to_timestamp('{endDate}'), interval 1 hour)) as calendarDateTime")
    .createOrReplaceTempView('datetimes')
)

In [0]:
spark.sql("""
    CREATE OR REPLACE TABLE dim_date
    USING DELTA LOCATION '/delta/gold_dates'
    AS
    SELECT bigint(date_format(calendarDateTime, 'yyyyMMddHH')) AS datetime_key,
    calendarDateTime AS datetime_actual,
    int(date_format(calendarDateTime, 'yyyyMMdd')) AS date_key,
    to_date(calendarDateTime) AS date_actual,
    day(calendarDateTime) AS day,
    date_format(calendarDateTime, 'EEEE') AS day_name,
    hour(calendarDateTime) AS hour,
    CASE WHEN date_format(calendarDateTime, 'HH:mm') BETWEEN '05:00' and '08:29'
                             THEN 'Early Morning'
  	                        WHEN date_format(calendarDateTime, 'HH:mm') BETWEEN '08:30' and '11:59'
  		                      THEN 'Late Morning'
  	                        WHEN date_format(calendarDateTime, 'HH:mm') BETWEEN '12:00' and '17:59'
  		                      THEN 'Noon'
  	                        WHEN date_format(calendarDateTime, 'HH:mm') BETWEEN '18:00' and '22:29'
  		                      THEN 'Evening'
  	                        ELSE 'Night' END AS time_of_day,
     dayofweek(calendarDateTime) AS day_of_week,
     date_format(calendarDateTime, 'D') AS day_of_year,
     extract(week FROM calendarDateTime) AS week_of_year,
     date_format(calendarDateTime,'MMMM') AS month_name,
     extract(month FROM calendarDateTime) AS month,
     CASE date_format(calendarDateTime, 'Q') WHEN 1 THEN 'First' 
                      WHEN 2 THEN 'Second' WHEN 3 THEN 'Third' WHEN 4 THEN 'Fourth' END AS quarter_name,
     date_format(calendarDateTime, 'Q') AS quarter,
     extract(year FROM calendarDateTime) AS year
    FROM datetimes
""")

In [0]:
gold_dates = spark.sql(""" SELECT * FROM dim_date """)

display(gold_dates)

datetime_key,datetime_actual,date_key,date_actual,day,day_name,hour,time_of_day,day_of_week,day_of_year,week_of_year,month_name,month,quarter_name,quarter,year
2013013100,2013-01-31T00:00:00.000+0000,20130131,2013-01-31,31,Thursday,0,Night,5,31,5,January,1,First,1,2013
2013013101,2013-01-31T01:00:00.000+0000,20130131,2013-01-31,31,Thursday,1,Night,5,31,5,January,1,First,1,2013
2013013102,2013-01-31T02:00:00.000+0000,20130131,2013-01-31,31,Thursday,2,Night,5,31,5,January,1,First,1,2013
2013013103,2013-01-31T03:00:00.000+0000,20130131,2013-01-31,31,Thursday,3,Night,5,31,5,January,1,First,1,2013
2013013104,2013-01-31T04:00:00.000+0000,20130131,2013-01-31,31,Thursday,4,Night,5,31,5,January,1,First,1,2013
2013013105,2013-01-31T05:00:00.000+0000,20130131,2013-01-31,31,Thursday,5,Early Morning,5,31,5,January,1,First,1,2013
2013013106,2013-01-31T06:00:00.000+0000,20130131,2013-01-31,31,Thursday,6,Early Morning,5,31,5,January,1,First,1,2013
2013013107,2013-01-31T07:00:00.000+0000,20130131,2013-01-31,31,Thursday,7,Early Morning,5,31,5,January,1,First,1,2013
2013013108,2013-01-31T08:00:00.000+0000,20130131,2013-01-31,31,Thursday,8,Early Morning,5,31,5,January,1,First,1,2013
2013013109,2013-01-31T09:00:00.000+0000,20130131,2013-01-31,31,Thursday,9,Late Morning,5,31,5,January,1,First,1,2013


In [0]:
df_trips = spark.read.format("delta") \
                .load("/delta/bronze_trips")

In [0]:
df_trips.createOrReplaceTempView("trips")

In [0]:
df_payments = spark.read.format("delta") \
                    .load("/delta/bronze_payments")

In [0]:
df_payments.createOrReplaceTempView("payments")

In [0]:
spark.sql("""
          CREATE OR REPLACE TABLE fact_payment_details
          USING DELTA LOCATION '/delta/gold_fact_payment_details'
          AS 
          SELECT 
            payment_id AS fact_payment_details_key,
            dim_riders.rider_id AS rider_id,
            bigint(date_format(payments.date, 'yyyyMMddHH')) AS paymentdate_key,
            payments.amount as payment_amount
            FROM payments
            LEFT JOIN dim_riders
            ON payments.rider_id = dim_riders.rider_id;
         """)

In [0]:
spark.sql("""
          CREATE OR REPLACE TABLE fact_trip_rider_details 
          USING DELTA LOCATION '/delta/gold_fact_trip_rider_details'
          AS 
          SELECT 
            CONCAT(trip_id, CAST(dim_riders.rider_id AS varchar(10))) AS trip_rider_key,
            trips.start_station_id AS start_station_id,
            trips.end_station_id AS end_station_id,
            dim_riders.rider_id AS rider_id,
            BIGINT(date_format(trips.start_at, 'yyyyMMddHH')) AS trip_start_key,
            INT(months_between(trips.start_at, dim_riders.birthday) / 12) AS rider_age_at_trip_years,
            INT(months_between(dim_riders.account_start_date, dim_riders.birthday) / 12) AS rider_age_at_account_start_years,
            BIGINT(trips.ended_at) - BIGINT(trips.start_at)  AS trip_duration_seconds
            FROM trips
            LEFT JOIN dim_riders
            ON trips.rider_id = dim_riders.rider_id;
         """)