In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import StringType

In [0]:
df=spark.table('stations')
display(df.limit(5))
df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('DimStation')

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.0127,-87.66606
KA1503000012,Clark St & Lake St,41.885796,-87.6311
637,Wood St & Chicago Ave,41.895634,-87.672066
13216,State St & 33rd St,41.834732,-87.625824
18003,Fairbanks St & Superior St,41.89581,-87.620255


In [0]:
df=spark.table('riders')
display(df.limit(5))
df=df.dropDuplicates(df.columns)
df=df.withColumn("rider_age_at_acc_start",F.floor(F.datediff(df.account_start_date, df.birthday)/365.25))
df.write.format('delta').mode('overwrite').saveAsTable('DimRider')

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


In [0]:


beginDate = spark.sql('select min(start_at) as start_date FROM trips').first()['start_date']
endDate = spark.sql('select dateadd(year,5,max(start_at)) as end_date FROM trips').first()['end_date']


(
  spark.sql(f"select explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as calendarDate")
    .createOrReplaceTempView('DimeDates')
)

df=spark.sql('select * from DimeDates')
df = df.withColumn("time_id", F.to_timestamp(df.calendarDate, "yyyy-MM-dd"))\
        .withColumn("d_date", df.calendarDate.cast(StringType()))\
        .withColumn("day_of_month", F.dayofmonth(df.calendarDate))\
        .withColumn("day_of_week", F.dayofweek(df.calendarDate))\
        .withColumn("month", F.month(df.calendarDate))\
        .withColumn("month_name",F.date_format(df.calendarDate, 'MMMM'))\
        .withColumn("year", F.year(df.calendarDate))\
        .withColumn("quarter", F.quarter(df.calendarDate))\
        .withColumn("week_of_year", F.weekofyear(df.calendarDate))\
        
df=df.drop(df.calendarDate)            
df.write.format('delta').mode('overwrite').saveAsTable('DimDate')


In [0]:
df=spark.sql('select payment_id,to_timestamp(date) as date,amount,rider_id from payments')
display(df.limit(5))
df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('FactPayment')

payment_id,date,amount,rider_id
1,2019-05-01T00:00:00.000+0000,9.0,1000
2,2019-06-01T00:00:00.000+0000,9.0,1000
3,2019-07-01T00:00:00.000+0000,9.0,1000
4,2019-08-01T00:00:00.000+0000,9.0,1000
5,2019-09-01T00:00:00.000+0000,9.0,1000


In [0]:
df=spark.sql('select trip_id,rideable_type,trips.start_at as start_at_id,trips.ended_at as ended_at_id,\
                    trips.start_station_id,trips.end_station_id,\
             dimrider.rider_id,DATEDIFF(YEAR,dimrider.birthday,trips.start_at) as rider_age,\
             DATEDIFF(MINUTE,trips.start_at,trips.ended_at) duration\
             from trips join dimrider on trips.rider_id=dimrider.rider_id')
display(df.limit(5))
df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('FactTrip')

trip_id,rideable_type,start_at_id,ended_at_id,start_station_id,end_station_id,rider_id,rider_age,duration
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660,71934,37,6
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,16806,47854,38,19
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,TA1305000029,70870,33,8
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,TA1305000034,58974,19,4
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,TA1309000055,39608,71,15
