In [1]:
!pip install Faker fastavro pandas

Collecting Faker
  Downloading faker-37.1.0-py3-none-any.whl.metadata (15 kB)
Collecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Downloading faker-37.1.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m47.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro, Faker
Successfully installed Faker-37.1.0 fastavro-1.10.0


- **Passenger Requests and Cancellations**: For passenger requests and cancellations.
- **Driver Availability Updates**: For driver availability.


In [2]:
import random
from faker import Faker
import fastavro
import datetime

In [3]:
from datetime import datetime, timedelta
from typing import Tuple, Dict, Any
import uuid

# Define a handful of “zones” with centroid coords for pickup/dropoff sampling
ZONES = {
    "Downtown": (40.7128, -74.0060),
    "Suburbs":  (40.7891, -73.1350),
    "Airport":  (40.6413, -73.7781),
}

def sample_zone_coordinates(zone: str) -> Tuple[float, float]:
    """
    Returns a latitude,longitude by sampling a small Gaussian around the zone centroid.
    """
    import random
    lat, lon = ZONES.get(zone, ZONES["Downtown"])
    return lat + random.gauss(0, 0.005), lon + random.gauss(0, 0.005)

def get_base_vehicle_price_range(vehicle_type: str) -> Tuple[int,int]:
    """
    Base price ranges by vehicle type; used before applying surge.
    """
    mapping = {
        "standard": (5, 20),
        "premium":  (15, 40),
        "shared":   (3, 15),
    }
    return mapping.get(vehicle_type, (5,20))

def adjust_price_for_demand(base_price: float, demand_level: str) -> float:
    """
    Multiplier by demand: low (1.0–1.1), medium (1.2–1.5), high (1.5–2.0).
    """
    import random
    multipliers = {
        "low":    random.uniform(1.0, 1.1),
        "medium": random.uniform(1.2, 1.5),
        "high":   random.uniform(1.5, 2.0),
    }
    return round(base_price * multipliers[demand_level], 2)

## Passenger Requests and Cancellations Schema:

In [4]:
passenger_request_schema = {
    "type": "record",
    "name": "PassengerRequest",
    "fields": [
        {"name": "request_id", "type": "string"},
        {"name": "passenger_id", "type": "string"},
        {"name": "passenger_name", "type": "string"},
        {"name": "pickup_location", "type": "string"},
        {"name": "dropoff_location", "type": "string"},
        {"name": "request_time", "type": "string"},
        {
            "name": "status",
            "type": {
                "type": "enum",
                "name": "Status",
                "symbols": ["requested", "accepted", "canceled", "completed"]
            }
        },
        {
            "name": "cancellation_time",
            "type": ["null", "string"],  # Correct use of null as a type in Python
            "default": None  # Use None (Python's null equivalent)
        },
        {"name": "ride_duration", "type": "int", "default": 30},
        {"name": "vehicle_type", "type": "string", "default": "standard"},
        {"name": "estimated_eta", "type": "string", "default": "20 minutes"},
        {
            "name": "demand_level",
            "type": {
                "type": "enum",
                "name": "DemandLevel",
                "symbols": ["High", "Medium", "Low"]
            },
            "default": "Medium"
        },
        {"name": "price", "type": "float"},
        {"name": "driver_rating", "type": "float"},
        {"name": "passenger_rating", "type": "float"},
        {
            "name": "favorite_location",
            "type": ["null", "string"],  # Correct use of null as a type in Python
            "default": None  # Use None for nullable fields in Python
        },
        {
            "name": "is_wheelchair_accessible",
            "type": "boolean",
            "default": False  # Use False (Python's boolean equivalent)
        },
        {
            "name": "scheduled_time",
            "type": ["null", "string"],
            "default": None  # Use None for nullable fields
        },
        {
            "name": "multiple_stops",
            "type": ["null", {"type": "array", "items": "string"}],
            "default": None
        },
        {
            "name": "donation_amount",
            "type": ["null", "float"],
            "default": None  # Use None for nullable fields
        },
        {"name": "vehicle_license_plate", "type": "string"}
    ]
}


- **request_id**: A unique ID for the request.
- **passenger_id**: A unique ID for the passenger.
- **passenger_name**
- **pickup_location**: Where the passenger is requesting the ride from.
- **dropoff_location**: The destination of the ride.
- **request_time**: The time when the passenger requests the ride.
- **status**: The status of the ride (requested, accepted, canceled, or completed).
- **cancellation_time**: If the request is canceled, this field records the cancellation time.
- **ride_duration**: The duration of the ride in minutes. The default is set to 30 minutes, but this can be updated based on the ride specifics.
- **vehicle_type**: The type of vehicle requested by the passenger
- **estimated_eta**: estimated time of arrival for drivers  based on current traffic conditions and proximity. The default is set to 20 minutes
- **demand_level**: The level of demand for rides in the area. This helps adjust the pricing or availability of vehicles based on whether it's high, medium, or low demand.
- **price**: The price of the ride based on the vehicle type, distance, and demand
- **driver_rating**: The rating given to the driver, which could be between 3.5 and 5 stars. A low rating (below 3.5) could cause the ride to be canceled.
- **passenger_rating**: The rating given by the driver to the passenger. It typically ranges from 4 to 5 stars, as passengers are generally rated positively.
- **favorite_location**: A location the passenger frequently uses (e.g., "Home", "Work"). This field is optional, and if not set, it defaults to None.
- **is_wheelchair_accessible**: A flag indicating whether the passenger requested a wheelchair-accessible vehicle. This is a boolean value (True or False).
- **scheduled_time**: The time at which the passenger wants to schedule the ride in advance. If not scheduled, it defaults to None. This allows passengers to set a pickup time in the future.
- **multiple_stops**: An array of strings representing multiple stops during the ride. This allows for additional destinations to be added along the way.
- **donation_amount**: The optional donation amount the passenger can choose to add to the fare, typically in increments of 5, 10, or 15. This field is nullable (None by default), allowing passengers who do not wish to donate to leave it empty.
- **vehicle_license_plate**


## Driver Availability Updates Schema

In [5]:
driver_availability_schema = {
    "type": "record",
    "name": "DriverAvailability",
    "fields": [
        {"name": "driver_id", "type": "string"},
        {"name": "driver_name", "type": "string"},
        {
            "name": "status",
            "type": {
                "type": "enum",
                "name": "Status",
                "symbols": ["available", "unavailable"]
            }
        },
        {"name": "update_time", "type": "string"},
        {"name": "driver_rating", "type": "float"},
        {
            "name": "is_wheelchair_accessible",
            "type": "boolean",
            "default": False  # Flag for wheelchair accessible vehicles
        },
        {"name": "vehicle_license_plate", "type": "string"}
    ]
}


- **driver_id**: A unique ID for the driver.
- **driver_name**
- **status**: The status of the driver (either "available" or "unavailable").
- **update_time**: The time when the driver’s availability status is updated.
- **driver_rating**: The rating given to the driver by passengers. This typically ranges from 1 to 5, with a higher rating indicating better service.
- **is_wheelchair_accessible**: A flag indicating whether the driver’s vehicle is wheelchair accessible. The default value is set to False, meaning the vehicle is not accessible unless explicitly marked as True.
- **vehicle_license_plate**

## Generate synthetic data

In [11]:
import random
from faker import Faker
import datetime
import fastavro
import json

# Initialize Faker to generate random data
fake = Faker()

def generate_realistic_time(base: datetime = None, variance_minutes: int = 30) -> datetime:
    if base is None:
        base = datetime.datetime.utcnow()
    offset = random.randint(-variance_minutes, variance_minutes)
    return base + timedelta(minutes=offset)

# Define price range based on vehicle type
def get_base_vehicle_price_range(vehicle_type):
    price_range_map = {
        "Black/Executive": (100, 150),  # $100 to $150
        "Van XL": (90, 140),  # $90 to $140
        "Van": (80, 120),  # $80 to $120
        "Priority": (70, 110),  # $70 to $110
        "Baby": (60, 100),  # $60 to $100
        "Kids": (50, 90),  # $50 to $90
        "Comfort": (40, 80),  # $40 to $80
        "Pet": (35, 70),  # $35 to $70
        "Electric": (30, 60),  # $30 to $60
        "Taxi": (20, 30),  # $20 to $30
        "Share": (10, 20),  # $10 to $20
        "Wheelchair": (60, 100)  # Wheelchair accessible vehicle price range
    }
    return price_range_map.get(vehicle_type, (20, 30))  # Default to Taxi price range if not found

# Function to simulate demand and adjust the price dynamically within the range
def adjust_price_for_demand(base_min, base_max):
    # Simulate a demand factor (between 1.0 and 2.0)
    demand_factor = random.uniform(1.0, 2.0)  # This simulates the demand multiplier

    # Categorize demand level
    if demand_factor >= 1.5:
        demand_level = "High"
    elif demand_factor >= 1.2:
        demand_level = "Medium"
    else:
        demand_level = "Low"

    adjusted_price = random.uniform(base_min, base_max) * demand_factor  # Adjust the price based on demand
    return round(adjusted_price, 2), demand_level  # Round to two decimal places for price

# Function to generate a driver rating (between 3.5 and 5 for most drivers, below 3.5 is rare)
def generate_driver_rating():
    rating = random.uniform(3.5, 5)  # Default range for drivers is 3.5 to 5
    if random.random() < 0.1:  # 10% chance to have a rating below 3.5
        rating = random.uniform(1, 3.5)  # Ratings below 3.5 are rare
    return round(rating, 1)

# Function to generate a passenger rating (between 4 and 5 stars)
def generate_passenger_rating():
    return round(random.uniform(4, 5), 1)  # Random rating between 4 and 5 stars

# Function to generate a passenger request event with multiple stops set to None or empty list
def generate_passenger_request(demand_level: str = "medium") -> Dict[str, Any]:
    # IDs & basic info
    request_id = str(uuid.uuid4())
    passenger_id = str(uuid.uuid4())
    passenger_name = fake.name()

    # Zones & coords
    pickup_zone = random.choice(list(ZONES))
    dropoff_zone = random.choice(list(ZONES))
    pickup_lat, pickup_lon   = sample_zone_coordinates(pickup_zone)
    dropoff_lat, dropoff_lon = sample_zone_coordinates(dropoff_zone)

    # Times
    request_time = generate_realistic_time()
    eta = random.randint(5, 30)

    # Status & possible cancellation
    status = "requested"
    cancellation_time = None
    if eta > 20 and random.random() < 0.4:
        status = "canceled"
        cancellation_time = int(generate_realistic_time(request_time).timestamp() * 1000)

    # Pricing
    vehicle_type = random.choice(["standard","premium","shared"])
    base_min, base_max = get_base_vehicle_price_range(vehicle_type)  # Get price range for the vehicle type
    price, _ = adjust_price_for_demand(base_min, base_max)  # Pass min/max to adjust_price_for_demand

    return {
        "request_id": request_id,
        "passenger_id": passenger_id,
        "passenger_name": passenger_name,
        "pickup_lat": pickup_lat,
        "pickup_lon": pickup_lon,
        "dropoff_lat": dropoff_lat,
        "dropoff_lon": dropoff_lon,
        "request_time": int(request_time.timestamp() * 1000),
        "status": status,
        "cancellation_time": cancellation_time,
        "ride_duration": None,
        "vehicle_type": vehicle_type,
        "estimated_eta": eta,
        "demand_level": demand_level,
        "price": price,
        "driver_rating": None,
        "passenger_rating": None,
        "favorite_location": random.choice([None, "Home", "Work"]),
        "is_wheelchair_accessible": random.random() < 0.1,
        "scheduled_time": None,
        "multiple_stops": [],
        "donation_amount": None,
        "vehicle_license_plate": fake.license_plate(),
    }


# Function to generate a driver availability event
def generate_driver_availability() -> Dict[str, Any]:
    driver_id = str(uuid.uuid4())
    driver_name = fake.name()
    status = random.choices(
        ["available","on_trip","offline"], weights=[0.6,0.3,0.1]
    )[0]
    update_time = generate_realistic_time()
    driver_rating = round(random.uniform(3.5, 5.0), 2)

    return {
        "driver_id": driver_id,
        "driver_name": driver_name,
        "status": status,
        "update_time": int(update_time.timestamp() * 1000),
        "driver_rating": driver_rating,
        "is_wheelchair_accessible": False,
        "vehicle_license_plate": fake.license_plate(),
    }

# Generate configurable number of events
def generate_events(passenger_count, driver_count):
    passenger_events = [generate_passenger_request() for _ in range(passenger_count)] # Remove extra arguments, use _ for unused loop variable
    driver_availability_events = [generate_driver_availability() for _ in range(driver_count)]
    return passenger_events, driver_availability_events

# Example usage: Generate 200 passenger requests and 100 driver availability events
passenger_events, driver_availability_events = generate_events(200, 100)

# Serialize the data to JSON format
with open('passenger_requests.json', 'w') as json_file:
    json.dump(passenger_events, json_file, indent=4)

with open('driver_availability.json', 'w') as json_file:
    json.dump(driver_availability_events, json_file, indent=4)


In [12]:
passenger_request_schema = {
    "namespace": "ride_hailing",
    "type": "record",
    "name": "PassengerRequest",
    "fields": [
        { "name": "request_id",               "type": "string" },
        { "name": "passenger_id",             "type": "string" },
        { "name": "passenger_name",           "type": "string" },
        { "name": "pickup_lat",               "type": "double" },
        { "name": "pickup_lon",               "type": "double" },
        { "name": "dropoff_lat",              "type": "double" },
        { "name": "dropoff_lon",              "type": "double" },
        {
          "name": "request_time",
          "type": { "type": "long", "logicalType": "timestamp-millis" }
        },
        { "name": "status",                   "type": "string" },
        {
          "name": "cancellation_time",
          "type": [
            "null",
            { "type": "long", "logicalType": "timestamp-millis" }
          ],
          "default": None
        },
        { "name": "ride_duration",            "type": ["null", "int"],   "default": None },
        { "name": "vehicle_type",             "type": "string" },
        { "name": "estimated_eta",            "type": "int" },
        { "name": "demand_level",             "type": "string" },
        { "name": "price",                    "type": "double" },
        { "name": "driver_rating",            "type": ["null", "double"], "default": None },
        { "name": "passenger_rating",         "type": ["null", "double"], "default": None },
        { "name": "favorite_location",        "type": ["null", "string"], "default": None },
        { "name": "is_wheelchair_accessible", "type": "boolean" },
        {
          "name": "scheduled_time",
          "type": [
            "null",
            { "type": "long", "logicalType": "timestamp-millis" }
          ],
          "default": None
        },
        {
          "name": "multiple_stops",
          "type": { "type": "array", "items": "string" }
        },
        { "name": "donation_amount",          "type": ["null", "double"], "default": None },
        { "name": "vehicle_license_plate",    "type": "string" },
    ]
}

driver_availability_schema = {
    "namespace": "ride_hailing",
    "type": "record",
    "name": "DriverAvailability",
    "fields": [
        { "name": "driver_id",               "type": "string" },
        { "name": "driver_name",             "type": "string" },
        { "name": "status",                  "type": "string" },
        {
          "name": "update_time",
          "type": { "type": "long", "logicalType": "timestamp-millis" }
        },
        { "name": "driver_rating",           "type": "double" },
        { "name": "is_wheelchair_accessible","type": "boolean" },
        { "name": "vehicle_license_plate",   "type": "string" },
    ]
}

In [13]:
import json
import fastavro

# How many events to simulate
NUM_EVENTS = 1000

# Generate lists using your new generators
passenger_events = [
    generate_passenger_request(demand_level="medium")
    for _ in range(NUM_EVENTS)
]
driver_events = [
    generate_driver_availability()
    for _ in range(NUM_EVENTS)
]

# 1) Write out JSON (newline‑delimited for easy streaming)
with open("passenger_requests.json", "w") as jf:
    for evt in passenger_events:
        jf.write(json.dumps(evt) + "\n")

with open("driver_availability.json", "w") as jf:
    for evt in driver_events:
        jf.write(json.dumps(evt) + "\n")

# 2) Write out AVRO
with open("passenger_requests.avro", "wb") as af:
    fastavro.writer(af, passenger_request_schema, passenger_events)

with open("driver_availability.avro", "wb") as af:
    fastavro.writer(af, driver_availability_schema, driver_events)

print(f"Wrote {NUM_EVENTS} passenger + {NUM_EVENTS} driver events in JSON & AVRO.")


Wrote 1000 passenger + 1000 driver events in JSON & AVRO.


### Output Sample Data for Verification

In [15]:
import json
import pandas as pd
import fastavro
from IPython.display import display

# 1) JSON (newline‑delimited)
with open('passenger_requests.json', 'r') as f:
    passenger_json = [json.loads(line) for _, line in zip(range(5), f)]
passenger_json_df = pd.DataFrame(passenger_json)
print("First 5 Passenger Requests (JSON):")
display(passenger_json_df)

with open('driver_availability.json', 'r') as f:
    driver_json = [json.loads(line) for _, line in zip(range(5), f)]
driver_json_df = pd.DataFrame(driver_json)
print("\nFirst 5 Driver Availability Entries (JSON):")
display(driver_json_df)

# 2) AVRO
with open('passenger_requests.avro', 'rb') as f:
    reader = fastavro.reader(f)
    passenger_avro = [r for i, r in enumerate(reader) if i < 5]
passenger_avro_df = pd.DataFrame(passenger_avro)
print("\nFirst 5 Passenger Requests (AVRO):")
display(passenger_avro_df)

with open('driver_availability.avro', 'rb') as f:
    reader = fastavro.reader(f)
    driver_avro = [r for i, r in enumerate(reader) if i < 5]
driver_avro_df = pd.DataFrame(driver_avro)
print("\nFirst 5 Driver Availability Entries (AVRO):")
display(driver_avro_df)


First 5 Passenger Requests (JSON):


Unnamed: 0,request_id,passenger_id,passenger_name,pickup_lat,pickup_lon,dropoff_lat,dropoff_lon,request_time,status,cancellation_time,...,demand_level,price,driver_rating,passenger_rating,favorite_location,is_wheelchair_accessible,scheduled_time,multiple_stops,donation_amount,vehicle_license_plate
0,ab954730-8de5-4f99-8e62-687d2ec8c602,f4c360d7-63d9-435c-acad-47370083d31a,Miranda Fischer,40.641636,-73.778733,40.641855,-73.78502,1745074520525,canceled,1745076000000.0,...,medium,25.04,,,,True,,[],,729Z1
1,d5df56e6-0762-4775-9ab6-f07ebf3d8358,91bb1e4b-3187-4465-ac30-0182e221c6c2,Jerry Torres,40.796234,-73.135529,40.707753,-74.008369,1745073260525,requested,,...,medium,29.98,,,Work,False,,[],,652-SUW
2,5abc2538-cf76-463e-aa5f-b4841cacd259,97491cfe-c6e5-4ca1-8ef6-dfc000aa14b4,Monica Hill,40.717264,-73.999539,40.717833,-74.00509,1745075000525,requested,,...,medium,41.14,,,,False,,[],,818-QFX
3,0be4b414-d0e1-4e0d-bb0c-56cc23ccc05e,0f1afbef-7e16-488b-a272-11f6398c4ea1,Andrew Riley,40.63858,-73.777747,40.646556,-73.778873,1745075720526,requested,,...,medium,36.8,,,,False,,[],,LMW 513
4,7fdf3ddd-f6b4-4762-af2c-7c1cec1fbca7,421b0a49-498b-439d-8339-83e056cb8a86,Dominique Rose,40.710898,-74.003647,40.705898,-74.011464,1745074880526,requested,,...,medium,52.68,,,Home,False,,[],,25N J09



First 5 Driver Availability Entries (JSON):


Unnamed: 0,driver_id,driver_name,status,update_time,driver_rating,is_wheelchair_accessible,vehicle_license_plate
0,c21c9ded-0d9c-4a30-84f7-ac864431ce04,Jennifer Davis,available,1745072720742,3.89,False,16T T95
1,de2915cf-b29e-44da-8912-bcf32d4a51a6,Jeremiah Jones,available,1745073500743,4.12,False,019 SYL
2,8f006be2-145e-4420-850d-47416b118795,Susan Nelson DDS,available,1745072600743,4.2,False,NO 21020
3,791a0b92-989a-4b0c-82f4-35bea99265e2,Manuel Hall,available,1745074880743,4.62,False,1912 IX
4,cd09d21c-d77a-49d8-a442-2c23418eb543,Alexander Best,available,1745073620743,4.88,False,RJH-8362



First 5 Passenger Requests (AVRO):


Unnamed: 0,request_id,passenger_id,passenger_name,pickup_lat,pickup_lon,dropoff_lat,dropoff_lon,request_time,status,cancellation_time,...,demand_level,price,driver_rating,passenger_rating,favorite_location,is_wheelchair_accessible,scheduled_time,multiple_stops,donation_amount,vehicle_license_plate
0,ab954730-8de5-4f99-8e62-687d2ec8c602,f4c360d7-63d9-435c-acad-47370083d31a,Miranda Fischer,40.641636,-73.778733,40.641855,-73.78502,2025-04-19 14:55:20.525000+00:00,canceled,2025-04-19 15:24:20.525000+00:00,...,medium,25.04,,,,True,,[],,729Z1
1,d5df56e6-0762-4775-9ab6-f07ebf3d8358,91bb1e4b-3187-4465-ac30-0182e221c6c2,Jerry Torres,40.796234,-73.135529,40.707753,-74.008369,2025-04-19 14:34:20.525000+00:00,requested,NaT,...,medium,29.98,,,Work,False,,[],,652-SUW
2,5abc2538-cf76-463e-aa5f-b4841cacd259,97491cfe-c6e5-4ca1-8ef6-dfc000aa14b4,Monica Hill,40.717264,-73.999539,40.717833,-74.00509,2025-04-19 15:03:20.525000+00:00,requested,NaT,...,medium,41.14,,,,False,,[],,818-QFX
3,0be4b414-d0e1-4e0d-bb0c-56cc23ccc05e,0f1afbef-7e16-488b-a272-11f6398c4ea1,Andrew Riley,40.63858,-73.777747,40.646556,-73.778873,2025-04-19 15:15:20.526000+00:00,requested,NaT,...,medium,36.8,,,,False,,[],,LMW 513
4,7fdf3ddd-f6b4-4762-af2c-7c1cec1fbca7,421b0a49-498b-439d-8339-83e056cb8a86,Dominique Rose,40.710898,-74.003647,40.705898,-74.011464,2025-04-19 15:01:20.526000+00:00,requested,NaT,...,medium,52.68,,,Home,False,,[],,25N J09



First 5 Driver Availability Entries (AVRO):


Unnamed: 0,driver_id,driver_name,status,update_time,driver_rating,is_wheelchair_accessible,vehicle_license_plate
0,c21c9ded-0d9c-4a30-84f7-ac864431ce04,Jennifer Davis,available,2025-04-19 14:25:20.742000+00:00,3.89,False,16T T95
1,de2915cf-b29e-44da-8912-bcf32d4a51a6,Jeremiah Jones,available,2025-04-19 14:38:20.743000+00:00,4.12,False,019 SYL
2,8f006be2-145e-4420-850d-47416b118795,Susan Nelson DDS,available,2025-04-19 14:23:20.743000+00:00,4.2,False,NO 21020
3,791a0b92-989a-4b0c-82f4-35bea99265e2,Manuel Hall,available,2025-04-19 15:01:20.743000+00:00,4.62,False,1912 IX
4,cd09d21c-d77a-49d8-a442-2c23418eb543,Alexander Best,available,2025-04-19 14:40:20.743000+00:00,4.88,False,RJH-8362
