In [0]:
payments = spark.table("default.payments")
     

payments.write.format("delta").mode("overwrite").saveAsTable("default.fact_payments")

In [0]:
display(payments.limit(10))

payment_id,date,amount,rider_id
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000
11,2020-03-01,9.0,1000


In [0]:
riders = spark.table("default.riders")
     

riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_riders")

In [0]:
display(riders.limit(10))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True
1010,Tracy,Austin,92973 Mary Ville,1996-04-07,2019-12-27,,True


In [0]:
stations = spark.table("default.stations")
     

stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")

In [0]:
display(stations.limit(10))

station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001
TA1306000012,Wells St & Huron St,41.89475366666667,-87.63440200000001


In [0]:
from pyspark.sql import functions as F 
from pyspark.sql import types as T 
from pyspark.sql.types import StringType

beginDate = spark.sql('select min(start_at) as start_date FROM trips').first()['start_date'] 
endDate = spark.sql('select dateadd(year,5,max(start_at)) as end_date FROM trips').first()['end_date'] 

( spark.sql(f"select explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as calendarDate") .createOrReplaceTempView('dim_times') ) 

df=spark.sql('select * from dim_times') 

df = df.withColumn("time_id", F.to_timestamp(df.calendarDate, "yyyy-MM-dd"))\
  .withColumn("d_date", df.calendarDate.cast(StringType())) \
  .withColumn("day_of_month", F.dayofmonth(df.calendarDate))\
  .withColumn("day_of_week", F.dayofweek(df.calendarDate))\
  .withColumn("month", F.month(df.calendarDate))\
  .withColumn("month_name",F.date_format(df.calendarDate, 'MMMM'))\
  .withColumn("year", F.year(df.calendarDate))\
  .withColumn("quarter", F.quarter(df.calendarDate))\
  .withColumn("week_of_year", F.weekofyear(df.calendarDate))
df=df.drop(df.calendarDate)
df.write.format('delta').mode('overwrite').saveAsTable('dim_time')

In [0]:
display(df.limit(10))

time_id,d_date,day_of_month,day_of_week,month,month_name,year,quarter,week_of_year
2021-02-01T00:00:00.000+0000,2021-02-01,1,2,2,February,2021,1,5
2021-02-02T00:00:00.000+0000,2021-02-02,2,3,2,February,2021,1,5
2021-02-03T00:00:00.000+0000,2021-02-03,3,4,2,February,2021,1,5
2021-02-04T00:00:00.000+0000,2021-02-04,4,5,2,February,2021,1,5
2021-02-05T00:00:00.000+0000,2021-02-05,5,6,2,February,2021,1,5
2021-02-06T00:00:00.000+0000,2021-02-06,6,7,2,February,2021,1,5
2021-02-07T00:00:00.000+0000,2021-02-07,7,1,2,February,2021,1,5
2021-02-08T00:00:00.000+0000,2021-02-08,8,2,2,February,2021,1,6
2021-02-09T00:00:00.000+0000,2021-02-09,9,3,2,February,2021,1,6
2021-02-10T00:00:00.000+0000,2021-02-10,10,4,2,February,2021,1,6


In [0]:
df=spark.sql('select trip_id,rideable_type,trips.start_at as start_at_id,trips.ended_at as ended_at_id,\
  trips.start_station_id,trips.end_station_id,\
  dim_riders.rider_id,DATEDIFF(YEAR,dim_riders.birthday,trips.start_at) as rider_age,\
  DATEDIFF(MINUTE,trips.start_at,trips.ended_at) duration\
  from trips join dim_riders on trips.rider_id=dim_riders.rider_id') 

df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('fact_trips')

In [0]:
display(df.limit(10))

trip_id,rideable_type,start_at_id,ended_at_id,start_station_id,end_station_id,rider_id,rider_age,duration
AB969616E25817D7,classic_bike,2021-06-13 00:27:17,2021-06-13 00:32:30,13409,13021,28101,37,5
5C1CD2814C6C8A59,classic_bike,2021-06-02 09:49:19,2021-06-02 10:00:44,TA1308000035,TA1308000009,72899,28,11
E188A33CEFFB17CA,classic_bike,2021-06-09 20:44:26,2021-06-09 20:56:40,KA1503000014,TA1308000009,74173,50,12
7F99CCC47AAF282C,classic_bike,2021-06-01 23:57:43,2021-06-02 00:54:00,13008,TA1308000009,35258,30,56
74EA07DD1D5B06B8,classic_bike,2021-06-28 17:48:41,2021-06-28 17:58:11,WL-012,13434,33871,32,9
1BA39075CCDED785,classic_bike,2021-06-27 16:35:46,2021-06-27 17:06:04,KA1503000049,TA1308000009,47457,23,30
D8E138EDB9488C6D,classic_bike,2021-06-06 15:27:21,2021-06-06 15:34:38,TA1306000015,13434,1917,51,7
6CE2D4199C20EFE2,classic_bike,2021-06-08 08:49:03,2021-06-08 09:00:49,13017,13021,47296,25,11
22FEF26D39047393,classic_bike,2021-06-17 20:02:30,2021-06-17 20:31:45,TA1306000011,13257,15460,15,29
B6AA7FE8CC7CCB6F,classic_bike,2021-06-19 08:27:35,2021-06-19 08:57:48,TA1306000013,TA1308000022,63404,44,30
