
# DIVVY - ETL

## Ingest the Data to the Bronze Layer

In [0]:
from pyspark.sql.functions import unix_timestamp, col, floor, months_between, expr
import pyspark.sql.functions as F

In [0]:
# Load CSV files into Spark DataFrames
df_payment = spark.read.format("csv").option("inferSchema", "true").option("header", "false").option("sep", ",").load("/FileStore/raw_data/payments.csv")
df_rider = spark.read.format("csv").option("inferSchema", "true").option("header", "false").option("sep", ",").load("/FileStore/raw_data/riders.csv")
df_station = spark.read.format("csv").option("inferSchema", "true").option("header", "false").option("sep", ",").load("/FileStore/raw_data/stations.csv")
df_trip = spark.read.format("csv").option("inferSchema", "true").option("header", "false").option("sep", ",").load("/FileStore/raw_data/trips.csv")


In [0]:
# Write DataFrames to Delta Lake format for bronze layer storage
df_payment.write.format("delta").mode("overwrite").save("/delta/bronze/payment")
df_rider.write.format("delta").mode("overwrite").save("/delta/bronze/rider")
df_station.write.format("delta").mode("overwrite").save("/delta/bronze/station")
df_trip.write.format("delta").mode("overwrite").save("/delta/bronze/trip")

## Cleanse and Conform Data for the Silver Layer

In [0]:
# Rename columns

df_payment = (
    df_payment.withColumnRenamed("_c0", "payment_id")
    .withColumnRenamed("_c1", "payment_date")
    .withColumnRenamed("_c2", "amount")
    .withColumnRenamed("_c3", "rider_id")
)

df_rider = (
    df_rider.withColumnRenamed("_c0", "rider_id")
    .withColumnRenamed("_c1", "first_name")
    .withColumnRenamed("_c2", "last_name")
    .withColumnRenamed("_c3", "address")
    .withColumnRenamed("_c4", "birthday")
    .withColumnRenamed("_c5", "account_start_date")
    .withColumnRenamed("_c6", "account_end_date")
    .withColumnRenamed("_c7", "is_member")
)

df_station = (
    df_station.withColumnRenamed("_c0", "station_id")
    .withColumnRenamed("_c1", "station_name")
    .withColumnRenamed("_c2", "latitude")
    .withColumnRenamed("_c3", "longitude")
)

df_trip = (
    df_trip.withColumnRenamed("_c0", "trip_id")
    .withColumnRenamed("_c1", "rideable_type")
    .withColumnRenamed("_c2", "start_at")
    .withColumnRenamed("_c3", "ended_at")
    .withColumnRenamed("_c4", "start_station_id")
    .withColumnRenamed("_c5", "end_station_id")
    .withColumnRenamed("_c6", "rider_id")
)

In [0]:
# Deduplicate
df_payment = df_payment.dropDuplicates(df_payment.columns)
df_rider = df_rider.dropDuplicates(df_rider.columns)
df_station = df_station.dropDuplicates(df_station.columns)
df_trip = df_trip.dropDuplicates(df_trip.columns)

In [0]:
# Write DataFrames to Delta Lake format for silver layer storage
df_payment.write.format("delta").mode("overwrite").save("/delta/silver/payment")
df_rider.write.format("delta").mode("overwrite").save("/delta/silver/rider")
df_station.write.format("delta").mode("overwrite").save("/delta/silver/station")
df_trip.write.format("delta").mode("overwrite").save("/delta/silver/trip")

## Transform the data into the Star Schema for the Gold Layer

In [0]:
# Add a new column 'trip_duration_minutes' to 'df_trip' with the difference in minutes between 'ended_at' and 'start_at'
df_trip = df_trip.withColumn(
    "trip_duration_minutes",
    F.round((unix_timestamp(col("ended_at")) - unix_timestamp(col("start_at"))) / 60),
)

# Join 'df_trip' with 'df_rider' on 'rider_id' and add 'rider_age_at_trip' column to 'df_trip'
df_trip = (
    df_trip.join(df_rider.select("rider_id", "birthday"), on="rider_id", how="inner")
    .withColumn("trip_date", col("start_at").cast("date"))
    .withColumn(
        "rider_age_at_trip",
        floor(months_between(col("trip_date"), col("birthday")) / 12),
    )
    .drop("birthday")  # Drop 'birthday' as no longer needed
)

# Add a new column 'rider_age_at_signup' to 'df_rider' to calculate the rider's age at the time of signup
df_rider = df_rider.withColumn(
    "rider_age_at_signup",
    floor(months_between(col("account_start_date"), col("birthday")) / 12),
)

In [0]:
# Create Date Dimension DataFrame

# Get minimum and maximum dates from 'df_trip' and 'df_payment'
trip_min_date, trip_max_date = df_trip.agg(
    F.min("trip_date"), F.max("trip_date")
).first()
payment_min_date, payment_max_date = df_payment.agg(
    F.min("payment_date"), F.max("payment_date")
).first()

# Determine overall min and max dates
min_date = min(trip_min_date, payment_min_date)
max_date = max(trip_max_date, payment_max_date)

# Convert datetime.date objects to strings in 'YYYY-MM-DD' format
min_date_str = min_date.strftime("%Y-%m-%d")
max_date_str = max_date.strftime("%Y-%m-%d")

# Generate date range DataFrame
df_date = (
    spark.range(0, (max_date - min_date).days + 1, 1)
    .withColumn("id", col("id").cast("int"))
    .withColumn("date", expr(f"date_add('{min_date_str}', id)"))
)

# Add other date attributes
df_date = (
    df_date.withColumn("year", F.year(col("date")))
    .withColumn("quarter", F.quarter(col("date")))
    .withColumn("month", F.month(col("date")))
    .withColumn("day", F.dayofmonth(col("date")))
    .withColumn("week", F.weekofyear(col("date")))
    .withColumn("is_weekend", F.dayofweek(col("date")).isin([1, 7]).cast("boolean"))
)

In [0]:
# Write DataFrames to Delta Lake format, and save as a Delta tables for gold layer
df_date.write.format("delta").mode("overwrite").saveAsTable("gold_dim_date")
df_station.write.format("delta").mode("overwrite").saveAsTable("gold_dim_station")
df_rider.write.format("delta").mode("overwrite").saveAsTable("gold_dim_rider")
df_trip.write.format("delta").mode("overwrite").saveAsTable("gold_fact_trip")
df_payment.write.format("delta").mode("overwrite").saveAsTable("gold_fact_payment")

In [0]:
# Display the first 5 rows from each Delta table in the gold layer for verification

print("gold_dim_date:")
spark.read.table("gold_dim_date").show(5)

print("gold_dim_station:")
spark.read.table("gold_dim_station").show(5)

print("gold_dim_rider:")
spark.read.table("gold_dim_rider").show(5)

print("gold_fact_trip:")
spark.read.table("gold_fact_trip").show(5)

print("gold_fact_payment:")
spark.read.table("gold_fact_payment").show(5)

gold_dim_date:
+---+----------+----+-------+-----+---+----+----------+
| id|      date|year|quarter|month|day|week|is_weekend|
+---+----------+----+-------+-----+---+----+----------+
|  0|2013-02-01|2013|      1|    2|  1|   5|     false|
|  1|2013-02-02|2013|      1|    2|  2|   5|      true|
|  2|2013-02-03|2013|      1|    2|  3|   5|      true|
|  3|2013-02-04|2013|      1|    2|  4|   6|     false|
|  4|2013-02-05|2013|      1|    2|  5|   6|     false|
+---+----------+----+-------+-----+---+----+----------+
only showing top 5 rows

gold_dim_station:
+------------+--------------------+------------------+------------------+
|  station_id|        station_name|          latitude|         longitude|
+------------+--------------------+------------------+------------------+
|       15550|Canal St & Taylor St|         41.870257|-87.63947399999999|
|TA1307000150|Pine Grove Ave & ...| 41.94947274088333|-87.64645278453827|
|TA1307000156|Lincoln Ave & Sun...|         41.963004|        -87.68

In [0]:
%sql
-- Uncomment if you need to drop Delta tables
--DROP TABLE IF EXISTS gold_dim_date;
--DROP TABLE IF EXISTS gold_dim_station;
--DROP TABLE IF EXISTS gold_dim_rider;
--DROP TABLE IF EXISTS gold_fact_trip;
--DROP TABLE IF EXISTS gold_fact_payment;

In [0]:
# Uncomment if you need to remove the entire '/delta' directory and all its contents
#dbutils.fs.rm("/delta", recurse=True)