In [None]:
# Import library
import pyspark.sql.types as T 
import pyspark.sql.functions as F
from datetime import datetime

## Gold Layer: Curated, Business-Ready Data
<img src="../img/star_schema.png" alt="star_schema" style="width: 600px;"/>

This star schema is designed to support key business outcomes by enabling detailed analysis of ride durations and revenue trends.

- **Ride Analysis**: Insights into ride duration by date/time factors, station usage, rider age, and membership type.  
- **Revenue Analysis**: Breakdown of spending by time periods and customer demographics.  

The schema includes fact tables (`fact_trip` and `fact_payment`) linked to dimension tables (`dim_date`, `dim_station`, and `dim_rider`) for streamlined reporting and analysis.

### Create Dimension tables

**Dimension table ``dim_station``**

In [None]:
dim_station = stations

**Dimension table ``dim_rider``**

In [None]:
dim_rider = riders.select(
    F.col("rider_id"),
    F.col("birthday"),
    F.col("is_member"),
    F.col("account_start_date").alias("start_date"),
    F.col("account_end_date").alias("end_date"),
    F.col("first"),
    F.col("last")
)

**Dimension table ``dim_date``**

In [None]:
df1 = payments.select(F.col("date_id"))
df2 = trips.select(F.col("ended_at").alias('date_id')) 
df3 = trips.select(F.col("started_at").alias('date_id'))
dim_date =  df1.union(df2).union(df3).distinct()
dim_date = dim_date.withColumn(
    "day_of_month",
    F.dayofmonth(F.col("date_id"))
)
dim_date = dim_date.withColumn(
    "month",
    F.month(F.col("date_id"))
)
dim_date = dim_date.withColumn(
    "year",
    F.year(F.col("date_id"))
)

display(dim_date)

date_id,day_of_month,month,year
2019-12-01T00:00:00.000+0000,1,12,2019
2014-02-01T00:00:00.000+0000,1,2,2014
2018-06-01T00:00:00.000+0000,1,6,2018
2016-02-01T00:00:00.000+0000,1,2,2016
2017-01-01T00:00:00.000+0000,1,1,2017
2014-11-01T00:00:00.000+0000,1,11,2014
2021-07-01T00:00:00.000+0000,1,7,2021
2019-01-01T00:00:00.000+0000,1,1,2019
2014-09-01T00:00:00.000+0000,1,9,2014
2020-08-01T00:00:00.000+0000,1,8,2020


### Create Fact tables

**Fact table ``fact_trip``**

In [None]:
# Fact table fact_trip
fact_trip = trips.select(
    F.col("trip_id"),
    F.col("started_at").alias("start_date_id"),
    F.col("ended_at").alias("end_date_id"),
    F.col("start_station_id"),
    F.col("end_station_id"),
    F.col("rider_id")
)

fact_trip = fact_trip.withColumn(
    "duration",
    F.round((F.col("end_date_id").cast('long') - F.col("start_date_id").cast('long')) / 60, 2)
)

birthday = riders.select(F.col('birthday'), F.col('rider_id').alias('r_rider_id'))
fact_trip = fact_trip.join(birthday, fact_trip.rider_id == birthday.r_rider_id, 'left').drop('r_rider_id')
fact_trip = fact_trip.withColumn(
    "rider_age",
    F.floor(F.date_diff(F.col("start_date_id"), F.col("birthday"))/365.2),
)

display(fact_trip)

trip_id,start_date_id,end_date_id,start_station_id,end_station_id,rider_id,duration,birthday,rider_age
7E1E50AC37E2DAD3,2021-08-14T14:01:36.000+0000,2021-08-14T14:34:49.000+0000,TA1309000007,13089,2644.0,33.22,1975-08-27T00:00:00.000+0000,48
ADFF32195521E952,2021-08-29T16:16:36.000+0000,2021-08-29T16:24:43.000+0000,13288,TA1308000031,37747.0,8.12,2002-01-21T00:00:00.000+0000,21
7C59843DB8D13CC7,2021-08-27T11:06:34.000+0000,2021-08-27T11:12:52.000+0000,TA1307000062,TA1305000020,63224.0,6.3,1986-09-27T00:00:00.000+0000,37
5B788004F8A5204C,2021-08-27T07:35:33.000+0000,2021-08-27T07:59:35.000+0000,13353,13242,45050.0,24.03,1993-06-30T00:00:00.000+0000,30
078629DD14B634AE,2021-08-08T15:00:30.000+0000,2021-08-08T15:22:57.000+0000,13353,13242,33762.0,22.45,1976-06-01T00:00:00.000+0000,47
5E98DA99CB0B52E4,2021-08-15T18:01:33.000+0000,2021-08-15T18:26:52.000+0000,13353,13242,33902.0,25.32,1984-06-28T00:00:00.000+0000,39
6A3F6243C9164889,2021-08-14T02:22:42.000+0000,2021-08-14T02:26:10.000+0000,13033,TA1305000020,47737.0,3.47,1997-01-13T00:00:00.000+0000,26
F034B9F0C7194317,2021-08-20T14:28:33.000+0000,2021-08-20T14:55:42.000+0000,TA1307000130,13089,28123.0,27.15,1977-12-02T00:00:00.000+0000,45
74EE09157161558A,2021-08-21T18:17:28.000+0000,2021-08-21T19:03:19.000+0000,15578,TA1308000031,60078.0,45.85,1992-08-23T00:00:00.000+0000,31
7EF8ED3865996053,2021-08-16T14:54:06.000+0000,2021-08-16T15:09:13.000+0000,13109,519,34360.0,15.12,1970-08-06T00:00:00.000+0000,53


**Fact table ``fact_payment``**

In [None]:
display(payments)

payment_id,date_id,rider_id,amount
1064462.0,2020-06-01T00:00:00.000+0000,9.0,42106.0
1064463.0,2020-07-01T00:00:00.000+0000,9.0,42106.0
1064464.0,2020-08-01T00:00:00.000+0000,9.0,42106.0
1064465.0,2020-09-01T00:00:00.000+0000,9.0,42106.0
1064466.0,2020-10-01T00:00:00.000+0000,9.0,42106.0
1064467.0,2020-11-01T00:00:00.000+0000,9.0,42106.0
1064468.0,2020-12-01T00:00:00.000+0000,9.0,42106.0
1064469.0,2021-01-01T00:00:00.000+0000,9.0,42106.0
1064470.0,2021-02-01T00:00:00.000+0000,9.0,42106.0
1064471.0,2021-03-01T00:00:00.000+0000,9.0,42106.0


In [None]:
# Fact table fact_payment
fact_payments = payments

### Save these tables to Delta in overwrite mode and register them as Delta tables.

In [None]:
# Remove table if exist
spark.sql("DROP TABLE IF EXISTS dim_station")
spark.sql("DROP TABLE IF EXISTS dim_rider")
spark.sql("DROP TABLE IF EXISTS dim_date")
spark.sql("DROP TABLE IF EXISTS fact_trip")
spark.sql("DROP TABLE IF EXISTS fact_payments")

# Save table
dim_station.write.format("delta").mode("overwrite").saveAsTable("dim_station")
dim_rider.write.format("delta").mode("overwrite").saveAsTable("dim_rider")
dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")
fact_trip.write.format("delta").mode("overwrite").saveAsTable("fact_trip")
fact_payments.write.format("delta").mode("overwrite").saveAsTable("fact_payments")