In [0]:
# 1. Import libraries
import random
import uuid
from datetime import datetime, timedelta
import pandas as pd

# 2. Define simulation functions
cities = ['London', 'Manchester', 'Leeds', 'Birmingham', 'Glasgow']
trip_statuses = ['completed', 'cancelled', 'in_progress']
driver_statuses = ['active', 'inactive']

def simulate_event():
    return {
        'event_id': str(uuid.uuid4()),
        'timestamp': datetime.utcnow().isoformat(),
        'city': random.choice(cities),
        'driver_id': f"DRV-{random.randint(1000,9999)}",
        'rider_id': f"RID-{random.randint(1000,9999)}",
        'trip_status': random.choice(trip_statuses),
        'driver_status': random.choice(driver_statuses),
        'pickup_lat': round(random.uniform(51.3, 53.6), 6),
        'pickup_lon': round(random.uniform(-0.5, -3.2), 6),
        'trip_duration_mins': random.randint(2, 60),
        'fare_amount': round(random.uniform(3.0, 40.0), 2)
    }

# 3. Generate 1000 events
events = [simulate_event() for _ in range(1000)]
df = pd.DataFrame(events)

# 4. Convert to Spark DataFrame
spark_df = spark.createDataFrame(df)

# 5. Save as CSV in `/data/raw`
raw_path = "dbfs:/data/raw/ride_events/"
spark_df.write.mode("overwrite").option("header", True).csv(raw_path)

display(spark_df)


event_id,timestamp,city,driver_id,rider_id,trip_status,driver_status,pickup_lat,pickup_lon,trip_duration_mins,fare_amount
67d04126-d29a-4473-a6fb-dbf323dad5b9,2025-04-11T07:53:42.676570,Leeds,DRV-3835,RID-3939,in_progress,inactive,51.527952,-1.403238,54,28.58
810c2d38-ffa7-4d45-bdca-e6a769b8822d,2025-04-11T07:53:42.676649,London,DRV-5323,RID-8351,in_progress,inactive,52.696485,-1.718087,11,25.34
9b6fad1f-cb4d-4682-8ba1-73a4fe695264,2025-04-11T07:53:42.676698,London,DRV-4352,RID-1203,completed,inactive,51.726669,-2.681708,44,22.66
f3e7063b-c009-442a-93b2-6865cf1e4aea,2025-04-11T07:53:42.676743,Manchester,DRV-3690,RID-4317,cancelled,inactive,52.327217,-1.238609,39,37.73
a4dcb0f2-ef28-4aed-9fdc-32d37c8411b8,2025-04-11T07:53:42.676834,Birmingham,DRV-5636,RID-5029,cancelled,inactive,52.577677,-2.455446,40,4.29
e7b01db1-e3ea-4a82-9a67-5f2cb26afdb6,2025-04-11T07:53:42.676882,London,DRV-1563,RID-7193,in_progress,active,53.437724,-2.67467,40,32.02
b5a9ee04-c9f3-48eb-b683-00ffcfe04f9d,2025-04-11T07:53:42.676922,Birmingham,DRV-5839,RID-6970,completed,active,52.333225,-2.593877,51,37.2
3a62a956-1100-40ff-b576-f063c472a5ef,2025-04-11T07:53:42.676962,Leeds,DRV-5026,RID-4207,in_progress,inactive,53.076886,-0.818438,11,29.25
cc78a6f0-55de-4c8d-8abf-305535578b1f,2025-04-11T07:53:42.677002,Leeds,DRV-9587,RID-2649,in_progress,inactive,53.401465,-1.721043,35,19.97
dfdd0f7d-9e65-4533-8bf2-0d0a42575d78,2025-04-11T07:53:42.677041,Leeds,DRV-8830,RID-2876,completed,active,53.509161,-0.744501,35,18.03


In [0]:
from pyspark.sql.functions import col

# 1. Read raw CSV from DBFS
raw_path = "dbfs:/data/raw/ride_events/"
df_raw = spark.read.option("header", True).csv(raw_path)

# 2. Write to Bronze Delta Table
bronze_path = "dbfs:/data/lakehouse/bronze/ride_events"
df_raw.write.format("delta").mode("overwrite").save(bronze_path)

# 3. Create Bronze table (for querying)
spark.sql("DROP TABLE IF EXISTS bronze_ride_events")
spark.sql(f"""
    CREATE TABLE bronze_ride_events
    USING DELTA
    LOCATION '{bronze_path}'
""")

display(spark.sql("SELECT * FROM bronze_ride_events LIMIT 5"))


event_id,timestamp,city,driver_id,rider_id,trip_status,driver_status,pickup_lat,pickup_lon,trip_duration_mins,fare_amount
67d04126-d29a-4473-a6fb-dbf323dad5b9,2025-04-11T07:53:42.676570,Leeds,DRV-3835,RID-3939,in_progress,inactive,51.527952,-1.403238,54,28.58
810c2d38-ffa7-4d45-bdca-e6a769b8822d,2025-04-11T07:53:42.676649,London,DRV-5323,RID-8351,in_progress,inactive,52.696485,-1.718087,11,25.34
9b6fad1f-cb4d-4682-8ba1-73a4fe695264,2025-04-11T07:53:42.676698,London,DRV-4352,RID-1203,completed,inactive,51.726669,-2.681708,44,22.66
f3e7063b-c009-442a-93b2-6865cf1e4aea,2025-04-11T07:53:42.676743,Manchester,DRV-3690,RID-4317,cancelled,inactive,52.327217,-1.238609,39,37.73
a4dcb0f2-ef28-4aed-9fdc-32d37c8411b8,2025-04-11T07:53:42.676834,Birmingham,DRV-5636,RID-5029,cancelled,inactive,52.577677,-2.455446,40,4.29


In [0]:
from pyspark.sql.functions import to_timestamp

# 1. Read from Bronze
bronze_df = spark.read.format("delta").load(bronze_path)

# 2. Clean + cast columns
silver_df = bronze_df.withColumn("timestamp", to_timestamp("timestamp")) \
    .withColumn("trip_duration_mins", col("trip_duration_mins").cast("int")) \
    .withColumn("fare_amount", col("fare_amount").cast("float")) \
    .dropna()

# 3. Save as Silver Delta Table
silver_path = "dbfs:/data/lakehouse/silver/ride_events"
silver_df.write.format("delta").mode("overwrite").save(silver_path)

# 4. Create Silver Table
spark.sql("DROP TABLE IF EXISTS silver_ride_events")
spark.sql(f"""
    CREATE TABLE silver_ride_events
    USING DELTA
    LOCATION '{silver_path}'
""")

display(spark.sql("SELECT * FROM silver_ride_events LIMIT 5"))


event_id,timestamp,city,driver_id,rider_id,trip_status,driver_status,pickup_lat,pickup_lon,trip_duration_mins,fare_amount
67d04126-d29a-4473-a6fb-dbf323dad5b9,2025-04-11T07:53:42.676+0000,Leeds,DRV-3835,RID-3939,in_progress,inactive,51.527952,-1.403238,54,28.58
810c2d38-ffa7-4d45-bdca-e6a769b8822d,2025-04-11T07:53:42.676+0000,London,DRV-5323,RID-8351,in_progress,inactive,52.696485,-1.718087,11,25.34
9b6fad1f-cb4d-4682-8ba1-73a4fe695264,2025-04-11T07:53:42.676+0000,London,DRV-4352,RID-1203,completed,inactive,51.726669,-2.681708,44,22.66
f3e7063b-c009-442a-93b2-6865cf1e4aea,2025-04-11T07:53:42.676+0000,Manchester,DRV-3690,RID-4317,cancelled,inactive,52.327217,-1.238609,39,37.73
a4dcb0f2-ef28-4aed-9fdc-32d37c8411b8,2025-04-11T07:53:42.676+0000,Birmingham,DRV-5636,RID-5029,cancelled,inactive,52.577677,-2.455446,40,4.29


In [0]:
from pyspark.sql.functions import hour, date_format, count, avg, sum

# 1. Load Silver data
silver_df = spark.read.format("delta").load(silver_path)

# 2. Create hourly aggregates
gold_df = silver_df.withColumn("hour", hour("timestamp")) \
    .withColumn("date", date_format("timestamp", "yyyy-MM-dd")) \
    .groupBy("city", "date", "hour") \
    .agg(
        count("*").alias("total_trips"),
        avg("fare_amount").alias("avg_fare"),
        sum("fare_amount").alias("total_revenue")
    )

# 3. Save as Gold Delta Table
gold_path = "dbfs:/data/lakehouse/gold/hourly_city_metrics"
gold_df.write.format("delta").mode("overwrite").save(gold_path)

# 4. Create Gold Table
spark.sql("DROP TABLE IF EXISTS gold_hourly_metrics")
spark.sql(f"""
    CREATE TABLE gold_hourly_metrics
    USING DELTA
    LOCATION '{gold_path}'
""")

display(spark.sql("SELECT * FROM gold_hourly_metrics LIMIT 5"))


city,date,hour,total_trips,avg_fare,total_revenue
Glasgow,2025-04-11,7,185,21.86194602089959,4044.460013866425
London,2025-04-11,7,217,21.919677498153828,4756.5700170993805
Manchester,2025-04-11,7,199,20.882361849348747,4155.590008020401
Birmingham,2025-04-11,7,198,22.167020111372977,4389.069982051849
Leeds,2025-04-11,7,201,22.7400000297015,4570.740005970001
