In [0]:
from pyspark.sql.types import *
from pyspark.sql import functions as f



###Payment_fact

In [0]:
payments = spark.table("default.payments_load")
display(payments.limit(2))

payment_id,date,amount,rider_id
1574726,2021-02-01,9.0,61831
1574727,2021-03-01,9.0,61831


In [0]:
# create payment_fact
spark.sql("drop table if exists default.payment_fact")
payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("default.payment_fact")

###Rider_dim

In [0]:
riders = spark.table("default.riders_load")
display(riders.limit(2))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True


In [0]:
# create rider_dim
spark.sql("drop table if exists default.rider_dim")
riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.rider_dim")

###Station_dim

In [0]:
stations = spark.table("default.stations_load")
display(stations.limit(2))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668


In [0]:
# Create station_dim
spark.sql("drop table if exists default.station_dim")
stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.station_dim")

###Trip_fact

In [0]:
trips = spark.table("default.trips_load")
display(trips.limit(2))


trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56Z,2021-02-12T16:21:43Z,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,525,16806,47854


In [0]:
trip_fact = trips.alias("t1").join(riders.alias("t2"), col("t1.rider_id") == col("t2.rider_id"), "left") \
            .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
            .withColumn('rider_age', round(datediff(col("account_start_date"),col("birthday"))/365.25).cast('int'))  \
            .withColumnRenamed('start_at', 'start_time_id') \
            .withColumnRenamed('ended_at', 'end_time_id') \
            .select("trip_id", col("t1.rider_id"), "rideable_type", "start_station_id", "end_station_id", "start_time_id", "end_time_id", "duration", "rider_age")
            
display(trip_fact.limit(10))

trip_id,rider_id,rideable_type,start_station_id,end_station_id,start_time_id,end_time_id,duration,rider_age
89E7AA6C29227EFF,71934,classic_bike,525,660,2021-02-12T16:14:56Z,2021-02-12T16:21:43Z,7.0,37
0FEFDE2603568365,47854,classic_bike,525,16806,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,20.0,37
E6159D746B2DBB91,70870,electric_bike,KA1503000012,TA1305000029,2021-02-09T19:10:18Z,2021-02-09T19:19:10Z,9.0,33
B32D3199F1C2E75B,58974,classic_bike,637,TA1305000034,2021-02-02T17:49:41Z,2021-02-02T17:54:06Z,4.0,19
83E463F23575F4BF,39608,electric_bike,13216,TA1309000055,2021-02-23T15:07:23Z,2021-02-23T15:22:37Z,15.0,71
BDAA7E3494E8D545,36267,electric_bike,18003,KP1705001026,2021-02-24T15:43:33Z,2021-02-24T15:49:05Z,6.0,28
A772742351171257,50104,classic_bike,KP1705001026,KP1705001026,2021-02-01T17:47:42Z,2021-02-01T17:48:33Z,1.0,29
295476889D9B79F8,19618,classic_bike,18003,18003,2021-02-11T18:33:53Z,2021-02-11T18:35:09Z,1.0,21
362087194BA4CC9A,16732,classic_bike,KP1705001026,KP1705001026,2021-02-27T15:13:39Z,2021-02-27T15:36:36Z,23.0,15
21630F715038CCB0,57068,classic_bike,KP1705001026,KP1705001026,2021-02-20T08:59:42Z,2021-02-20T09:17:04Z,17.0,45


In [0]:
#create trip_fact
spark.sql("drop table if exists default.trip_fact")
trip_fact.write.format("delta").mode("overwrite").saveAsTable("default.trip_fact")

###Date_dim

In [0]:
# Find the minimum start date from the 'trips' DataFrame
min_date = trips.selectExpr('MIN(start_at) AS started_at').first().asDict()['started_at']

# Find the maximum start date from the 'trips' DataFrame and add 5 years to it
max_date = trips.selectExpr('DATEADD(year, 5, MAX(start_at)) AS started_at').first().asDict()['started_at']

# Define an expression to generate a sequence of dates starting from the minimum date and ending at the maximum date with an interval of 1 day
expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"

# Create a DataFrame 'date_dim' with a single row, 'date_id' column
date_dim = spark.createDataFrame([(1,)], ["date_id"])

# Explode the expression into individual dates and create a new column 'dateinit'
date_dim = date_dim.withColumn("dateinit", f.explode(f.expr(expression)))

# Convert the 'dateinit' column to Timestamp type and create a new column 'date' with the converted values
date_dim = date_dim.withColumn("date", f.to_timestamp(date_dim.dateinit, "yyyy-MM-dd"))

# Extract additional date-related columns from the 'date' column
date_dim = date_dim \
            .withColumn("date_id", date_dim.date.cast(StringType())) \
            .withColumn("day_of_week", f.dayofweek(date_dim.date)) \
            .withColumn("quarter", f.quarter(date_dim.date)) \
            .withColumn("month", f.month(date_dim.date)) \
            .withColumn("year", f.year(date_dim.date)) \
            .drop(f.col("dateinit"))

In [0]:
#create Date_dim
spark.sql("drop table if exists default.date_dim")
date_dim.write.format("delta").mode("overwrite").saveAsTable("default.date_dim")