In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType


<h2>Load </h2>

In [0]:
payments = spark.table("default.staging_payments")

In [0]:
display(payments.limit(10))

payment_id,date,amount,rider_id
1574726,2021-02-01,9.0,61831
1574727,2021-03-01,9.0,61831
1574728,2021-04-01,9.0,61831
1574729,2021-05-01,9.0,61831
1574730,2021-06-01,9.0,61831
1574731,2021-07-01,9.0,61831
1574732,2021-08-01,9.0,61831
1574733,2021-09-01,9.0,61831
1574734,2021-10-01,9.0,61831
1574735,2021-11-01,9.0,61831


In [0]:
# Write data to fact_payments
spark.sql("drop table if exists default.fact_payments")
payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("default.fact_payments")

<h3> Load 

In [0]:
riders = spark.table("default.staging_riders")

In [0]:
display(riders.limit(10))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True
57259,Bryan,Manning,089 Sarah Square,1984-11-05,2018-08-10,,True
57260,Michele,Rowe,3157 Nicole Ferry Apt. 826,1997-09-21,2016-06-03,,True
57261,John,Mckenzie,312 Jessica Wells,2002-10-13,2016-02-01,2018-07-01,True
57262,Tami,Rivera,910 Lopez Pass Apt. 426,2001-06-29,2020-11-12,,True
57263,Joseph,Hodge,411 Mccoy Haven,1993-04-30,2020-05-29,,False
57264,Lauren,Brown,667 Rodriguez Ramp,2002-10-20,2020-05-10,,False
57265,Stephanie,Reed,90789 Fowler Circle,1993-03-20,2018-09-05,2019-09-01,True
57266,Brittney,Lamb,55635 Valerie Falls,1993-12-09,2020-10-10,,True


In [0]:
# Write data to dim_rider
spark.sql("drop table if exists default.dim_rider")
riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_rider")

<h3>Load  </h3>

In [0]:
stations = spark.table("default.staging_stations")


In [0]:
display(stations.limit(9))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334


In [0]:
# Write data to dim_station
spark.sql("drop table if exists default.dim_station")
stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")

<h3>Load </h3>

In [0]:
trips = spark.table("default.staging_trips")


In [0]:
display(trips.limit(10))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
7E1E50AC37E2DAD3,classic_bike,2021-08-14T14:01:36Z,2021-08-14T14:34:49Z,TA1309000007,13089,2644
ADFF32195521E952,classic_bike,2021-08-29T16:16:36Z,2021-08-29T16:24:43Z,13288,TA1308000031,37747
7C59843DB8D13CC7,electric_bike,2021-08-27T11:06:34Z,2021-08-27T11:12:52Z,TA1307000062,TA1305000020,63224
5B788004F8A5204C,classic_bike,2021-08-27T07:35:33Z,2021-08-27T07:59:35Z,13353,13242,45050
078629DD14B634AE,classic_bike,2021-08-08T15:00:30Z,2021-08-08T15:22:57Z,13353,13242,33762
5E98DA99CB0B52E4,classic_bike,2021-08-15T18:01:33Z,2021-08-15T18:26:52Z,13353,13242,33902
6A3F6243C9164889,classic_bike,2021-08-14T02:22:42Z,2021-08-14T02:26:10Z,13033,TA1305000020,47737
F034B9F0C7194317,classic_bike,2021-08-20T14:28:33Z,2021-08-20T14:55:42Z,TA1307000130,13089,28123
74EE09157161558A,classic_bike,2021-08-21T18:17:28Z,2021-08-21T19:03:19Z,15578,TA1308000031,60078
7EF8ED3865996053,electric_bike,2021-08-16T14:54:06Z,2021-08-16T15:09:13Z,13109,519,34360


In [0]:
fact_trips = trips.alias("tbl1").join(riders.alias("tbl2"), col("tbl1.rider_id") == col("tbl2.rider_id"), "left") \
            .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
            .withColumn('rider_age', round((unix_timestamp("account_start_date") - unix_timestamp('birthday'))/3600/24)) \
            .withColumnRenamed('start_at', 'start_time_id') \
            .withColumnRenamed('ended_at', 'end_time_id') \
            .select("trip_id", col("tbl1.rider_id"), "rideable_type", "start_station_id", "end_station_id", "start_time_id", "end_time_id", "duration", "rider_age")
            
display(fact_trips.limit(10))

trip_id,rider_id,rideable_type,start_station_id,end_station_id,start_time_id,end_time_id,duration,rider_age
7E1E50AC37E2DAD3,2644,classic_bike,TA1309000007,13089,2021-08-14T14:01:36Z,2021-08-14T14:34:49Z,33.0,16081.0
ADFF32195521E952,37747,classic_bike,13288,TA1308000031,2021-08-29T16:16:36Z,2021-08-29T16:24:43Z,8.0,7306.0
7C59843DB8D13CC7,63224,electric_bike,TA1307000062,TA1305000020,2021-08-27T11:06:34Z,2021-08-27T11:12:52Z,6.0,12635.0
5B788004F8A5204C,45050,classic_bike,13353,13242,2021-08-27T07:35:33Z,2021-08-27T07:59:35Z,24.0,10321.0
078629DD14B634AE,33762,classic_bike,13353,13242,2021-08-08T15:00:30Z,2021-08-08T15:22:57Z,22.0,16062.0
5E98DA99CB0B52E4,33902,classic_bike,13353,13242,2021-08-15T18:01:33Z,2021-08-15T18:26:52Z,25.0,13272.0
6A3F6243C9164889,47737,classic_bike,13033,TA1305000020,2021-08-14T02:22:42Z,2021-08-14T02:26:10Z,3.0,8867.0
F034B9F0C7194317,28123,classic_bike,TA1307000130,13089,2021-08-20T14:28:33Z,2021-08-20T14:55:42Z,27.0,13814.0
74EE09157161558A,60078,classic_bike,15578,TA1308000031,2021-08-21T18:17:28Z,2021-08-21T19:03:19Z,46.0,10274.0
7EF8ED3865996053,34360,electric_bike,13109,519,2021-08-16T14:54:06Z,2021-08-16T15:09:13Z,15.0,18105.0


In [0]:
# Write data to fact_trip
spark.sql("drop table if exists default.fact_trip")
fact_trips.write.format("delta").mode("overwrite").saveAsTable("default.fact_trip")

<h3>Load </h3>

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType
# Get min date from trips
min_date = trips.selectExpr('MIN(start_at) AS started_at').first().asDict()['started_at']
# Add date range as max start_at from trip plus 5 years
max_date = trips.selectExpr('DATEADD(year, 5, MAX(start_at)) AS started_at').first().asDict()['started_at']
expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_time = spark.createDataFrame([(1,)], ["time_id"])

dim_time = dim_time.withColumn("dateinit", f.explode(f.expr(expression)))
dim_time = dim_time.withColumn("date", f.to_timestamp(dim_time.dateinit, "yyyy-MM-dd"))

dim_time = dim_time \
            .withColumn("day_of_week", f.dayofweek(dim_time.date)) \
            .withColumn("day_of_month", f.dayofmonth(dim_time.date)) \
            .withColumn("week_of_year", f.weekofyear(dim_time.date)) \
            .withColumn("year", f.year(dim_time.date)) \
            .withColumn("quarter", f.quarter(dim_time.date)) \
            .withColumn("month", f.month(dim_time.date)) \
            .withColumn("time_id", dim_time.date.cast(StringType())) \
            .drop(f.col("dateinit"))

In [0]:
display(dim_time.selectExpr('MIN(date)').show())
display(dim_time.selectExpr('MAX(date)').show())

+-------------------+
|          min(date)|
+-------------------+
|2021-02-01 00:00:00|
+-------------------+

+-------------------+
|          max(date)|
+-------------------+
|2027-01-31 00:00:00|
+-------------------+



In [0]:
# Write data to dim_time
spark.sql("drop table if exists default.dim_time")
dim_time.write.format("delta").mode("overwrite").saveAsTable("default.dim_time")