BRONZE LAYER

In [0]:
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA `divvy-bikeshare`")

from pyspark.sql import functions as F

sources = {
  "payments": "main.`divvy-bikeshare`.payments",
  "riders":   "main.`divvy-bikeshare`.riders",
  "stations": "main.`divvy-bikeshare`.stations",
  "trips":    "main.`divvy-bikeshare`.trips"
}

for name, src in sources.items():
    df = (spark.table(src)
          .withColumn("_ingest_ts", F.current_timestamp())
          .withColumn("_source_table", F.lit(src)))
    
    target = f"main.`divvy-bikeshare`.bronze_{name}"
    (df.write
       .format("delta")
       .mode("overwrite")
       .saveAsTable(target))
    
    print(f"âœ… Created {target}")

SLIVER LAYER

In [0]:
from pyspark.sql import functions as F

spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA `divvy-bikeshare`")
#-------------
#for trips 
trips = spark.table("main.`divvy-bikeshare`.bronze_trips")

silver_trips = (
    trips
    .withColumn("started_at", F.to_timestamp("started_at"))
    .withColumn("ended_at", F.to_timestamp("ended_at"))
    .withColumn(
        "ride_duration_seconds",
        F.col("ended_at").cast("long") - F.col("started_at").cast("long")
    )
    .filter(F.col("ride_duration_seconds").isNotNull())
    .filter(F.col("ride_duration_seconds") >= 0)
    .dropDuplicates(["trip_id"])
)

(
    silver_trips.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.silver_trips")
)
#-----------------
##for riders
riders = spark.table("main.`divvy-bikeshare`.bronze_riders")
silver_riders = riders.dropDuplicates(["riders_id"])

(
    silver_riders.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.silver_riders")
)
#-----------------
#for stations 
stations = spark.table("main.`divvy-bikeshare`.bronze_stations")

silver_stations = stations.dropDuplicates(["station_id"])

(
    silver_stations.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.silver_stations")
)
#---------------
#for payments 
payments = spark.table("main.`divvy-bikeshare`.bronze_payments")

silver_payments = (
    payments
    .withColumn("date", F.to_date("date"))
    .dropDuplicates(["payment_id"])
)

(
    silver_payments.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.silver_payments")
)


DIMENSIONS TABLE (GOLD LAYER)

In [0]:
#-----------
#riders
riders = spark.table("main.`divvy-bikeshare`.silver_riders")

dim_rider = (
    riders
    .withColumn("rider_key", F.monotonically_increasing_id())
)

(
    dim_rider.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.dim_rider")
)
#-----------
#stations 
stations = spark.table("main.`divvy-bikeshare`.silver_stations")

dim_station = (
    stations
    .withColumn("station_key", F.monotonically_increasing_id())
)

(
    dim_station.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.dim_station")
)
#-----------
#date
trips = spark.table("main.`divvy-bikeshare`.silver_trips")

dim_date = (
    trips
    .withColumn("ride_date", F.to_date("started_at"))
    .select("ride_date")
    .distinct()
    .withColumn("date_key", F.date_format("ride_date", "yyyyMMdd").cast("int"))
    .withColumn("year", F.year("ride_date"))
    .withColumn("month", F.month("ride_date"))
    .withColumn("day_of_week", F.date_format("ride_date", "E"))
)

(
    dim_date.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.dim_date")
)
#----------
#time
dim_time = (
    trips
    .select(F.date_format("started_at", "HH:mm:ss").alias("time"))
    .distinct()
    .withColumn("hour", F.substring("time", 1, 2).cast("int"))
    .withColumn("minute", F.substring("time", 4, 2).cast("int"))
    .withColumn("time_key", (F.col("hour") * 100 + F.col("minute")).cast("int"))
)

(
    dim_time.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.dim_time")
)

FACT TABLES

In [0]:
#------------
#fact_trip 
dim_station = spark.table("main.`divvy-bikeshare`.dim_station")
dim_rider = spark.table("main.`divvy-bikeshare`.dim_rider")

fact_trip = (
    trips
    .withColumn("date_key", F.date_format(F.to_date("started_at"), "yyyyMMdd").cast("int"))
    .withColumn("time_key", (F.hour("started_at") * 100 + F.minute("started_at")).cast("int"))

    .join(
        dim_station.select(
            F.col("station_id").alias("start_station_id"),
            F.col("station_key").alias("start_station_key")
        ),
        "start_station_id",
        "left"
    )
    .join(
        dim_station.select(
            F.col("station_id").alias("end_station_id"),
            F.col("station_key").alias("end_station_key")
        ),
        "end_station_id",
        "left"
    )
    .join(
        dim_rider.select("riders_id", "rider_key"),
        F.col("rider_id") == F.col("riders_id"),
        "left"
    )
    .select(
        "trip_id",
        "date_key",
        "time_key",
        "start_station_key",
        "end_station_key",
        "rider_key",
        "ride_duration_seconds"
    )
    .withColumn("trip_count", F.lit(1))
)

(
    fact_trip.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.fact_trip")
)
#------------
#payments 
payments = spark.table("main.`divvy-bikeshare`.silver_payments")

fact_payment = (
    payments
    .withColumn("date_key", F.date_format(F.col("date"), "yyyyMMdd").cast("int"))
    .join(
        dim_rider.select("riders_id", "rider_key"),
        F.col("rider_id") == F.col("riders_id"),
        "left"
    )
    .select(
        "payment_id",
        "date_key",
        "rider_key",
        "amount"
    )
)

(
    fact_payment.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("main.`divvy-bikeshare`.fact_payment")
)


Checking to see if all the tables were made 

In [0]:
%sql
SHOW TABLES IN main.`divvy-bikeshare`;

## Tables are completed now lets analyze our data 
##     
  Analyze how much time is spent per ride based on:
    
    -date and time factors (day of week, time of day)
    -starting stations
    -age of the rider at time of the ride
    -whether the rider is a member or casual rider


In [0]:
%sql
--time based on date and time factors (day of week, time of day)
SELECT d.day_of_week, t.hour,
       AVG(f.ride_duration_seconds) AS avg_duration
FROM fact_trip f
JOIN dim_date d ON f.date_key = d.date_key
JOIN dim_time t ON f.time_key = t.time_key
GROUP BY d.day_of_week, t.hour;

In [0]:
%sql
--time based on starting stations
SELECT s.name AS start_station,
       AVG(f.ride_duration_seconds) AS avg_duration
FROM fact_trip f
JOIN dim_station s ON f.start_station_key = s.station_key
GROUP BY s.name;

In [0]:
%sql
--time based on age of the rider at time of the ride

SELECT
  FLOOR(months_between(d.ride_date, r.birthday)/12) AS rider_age,
  AVG(f.ride_duration_seconds) AS avg_duration
FROM fact_trip f
JOIN dim_rider r ON f.rider_key = r.rider_key
JOIN dim_date d ON f.date_key = d.date_key
GROUP BY FLOOR(months_between(d.ride_date, r.birthday)/12);

In [0]:
%sql
--time based on whether the rider is a member or casual rider
SELECT r.is_member,
       AVG(f.ride_duration_seconds) AS avg_duration
FROM fact_trip f
JOIN dim_rider r ON f.rider_key = r.rider_key
GROUP BY r.is_member;

Analyze how much money is spent based on

    -Per month, year
    -Per member, based on age at account start


In [0]:
%sql
--per month &year
SELECT d.year, d.month,
       SUM(p.amount) AS total_spent
FROM fact_payment p
JOIN dim_date d ON p.date_key = d.date_key
GROUP BY d.year, d.month;

In [0]:
%sql
--Per member, based on age at account start
SELECT
  FLOOR(months_between(r.account_start_date, r.birthday)/12)
    AS age_at_account_start,
  SUM(p.amount) AS total_spent
FROM fact_payment p
JOIN dim_rider r ON p.rider_key = r.rider_key
GROUP BY FLOOR(months_between(r.account_start_date, r.birthday)/12);