###Transform the data into the star schema for a Gold data store

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StringType
from pyspark.sql.functions import *

#### Create **fact_payment** table

In [0]:
payments = spark.table("staging_payments")
display(payments.limit(5))

payment_id,date,amount,rider_id
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000


In [0]:
spark.sql('DROP TABLE IF EXISTS fact_payments')
payments.dropDuplicates(['payment_id']).write.format('delta').mode('overwrite').saveAsTable('fact_payments')

In [0]:
fact_payments = spark.table('fact_payments')
display(fact_payments.limit(5))

payment_id,date,amount,rider_id
10000,2021-03-01,12.81,1393
1000001,2018-08-01,9.0,39647
1000011,2019-06-01,9.0,39647
1000012,2019-07-01,9.0,39647
1000015,2019-10-01,9.0,39647


####Create **dim_station** table

In [0]:
stations = spark.table('staging_stations')
display(stations.limit(10))

station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001
TA1306000012,Wells St & Huron St,41.89475366666667,-87.63440200000001


In [0]:
spark.sql('DROP TABLE IF EXISTS dim_station')
stations.dropDuplicates(['station_id']).write.format('delta').mode('overwrite').saveAsTable('dim_station')

In [0]:
dim_station = spark.table('dim_station')
display(dim_station.limit(5))

station_id,name,latitude,longitude
13001,Michigan Ave & Washington St,41.8839840647265,-87.6246839761734
13006,LaSalle St & Washington St,41.882664,-87.63253
13008,Millennium Park,41.8810317,-87.62408432
13011,Canal St & Adams St,41.879255,-87.639904
13016,St. Clair St & Erie St,41.89434513742426,-87.62279838323593


####Create **dim_rider** table

In [0]:
riders = spark.table('staging_riders')
display(riders.limit(5))

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False


In [0]:
spark.sql('DROP TABLE IF EXISTS dim_rider')
riders.dropDuplicates(['rider_id']).write.format('delta').mode('overwrite').saveAsTable('dim_rider')

In [0]:
dim_rider = spark.table('dim_rider')
display(dim_rider.limit(5))

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
10000,Kenneth,Foster,03514 Bruce Lock,1980-08-02,2017-01-15,,True
10009,Brenda,Cook,1430 Stephanie Ferry Apt. 296,1984-08-28,2021-08-01,,True
10010,Scott,Hamilton,1632 Griffin Highway,1981-03-24,2020-02-19,,True
10011,Erica,Lindsey,74456 Hall Mountains,1991-06-16,2017-12-29,2018-12-01,True
10012,Natasha,James,94890 April Cove,1999-10-29,2018-07-17,,True


#### Create **fact_trips** table

In [0]:
trips = spark.table('staging_trips')
display(trips.limit(5))

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24 15:43:33,2021-02-24 15:49:05,18003,KP1705001026,36267


In [0]:
fact_trip = trips.alias('trip').join(riders.alias('rider'), f.col('trip.rider_id') == f.col('rider.rider_id'), 'left')  \
    .withColumn('rider_age', round((unix_timestamp('started_at') - unix_timestamp('rider.birthday','yyyy-MM-dd'))/3600/24/365)) \
    .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('started_at'))/60)) \
    .withColumnRenamed('started_at', 'start_time_id') \
    .withColumnRenamed('ended_at', 'end_time_id') \
    .select('trip_id', col('trip.rider_id'), 'rideable_type', 'start_station_id', 'end_station_id', 'start_time_id', 'duration', 'rider_age')  

In [0]:
display(fact_trip.limit(5))

trip_id,rider_id,rideable_type,start_station_id,end_station_id,start_time_id,duration,rider_age
0FEFDE2603568365,47854,classic_bike,525,16806,2021-02-14 17:52:38,20.0,38.0
E6159D746B2DBB91,70870,electric_bike,KA1503000012,TA1305000029,2021-02-09 19:10:18,9.0,33.0
B32D3199F1C2E75B,58974,classic_bike,637,TA1305000034,2021-02-02 17:49:41,4.0,19.0
83E463F23575F4BF,39608,electric_bike,13216,TA1309000055,2021-02-23 15:07:23,15.0,72.0
BDAA7E3494E8D545,36267,electric_bike,18003,KP1705001026,2021-02-24 15:43:33,6.0,27.0


In [0]:
riders.columns

Out[15]: ['rider_id',
 'first',
 'last',
 'address',
 'birthday',
 'account_start_date',
 'account_end_date',
 'is_member']

In [0]:
spark.sql('DROP TABLE IF EXISTS fact_trips')
fact_trip.dropDuplicates(['trip_id']).write.format('delta').mode('overwrite').saveAsTable('fact_trips')

#### Create **dim_time** table
[How to calculate Max(Date) and Min(Date) for DateType in pyspark dataframe?](https://stackoverflow.com/questions/50069061/how-to-calculate-maxdate-and-mindate-for-datetype-in-pyspark-dataframe)

[Expand PySpark dataframe for missing dates](https://stackoverflow.com/questions/71564254/expand-pyspark-dataframe-for-missing-dates)

[How to add ten days to existing date column in Pyspark](https://stackoverflow.com/questions/50703284/how-to-add-ten-days-to-existing-date-column-in-pyspark)

In [0]:
max_date = trips.selectExpr('MAX(started_at) AS started_at').first().asDict()['started_at']
min_date = trips.selectExpr('MIN(started_at) AS started_at').first().asDict()['started_at']
expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_date = spark.createDataFrame([(1,)], ["time_id"])

dim_date = dim_date.withColumn("dateinit", f.explode(f.expr(expression)))
dim_date = dim_date.withColumn("date", f.to_timestamp(dim_date.dateinit, "yyyy-MM-dd"))

dim_date = dim_date \
            .withColumn("week_of_year", f.weekofyear(dim_date.date)) \
            .withColumn("day_of_month", f.dayofmonth(dim_date.date)) \
            .withColumn("day_of_week", f.dayofweek(dim_date.date)) \
            .withColumn("year", f.year(dim_date.date)) \
            .withColumn("month", f.month(dim_date.date)) \
            .withColumn("quarter", f.quarter(dim_date.date)) \
            .withColumn("time_id", dim_date.date.cast(StringType())) \
            .drop(f.col("dateinit"))

In [0]:
spark.sql("DROP TABLE IF EXISTS dim_date")
dim_date.write.format("delta").mode("overwrite").saveAsTable("default.dim_date")
     