In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StringType
import pyspark.sql.functions as f

     
# Creating Star Schema(fact and dim tables)


# Fact payment table
payments = spark.table("default.staging_payments")
     
display(payments.limit(5))
     
# Write data to fact_payments
spark.sql("drop table if exists default.fact_payments")
payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("default.fact_payments")
     
# Fact trips table
trips = spark.table("default.staging_trips")

display(trips.limit(5))
     
fact_trips = trips.join(riders, trips.rider_id == riders.rider_id,"inner") \
            .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
            .withColumn('rider_age', round(datediff( to_date("start_at"), to_date("birthday"))/365.25)) \
            .select("trip_id", riders.rider_id, "rideable_type", "start_station_id", "end_station_id", "start_at", "ended_at", "duration", "rider_age")
            
display(fact_trips.limit(5))
     
# Write data to fact_trip
spark.sql("drop table if exists default.fact_trip")
fact_trips.write.format("delta").mode("overwrite").saveAsTable("default.fact_trip")     

# Dim riders table
riders = spark.table("default.staging_riders")
     
display(riders.limit(5))
     
# Write data to dim_rider
spark.sql("drop table if exists default.dim_rider")
riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_rider")
     

# Dim stations table
stations = spark.table("default.staging_stations")

display(stations.limit(5))
     
# Write data to dim_station
spark.sql("drop table if exists default.dim_station")
stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")
     
# Dim date table

# Get min date from trips
min_date = trips.selectExpr('MIN(start_at) as started_at').first().asDict()['started_at']
max_date = trips.selectExpr('MAX(ended_at) as ended_at').first().asDict() ['ended_at']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_date = spark.createDataFrame([(1,)], ["date_id"])

dim_date = dim_date.withColumn("dateinit", f.explode(f.expr(expression)))
dim_date = dim_date.withColumn("date", f.to_timestamp(dim_date.dateinit, "yyyy-MM-dd"))

dim_date = dim_date \
            .withColumn("day", f.dayofmonth(dim_date.date)) \
            .withColumn("month", f.month(dim_date.date)) \
            .withColumn("quarter", f.quarter(dim_date.date)) \
            .withColumn("year", f.year(dim_date.date)) \
            .withColumn("day_of_year", f.dayofyear(dim_date.date)) \
            .withColumn("day_of_week", f.dayofweek(dim_date.date)) \
            .withColumn("date_id", dim_date.date.cast(StringType())) \
            .drop(f.col("dateinit"))

display(dim_date.limit(5))
     
# Write data to dim_time
spark.sql("drop table if exists default.dim_time")
dim_date.write.format("delta").mode("overwrite").saveAsTable("default.dim_date")
     


payment_id,date,amount,rider_id
539256,2020-08-01,9.0,21826
539257,2020-09-01,9.0,21826
539258,2020-10-01,9.0,21826
539259,2020-11-01,9.0,21826
539260,2020-12-01,9.0,21826


trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
F6F309843C09CAAC,classic_bike,2021-09-02 18:36:00,2021-09-02 18:48:43,TA1306000026,TA1307000041,70456
BD496FA19316E89C,classic_bike,2021-09-02 17:23:28,2021-09-02 17:32:43,TA1306000026,TA1309000019,24732
657E606A35206CC1,classic_bike,2021-09-05 23:13:43,2021-09-05 23:30:26,15642,TA1305000041,11875
B10DEEC4A5D90EB3,electric_bike,2021-09-09 18:26:00,2021-09-09 18:40:17,TA1307000001,TA1308000049,33064
268A00298A05078B,classic_bike,2021-09-06 18:49:27,2021-09-06 19:02:22,TA1306000026,TA1308000049,71976


trip_id,rider_id,rideable_type,start_station_id,end_station_id,start_at,ended_at,duration,rider_age
222BB8E5059252D7,34062,classic_bike,KA1503000064,13021,2021-06-13 09:48:47,2021-06-13 10:07:23,19.0,30.0
1826E16CB5486018,5342,classic_bike,TA1306000010,13021,2021-06-21 22:59:13,2021-06-21 23:04:29,5.0,26.0
3D9B6A0A5330B04D,3714,classic_bike,TA1305000030,13021,2021-06-18 16:06:42,2021-06-18 16:12:02,5.0,26.0
07E82F5E9C9E490F,18793,classic_bike,TA1305000034,13021,2021-06-17 16:46:23,2021-06-17 17:02:45,16.0,19.0
A8E94BAECBF0C2DD,43342,docked_bike,TA1308000009,TA1308000009,2021-06-13 17:36:29,2021-06-13 18:30:39,54.0,28.0


rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False


station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326


date_id,date,day,month,quarter,year,day_of_year,day_of_week
2021-02-01 00:00:00,2021-02-01T00:00:00Z,1,2,1,2021,32,2
2021-02-02 00:00:00,2021-02-02T00:00:00Z,2,2,1,2021,33,3
2021-02-03 00:00:00,2021-02-03T00:00:00Z,3,2,1,2021,34,4
2021-02-04 00:00:00,2021-02-04T00:00:00Z,4,2,1,2021,35,5
2021-02-05 00:00:00,2021-02-05T00:00:00Z,5,2,1,2021,36,6
