#Milestone 1

**Group 10:** Nicolas Cubillo, Pablo Gomez, Sebastian Llobet, Pablo Jaime Rivera, Jorge Rodriguez

## Data Feeds + Schemas

### Feed 1: Passenger Requests

Includes ride requests and cancellations

In [1]:
PASSENGER_REQUEST_SCHEMA = {
  "type": "record",
  "name": "PassengerRequest",
  "fields": [
    {
      "name": "request_id",
      "type": "string",
      "doc": "Unique identifier for the passenger request."
    },
    {
      "name": "user_id",
      "type": "string",
      "doc": "Unique identifier for the user making the request."
    },
    {
      "name": "event_type",
      "type": {
        "type": "enum",
        "name": "EventType",
        "symbols": ["request", "cancel"]
      },
      "doc": "Type of event, either a ride request or a cancellation."
    },
    {
      "name": "timestamp",
      "type": "long",
      "doc": "Epoch timestamp indicating when the request was made."
    },
    {
      "name": "pickup_latitude",
      "type": "double",
      "doc": "Latitude coordinate of the pickup location."
    },
    {
      "name": "pickup_longitude",
      "type": "double",
      "doc": "Longitude coordinate of the pickup location."
    },
    {
      "name": "pickup_address",
      "type": "string",
      "doc": "Readable address of the pickup location."
    },
    {
      "name": "dropoff_latitude",
      "type": "double",
      "doc": "Latitude coordinate of the dropoff location."
    },
    {
      "name": "dropoff_longitude",
      "type": "double",
      "doc": "Longitude coordinate of the dropoff location."
    },
    {
      "name": "dropoff_address",
      "type": "string",
      "doc": "Readable address of the dropoff location."
    },
    {
      "name": "payment_method",
      "type": {
        "type": "enum",
        "name": "PaymentMethod",
        "symbols": ["credit_card", "debit_card", "cash", "mobile_payment"]
      },
      "doc": "Payment method chosen by the user."
    },
    {
      "name": "vehicle_type",
      "type": {
        "type": "enum",
        "name": "VehicleType",
        "symbols": ["standard", "premium", "luxury", "pool"]
      },
      "doc": "Type of vehicle requested for the ride."
    },
    {
      "name": "num_passengers",
      "type": "int",
      "doc": "Number of passengers included in the ride request."
    },
    {
      "name": "fare_estimate",
      "type": "float",
      "doc": "Estimated fare for the requested ride."
    },
    {
      "name": "ride_purpose",
      "type": {
        "type": "enum",
        "name": "RidePurpose",
        "symbols": ["commute", "airport", "business", "leisure"]
      },
      "doc": "Purpose of the ride request."
    },
    {
      "name": "device_type",
      "type": {
        "type": "enum",
        "name": "DeviceType",
        "symbols": ["mobile", "web"]
      },
      "doc": "Type of device used to make the ride request."
    }
  ]
}



### Feed 2: Ride Status

Tracks Accepted, Ongoing, Completed, Cancelled

In [2]:
RIDE_STATUS_SCHEMA = {
  "type": "record",
  "name": "RideStatus",
  "fields": [
    {
      "name": "ride_id",
      "type": "string",
      "doc": "Unique identifier for the ride."
    },
    {
      "name": "request_id",
      "type": "string",
      "doc": "Identifier linking this ride status to the original ride request."
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "RideStatusEnum",
        "symbols": ["accepted", "ongoing", "completed", "cancelled"]
      },
      "doc": "Current status of the ride."
    },
    {
      "name": "timestamp",
      "type": "long",
      "doc": "Epoch timestamp indicating when the status was recorded."
    },
    {
      "name": "driver_id",
      "type": "string",
      "doc": "Unique identifier for the driver assigned to the ride."
    },
    {
      "name": "estimated_arrival_time",
      "type": "long",
      "doc": "Estimated epoch timestamp of when the driver is expected to arrive at the pickup location."
    },
    {
      "name": "actual_arrival_time",
      "type": ["null", "long"],
      "default": None,
      "doc": "Actual epoch timestamp when the driver arrived at the pickup location. Null if not yet arrived."
    },
    {
      "name": "ride_duration",
      "type": ["null", "int"],
      "default": None,
      "doc": "Duration of the ride in minutes. Null if ride is not completed."
    },
    {
      "name": "distance_traveled",
      "type": ["null", "float"],
      "default": None,
      "doc": "Total distance traveled during the ride in kilometers. Null if ride is not completed."
    },
    {
      "name": "fare",
      "type": "float",
      "doc": "Final fare amount for the ride."
    },
    {
      "name": "surge_multiplier",
      "type": "float",
      "doc": "Surge pricing multiplier applied to the fare."
    },
    {
      "name": "traffic_condition",
      "type": {
        "type": "enum",
        "name": "TrafficCondition",
        "symbols": ["low", "medium", "high"]
      },
      "doc": "Traffic conditions during the ride."
    },
    {
      "name": "weather_condition",
      "type": {
        "type": "enum",
        "name": "WeatherCondition",
        "symbols": ["clear", "rainy", "snowy", "foggy", "stormy"]
      },
      "doc": "Weather conditions at the time of the ride."
    },
    {
      "name": "day_of_week",
      "type": {
        "type": "enum",
        "name": "DayOfWeek",
        "symbols": ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
      },
      "doc": "Day of the week when the ride took place."
    },
    {
      "name": "driver_rating",
      "type": ["null", "float"],
      "default": None,
      "doc": "Driver's rating for the ride (scale 1-5). Null if ride not completed."
    },
    {
      "name": "cancellation_reason",
      "type": ["null", {
        "type": "enum",
        "name": "CancellationReason",
        "symbols": ["user_request", "driver_unavailable", "technical_issue"]
      }],
      "default": None,
      "doc": "Reason for ride cancellation, if applicable. Null if not cancelled."
    },
    {
      "name": "payment_status",
      "type": {
        "type": "enum",
        "name": "PaymentStatus",
        "symbols": ["pending", "completed", "failed"]
      },
      "doc": "Current status of the ride payment."
    }
  ]
}


## Data Generation

In [3]:
!pip install Faker

Collecting Faker
  Downloading faker-36.2.2-py3-none-any.whl.metadata (15 kB)
Downloading faker-36.2.2-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m20.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Faker
Successfully installed Faker-36.2.2


In [4]:
!pip install mimesis

Collecting mimesis
  Downloading mimesis-18.0.0-py3-none-any.whl.metadata (5.7 kB)
Downloading mimesis-18.0.0-py3-none-any.whl (4.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.7/4.7 MB[0m [31m26.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mimesis
Successfully installed mimesis-18.0.0


In [5]:
!pip install fastavro

Collecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m33.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro
Successfully installed fastavro-1.10.0


In [6]:
import random
import time
import json
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from faker import Faker
from mimesis import Person, Address
from mimesis.locales import Locale
import fastavro

# Initialize Faker and Mimesis
fake = Faker()
person = Person(locale=Locale.EN)
address = Address(locale=Locale.EN)

# Define peak hours (higher ride requests)
PEAK_HOURS = [(7, 10.5), (16, 20)]  # Weekday rush hours
WEEKEND_PEAK_HOURS = [(12, 23)]  # Weekend peak hours
NIGHT_HOURS = (22, 5)  # Nighttime range

# Traffic and weather impact factors
TRAFFIC_CONDITIONS = {"low": 1.0, "medium": 1.2, "high": 1.5}
WEATHER_CONDITIONS = {"clear": 1.0, "rainy": 1.3, "snowy": 1.5, "foggy": 1.6, "stormy": 1.8}

# Define function to determine peak hour multiplier
def get_peak_multiplier(hour, weekday):
    if weekday < 5:  # Weekdays
        for start, end in PEAK_HOURS:
            if start <= hour <= end:
                return 1.5  # Peak hour multiplier
    else:  # Weekends
        for start, end in WEEKEND_PEAK_HOURS:
            if start <= hour <= end:
                return 1.5
    return 1.0

# Define function to determine night multiplier
def get_night_multiplier(hour):
    if NIGHT_HOURS[0] <= hour or hour <= NIGHT_HOURS[1]:
        return 1.1  # Slight increase at night
    return 1.0

# Generate synthetic ride requests
def generate_ride_request():
    timestamp = datetime.utcnow() + timedelta(hours=random.randint(-12, 12))  # Simulate past/future requests
    hour, weekday = timestamp.hour, timestamp.weekday()

    # Assign random weather and traffic conditions
    traffic_condition = random.choice(list(TRAFFIC_CONDITIONS.keys()))
    weather_condition = random.choice(list(WEATHER_CONDITIONS.keys()))

    # Calculate fare multiplier
    base_fare = random.uniform(5, 20)  # Base fare between $5 and $20
    multiplier = (
        get_peak_multiplier(hour, weekday) *
        get_night_multiplier(hour) *
        TRAFFIC_CONDITIONS[traffic_condition] *
        WEATHER_CONDITIONS[weather_condition]
    )
    fare = round(base_fare * multiplier, 2)

    request = {
        "request_id": fake.uuid4(),
        "user_id": fake.uuid4(),
        "event_type": random.choice(["request", "cancel"]),
        "timestamp": int(timestamp.timestamp()),
        "pickup_latitude": round(random.uniform(-90, 90), 6),
        "pickup_longitude": round(random.uniform(-180, 180), 6),
        "pickup_address": address.address(),
        "dropoff_latitude": round(random.uniform(-90, 90), 6),
        "dropoff_longitude": round(random.uniform(-180, 180), 6),
        "dropoff_address": address.address(),
        "payment_method": random.choice(["credit_card", "debit_card", "cash", "mobile_payment"]),
        "vehicle_type": random.choice(["standard", "premium", "luxury", "pool"]),
        "num_passengers": random.randint(1, 4),
        "fare_estimate": fare,
        "ride_purpose": random.choice(["commute", "airport", "business", "leisure"]),
        "device_type": random.choice(["mobile", "web"])
    }
    return request

# Generate synthetic ride status updates
def generate_ride_status(request_id):
    timestamp = datetime.utcnow() + timedelta(hours=random.randint(-12, 12))
    status = random.choice(["accepted", "ongoing", "completed", "cancelled"])
    driver_id = fake.uuid4()
    estimated_arrival_time = int(timestamp.timestamp()) + random.randint(60, 600)  # 1-10 minutes after request
    actual_arrival_time = estimated_arrival_time + random.randint(-120, 120) if status in ["ongoing", "completed"] else None
    ride_duration = random.randint(5, 60) if status == "completed" else None
    distance_traveled = round(random.uniform(1, 30), 2) if status == "completed" else None
    fare = round(random.uniform(5, 50), 2)
    surge_multiplier = round(random.uniform(1.0, 2.5), 2)
    traffic_condition = random.choice(list(TRAFFIC_CONDITIONS.keys()))
    weather_condition = random.choice(list(WEATHER_CONDITIONS.keys()))
    driver_rating = round(random.uniform(3.0, 5.0), 1) if status == "completed" else None
    cancellation_reason = random.choice(["user_request", "driver_unavailable", "technical_issue"]) if status == "cancelled" else None
    payment_status = random.choice(["pending", "completed", "failed"])

    ride_status = {
        "ride_id": fake.uuid4(),
        "request_id": request_id,
        "status": status,
        "timestamp": int(timestamp.timestamp()),
        "driver_id": driver_id,
        "estimated_arrival_time": estimated_arrival_time,
        "actual_arrival_time": actual_arrival_time,
        "ride_duration": ride_duration,
        "distance_traveled": distance_traveled,
        "fare": fare,
        "surge_multiplier": surge_multiplier,
        "traffic_condition": traffic_condition,
        "weather_condition": weather_condition,
        "day_of_week": timestamp.strftime("%A"),
        "driver_rating": driver_rating,
        "cancellation_reason": cancellation_reason,
        "payment_status": payment_status
    }
    return ride_status

# Serialize data to AVRO
def serialize_to_avro(data, schema, file_name):
    with open(file_name, "wb") as out_file:
        fastavro.writer(out_file, schema, data)


# Stream data generation
if __name__ == "__main__":
    requests = [generate_ride_request() for _ in range(1000)]
    statuses = [generate_ride_status(req["request_id"]) for req in requests]
    serialize_to_avro(requests, PASSENGER_REQUEST_SCHEMA, "passenger_requests.avro")
    serialize_to_avro(statuses, RIDE_STATUS_SCHEMA, "ride_statuses.avro")
    print("Serialized to AVRO files.")


Serialized to AVRO files.


In [7]:
def display_first_ten_records(filename):
    with open(filename, 'rb') as file:
        reader = fastavro.reader(file)
        for i in range(10):
            try:
                record = next(reader)
                print(record)
            except StopIteration:
                print("File contains less than 10 records.")
                break

In [8]:
display_first_ten_records('passenger_requests.avro')

{'request_id': 'b1c7c76f-09db-433e-87f5-de18ee81e80e', 'user_id': 'bff66f5a-45e0-4bf3-bf64-a2aa7d94d562', 'event_type': 'cancel', 'timestamp': 1741377359, 'pickup_latitude': 39.006253, 'pickup_longitude': 124.503267, 'pickup_address': '601 Grant Mall', 'dropoff_latitude': 20.406103, 'dropoff_longitude': 131.085288, 'dropoff_address': '551 Plaza Bridge', 'payment_method': 'cash', 'vehicle_type': 'luxury', 'num_passengers': 4, 'fare_estimate': 19.0, 'ride_purpose': 'commute', 'device_type': 'mobile'}
{'request_id': '74e0be89-15b2-43bf-877a-abc4a3fe1e5a', 'user_id': '69f7f016-1afb-4b12-b14b-4098656bfbc5', 'event_type': 'request', 'timestamp': 1741323359, 'pickup_latitude': -37.500585, 'pickup_longitude': 81.66645, 'pickup_address': '94 Palo Alto Bridge', 'dropoff_latitude': 37.400646, 'dropoff_longitude': 142.137955, 'dropoff_address': '814 Lettuce Plaza', 'payment_method': 'mobile_payment', 'vehicle_type': 'premium', 'num_passengers': 1, 'fare_estimate': 28.049999237060547, 'ride_purpose

In [9]:
display_first_ten_records('ride_statuses.avro')

{'ride_id': '3c7e215c-689e-4e9a-88d5-f1aebd4d0050', 'request_id': 'b1c7c76f-09db-433e-87f5-de18ee81e80e', 'status': 'completed', 'timestamp': 1741341359, 'driver_id': 'd9a23ad8-6d37-4731-b6a8-bce8fc1ac781', 'estimated_arrival_time': 1741341625, 'actual_arrival_time': 1741341678, 'ride_duration': 10, 'distance_traveled': 23.06999969482422, 'fare': 40.970001220703125, 'surge_multiplier': 1.1100000143051147, 'traffic_condition': 'medium', 'weather_condition': 'foggy', 'day_of_week': 'Friday', 'driver_rating': 4.900000095367432, 'cancellation_reason': None, 'payment_status': 'completed'}
{'ride_id': 'c9ae35a4-865c-4312-857a-7ed0068a786f', 'request_id': '74e0be89-15b2-43bf-877a-abc4a3fe1e5a', 'status': 'ongoing', 'timestamp': 1741384559, 'driver_id': '07379724-d0f6-4f06-ac78-ad44da8a8894', 'estimated_arrival_time': 1741384734, 'actual_arrival_time': 1741384790, 'ride_duration': None, 'distance_traveled': None, 'fare': 6.909999847412109, 'surge_multiplier': 2.3399999141693115, 'traffic_condi