In [0]:
# Import required libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, date

# Create bronze database
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")

# Create bronze layer tables
try:
    # Create payments table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS bronze.payments
    USING DELTA
    LOCATION '/FileStore/tables/bronze/payments'
    OPTIONS (
        header = false
    )
    AS 
    SELECT 
        _c0 as payment_id,
        cast(_c1 as date) as payment_date,
        cast(_c2 as decimal(10,2)) as amount,
        cast(_c3 as int) as rider_id,
        current_timestamp() as ingestion_timestamp,
        input_file_name() as source_file
    FROM csv.`/FileStore/tables/payments.csv`
    """)
    print("Created payments table")

    # Create trips table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS bronze.trips
    USING DELTA
    LOCATION '/FileStore/tables/bronze/trips'
    OPTIONS (
        header = false
    )
    AS 
    SELECT 
        _c0 as trip_id,
        _c1 as rideable_type,
        cast(_c2 as timestamp) as start_time,
        cast(_c3 as timestamp) as end_time,
        _c4 as start_station_id,
        _c5 as end_station_id,
        cast(_c6 as int) as rider_id,
        current_timestamp() as ingestion_timestamp,
        input_file_name() as source_file
    FROM csv.`/FileStore/tables/trips.csv`
    """)
    print("Created trips table")

    # Create riders table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS bronze.riders
    USING DELTA
    LOCATION '/FileStore/tables/bronze/riders'
    OPTIONS (
        header = false
    )
    AS 
    SELECT 
        cast(_c0 as int) as rider_id,
        _c1 as first_name,
        _c2 as last_name,
        _c3 as address,
        cast(_c4 as date) as birthday,
        cast(_c5 as date) as account_start_date,
        cast(_c6 as date) as account_end_date,
        cast(_c7 as boolean) as is_member,
        current_timestamp() as ingestion_timestamp,
        input_file_name() as source_file
    FROM csv.`/FileStore/tables/riders.csv`
    """)
    print("Created riders table")

    # Create stations table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS bronze.stations
    USING DELTA
    LOCATION '/FileStore/tables/bronze/stations'
    OPTIONS (
        header = false
    )
    AS 
    SELECT 
        _c0 as station_id,
        _c1 as name,
        cast(_c2 as double) as latitude,
        cast(_c3 as double) as longitude,
        current_timestamp() as ingestion_timestamp,
        input_file_name() as source_file
    FROM csv.`/FileStore/tables/stations.csv`
    """)
    print("Created stations table")

    # Set Delta Lake properties for all tables
    for table in ["payments", "trips", "riders", "stations"]:
        spark.sql(f"""
        ALTER TABLE bronze.{table}
        SET TBLPROPERTIES (
            'delta.autoOptimize.optimizeWrite' = 'true',
            'delta.autoOptimize.autoCompact' = 'true'
        )
        """)

    # Verify bronze layer tables
    print("\nVerifying bronze layer tables:")
    for table in ["payments", "trips", "riders", "stations"]:
        count = spark.sql(f"SELECT COUNT(*) as count FROM bronze.{table}").collect()[0]["count"]
        print(f"bronze.{table}: {count} records")
        # Show sample data
        print(f"\nSample data from bronze.{table}:")
        spark.sql(f"SELECT * FROM bronze.{table} LIMIT 5").show()

except Exception as e:
    print(f"Error in bronze layer setup: {str(e)}")

Created payments table
Created trips table
Created riders table
Created stations table

Verifying bronze layer tables:
bronze.payments: 1946607 records

Sample data from bronze.payments:
+----------+------------+------+--------+--------------------+--------------------+
|payment_id|payment_date|amount|rider_id| ingestion_timestamp|         source_file|
+----------+------------+------+--------+--------------------+--------------------+
|    539256|  2020-08-01|  9.00|   21826|2024-12-06 04:18:...|dbfs:/FileStore/t...|
|    539257|  2020-09-01|  9.00|   21826|2024-12-06 04:18:...|dbfs:/FileStore/t...|
|    539258|  2020-10-01|  9.00|   21826|2024-12-06 04:18:...|dbfs:/FileStore/t...|
|    539259|  2020-11-01|  9.00|   21826|2024-12-06 04:18:...|dbfs:/FileStore/t...|
|    539260|  2020-12-01|  9.00|   21826|2024-12-06 04:18:...|dbfs:/FileStore/t...|
+----------+------------+------+--------+--------------------+--------------------+

bronze.trips: 4584921 records

Sample data from bronze.t

In [0]:
# Create dimension tables for dates and times
try:
    # Create DimDate
    spark.sql("""
    CREATE TABLE IF NOT EXISTS bronze.dim_date
    USING DELTA
    LOCATION '/FileStore/tables/bronze/dim_date'
    AS
    WITH date_cte AS (
        SELECT explode(sequence(
            to_date('2019-01-01'),
            to_date('2022-12-31'),
            interval 1 day
        )) as full_date
    )
    SELECT
        date_format(full_date, 'yyyyMMdd') as date_key,
        full_date,
        year(full_date) as year,
        quarter(full_date) as quarter,
        month(full_date) as month,
        dayofmonth(full_date) as day,
        date_format(full_date, 'EEEE') as day_of_week,
        date_format(full_date, 'MMMM') as month_name,
        concat('Q', quarter(full_date)) as quarter_name,
        dayofweek(full_date) IN (1, 7) as is_weekend,
        false as is_holiday
    FROM date_cte
    """)
    print("Created DimDate successfully")

    # Create DimTime
    spark.sql("""
    CREATE TABLE IF NOT EXISTS bronze.dim_time
    USING DELTA
    LOCATION '/FileStore/tables/bronze/dim_time'
    AS
    WITH time_cte AS (
        SELECT explode(sequence(0, 1439)) as minute_of_day
    )
    SELECT
        minute_of_day as time_key,
        concat(
            lpad(cast(floor(minute_of_day/60) as string), 2, '0'),
            ':',
            lpad(cast(minute_of_day % 60 as string), 2, '0'),
            ':00'
        ) as full_time,
        floor(minute_of_day/60) as hour_24,
        case
            when floor(minute_of_day/60) = 0 then 12
            when floor(minute_of_day/60) <= 12 then floor(minute_of_day/60)
            else floor(minute_of_day/60) - 12
        end as hour_12,
        minute_of_day % 60 as minute,
        case when floor(minute_of_day/60) < 12 then 'AM' else 'PM' end as am_pm,
        case
            when floor(minute_of_day/60) < 6 then 'Night'
            when floor(minute_of_day/60) < 12 then 'Morning'
            when floor(minute_of_day/60) < 17 then 'Afternoon'
            when floor(minute_of_day/60) < 21 then 'Evening'
            else 'Night'
        end as time_of_day
    FROM time_cte
    ORDER BY minute_of_day
    """)
    print("Created DimTime successfully")

    # Set Delta properties for dimension tables
    for table in ["dim_date", "dim_time"]:
        spark.sql(f"""
        ALTER TABLE bronze.{table}
        SET TBLPROPERTIES (
            'delta.autoOptimize.optimizeWrite' = 'true',
            'delta.autoOptimize.autoCompact' = 'true'
        )
        """)

    # Verify dimension tables
    print("\nVerifying dimension tables:")
    for table in ["dim_date", "dim_time"]:
        count = spark.sql(f"SELECT COUNT(*) as count FROM bronze.{table}").collect()[0]["count"]
        print(f"bronze.{table}: {count} records")
        print(f"\nSample data from bronze.{table}:")
        spark.sql(f"SELECT * FROM bronze.{table} LIMIT 5").show()

except Exception as e:
    print(f"Error creating dimension tables: {str(e)}")

Created DimDate successfully
Created DimTime successfully

Verifying dimension tables:
bronze.dim_date: 1461 records

Sample data from bronze.dim_date:
+--------+----------+----+-------+-----+---+-----------+----------+------------+----------+----------+
|date_key| full_date|year|quarter|month|day|day_of_week|month_name|quarter_name|is_weekend|is_holiday|
+--------+----------+----+-------+-----+---+-----------+----------+------------+----------+----------+
|20190101|2019-01-01|2019|      1|    1|  1|    Tuesday|   January|          Q1|     false|     false|
|20190102|2019-01-02|2019|      1|    1|  2|  Wednesday|   January|          Q1|     false|     false|
|20190103|2019-01-03|2019|      1|    1|  3|   Thursday|   January|          Q1|     false|     false|
|20190104|2019-01-04|2019|      1|    1|  4|     Friday|   January|          Q1|     false|     false|
|20190105|2019-01-05|2019|      1|    1|  5|   Saturday|   January|          Q1|      true|     false|
+--------+----------+---

In [0]:
# Create gold database and implement star schema
spark.sql("CREATE DATABASE IF NOT EXISTS gold")

try:
    # Create DimStation
    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.dim_station
    USING DELTA
    LOCATION '/FileStore/tables/gold/dim_station'
    AS
    SELECT 
        uuid() as station_key,
        station_id,
        name,
        latitude,
        longitude,
        CASE 
            WHEN station_id LIKE 'C%' THEN 'Central District'
            WHEN station_id LIKE 'N%' THEN 'North District'
            ELSE 'Unknown'
        END as city_district
    FROM bronze.stations
    """)

    # Create DimRider
    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.dim_rider
    USING DELTA
    LOCATION '/FileStore/tables/gold/dim_rider'
    AS
    SELECT 
        uuid() as rider_key,
        rider_id,
        first_name,
        last_name,
        birthday,
        floor(months_between(current_date(), birthday)/12) as current_age,
        account_start_date,
        account_end_date,
        is_member,
        floor(months_between(account_start_date, birthday)/12) as age_at_account_start,
        CASE 
            WHEN account_end_date IS NULL THEN 'Active'
            ELSE 'Inactive'
        END as rider_status
    FROM bronze.riders
    """)

    # Move dimension tables to gold
    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.dim_date
    USING DELTA
    LOCATION '/FileStore/tables/gold/dim_date'
    AS SELECT * FROM bronze.dim_date
    """)

    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.dim_time
    USING DELTA
    LOCATION '/FileStore/tables/gold/dim_time'
    AS SELECT * FROM bronze.dim_time
    """)

    # Create FactTrip
    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.fact_trip
    USING DELTA
    LOCATION '/FileStore/tables/gold/fact_trip'
    PARTITIONED BY (start_date_key)
    AS
    SELECT 
        uuid() as trip_key,
        r.rider_key,
        ss.station_key as start_station_key,
        es.station_key as end_station_key,
        cast(date_format(t.start_time, 'yyyyMMdd') as int) as start_date_key,
        (hour(t.start_time) * 60 + minute(t.start_time)) as start_time_key,
        cast(date_format(t.end_time, 'yyyyMMdd') as int) as end_date_key,
        (hour(t.end_time) * 60 + minute(t.end_time)) as end_time_key,
        round((unix_timestamp(t.end_time) - unix_timestamp(t.start_time))/60, 2) as trip_duration_minutes,
        t.rideable_type
    FROM bronze.trips t
    LEFT JOIN gold.dim_station ss ON t.start_station_id = ss.station_id
    LEFT JOIN gold.dim_station es ON t.end_station_id = es.station_id
    LEFT JOIN gold.dim_rider r ON t.rider_id = r.rider_id
    """)

    # Create FactPayment
    spark.sql("""
    CREATE TABLE IF NOT EXISTS gold.fact_payment
    USING DELTA
    LOCATION '/FileStore/tables/gold/fact_payment'
    AS
    SELECT 
        uuid() as payment_key,
        r.rider_key,
        cast(date_format(p.payment_date, 'yyyyMMdd') as int) as date_key,
        p.amount
    FROM bronze.payments p
    LEFT JOIN gold.dim_rider r ON p.rider_id = r.rider_id
    """)

    # Set Delta properties for gold tables
    for table in ["dim_station", "dim_rider", "dim_date", "dim_time", "fact_trip", "fact_payment"]:
        spark.sql(f"""
        ALTER TABLE gold.{table}
        SET TBLPROPERTIES (
            'delta.autoOptimize.optimizeWrite' = 'true',
            'delta.autoOptimize.autoCompact' = 'true',
            'delta.enableChangeDataFeed' = 'true'
        )
        """)

    # Optimize fact tables (without partition columns)
    spark.sql("""
    OPTIMIZE gold.fact_trip
    ZORDER BY (rider_key, trip_duration_minutes)
    """)
    
    spark.sql("""
    OPTIMIZE gold.fact_payment
    ZORDER BY (rider_key, amount)
    """)

    # Verify gold layer tables
    print("\nVerifying gold layer tables:")
    for table in ["dim_station", "dim_rider", "dim_date", "dim_time", "fact_trip", "fact_payment"]:
        count = spark.sql(f"SELECT COUNT(*) as count FROM gold.{table}").collect()[0]["count"]
        print(f"gold.{table}: {count} records")
        
        # Show schema
        print(f"\nSchema for gold.{table}:")
        spark.sql(f"DESCRIBE gold.{table}").show()

except Exception as e:
    print(f"Error in gold layer implementation: {str(e)}")

# Final verification
print("\nFinal database overview:")
for database in ["bronze", "gold"]:
    print(f"\nTables in {database} database:")
    spark.sql(f"SHOW TABLES IN {database}").show()


Verifying gold layer tables:
gold.dim_station: 838 records

Schema for gold.dim_station:
+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|  station_key|   string|   NULL|
|   station_id|   string|   NULL|
|         name|   string|   NULL|
|     latitude|   double|   NULL|
|    longitude|   double|   NULL|
|city_district|   string|   NULL|
+-------------+---------+-------+

gold.dim_rider: 75000 records

Schema for gold.dim_rider:
+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|           rider_key|   string|   NULL|
|            rider_id|      int|   NULL|
|          first_name|   string|   NULL|
|           last_name|   string|   NULL|
|            birthday|     date|   NULL|
|         current_age|   bigint|   NULL|
|  account_start_date|     date|   NULL|
|    account_end_date|     date|   NULL|
|           is_member|  boolean|   NULL|
|age_at_account_sta