In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

In [0]:
payments = spark.table("default.staging_paymets")

In [0]:
display(payments.limit(10))

payments.printSchema

payment_id,date,amount,rider_id
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000
11,2020-03-01,9.0,1000


In [0]:
spark.sql("DROP TABLE IF EXISTS fact_payments")

In [0]:
payments.write.format("delta").mode("overwrite").saveAsTable("fact_payments")

In [0]:
trips = spark.table("default.staging_trips")
riders = spark.table("default.staging_riders")

In [0]:
fact_trips = trips.join(riders, trips.rider_id == riders.rider_id,"inner") \
.withColumn('age', round(datediff( to_date("account_start_date"), to_date("birthday"))/365.25)) \
.withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp("started_at"))/60)) \
.select("trip_id", riders.rider_id, "rideable_type", "started_at", "ended_at", "start_station_id", "end_station_id", "age", "duration")
display(fact_trips.limit(10))

trip_id,rider_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,age,duration
222BB8E5059252D7,34062,classic_bike,2021-06-13 09:48:47,2021-06-13 10:07:23,KA1503000064,13021,30.0,19.0
1826E16CB5486018,5342,classic_bike,2021-06-21 22:59:13,2021-06-21 23:04:29,TA1306000010,13021,25.0,5.0
3D9B6A0A5330B04D,3714,classic_bike,2021-06-18 16:06:42,2021-06-18 16:12:02,TA1305000030,13021,26.0,5.0
07E82F5E9C9E490F,18793,classic_bike,2021-06-17 16:46:23,2021-06-17 17:02:45,TA1305000034,13021,18.0,16.0
A8E94BAECBF0C2DD,43342,docked_bike,2021-06-13 17:36:29,2021-06-13 18:30:39,TA1308000009,TA1308000009,28.0,54.0
378F4AB323AA1D14,6693,docked_bike,2021-06-13 13:20:10,2021-06-13 14:06:14,TA1308000009,TA1308000009,28.0,46.0
38AD311DC2EB1FBE,71480,docked_bike,2021-06-16 17:14:30,2021-06-16 17:28:34,KA1503000019,KA1503000019,56.0,14.0
1D466737F0B18097,50846,docked_bike,2021-06-27 14:51:52,2021-06-27 15:26:39,TA1308000009,TA1308000009,40.0,35.0
27E1142E1ACFAEFB,18951,electric_bike,2021-06-21 13:58:26,2021-06-21 13:58:53,13257,13257,20.0,0.0
67F2A115DAE77924,63987,classic_bike,2021-06-22 00:51:43,2021-06-22 01:08:25,TA1308000009,TA1308000009,37.0,17.0


In [0]:
fact_trips.write.format("delta").mode("overwrite").saveAsTable("fact_trips")

In [0]:
stations = spark.table("default.staging_stations")

In [0]:
display(stations.limit(10))

station_id,name,latitude,longtitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001
TA1306000012,Wells St & Huron St,41.89475366666667,-87.63440200000001


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_stations")

In [0]:
stations.write.format("delta").mode("overwrite").saveAsTable("dim_stations")

In [0]:
spark.sql("DROP TABLE IF EXISTS dim_riders")

In [0]:
riders.write.format("delta").mode("overwrite").saveAsTable("dim_riders")

In [0]:
trips

In [0]:
max_date = trips.selectExpr('MAX(ended_at) as ended_at').first().asDict() ['ended_at']
min_date = trips.selectExpr('MIN(started_at) as started_at').first().asDict() ['started_at']

dim_date = spark.createDataFrame([(1,)], ["time_id"])

dim_date = dim_date.withColumn("initial_date", explode(expr(f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)")))
dim_date = dim_date.withColumn("date", to_timestamp(dim_date.initial_date))

In [0]:
dim_date = dim_date.withColumn("day", dayofmonth(dim_date.date)) \
.withColumn("month", month(dim_date.date)) \
.withColumn("quarter", quarter(dim_date.date)) \
.withColumn("year", year(dim_date.date)) \
.withColumn("day_of_year", dayofyear(dim_date.date)) \
.withColumn("day_of_week", dayofweek(dim_date.date)) \
.withColumn("time_id", dim_date.date.cast(StringType())) \
.drop(col("initial_date"))

display(dim_date.limit(10))

time_id,date,day,month,quarter,year,day_of_year,day_of_week
2021-02-01 00:00:00,2021-02-01T00:00:00.000+0000,1,2,1,2021,32,2
2021-02-02 00:00:00,2021-02-02T00:00:00.000+0000,2,2,1,2021,33,3
2021-02-03 00:00:00,2021-02-03T00:00:00.000+0000,3,2,1,2021,34,4
2021-02-04 00:00:00,2021-02-04T00:00:00.000+0000,4,2,1,2021,35,5
2021-02-05 00:00:00,2021-02-05T00:00:00.000+0000,5,2,1,2021,36,6
2021-02-06 00:00:00,2021-02-06T00:00:00.000+0000,6,2,1,2021,37,7
2021-02-07 00:00:00,2021-02-07T00:00:00.000+0000,7,2,1,2021,38,1
2021-02-08 00:00:00,2021-02-08T00:00:00.000+0000,8,2,1,2021,39,2
2021-02-09 00:00:00,2021-02-09T00:00:00.000+0000,9,2,1,2021,40,3
2021-02-10 00:00:00,2021-02-10T00:00:00.000+0000,10,2,1,2021,41,4


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_date")

In [0]:
dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")