In [0]:
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StringType

# Fact Payments

In [0]:
df_staging_payments = spark.table("staging_payments")

In [0]:
display(df_staging_payments.limit(5))

payment_id,date,amount,rider_id
1064462,2020-06-01,9.0,42106
1064463,2020-07-01,9.0,42106
1064464,2020-08-01,9.0,42106
1064465,2020-09-01,9.0,42106
1064466,2020-10-01,9.0,42106


In [0]:
spark.sql("DROP TABLE IF EXISTS fact_payments")

df_staging_payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("fact_payments")

In [0]:
fact_payments = spark.sql("SELECT * FROM fact_payments")

display(fact_payments.limit(5))

payment_id,date,amount,rider_id
819,2021-08-01,20.9,1034
822,2021-11-01,14.76,1034
927,2019-12-01,9.0,1038
982,2017-02-01,9.0,1040
1312,2017-11-01,9.0,1048


# Dim Riders

In [0]:
df_staging_riders = spark.table("staging_riders")

In [0]:
display(df_staging_riders.limit(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True
57259,Bryan,Manning,089 Sarah Square,1984-11-05,2018-08-10,,True
57260,Michele,Rowe,3157 Nicole Ferry Apt. 826,1997-09-21,2016-06-03,,True
57261,John,Mckenzie,312 Jessica Wells,2002-10-13,2016-02-01,2018-07-01,True


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_riders")

df_staging_riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("dim_riders")

In [0]:
dim_riders = spark.sql("SELECT * FROM dim_riders")

display(dim_riders.limit(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1144,Michael,Chaney,7751 Sandra Dale,1980-09-02,2021-10-01,,True
1444,Monica,Banks,6753 Rhonda Prairie,1984-10-17,2021-07-30,,False
1461,Bill,Craig,078 Richards Shoal,2000-10-27,2019-02-05,,True
1994,Jonathan,Cain,85710 Scott Island,1994-03-23,2016-03-16,,True
2026,Cheryl,Rodriguez,0434 Sarah Highway Suite 180,1997-03-04,2018-08-07,,False



# Dim Stations

In [0]:
df_staging_stations = spark.table("staging_stations")

In [0]:
display(df_staging_stations.limit(5))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_stations")

df_staging_stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("dim_stations")

In [0]:
dim_stations = spark.sql("SELECT * FROM dim_stations")

display(dim_stations.limit(5))

station_id,name,latitude,longitude
13192,Halsted St & Dickens Ave,41.919936,-87.64883
KA1504000162,Clark St & Lunt Ave,42.009074,-87.67419
15634,Western Ave & Roscoe St,41.943093,-87.6873335
15539,Desplaines St & Jackson Blvd,41.878161166666665,-87.64428766666668
TA1305000022,Orleans St & Merchandise Mart Plaza,41.888243,-87.63639



# Fact Trips

In [0]:
df_staging_trips = spark.table("staging_trips")

In [0]:
display(df_staging_trips.limit(5))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
222BB8E5059252D7,classic_bike,2021-06-13T09:48:47Z,2021-06-13T10:07:23Z,KA1503000064,13021,34062
1826E16CB5486018,classic_bike,2021-06-21T22:59:13Z,2021-06-21T23:04:29Z,TA1306000010,13021,5342
3D9B6A0A5330B04D,classic_bike,2021-06-18T16:06:42Z,2021-06-18T16:12:02Z,TA1305000030,13021,3714
07E82F5E9C9E490F,classic_bike,2021-06-17T16:46:23Z,2021-06-17T17:02:45Z,TA1305000034,13021,18793
A8E94BAECBF0C2DD,docked_bike,2021-06-13T17:36:29Z,2021-06-13T18:30:39Z,TA1308000009,TA1308000009,43342


In [0]:
fact_trips = spark.sql('SELECT * FROM staging_trips') \
                        .join(df_staging_riders,on='rider_id') \
                        .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
                        .withColumn('rider_age', round((unix_timestamp("account_start_date") - unix_timestamp('birthday'))/3600/24)) \
                        .select("trip_id", "rider_id", "rideable_type", "start_station_id", "end_station_id", "start_at", "ended_at", "duration", "rider_age")
            

In [0]:
spark.sql("DROP TABLE IF EXISTS fact_trips")

df_staging_trips.write.format("delta").mode("overwrite").saveAsTable("fact_trips")

In [0]:
fact_trips = spark.sql("SELECT * FROM fact_trips")

display(fact_trips.limit(5))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
F6F309843C09CAAC,classic_bike,2021-09-02T18:36:00Z,2021-09-02T18:48:43Z,TA1306000026,TA1307000041,70456
BD496FA19316E89C,classic_bike,2021-09-02T17:23:28Z,2021-09-02T17:32:43Z,TA1306000026,TA1309000019,24732
657E606A35206CC1,classic_bike,2021-09-05T23:13:43Z,2021-09-05T23:30:26Z,15642,TA1305000041,11875
B10DEEC4A5D90EB3,electric_bike,2021-09-09T18:26:00Z,2021-09-09T18:40:17Z,TA1307000001,TA1308000049,33064
268A00298A05078B,classic_bike,2021-09-06T18:49:27Z,2021-09-06T19:02:22Z,TA1306000026,TA1308000049,71976



# Dim Time

In [0]:
min_date = df_staging_trips.selectExpr('MIN(start_at) AS started_at').first().asDict()['started_at']
max_date = df_staging_trips.selectExpr('MAX(start_at) AS started_at').first().asDict()['started_at']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_times = spark.createDataFrame([(1,)], ["time_id"])

dim_times = dim_times.withColumn("dateinit", f.explode(f.expr(expression)))
dim_times = dim_times.withColumn("date", f.to_timestamp(dim_times.dateinit, "yyyy-MM-dd"))

dim_times = dim_times \
            .withColumn("day_of_month", f.dayofmonth(dim_times.date)) \
            .withColumn("day_of_week", f.dayofweek(dim_times.date)) \
            .withColumn("year", f.year(dim_times.date)) \
            .withColumn("month", f.month(dim_times.date)) \
            .withColumn("quarter", f.quarter(dim_times.date)) \
            .withColumn("week_of_year", f.weekofyear(dim_times.date)) \
            .withColumn("time_id", dim_times.date.cast(StringType())) \
            .drop(f.col("dateinit"))

In [0]:
spark.sql("DROP TABLE IF EXISTS dim_times")

dim_times.write.format("delta").mode("overwrite").saveAsTable("dim_times")

In [0]:
dim_times = spark.sql("SELECT * FROM dim_times")

display(dim_times.limit(5))

time_id,date,day_of_month,day_of_week,year,month,quarter,week_of_year
2021-02-01 00:00:00,2021-02-01T00:00:00Z,1,2,2021,2,1,5
2021-02-02 00:00:00,2021-02-02T00:00:00Z,2,3,2021,2,1,5
2021-02-03 00:00:00,2021-02-03T00:00:00Z,3,4,2021,2,1,5
2021-02-04 00:00:00,2021-02-04T00:00:00Z,4,5,2021,2,1,5
2021-02-05 00:00:00,2021-02-05T00:00:00Z,5,6,2021,2,1,5
