In [0]:
###dim_date###

print("  dimtable")
print("="*70)

from pyspark.sql.functions import *
from pyspark.sql.types import *

#  dim_date 
print("\n creat dim_date...")

# date range from both trips and payments
trips_dates = spark.sql("""
    SELECT DISTINCT date(start_time) as trip_date 
    FROM bronze_trips
    WHERE start_time IS NOT NULL
""")

payment_dates = spark.sql("""
    SELECT DISTINCT payment_date 
    FROM bronze_payments
    WHERE payment_date IS NOT NULL
""")

# Combine and get full date range
all_dates = trips_dates.select(col("trip_date").alias("date")) \
    .union(payment_dates.select(col("payment_date").alias("date"))) \
    .distinct() \
    .orderBy("date")

# Add date dimension attributes
dim_date = all_dates.select(
    # Primary key: YYYYMMDD format
    date_format(col("date"), "yyyyMMdd").cast("int").alias("date_key"),
    col("date"),
    year(col("date")).alias("year"),
    quarter(col("date")).alias("quarter"),
    month(col("date")).alias("month"),
    date_format(col("date"), "MMMM").alias("month_name"),
    dayofmonth(col("date")).alias("day"),
    dayofweek(col("date")).alias("day_of_week"),
    date_format(col("date"), "EEEE").alias("day_name"),
    when(dayofweek(col("date")).isin([1, 7]), True).otherwise(False).alias("is_weekend")
)

# Save as Delta table
dim_date.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_dim_date")

print(f" dim_date created: {dim_date.count():,} rows")
dim_date.show(5)




###dim_time###

print("\n Creatig dim_time...")

from pyspark.sql import Row

# Generate all possible hours and minutes (24 * 60 = 1440 combinations)
# For simplicity, we'll create hourly buckets
time_data = []
for hour in range(24):
    time_key = hour * 100  # 0, 100, 200, ..., 2300
    
    # Determine hour group
    if 6 <= hour < 12:
        hour_group = "Morning"
    elif 12 <= hour < 18:
        hour_group = "Afternoon" 
    elif 18 <= hour < 22:
        hour_group = "Evening"
    else:
        hour_group = "Night"
    
    # Business hours flag
    is_business_hours = 9 <= hour < 17
    
    # Rush hour flag (7-9 AM, 5-7 PM)
    is_rush_hour = (7 <= hour < 9) or (17 <= hour < 19)
    
    time_data.append(Row(
        time_key=time_key,
        hour=hour,
        minute=0,  # Simplified to hour level
        hour_group=hour_group,
        is_business_hours=is_business_hours,
        is_rush_hour=is_rush_hour
    ))

dim_time = spark.createDataFrame(time_data)

dim_time.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_dim_time")

print(f" dim_time created: {dim_time.count()} rows")
dim_time.show(8)


###dim_rider-station###

print("\n Creating dim_rider...")

dim_rider = spark.sql("""
    SELECT 
        rider_id as rider_key,
        rider_id,
        first_name,
        last_name,
        CONCAT(first_name, ' ', last_name) as full_name,
        address,
        birthday,
        account_start_date,
        is_member,
        -- Calculate age at account start
        FLOOR(DATEDIFF(account_start_date, birthday) / 365.25) as age_at_account_start,
        -- Calculate account tenure in years
        ROUND(DATEDIFF(CURRENT_DATE(), account_start_date) / 365.25, 2) as account_tenure_years
    FROM bronze_riders
""")

dim_rider.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_dim_rider")

print(f" dim_rider created: {dim_rider.count():,} rows")

# CREATE DIM_STATION - Station information
print("\n Creating dim_station...")

dim_station = spark.sql("""
    SELECT 
        station_id as station_key,
        station_id,
        station_name,
        latitude,
        longitude,
        -- Simple area classification based on coordinates
        CASE 
            WHEN latitude > 41.9 THEN 'North Chicago'
            WHEN latitude < 41.8 THEN 'South Chicago'  
            ELSE 'Central Chicago'
        END as station_area
    FROM bronze_stations
""")

dim_station.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_dim_station")

print(f" dim_station created: {dim_station.count()} rows")

print("\n ALL DIMENSION TABLES CREATED!")
print("="*70)

  dimtable

 creat dim_date...
 dim_date created: 462 rows
+--------+----------+----+-------+-----+----------+---+-----------+---------+----------+
|date_key|      date|year|quarter|month|month_name|day|day_of_week| day_name|is_weekend|
+--------+----------+----+-------+-----+----------+---+-----------+---------+----------+
|20130201|2013-02-01|2013|      1|    2|  February|  1|          6|   Friday|     false|
|20130301|2013-03-01|2013|      1|    3|     March|  1|          6|   Friday|     false|
|20130401|2013-04-01|2013|      2|    4|     April|  1|          2|   Monday|     false|
|20130501|2013-05-01|2013|      2|    5|       May|  1|          4|Wednesday|     false|
|20130601|2013-06-01|2013|      2|    6|      June|  1|          7| Saturday|      true|
+--------+----------+----+-------+-----+----------+---+-----------+---------+----------+
only showing top 5 rows

 Creatig dim_time...
 dim_time created: 24 rows
+--------+----+------+----------+-----------------+------------+
|t