### Riders dimension

In [0]:
dim_rider = spark.sql('''
    SELECT 
        rider_id, firstName, lastName, address, birthday, startDate, endDate, isMember
    FROM riders
''')

In [0]:
dim_rider.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_rider")

In [0]:
%sql
SELECT * from dim_rider limit 5

rider_id,firstName,lastName,address,birthday,startDate,endDate,isMember
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13T00:00:00.000+0000,2019-04-23T00:00:00.000+0000,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10T00:00:00.000+0000,2019-11-01T00:00:00.000+0000,2020-09-01T00:00:00.000+0000,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10T00:00:00.000+0000,2022-02-04T00:00:00.000+0000,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29T00:00:00.000+0000,2019-08-26T00:00:00.000+0000,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11T00:00:00.000+0000,2019-09-14T00:00:00.000+0000,,True


### Station Dimension

In [0]:
dim_station = spark.sql('''
    SELECT station_id, name, latitude , longitude FROM stations
''')

In [0]:
dim_station.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_station")

In [0]:
%sql
SELECT * from dim_station limit 5

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


### Time Dimension

In [0]:
min_date_qry = spark.sql('''
    SELECT MIN(started_at) as started_at FROM trips
''')
min_date = min_date_qry.first().asDict()['started_at']

max_date_qry = spark.sql('''
    SELECT DATEADD(year, 5, MAX(started_at)) as started_at FROM trips
''')
max_date = max_date_qry.first().asDict()['started_at']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType


dim_time = spark.createDataFrame([(1,)], ["time_id"])

dim_time = dim_time.withColumn("dateinit", f.explode(f.expr(expression)))
dim_time = dim_time.withColumn("date", f.to_timestamp(dim_time.dateinit, "yyyy-MM-dd"))
dim_time = dim_time \
            .withColumn("year", f.year(dim_time.date)) \
            .withColumn("quarter", f.quarter(dim_time.date)) \
            .withColumn("month", f.month(dim_time.date)) \
            .withColumn("day", f.dayofweek(dim_time.date)) \
            .withColumn("time_id", dim_time.date.cast(StringType())) \
            .drop(f.col("dateinit"))

dim_time.show(5)

In [0]:
dim_time.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_time")

In [0]:
%sql
select * from dim_time limit 5

time_id,date,year,quarter,month,day
2021-02-01 00:00:00,2021-02-01T00:00:00.000+0000,2021,1,2,2
2021-02-02 00:00:00,2021-02-02T00:00:00.000+0000,2021,1,2,3
2021-02-03 00:00:00,2021-02-03T00:00:00.000+0000,2021,1,2,4
2021-02-04 00:00:00,2021-02-04T00:00:00.000+0000,2021,1,2,5
2021-02-05 00:00:00,2021-02-05T00:00:00.000+0000,2021,1,2,6


### Trip Fact

In [0]:
fact_trip = spark.sql('''
    SELECT 
        trips.trip_id,
        riders.rider_id,
        trips.start_station_id, 
        trips.end_station_id, 
        start_time.time_id                                                  AS start_time_id,
        end_time.time_id                                                    AS end_time_id,
        trips.rideable_type,
        DATEDIFF(hour, trips.started_at, trips.ended_at)                    AS duration,
        DATEDIFF(year, riders.birthday, trips.started_at)                     AS rider_age

    FROM trips
    JOIN riders                     ON riders.rider_id = trips.rider_id
    JOIN dim_time AS start_time     ON start_time.date = trips.started_at
    JOIN dim_time AS end_time       ON end_time.date = trips.ended_at
''')

In [0]:
fact_trip.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_trip")

### Payment Fact

In [0]:
fact_payment = spark.sql('''
    SELECT
        payment_id,
        payments.amount,
        payments.rider_id,
        dim_time.time_id
    FROM payments
    JOIN dim_time ON dim_time.date = payments.date
''')

In [0]:
fact_payment.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payment")