In [0]:
min_date_qry = spark.sql('''
    SELECT MIN(StartTimeId) as StartTimeId FROM trips
''')
min_date = min_date_qry.first().asDict()['StartTimeId']

max_date_qry = spark.sql('''
    SELECT DATEADD(year, 5, MAX(StartTimeId)) as StartTimeId FROM trips
''')
max_date = max_date_qry.first().asDict()['StartTimeId']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType


dim_time = spark.createDataFrame([(1,)], ["TimeId"])

dim_time = dim_time.withColumn("dateinit", f.explode(f.expr(expression)))
dim_time = dim_time.withColumn("Date", f.to_timestamp(dim_time.dateinit, "yyyy-MM-dd"))
dim_time = dim_time \
    .withColumn("Year", f.year(dim_time.Date)) \
    .withColumn("Quarter", f.quarter(dim_time.Date)) \
    .withColumn("Month", f.month(dim_time.Date)) \
    .withColumn("Day", f.dayofweek(dim_time.Date)) \
    .withColumn("TimeId", dim_time.Date.cast(StringType())) \


dim_time.show(5)

+-------------------+----------+-------------------+----+-------+-----+---+
|             TimeId|  dateinit|               Date|Year|Quarter|Month|Day|
+-------------------+----------+-------------------+----+-------+-----+---+
|2021-02-01 00:00:00|2021-02-01|2021-02-01 00:00:00|2021|      1|    2|  2|
|2021-02-02 00:00:00|2021-02-02|2021-02-02 00:00:00|2021|      1|    2|  3|
|2021-02-03 00:00:00|2021-02-03|2021-02-03 00:00:00|2021|      1|    2|  4|
|2021-02-04 00:00:00|2021-02-04|2021-02-04 00:00:00|2021|      1|    2|  5|
|2021-02-05 00:00:00|2021-02-05|2021-02-05 00:00:00|2021|      1|    2|  6|
+-------------------+----------+-------------------+----+-------+-----+---+
only showing top 5 rows



In [0]:
dim_time.agg({'year': 'max'}).show()

+---------+
|max(year)|
+---------+
|     2027|
+---------+



In [0]:
dim_time.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_time")

In [0]:
%sql
select * from dim_time limit 5

TimeId,dateinit,date,Year,Quarter,Month,Day
2021-02-01 00:00:00,2021-02-01,2021-02-01T00:00:00.000+0000,2021,1,2,2
2021-02-02 00:00:00,2021-02-02,2021-02-02T00:00:00.000+0000,2021,1,2,3
2021-02-03 00:00:00,2021-02-03,2021-02-03T00:00:00.000+0000,2021,1,2,4
2021-02-04 00:00:00,2021-02-04,2021-02-04T00:00:00.000+0000,2021,1,2,5
2021-02-05 00:00:00,2021-02-05,2021-02-05T00:00:00.000+0000,2021,1,2,6


In [0]:
dim_time.printSchema()

root
 |-- TimeId: string (nullable = true)
 |-- dateinit: date (nullable = false)
 |-- Date: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)



In [0]:
fact_payment = spark.sql('''
    SELECT
        PaymentId,
        payments.Amount,
        payments.RiderId,
        dim_time.TimeId
    FROM payments
    JOIN dim_time ON dim_time.date = payments.date
''')

In [0]:
fact_payment.show()

+---------+------+-------+-------------------+
|PaymentId|Amount|RiderId|             TimeId|
+---------+------+-------+-------------------+
|       22|   9.0|   1000|2021-02-01 00:00:00|
|       23|   9.0|   1000|2021-03-01 00:00:00|
|       24|   9.0|   1000|2021-04-01 00:00:00|
|       25|   9.0|   1000|2021-05-01 00:00:00|
|       26|   9.0|   1000|2021-06-01 00:00:00|
|       27|   9.0|   1000|2021-07-01 00:00:00|
|       28|   9.0|   1000|2021-08-01 00:00:00|
|       29|   9.0|   1000|2021-09-01 00:00:00|
|       30|   9.0|   1000|2021-10-01 00:00:00|
|       31|   9.0|   1000|2021-11-01 00:00:00|
|       32|   9.0|   1000|2021-12-01 00:00:00|
|       33|   9.0|   1000|2022-01-01 00:00:00|
|       34|   9.0|   1000|2022-02-01 00:00:00|
|       62|  6.56|   1003|2021-02-01 00:00:00|
|       63|   7.4|   1003|2021-03-01 00:00:00|
|       64| 18.31|   1003|2021-04-01 00:00:00|
|       65| 14.48|   1003|2021-05-01 00:00:00|
|       66|   6.5|   1003|2021-06-01 00:00:00|
|       67| 1

In [0]:
fact_payment.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payment")

In [0]:
%sql
select * from fact_payment limit 10

PaymentId,Amount,RiderId,TimeId
22,9.0,1000,2021-02-01 00:00:00
23,9.0,1000,2021-03-01 00:00:00
24,9.0,1000,2021-04-01 00:00:00
25,9.0,1000,2021-05-01 00:00:00
26,9.0,1000,2021-06-01 00:00:00
27,9.0,1000,2021-07-01 00:00:00
28,9.0,1000,2021-08-01 00:00:00
29,9.0,1000,2021-09-01 00:00:00
30,9.0,1000,2021-10-01 00:00:00
31,9.0,1000,2021-11-01 00:00:00


In [0]:
fact_trip = spark.sql('''
    SELECT 
        trips.TripId,
        riders.RiderId,
        trips.StartStationId, 
        trips.EndStationId, 
        StartTime.TimeId    AS StartTimeId,
        EndTime.TimeId      AS EndTimeId,
        trips.Type,
        DATEDIFF(hour, trips.StartTimeId, trips.EndTimeId)    AS Duration,
        DATEDIFF(year, riders.BirthDay, trips.StartTimeId)  AS RiderAge
    FROM trips JOIN riders ON riders.RiderId = trips.RiderId
    JOIN dim_time AS StartTime ON StartTime.Date = DATE_FORMAT(trips.StartTimeId, 'yyyy-MM-dd')
    JOIN dim_time AS EndTime ON EndTime.Date = DATE_FORMAT(trips.EndTimeId, 'yyyy-MM-dd')
''')

In [0]:
fact_trip.show()

+----------------+-------+--------------+------------+-------------------+-------------------+-------------+--------+--------+
|          TripId|RiderId|StartStationId|EndStationId|        StartTimeId|          EndTimeId|         Type|Duration|RiderAge|
+----------------+-------+--------------+------------+-------------------+-------------------+-------------+--------+--------+
|222BB8E5059252D7|  34062|  KA1503000064|       13021|2021-06-13 00:00:00|2021-06-13 00:00:00| classic_bike|       0|      30|
|1826E16CB5486018|   5342|  TA1306000010|       13021|2021-06-21 00:00:00|2021-06-21 00:00:00| classic_bike|       0|      26|
|3D9B6A0A5330B04D|   3714|  TA1305000030|       13021|2021-06-18 00:00:00|2021-06-18 00:00:00| classic_bike|       0|      26|
|07E82F5E9C9E490F|  18793|  TA1305000034|       13021|2021-06-17 00:00:00|2021-06-17 00:00:00| classic_bike|       0|      18|
|A8E94BAECBF0C2DD|  43342|  TA1308000009|TA1308000009|2021-06-13 00:00:00|2021-06-13 00:00:00|  docked_bike|   

In [0]:
fact_trip.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_trip")

In [0]:
%sql
SELECT * from fact_trip limit 5


TripId,RiderId,StartStationId,EndStationId,StartTimeId,EndTimeId,Type,Duration,RiderAge
89E7AA6C29227EFF,71934,525,660,2021-02-12 00:00:00,2021-02-12 00:00:00,classic_bike,0,37
0FEFDE2603568365,47854,525,16806,2021-02-14 00:00:00,2021-02-14 00:00:00,classic_bike,0,38
E6159D746B2DBB91,70870,KA1503000012,TA1305000029,2021-02-09 00:00:00,2021-02-09 00:00:00,electric_bike,0,33
B32D3199F1C2E75B,58974,637,TA1305000034,2021-02-02 00:00:00,2021-02-02 00:00:00,classic_bike,0,19
83E463F23575F4BF,39608,13216,TA1309000055,2021-02-23 00:00:00,2021-02-23 00:00:00,electric_bike,0,71


In [0]:
dim_rider = spark.sql('''
    SELECT 
        RiderId, 
        FirstName, 
        LastName, 
        Address, 
        BirthDay, 
        StartDate, 
        EndDate, 
        IsMember
    FROM riders
''')

In [0]:
dim_rider.show(2)

+-------+---------+--------+-------------------+----------+----------+----------+--------+
|RiderId|FirstName|LastName|            Address|  BirthDay| StartDate|   EndDate|IsMember|
+-------+---------+--------+-------------------+----------+----------+----------+--------+
|   1000|    Diana|   Clark|1200 Alyssa Squares|1989-02-13|2019-04-23|      null|    true|
|   1001| Jennifer|   Smith|    397 Diana Ferry|1976-08-10|2019-11-01|2020-09-01|    true|
+-------+---------+--------+-------------------+----------+----------+----------+--------+
only showing top 2 rows



In [0]:
dim_rider.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("dim_rider")

In [0]:
%sql
SELECT * from dim_rider limit 5

RiderId,FirstName,LastName,Address,BirthDay,StartDate,EndDate,IsMember
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


In [0]:
dim_station = spark.sql(
    """
    SELECT 
        StationId, Name, Latitude, Longitude 
    FROM stations
"""
)

In [0]:
dim_station.show(2)

+------------+--------------------+-----------------+------------------+
|   StationId|                Name|         Latitude|         Longitude|
+------------+--------------------+-----------------+------------------+
|         525|Glenwood Ave & To...|        42.012701|-87.66605799999999|
|KA1503000012|  Clark St & Lake St|41.88579466666667|-87.63110066666668|
+------------+--------------------+-----------------+------------------+
only showing top 2 rows



In [0]:
dim_station.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_station")

In [0]:
%sql
SELECT * from dim_station limit 2

StationId,Name,Latitude,Longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668


In [0]:
dim_payment = spark.sql(
    """
    SELECT
        PaymentId,
        payments.Amount,
        payments.RiderId,
        dim_time.TimeId
    FROM payments
    JOIN dim_time ON dim_time.date = payments.date
"""
)

In [0]:
dim_payment.show(2)

+---------+------+-------+-------------------+
|PaymentId|Amount|RiderId|             TimeId|
+---------+------+-------+-------------------+
|       22|   9.0|   1000|2021-02-01 00:00:00|
|       23|   9.0|   1000|2021-03-01 00:00:00|
+---------+------+-------+-------------------+
only showing top 2 rows



In [0]:
dim_payment.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_payment")

In [0]:
%sql
SELECT * from dim_payment limit 2

PaymentId,Amount,RiderId,TimeId
22,9.0,1000,2021-02-01 00:00:00
23,9.0,1000,2021-03-01 00:00:00
