In [0]:

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import StringType

In [0]:

##DimStation table created
df=spark.table('stations')
display(df.limit(10))
df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('DimStation')

station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326


In [0]:
##DimRider table created
df=spark.table('riders')
display(df.limit(10))
df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('DimRider')

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True
1010,Tracy,Austin,92973 Mary Ville,1996-04-07,2019-12-27,,True


In [0]:
##DimDate table created
beginDate = spark.sql('select min(start_at) as start_date FROM trips').first()['start_date']
endDate = spark.sql('select dateadd(year,5,max(start_at)) as end_date FROM trips').first()['end_date']


(
  spark.sql(f"select explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as calendarDate")
    .createOrReplaceTempView('DimeDates')
)

df=spark.sql('select * from DimeDates')
df = df.withColumn("time_id", F.to_timestamp(df.calendarDate, "yyyy-MM-dd"))\
        .withColumn("d_date", df.calendarDate.cast(StringType()))\
        .withColumn("day_of_month", F.dayofmonth(df.calendarDate))\
        .withColumn("day_of_week", F.dayofweek(df.calendarDate))\
        .withColumn("month", F.month(df.calendarDate))\
        .withColumn("month_name",F.date_format(df.calendarDate, 'MMMM'))\
        .withColumn("year", F.year(df.calendarDate))\
        .withColumn("quarter", F.quarter(df.calendarDate))\
        .withColumn("week_of_year", F.weekofyear(df.calendarDate))\
        
df=df.drop(df.calendarDate)            
df.write.format('delta').mode('overwrite').saveAsTable('DimDate')

In [0]:
##FactPayment table created
df=spark.sql('select payment_id,to_timestamp(date) as date,amount,rider_id from payments')
display(df.limit(5))
df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('FactPayment')

payment_id,date,amount,rider_id
2,2019-06-01T00:00:00.000+0000,9.0,1000
3,2019-07-01T00:00:00.000+0000,9.0,1000
4,2019-08-01T00:00:00.000+0000,9.0,1000
5,2019-09-01T00:00:00.000+0000,9.0,1000
6,2019-10-01T00:00:00.000+0000,9.0,1000


In [0]:
## FactTrip table created
df=spark.sql('select tr.trip_id,tr.rideable_type,tr.start_at ,tr.ended_at,\
                    tr.start_station_id,tr.end_station_id,\
             r.rider_id,DATEDIFF(YEAR,r.birthday,tr.start_at) as rider_age,\
             DATEDIFF(MINUTE,tr.start_at,tr.ended_at) duration\
             from trips as tr join riders as r on tr.rider_id=r.rider_id')
display(df.limit(5))
df=df.dropDuplicates(df.columns)
df.write.format('delta').mode('overwrite').saveAsTable('FactTrip')

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id,rider_age,duration
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,16806,47854,38,19
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,TA1305000029,70870,33,8
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,TA1305000034,58974,19,4
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,TA1309000055,39608,71,15
BDAA7E3494E8D545,electric_bike,2021-02-24T15:43:33.000+0000,2021-02-24T15:49:05.000+0000,18003,KP1705001026,36267,27,5
