In [0]:
# Import necessary libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

### I. Bronze tables

#### 1. Ingest raw data

In [0]:
# Define file paths
payments_path = "dbfs:/FileStore/data/payments.csv"
riders_path = "dbfs:/FileStore/data/riders.csv"
stations_path = "dbfs:/FileStore/data/stations.csv"
trips_path = "dbfs:/FileStore/data/trips.csv"

# Read CSV files with schemas
# Payments data
payments_schema = StructType([
    StructField("payment_id", IntegerType(), True),
    StructField("payment_date", TimestampType(), True),
    StructField("payment_amount", FloatType(), True),
    StructField("acc_number", IntegerType(), True),
])
payments_df = spark.read.csv(payments_path, header=False, schema=payments_schema)

# Riders data
riders_schema = StructType([
    StructField("acc_number", IntegerType(), True),
    StructField("fname", StringType(), True),
    StructField("lname", StringType(), True),
    StructField("home_address", StringType(), True),
    StructField("dob", TimestampType(), True),
    StructField("start_date", TimestampType(), True),
    StructField("end_date", TimestampType(), True),
    StructField("membership_status", BooleanType(), True),
])
riders_df = spark.read.csv(riders_path, header=False, schema=riders_schema)

# Stations data
stations_schema = StructType([
    StructField("station_id", StringType(), True),
    StructField("station_name", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True)
])
stations_df = spark.read.csv(stations_path, header=False, schema=stations_schema)

# Trips data
trips_schema = StructType([
    StructField("trip_id", StringType(), True),
    StructField("type_of_rideable", StringType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("end_time", TimestampType(), True),
    StructField("start_station", StringType(), True),
    StructField("end_station", StringType(), True),
    StructField("acc_number", IntegerType(), True)
])
trips_df = spark.read.csv(trips_path, header=False, schema=trips_schema)


#### 2. Write data as delta tables

In [0]:
# Save DataFrames as Delta tables
payments_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .save("/delta/bronze_payments")

riders_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .save("/delta/bronze_riders")

stations_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .save("/delta/bronze_stations")

trips_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .save("/delta/bronze_trips")


### III. Silver tables

#### 1. Read data from bronze

In [0]:
# Define Delta table paths
payments_delta_path = "dbfs:/delta/bronze_payments"
riders_delta_path = "dbfs:/delta/bronze_riders"
stations_delta_path = "dbfs:/delta/bronze_stations"
trips_delta_path = "dbfs:/delta/bronze_trips"

# Load Delta tables
# Payments DataFrame
payments_df = spark.read.format("delta")\
                .option("inferSchema", "false")\
                .option("header", "true")\
                .load(payments_delta_path)

# Riders DataFrame
riders_df = spark.read.format("delta")\
            .option("inferSchema", "false")\
            .option("header", "true")\
            .load(riders_delta_path)

# Stations DataFrame
stations_df = spark.read.format("delta")\
            .option("inferSchema", "false")\
            .option("header", "true")\
            .load(stations_delta_path)

# Trips DataFrame
trips_df = spark.read.format("delta")\
            .option("inferSchema", "false")\
            .option("header", "true")\
            .load(trips_delta_path)


#### 2. Remove null and duplicate

In [0]:
# Drop rows with null values in key columns
payments_clean_df = payments_df.na.drop(subset=["payment_id"])
riders_clean_df = riders_df.na.drop(subset=["acc_number"])
stations_clean_df = stations_df.na.drop(subset=["station_id", "station_name"])
trips_clean_df = trips_df.na.drop(subset=["trip_id"])


In [0]:
# Remove duplicate rows from DataFrames
payments_clean_df = payments_clean_df.dropDuplicates()
riders_clean_df = riders_clean_df.dropDuplicates()
stations_clean_df = stations_clean_df.dropDuplicates()
trips_clean_df = trips_clean_df.dropDuplicates()


#### 3. Save as tables

In [0]:
# Save cleaned DataFrames as Delta tables
payments_clean_df.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("silver_payments")

riders_clean_df.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("silver_riders")

stations_clean_df.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("silver_stations")

trips_clean_df.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("silver_trips")

### III. Gold tables

#### 1. Read data from silver

In [0]:
# Load Delta tables into DataFrames
silver_payments_df = spark.table("silver_payments")
silver_riders_df = spark.table("silver_riders")
silver_stations_df = spark.table("silver_stations")
silver_trips_df = spark.table("silver_trips")

#### 2. Create fact and dimension tables

In [0]:
# Save factPayment as Delta table
silver_payments_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .saveAsTable("gold_fact_payments")

# Save dimRiders as Delta table
silver_riders_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .saveAsTable("gold_dim_riders")

# Save dimStations as Delta table
silver_stations_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .saveAsTable("gold_dim_stations")

# Create and save factTrips as Delta table
gold_trips_df = spark.sql("""
    SELECT tr.trip_id,
           tr.start_time,
           tr.end_time,
           tr.type_of_rideable,
           tr.start_station,
           tr.end_station,
           DATEDIFF(YEAR, r.dob, tr.start_time) AS rider_age_at_trip_start,
           DATEDIFF(SECOND, tr.start_time, tr.end_time) AS trip_duration_in_seconds,
           r.acc_number
    FROM silver_trips tr
    LEFT JOIN silver_riders r ON tr.acc_number = r.acc_number
""")

gold_trips_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .saveAsTable("gold_fact_trips")


# Create and save dimTime as Delta table
temp_time_df = spark.sql("""
    SELECT start_time AS syncTime FROM silver_trips
    UNION
    SELECT end_time AS syncTime FROM silver_trips
    UNION
    SELECT payment_date AS syncTime FROM silver_payments
""")

gold_time_df = temp_time_df.withColumn("datetime", col("syncTime"))\
                           .withColumn("dayofweek", when(dayofweek(col("syncTime")) == 1, "Sunday")
                                                   .when(dayofweek(col("syncTime")) == 2, "Monday")
                                                   .when(dayofweek(col("syncTime")) == 3, "Tuesday")
                                                   .when(dayofweek(col("syncTime")) == 4, "Wednesday")
                                                   .when(dayofweek(col("syncTime")) == 5, "Thursday")
                                                   .when(dayofweek(col("syncTime")) == 6, "Friday")
                                                   .when(dayofweek(col("syncTime")) == 7, "Saturday"))\
                           .withColumn("day", dayofmonth(col("syncTime")))\
                           .withColumn("month", month(col("syncTime")))\
                           .withColumn("quarter", quarter(col("syncTime")))\
                           .withColumn("year", year(col("syncTime")))\
                           .withColumn("hourOfDay", hour(col("syncTime")))\
                           .withColumn("minute", minute(col("syncTime")))\
                           .withColumn("second", second(col("syncTime")))

gold_time_df.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .saveAsTable("gold_dim_time")
