In [0]:
## Create fact payment table
payments_df = spark.table("default.payments")
     
payments_df.write.format("delta").mode("overwrite").saveAsTable("default.fact_payments")

In [0]:
display(payments_df.limit(5))

payment_id,date,amount,rider_id
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000


In [0]:
## Create dim riders table 

riders_df = spark.table("default.riders")
     
riders_df.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_riders")

In [0]:
display(riders_df.limit(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False


In [0]:
## Create dim stations table 
stations_df = spark.table("default.stations")
     
stations_df.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")

In [0]:
display(stations_df.limit(5))

station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326


In [0]:
## Create dim time table 

from pyspark.sql import functions as F 
from pyspark.sql import types as T 
from pyspark.sql.types import StringType

beginDate = spark.sql('select min(start_at) as start_date FROM trips').first()['start_date'] 
endDate = spark.sql('select dateadd(year,5,max(start_at)) as end_date FROM trips').first()['end_date'] 

( spark.sql(f"select explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as calendarDate") .createOrReplaceTempView('dim_times') ) 

df=spark.sql('select * from dim_times') 

df = df.withColumn("time_id", F.to_timestamp(df.calendarDate, "yyyy-MM-dd"))\
  .withColumn("d_date", df.calendarDate.cast(StringType()))\
  .withColumn("day_of_week", F.dayofweek(df.calendarDate))\
  .withColumn("day_of_month", F.dayofmonth(df.calendarDate))\
  .withColumn("week_of_year", F.weekofyear(df.calendarDate))\
  .withColumn("quarter", F.quarter(df.calendarDate))\
  .withColumn("month", F.month(df.calendarDate))\
  .withColumn("year", F.year(df.calendarDate))


df=df.drop(df.calendarDate)

df.write.format('delta').mode('overwrite').saveAsTable('dim_time')

In [0]:
display(df.limit(5))

time_id,d_date,day_of_week,day_of_month,week_of_year,quarter,month,year
2021-02-01T00:00:00.000+0000,2021-02-01,2,1,5,1,2,2021
2021-02-02T00:00:00.000+0000,2021-02-02,3,2,5,1,2,2021
2021-02-03T00:00:00.000+0000,2021-02-03,4,3,5,1,2,2021
2021-02-04T00:00:00.000+0000,2021-02-04,5,4,5,1,2,2021
2021-02-05T00:00:00.000+0000,2021-02-05,6,5,5,1,2,2021


In [0]:
## Create fact trips table 

df=spark.sql('SELECT trip_id,rideable_type, trips.start_at, trips.ended_at,\
  trips.start_station_id,trips.end_station_id,\
  dim_riders.rider_id,DATEDIFF(YEAR,dim_riders.birthday,trips.start_at) AS rider_age,\
  DATEDIFF(MINUTE,trips.start_at,trips.ended_at) duration\
  FROM trips JOIN dim_riders ON trips.rider_id=dim_riders.rider_id') 

df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('fact_trips')
     

In [0]:
display(df.limit(5))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id,rider_age,duration
93A6EDCF2894D658,docked_bike,2021-03-30T15:04:48.000+0000,2021-03-30T16:17:42.000+0000,TA1307000048,TA1307000048,70076,27,72
03C7A5392B64DBAE,docked_bike,2021-03-07T17:47:08.000+0000,2021-03-07T18:23:37.000+0000,TA1307000048,TA1307000048,51643,23,36
14CFBFBFBEB3F925,classic_bike,2021-03-23T08:00:53.000+0000,2021-03-23T08:42:42.000+0000,TA1307000048,TA1307000048,53408,29,41
22FDFB604B2104D0,classic_bike,2021-03-13T07:26:27.000+0000,2021-03-13T07:26:42.000+0000,TA1307000048,TA1307000048,42232,32,0
599FE144646D3EC3,classic_bike,2021-03-08T17:52:33.000+0000,2021-03-08T18:18:13.000+0000,13022,TA1309000010,31710,29,25
