In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType



In [0]:
payments = spark.table("default.staging_paymets")

In [0]:
display(payments.limit(10))

payments.printSchema

payment_id,date,amount,rider_id
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000
11,2020-03-01,9.0,1000


Out[8]: <bound method DataFrame.printSchema of DataFrame[payment_id: string, date: string, amount: string, rider_id: string]>

In [0]:
spark.sql("DROP TABLE IF EXISTS fact_payments")

Out[9]: DataFrame[]

In [0]:
payments.write.format("delta").mode("overwrite").saveAsTable("fact_payments")

In [0]:
trips = spark.table("default.staging_trips")
riders = spark.table("default.staging_riders")



In [0]:
fact_trips = trips.join(riders, trips.rider_id == riders.rider_id,"inner") \
.withColumn('age', round(datediff( to_date("account_start_date"), to_date("birthday"))/365.25)) \
.withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp("started_at"))/60)) \
.select("trip_id", riders.rider_id, "rideable_type", "started_at", "ended_at", "start_station_id", "end_station_id", "age", "duration")
display(fact_trips.limit(10))

trip_id,rider_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,age,duration
9506E0F317DE3528,47823,classic_bike,2021-08-06 22:49:29,2021-08-06 22:55:16,13146,TA1309000024,28.0,6.0
02F52EC184442971,71777,electric_bike,2021-08-23 09:51:29,2021-08-23 10:00:29,13146,TA1307000151,33.0,9.0
B47FF3EEB2039C65,39370,electric_bike,2021-08-24 11:02:39,2021-08-24 11:05:40,13206,13158,32.0,3.0
CA6B1F0DE566112F,14990,docked_bike,2021-08-12 12:03:00,2021-08-12 12:21:01,13430,TA1307000151,39.0,18.0
D97271CCFC2B5076,69717,classic_bike,2021-08-05 23:07:41,2021-08-05 23:13:31,13146,TA1309000024,17.0,6.0
7FFF0C8B55E9EAE8,67920,classic_bike,2021-08-29 15:11:37,2021-08-29 15:29:23,13146,13278,18.0,18.0
7CAE020D54FC25FC,74623,classic_bike,2021-08-02 22:23:44,2021-08-02 22:41:23,13146,13278,27.0,18.0
8F6B49548B56B371,42281,classic_bike,2021-08-22 01:49:26,2021-08-22 01:56:02,13146,TA1309000024,17.0,7.0
4EB138A939033459,9166,classic_bike,2021-08-18 16:59:10,2021-08-18 17:26:20,13146,13158,21.0,27.0
AF621584FA0FFDCA,24637,classic_bike,2021-08-28 19:52:49,2021-08-28 19:59:13,13146,TA1309000024,28.0,6.0


In [0]:
fact_trips.write.format("delta").mode("overwrite").saveAsTable("fact_trips")

In [0]:
stations = spark.table("default.staging_stations")

In [0]:
display(stations.limit(10))

station_id,name,latitude,longtitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001
TA1306000012,Wells St & Huron St,41.89475366666667,-87.63440200000001


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_stations")

Out[16]: DataFrame[]

In [0]:
stations.write.format("delta").mode("overwrite").saveAsTable("dim_stations")

In [0]:
spark.sql("DROP TABLE IF EXISTS dim_riders")

Out[18]: DataFrame[]

In [0]:
riders.write.format("delta").mode("overwrite").saveAsTable("dim_riders")

In [0]:
trips

Out[20]: DataFrame[trip_id: string, rideable_type: string, started_at: string, ended_at: string, start_station_id: string, end_station_id: string, rider_id: string]

In [0]:
max_date = trips.selectExpr('MAX(ended_at) as ended_at').first().asDict() ['ended_at']
min_date = trips.selectExpr('MIN(started_at) as started_at').first().asDict() ['started_at']

dim_date = spark.createDataFrame([(1,)], ["time_id"])

dim_date = dim_date.withColumn("initial_date", explode(expr(f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)")))
dim_date = dim_date.withColumn("date", to_timestamp(dim_date.initial_date))


In [0]:
dim_date = dim_date.withColumn("day", dayofmonth(dim_date.date)) \
.withColumn("month", month(dim_date.date)) \
.withColumn("quarter", quarter(dim_date.date)) \
.withColumn("year", year(dim_date.date)) \
.withColumn("day_of_year", dayofyear(dim_date.date)) \
.withColumn("day_of_week", dayofweek(dim_date.date)) \
.withColumn("time_id", dim_date.date.cast(StringType())) \
.drop(col("initial_date"))

display(dim_date.limit(10))

time_id,date,day,month,quarter,year,day_of_year,day_of_week
2021-02-01 00:00:00,2021-02-01T00:00:00.000+0000,1,2,1,2021,32,2
2021-02-02 00:00:00,2021-02-02T00:00:00.000+0000,2,2,1,2021,33,3
2021-02-03 00:00:00,2021-02-03T00:00:00.000+0000,3,2,1,2021,34,4
2021-02-04 00:00:00,2021-02-04T00:00:00.000+0000,4,2,1,2021,35,5
2021-02-05 00:00:00,2021-02-05T00:00:00.000+0000,5,2,1,2021,36,6
2021-02-06 00:00:00,2021-02-06T00:00:00.000+0000,6,2,1,2021,37,7
2021-02-07 00:00:00,2021-02-07T00:00:00.000+0000,7,2,1,2021,38,1
2021-02-08 00:00:00,2021-02-08T00:00:00.000+0000,8,2,1,2021,39,2
2021-02-09 00:00:00,2021-02-09T00:00:00.000+0000,9,2,1,2021,40,3
2021-02-10 00:00:00,2021-02-10T00:00:00.000+0000,10,2,1,2021,41,4


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_date")

Out[24]: DataFrame[]

In [0]:
dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")