# Milestone 1 - Data Feed Generation

## Step 1: Set up the project structure and define AVRO schemas

In [1]:
# Step 1: Set up the project structure and define AVRO schemas

import numpy as np
import pandas as pd
import fastavro
import json
import datetime
import random
import time
import os
import uuid
from faker import Faker
from geopy.distance import geodesic
import math

# Define AVRO schemas for ride requests
ride_request_schema = {
    "namespace": "ridehailing.avro",
    "type": "record",
    "name": "RideRequest",
    "fields": [
        {"name": "request_id", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "pickup_location", "type": {"type": "record", "name": "Location", 
            "fields": [
                {"name": "latitude", "type": "double"},
                {"name": "longitude", "type": "double"}
            ]}
        },
        {"name": "destination", "type": "Location"},
        {"name": "status", "type": {"type": "enum", "name": "RequestStatus", 
            "symbols": ["REQUESTED", "ACCEPTED", "CANCELLED", "COMPLETED"]}},
        {"name": "vehicle_type", "type": "string"},
        {"name": "estimated_fare", "type": "double"},
        {"name": "estimated_duration", "type": "int"},
        {"name": "estimated_distance", "type": "double"},
        {"name": "passenger_count", "type": "int"}
    ]
}

# Define AVRO schema for driver status
driver_status_schema = {
    "namespace": "ridehailing.avro",
    "type": "record",
    "name": "DriverStatus",
    "fields": [
        {"name": "driver_id", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "current_location", "type": {"type": "record", "name": "Location", 
            "fields": [
                {"name": "latitude", "type": "double"},
                {"name": "longitude", "type": "double"}
            ]}
        },
        {"name": "status", "type": {"type": "enum", "name": "DriverStatus", 
            "symbols": ["AVAILABLE", "BUSY", "OFFLINE", "EN_ROUTE_TO_PICKUP", "WITH_PASSENGER"]}},
        {"name": "vehicle_info", "type": {"type": "record", "name": "Vehicle", 
            "fields": [
                {"name": "vehicle_id", "type": "string"},
                {"name": "vehicle_type", "type": "string"},
                {"name": "capacity", "type": "int"}
            ]}
        },
        {"name": "current_ride_id", "type": ["null", "string"]},
        {"name": "last_update", "type": "long"},
        {"name": "battery_level", "type": ["null", "double"]}
    ]
}

# Create directory structure for output data
os.makedirs('data/json/ride_requests', exist_ok=True)
os.makedirs('data/json/driver_status', exist_ok=True)
os.makedirs('data/avro/ride_requests', exist_ok=True)
os.makedirs('data/avro/driver_status', exist_ok=True)

print("Project structure and AVRO schemas set up successfully!")

Project structure and AVRO schemas set up successfully!


## Step 2: Implement the CityGrid class

In [2]:
# Step 2: Implement the CityGrid class

class CityGrid:
    def __init__(self, lat_range=(40.70, 40.85), lon_range=(-74.05, -73.90)):
        """
        Initialize a city grid with hotspots
        Default coordinates approximate New York City
        """
        self.lat_range = lat_range
        self.lon_range = lon_range
        
        # Define hotspots (business districts, residential areas, entertainment zones)
        self.hotspots = {
            'business_districts': [
                {'center': (40.75, -73.98), 'radius': 0.02, 'name': 'Midtown'}, # Midtown
                {'center': (40.71, -74.01), 'radius': 0.015, 'name': 'Financial District'}, # Financial District
            ],
            'residential_areas': [
                {'center': (40.78, -73.95), 'radius': 0.025, 'name': 'Upper East Side'}, # Upper East Side
                {'center': (40.73, -73.99), 'radius': 0.02, 'name': 'Greenwich Village'}, # Greenwich Village
                {'center': (40.80, -73.96), 'radius': 0.025, 'name': 'Upper West Side'}, # Upper West Side
            ],
            'entertainment_zones': [
                {'center': (40.76, -73.98), 'radius': 0.015, 'name': 'Times Square'}, # Times Square
                {'center': (40.74, -73.99), 'radius': 0.015, 'name': 'Chelsea'}, # Chelsea
            ],
            'airports': [
                {'center': (40.64, -73.78), 'radius': 0.03, 'name': 'JFK Airport'}, # JFK Airport
                {'center': (40.77, -73.87), 'radius': 0.02, 'name': 'LaGuardia Airport'}, # LaGuardia
            ]
        }
        
    def get_random_location(self, hotspot_bias=0.8):
        """
        Generate a random location within the city grid
        With a bias towards hotspots
        """
        if random.random() < hotspot_bias:
            # Select a random hotspot category
            category = random.choice(list(self.hotspots.keys()))
            # Select a random hotspot within that category
            hotspot = random.choice(self.hotspots[category])
            
            # Generate a point within the hotspot radius
            angle = random.uniform(0, 2 * math.pi)
            radius = random.uniform(0, hotspot['radius'])
            
            # Convert polar coordinates to lat/lon offset
            lat_offset = radius * math.cos(angle)
            lon_offset = radius * math.sin(angle)
            
            return (
                hotspot['center'][0] + lat_offset,
                hotspot['center'][1] + lon_offset
            )
        else:
            # Generate a completely random location within the city bounds
            return (
                random.uniform(self.lat_range[0], self.lat_range[1]),
                random.uniform(self.lon_range[0], self.lon_range[1])
            )
    
    def get_hotspot_demand_factor(self, location, time_of_day):
        """
        Calculate demand factor based on location and time of day
        Returns a multiplier for ride request probability
        """
        lat, lon = location
        demand_factor = 1.0
        
        # Check if location is in or near a hotspot
        for category, hotspots in self.hotspots.items():
            for hotspot in hotspots:
                distance = geodesic(
                    (lat, lon), 
                    hotspot['center']
                ).kilometers
                
                # If within hotspot radius, adjust demand factor
                if distance <= hotspot['radius'] * 111:  # Convert degrees to km approximately
                    if category == 'business_districts':
                        # Higher demand during morning and evening rush hours
                        if 7 <= time_of_day.hour < 10:  # Morning rush
                            demand_factor += 2.0
                        elif 16 <= time_of_day.hour < 19:  # Evening rush
                            demand_factor += 2.5
                        elif 12 <= time_of_day.hour < 14:  # Lunch time
                            demand_factor += 1.5
                        elif 22 <= time_of_day.hour or time_of_day.hour < 5:  # Late night
                            demand_factor += 0.2
                    
                    elif category == 'residential_areas':
                        # Higher demand in early morning and evening
                        if 6 <= time_of_day.hour < 9:  # Early morning
                            demand_factor += 1.8
                        elif 17 <= time_of_day.hour < 20:  # Evening return
                            demand_factor += 1.2
                    
                    elif category == 'entertainment_zones':
                        # Higher demand in evenings and late night
                        if 18 <= time_of_day.hour < 22:  # Evening
                            demand_factor += 2.0
                        elif 22 <= time_of_day.hour or time_of_day.hour < 3:  # Late night
                            demand_factor += 3.0
                    
                    elif category == 'airports':
                        # Steady demand throughout day with slight peaks
                        demand_factor += 1.0
                        if 7 <= time_of_day.hour < 10 or 16 <= time_of_day.hour < 20:
                            demand_factor += 0.5
        
        return max(0.1, demand_factor)  # Ensure minimum demand factor

    def calculate_trip_details(self, pickup, destination):
        """
        Calculate estimated trip duration, distance and fare
        """
        # Calculate distance in kilometers
        distance_km = geodesic(pickup, destination).kilometers
        
        # Estimate duration (assume average speed of 25 km/h in city)
        # Add random variation to account for traffic
        avg_speed = random.uniform(15, 35)  # km/h
        duration_minutes = (distance_km / avg_speed) * 60
        
        # Add random traffic delay (0-10 minutes)
        duration_minutes += random.uniform(0, 10)
        
        # Calculate fare
        base_fare = 2.50
        per_km_rate = 1.75
        per_minute_rate = 0.35
        
        fare = base_fare + (distance_km * per_km_rate) + (duration_minutes * per_minute_rate)
        
        # Round values
        distance_km = round(distance_km, 2)
        duration_minutes = int(round(duration_minutes))
        fare = round(fare, 2)
        
        return {
            'distance': distance_km,
            'duration': duration_minutes,
            'fare': fare
        }

# Test the CityGrid class
if __name__ == "__main__":
    city = CityGrid()
    test_location = city.get_random_location()
    print(f"Random location: {test_location}")
    
    test_time = datetime.datetime.now()
    demand = city.get_hotspot_demand_factor(test_location, test_time)
    print(f"Demand factor at {test_time.hour}:00: {demand}")
    
    pickup = city.get_random_location()
    destination = city.get_random_location()
    trip = city.calculate_trip_details(pickup, destination)
    print(f"Trip from {pickup} to {destination}:")
    print(f"  Distance: {trip['distance']} km")
    print(f"  Duration: {trip['duration']} minutes")
    print(f"  Fare: ${trip['fare']}")

Random location: (40.75601220134024, -73.99056262201339)
Demand factor at 11:00: 1.0
Trip from (40.75578599882364, -73.9552958230431) to (40.77101308675731, -73.94244789690988):
  Distance: 2.01 km
  Duration: 12 minutes
  Fare: $10.08


## Step 3: Implement the core RideHailingDataGenerator class

In [3]:
# Step 3: Implement the core RideHailingDataGenerator class

class RideHailingDataGenerator:
    def __init__(self, config):
        """
        Initialize the ride-hailing data generator with configuration
        
        Parameters:
        -----------
        config : dict
            Configuration dictionary with parameters like:
            - num_drivers: Number of drivers to simulate
            - num_users: Number of users to simulate
            - start_time: Simulation start time (format: 'YYYY-MM-DD HH:MM:SS')
            - base_request_rate: Average number of requests per minute
        """
        self.config = config
        self.faker = Faker()
        self.city_grid = CityGrid()
        self.current_time = datetime.datetime.strptime(config['start_time'], '%Y-%m-%d %H:%M:%S')
        
        # Vehicle types with their capacities - define this BEFORE initializing drivers
        self.vehicle_types = {
            'Economy': 4,
            'Comfort': 4,
            'Premium': 4,
            'SUV': 6,
            'Van': 8
        }
        
        # Initialize drivers and users
        self.drivers = self._initialize_drivers(config['num_drivers'])
        self.users = self._initialize_users(config['num_users'])
        
        # Active rides tracking
        self.active_rides = {}  # request_id -> ride details
        
        print(f"Initialized RideHailingDataGenerator with {len(self.drivers)} drivers and {len(self.users)} users")
        
    def _initialize_drivers(self, num_drivers):
        """
        Initialize driver pool with random attributes
        
        Parameters:
        -----------
        num_drivers : int
            Number of drivers to initialize
            
        Returns:
        --------
        dict
            Dictionary of driver_id -> driver_details
        """
        drivers = {}
        
        for _ in range(num_drivers):
            driver_id = f"D-{str(uuid.uuid4())[:8]}"
            location = self.city_grid.get_random_location()
            
            # Randomly select vehicle type
            vehicle_type = random.choice(list(self.vehicle_types.keys()))
            
            # Initial status distribution: 60% available, 30% busy, 10% offline
            status = random.choices(
                ['AVAILABLE', 'BUSY', 'OFFLINE'], 
                weights=[0.6, 0.3, 0.1]
            )[0]
            
            drivers[driver_id] = {
                'driver_id': driver_id,
                'current_location': {
                    'latitude': location[0],
                    'longitude': location[1]
                },
                'status': status,
                'vehicle_info': {
                    'vehicle_id': f"V-{str(uuid.uuid4())[:8]}",
                    'vehicle_type': vehicle_type,
                    'capacity': self.vehicle_types[vehicle_type]
                },
                'current_ride_id': None,
                'last_update': int(self.current_time.timestamp()),
                'battery_level': random.uniform(0.3, 1.0)
            }
        
        return drivers
    
    def _initialize_users(self, num_users):
        """
        Initialize user pool with random attributes
        
        Parameters:
        -----------
        num_users : int
            Number of users to initialize
            
        Returns:
        --------
        dict
            Dictionary of user_id -> user_details
        """
        users = {}
        
        for _ in range(num_users):
            user_id = f"U-{str(uuid.uuid4())[:8]}"
            
            # Generate home and work locations (typically in residential and business areas)
            home_location = self.city_grid.get_random_location(hotspot_bias=0.9)
            work_location = self.city_grid.get_random_location(hotspot_bias=0.9)
            
            # Preferred vehicle type (weighted towards economy)
            preferred_vehicle_type = random.choices(
                list(self.vehicle_types.keys()), 
                weights=[0.5, 0.25, 0.15, 0.07, 0.03]
            )[0]
            
            users[user_id] = {
                'user_id': user_id,
                'name': self.faker.name(),
                'home_location': home_location,
                'work_location': work_location,
                'preferred_vehicle_type': preferred_vehicle_type,
                'cancellation_probability': random.uniform(0.02, 0.08),  # 2-8% chance to cancel
                'last_activity': int(self.current_time.timestamp())
            }
        
        return users

# Test the RideHailingDataGenerator initialization
if __name__ == "__main__":
    config = {
        'num_drivers': 10,  # Small number for testing
        'num_users': 50,    # Small number for testing
        'start_time': '2023-03-01 08:00:00',
        'base_request_rate': 5  # Average requests per minute
    }
    
    generator = RideHailingDataGenerator(config)
    
    # Print a sample driver
    sample_driver_id = list(generator.drivers.keys())[0]
    print("\nSample Driver:")
    print(json.dumps(generator.drivers[sample_driver_id], indent=2))
    
    # Print a sample user
    sample_user_id = list(generator.users.keys())[0]
    print("\nSample User:")
    print(json.dumps(generator.users[sample_user_id], indent=2))


Initialized RideHailingDataGenerator with 10 drivers and 50 users

Sample Driver:
{
  "driver_id": "D-1abb8413",
  "current_location": {
    "latitude": 40.7486351276492,
    "longitude": -73.98574113010186
  },
  "status": "BUSY",
  "vehicle_info": {
    "vehicle_id": "V-791389af",
    "vehicle_type": "Premium",
    "capacity": 4
  },
  "current_ride_id": null,
  "last_update": 1677654000,
  "battery_level": 0.3741492170311884
}

Sample User:
{
  "user_id": "U-a37ecc9c",
  "name": "Jesus Palmer",
  "home_location": [
    40.77249442631335,
    -73.87157506799412
  ],
  "work_location": [
    40.75997029257859,
    -73.98706024920546
  ],
  "preferred_vehicle_type": "SUV",
  "cancellation_probability": 0.07186629781386039,
  "last_activity": 1677654000
}


## Step 4: Implement ride request generation

In [4]:
# Step 4: Implement ride request generation

def _get_time_factors(self):
    """
    Calculate time-based factors affecting ride demand
    
    Returns:
    --------
    dict
        Dictionary of time-based factors affecting demand
    """
    hour = self.current_time.hour
    weekday = self.current_time.weekday()  # 0=Monday, 6=Sunday
    
    # Base demand by hour (24-hour format)
    hourly_factors = {
        0: 0.4, 1: 0.3, 2: 0.2, 3: 0.1, 4: 0.1, 5: 0.2,
        6: 0.5, 7: 1.5, 8: 2.0, 9: 1.8, 10: 1.0, 11: 1.0,
        12: 1.2, 13: 1.2, 14: 1.0, 15: 1.0, 16: 1.5, 17: 2.0,
        18: 1.8, 19: 1.5, 20: 1.3, 21: 1.2, 22: 1.0, 23: 0.7
    }
    
    # Weekend vs weekday factors
    weekday_factor = 1.0 if weekday < 5 else 1.3
    
    # Special events (randomly occurring)
    special_event_factor = 1.0
    if random.random() < 0.05:  # 5% chance of special event
        special_event_factor = random.uniform(1.5, 3.0)
        print(f"Special event occurring at {self.current_time}! Demand multiplier: {special_event_factor:.2f}")
    
    # Weather effect (randomly occurring)
    weather_factor = 1.0
    if random.random() < 0.1:  # 10% chance of bad weather
        weather_factor = random.uniform(1.2, 1.8)
        print(f"Bad weather at {self.current_time}! Demand multiplier: {weather_factor:.2f}")
    
    return {
        'hourly': hourly_factors.get(hour, 1.0),
        'weekday': weekday_factor,
        'special_event': special_event_factor,
        'weather': weather_factor
    }

def generate_ride_requests(self):
    """
    Generate ride requests based on time of day, location, etc.
    
    Returns:
    --------
    list
        List of generated ride request events
    """
    time_factors = self._get_time_factors()
    
    # Calculate base request rate per minute
    base_rate = self.config['base_request_rate']
    adjusted_rate = base_rate * time_factors['hourly'] * time_factors['weekday'] * \
                   time_factors['special_event'] * time_factors['weather']
    
    # Determine number of requests to generate this minute
    num_requests = np.random.poisson(adjusted_rate)
    
    requests = []
    for _ in range(num_requests):
        # Select a random user
        user_id = random.choice(list(self.users.keys()))
        user = self.users[user_id]
        
        # Determine pickup location (biased towards user's home/work during certain hours)
        hour = self.current_time.hour
        if 6 <= hour < 10 and random.random() < 0.7:  # Morning commute
            pickup = user['home_location']
            destination = user['work_location']
        elif 16 <= hour < 19 and random.random() < 0.7:  # Evening commute
            pickup = user['work_location']
            destination = user['home_location']
        else:
            # Random locations with hotspot bias
            pickup = self.city_grid.get_random_location()
            destination = self.city_grid.get_random_location()
            
            # Ensure pickup and destination are not too close
            while geodesic(pickup, destination).kilometers < 1.0:
                destination = self.city_grid.get_random_location()
        
        # Calculate trip details
        trip_details = self.city_grid.calculate_trip_details(pickup, destination)
        
        # Generate request
        request_id = f"R-{str(uuid.uuid4())}"
        timestamp = int(self.current_time.timestamp())
        
        request = {
            'request_id': request_id,
            'user_id': user_id,
            'timestamp': timestamp,
            'pickup_location': {
                'latitude': pickup[0],
                'longitude': pickup[1]
            },
            'destination': {
                'latitude': destination[0],
                'longitude': destination[1]
            },
            'status': 'REQUESTED',
            'vehicle_type': user['preferred_vehicle_type'],
            'estimated_fare': trip_details['fare'],
            'estimated_duration': trip_details['duration'],
            'estimated_distance': trip_details['distance'],
            'passenger_count': random.randint(1, self.vehicle_types[user['preferred_vehicle_type']])
        }
        
        # Add to active rides
        self.active_rides[request_id] = {
            'request': request,
            'assigned_driver': None,
            'status': 'REQUESTED',
            'created_at': self.current_time,
            'pickup_time': None,
            'completion_time': None
        }
        
        requests.append(request)
    
    if requests:
        print(f"Generated {len(requests)} ride requests at {self.current_time}")
    
    return requests

# Add these methods to the RideHailingDataGenerator class
RideHailingDataGenerator._get_time_factors = _get_time_factors
RideHailingDataGenerator.generate_ride_requests = generate_ride_requests

# Test ride request generation
if __name__ == "__main__":
    config = {
        'num_drivers': 10,
        'num_users': 50,
        'start_time': '2023-03-01 08:00:00',  # Morning rush hour
        'base_request_rate': 5  # Average requests per minute
    }
    
    generator = RideHailingDataGenerator(config)
    
    # Generate ride requests for the current time
    ride_requests = generator.generate_ride_requests()
    
    # Print a sample ride request
    if ride_requests:
        print("\nSample Ride Request:")
        print(json.dumps(ride_requests[0], indent=2))
        
        print(f"\nTotal active rides: {len(generator.active_rides)}")
    else:
        print("No ride requests generated in this iteration.")


Initialized RideHailingDataGenerator with 10 drivers and 50 users
Generated 4 ride requests at 2023-03-01 08:00:00

Sample Ride Request:
{
  "request_id": "R-939722f9-0d36-4a5c-ab2a-c06c6a2915bf",
  "user_id": "U-79aaa849",
  "timestamp": 1677654000,
  "pickup_location": {
    "latitude": 40.629296252415465,
    "longitude": -73.79278698281433
  },
  "destination": {
    "latitude": 40.77132018876232,
    "longitude": -73.86657707425255
  },
  "status": "REQUESTED",
  "vehicle_type": "Premium",
  "estimated_fare": 45.49,
  "estimated_duration": 38,
  "estimated_distance": 16.96,
  "passenger_count": 4
}

Total active rides: 4


## Step 5: Implement driver status updates and ride matching

In [5]:
# Step 5: Implement driver status updates and ride matching

def update_driver_statuses(self):
    """
    Update driver locations and statuses
    Match available drivers with ride requests
    
    Returns:
    --------
    list
        List of driver status update events
    """
    driver_updates = []
    
    # First, process ride matching for pending requests
    self._match_drivers_to_requests()
    
    # Then update all drivers
    for driver_id, driver in self.drivers.items():
        # Skip drivers who are offline
        if driver['status'] == 'OFFLINE':
            # 5% chance of coming online
            if random.random() < 0.05:
                driver['status'] = 'AVAILABLE'
                print(f"Driver {driver_id} came online")
            else:
                continue
        
        # Update timestamp
        timestamp = int(self.current_time.timestamp())
        driver['last_update'] = timestamp
        
        # Update location based on status
        current_lat = driver['current_location']['latitude']
        current_lon = driver['current_location']['longitude']
        
        if driver['status'] == 'AVAILABLE':
            # Move randomly, with bias towards hotspots
            if random.random() < 0.7:  # 70% chance to move towards hotspots
                # Get a random hotspot
                category = random.choice(list(self.city_grid.hotspots.keys()))
                hotspot = random.choice(self.city_grid.hotspots[category])
                
                # Move slightly towards the hotspot
                target_lat, target_lon = hotspot['center']
                
                # Calculate direction vector
                direction_lat = target_lat - current_lat
                direction_lon = target_lon - current_lon
                
                # Normalize and scale (simulate driving at ~20-30 km/h)
                distance = ((direction_lat ** 2) + (direction_lon ** 2)) ** 0.5
                if distance > 0:
                    # Convert to unit vector and scale
                    # ~0.0005 degrees is roughly 50 meters
                    scale = min(distance, 0.0005) / distance
                    new_lat = current_lat + (direction_lat * scale)
                    new_lon = current_lon + (direction_lon * scale)
                else:
                    # If already at hotspot, move randomly
                    new_lat = current_lat + random.uniform(-0.0005, 0.0005)
                    new_lon = current_lon + random.uniform(-0.0005, 0.0005)
            else:
                # Random movement
                new_lat = current_lat + random.uniform(-0.0005, 0.0005)
                new_lon = current_lon + random.uniform(-0.0005, 0.0005)
                
            # Ensure we stay within city bounds
            new_lat = max(self.city_grid.lat_range[0], min(new_lat, self.city_grid.lat_range[1]))
            new_lon = max(self.city_grid.lon_range[0], min(new_lon, self.city_grid.lon_range[1]))
            
            driver['current_location']['latitude'] = new_lat
            driver['current_location']['longitude'] = new_lon
            
        elif driver['status'] == 'EN_ROUTE_TO_PICKUP':
            # Driver is heading to pickup a passenger
            ride_id = driver['current_ride_id']
            if ride_id and ride_id in self.active_rides:
                ride = self.active_rides[ride_id]
                pickup_lat = ride['request']['pickup_location']['latitude']
                pickup_lon = ride['request']['pickup_location']['longitude']
                
                # Calculate direction to pickup
                direction_lat = pickup_lat - current_lat
                direction_lon = pickup_lon - current_lon
                
                # Calculate distance to pickup
                distance = ((direction_lat ** 2) + (direction_lon ** 2)) ** 0.5
                
                if distance < 0.0001:  # ~10 meters, driver has arrived at pickup
                    # Update ride status
                    ride['status'] = 'PICKUP_ARRIVED'
                    ride['pickup_time'] = self.current_time
                    ride['request']['status'] = 'ACCEPTED'
                    
                    # Update driver status
                    driver['status'] = 'WITH_PASSENGER'
                    print(f"Driver {driver_id} arrived at pickup for ride {ride_id}")
                else:
                    # Move towards pickup
                    # Scale movement (faster than cruising)
                    scale = min(distance, 0.001) / distance
                    new_lat = current_lat + (direction_lat * scale)
                    new_lon = current_lon + (direction_lon * scale)
                    
                    driver['current_location']['latitude'] = new_lat
                    driver['current_location']['longitude'] = new_lon
            else:
                # Something went wrong, reset driver
                driver['status'] = 'AVAILABLE'
                driver['current_ride_id'] = None
                print(f"Driver {driver_id} reset due to invalid ride reference")
                
        elif driver['status'] == 'WITH_PASSENGER':
            # Driver is taking passenger to destination
            ride_id = driver['current_ride_id']
            if ride_id and ride_id in self.active_rides:
                ride = self.active_rides[ride_id]
                dest_lat = ride['request']['destination']['latitude']
                dest_lon = ride['request']['destination']['longitude']
                
                # Calculate direction to destination
                direction_lat = dest_lat - current_lat
                direction_lon = dest_lon - current_lon
                
                # Calculate distance to destination
                distance = ((direction_lat ** 2) + (direction_lon ** 2)) ** 0.5
                
                if distance < 0.0001:  # ~10 meters, driver has arrived at destination
                    # Update ride status
                    ride['status'] = 'COMPLETED'
                    ride['completion_time'] = self.current_time
                    ride['request']['status'] = 'COMPLETED'
                    
                    # Update driver status
                    driver['status'] = 'AVAILABLE'
                    driver['current_ride_id'] = None
                    print(f"Driver {driver_id} completed ride {ride_id}")
                else:
                    # Move towards destination
                    # Scale movement (faster than cruising)
                    scale = min(distance, 0.001) / distance
                    new_lat = current_lat + (direction_lat * scale)
                    new_lon = current_lon + (direction_lon * scale)
                    
                    driver['current_location']['latitude'] = new_lat
                    driver['current_location']['longitude'] = new_lon
            else:
                # Something went wrong, reset driver
                driver['status'] = 'AVAILABLE'
                driver['current_ride_id'] = None
                print(f"Driver {driver_id} reset due to invalid ride reference")
        
        # Random chance for driver to go offline if available
        if driver['status'] == 'AVAILABLE' and random.random() < 0.01:  # 1% chance per minute
            driver['status'] = 'OFFLINE'
            print(f"Driver {driver_id} went offline")
        
        # Create driver status update event
        driver_update = {
            'driver_id': driver_id,
            'timestamp': timestamp,
            'current_location': driver['current_location'],
            'status': driver['status'],
            'vehicle_info': driver['vehicle_info'],
            'current_ride_id': driver['current_ride_id'],
            'last_update': driver['last_update'],
            'battery_level': driver['battery_level']
        }
        
        driver_updates.append(driver_update)
    
    return driver_updates

def _match_drivers_to_requests(self):
    """
    Match available drivers with pending ride requests
    """
    # Get all pending ride requests
    pending_requests = {
        req_id: ride for req_id, ride in self.active_rides.items() 
        if ride['status'] == 'REQUESTED' and ride['assigned_driver'] is None
    }
    
    if not pending_requests:
        return
    
    # Get all available drivers
    available_drivers = {
        driver_id: driver for driver_id, driver in self.drivers.items()
        if driver['status'] == 'AVAILABLE'
    }
    
    if not available_drivers:
        return
    
    # Sort requests by timestamp (oldest first)
    sorted_requests = sorted(
        pending_requests.items(),
        key=lambda x: x[1]['created_at']
    )
    
    # For each request, find the closest available driver with matching vehicle type
    for request_id, ride in sorted_requests:
        request = ride['request']
        required_vehicle_type = request['vehicle_type']
        pickup_location = (
            request['pickup_location']['latitude'],
            request['pickup_location']['longitude']
        )
        
        # Find drivers with matching vehicle type
        matching_drivers = {
            driver_id: driver for driver_id, driver in available_drivers.items()
            if driver['vehicle_info']['vehicle_type'] == required_vehicle_type
        }
        
        # If no exact match, consider any available driver
        if not matching_drivers:
            matching_drivers = available_drivers
        
        # Find closest driver
        closest_driver_id = None
        min_distance = float('inf')
        
        for driver_id, driver in matching_drivers.items():
            driver_location = (
                driver['current_location']['latitude'],
                driver['current_location']['longitude']
            )
            
            distance = geodesic(driver_location, pickup_location).kilometers
            
            if distance < min_distance:
                min_distance = distance
                closest_driver_id = driver_id
        
        # If we found a driver within reasonable distance (10km)
        if closest_driver_id and min_distance <= 10:
            # Assign driver to ride
            ride['assigned_driver'] = closest_driver_id
            ride['status'] = 'DRIVER_ASSIGNED'
            
            # Update driver status
            driver = self.drivers[closest_driver_id]
            driver['status'] = 'EN_ROUTE_TO_PICKUP'
            driver['current_ride_id'] = request_id
            
            # Remove driver from available pool
            available_drivers.pop(closest_driver_id)
            
            print(f"Matched driver {closest_driver_id} to ride {request_id} (distance: {min_distance:.2f} km)")

# Add these methods to the RideHailingDataGenerator class
RideHailingDataGenerator.update_driver_statuses = update_driver_statuses
RideHailingDataGenerator._match_drivers_to_requests = _match_drivers_to_requests

# Test driver updates and matching
if __name__ == "__main__":
    config = {
        'num_drivers': 20,  # More drivers for better matching
        'num_users': 50,
        'start_time': '2023-03-01 08:00:00',
        'base_request_rate': 5
    }
    
    generator = RideHailingDataGenerator(config)
    
    # Generate some ride requests
    ride_requests = generator.generate_ride_requests()
    
    # Update driver statuses and match with requests
    driver_updates = generator.update_driver_statuses()
    
    # Print matching results
    print("\nActive Rides Status:")
    for ride_id, ride in generator.active_rides.items():
        print(f"Ride {ride_id}: Status={ride['status']}, Driver={ride['assigned_driver']}")
    
    # Print a sample driver update
    if driver_updates:
        print("\nSample Driver Update:")
        print(json.dumps(driver_updates[0], indent=2))


Initialized RideHailingDataGenerator with 20 drivers and 50 users
Generated 6 ride requests at 2023-03-01 08:00:00
Matched driver D-56583691 to ride R-1635c21b-a7ab-4830-830f-eb9d63b88964 (distance: 1.37 km)
Matched driver D-f931fda0 to ride R-38c7a13f-355f-471c-8a3e-8e68fad1d767 (distance: 3.98 km)
Matched driver D-0b625d05 to ride R-2b208e4b-7aba-41b1-b806-f13a5008f318 (distance: 0.34 km)
Matched driver D-2bf5976e to ride R-1dd17352-832a-4ff6-9d6c-0b1d327ec89c (distance: 2.52 km)

Active Rides Status:
Ride R-1635c21b-a7ab-4830-830f-eb9d63b88964: Status=DRIVER_ASSIGNED, Driver=D-56583691
Ride R-38c7a13f-355f-471c-8a3e-8e68fad1d767: Status=DRIVER_ASSIGNED, Driver=D-f931fda0
Ride R-1ad9eaa5-9772-4582-aaf6-d67d70ad9327: Status=REQUESTED, Driver=None
Ride R-1cda35ef-9820-4e5a-a943-21af5cb2274f: Status=REQUESTED, Driver=None
Ride R-2b208e4b-7aba-41b1-b806-f13a5008f318: Status=DRIVER_ASSIGNED, Driver=D-0b625d05
Ride R-1dd17352-832a-4ff6-9d6c-0b1d327ec89c: Status=DRIVER_ASSIGNED, Driver=D-2b

## Step 6: Implement ride lifecycle management


In [6]:
# Step 6: Implement ride lifecycle management

def process_ride_lifecycle_events(self):
    """
    Process ride lifecycle events: cancellations, completions, etc.
    
    Returns:
    --------
    list
        List of ride status update events
    """
    ride_updates = []
    rides_to_remove = []
    
    for ride_id, ride in self.active_rides.items():
        request = ride['request']
        status = ride['status']
        user_id = request['user_id']
        user = self.users[user_id]
        
        # Process based on current status
        if status == 'REQUESTED':
            # Ride is waiting for a driver
            # Check if user cancels
            wait_time = (self.current_time - ride['created_at']).total_seconds() / 60  # in minutes
            
            # Increasing cancellation probability the longer they wait
            cancel_prob = user['cancellation_probability'] * (1 + wait_time / 5)
            
            if random.random() < cancel_prob:
                # User cancels the ride
                ride['status'] = 'CANCELLED'
                request['status'] = 'CANCELLED'
                print(f"User {user_id} cancelled ride {ride_id} after waiting {wait_time:.1f} minutes")
                
                ride_update = request.copy()
                ride_updates.append(ride_update)
                
                # Mark for removal from active rides
                rides_to_remove.append(ride_id)
            
            # If waiting too long (>15 min), auto-cancel
            elif wait_time > 25:
                ride['status'] = 'CANCELLED'
                request['status'] = 'CANCELLED'
                print(f"System auto-cancelled ride {ride_id} after excessive wait time ({wait_time:.1f} minutes)")
                
                ride_update = request.copy()
                ride_updates.append(ride_update)
                
                # Mark for removal from active rides
                rides_to_remove.append(ride_id)
                
        elif status == 'DRIVER_ASSIGNED':
            # Driver is assigned but not yet at pickup
            # Small chance of driver cancellation
            if random.random() < 0.02:  # 2% chance per minute
                driver_id = ride['assigned_driver']
                
                if driver_id and driver_id in self.drivers:
                    # Reset driver
                    driver = self.drivers[driver_id]
                    driver['status'] = 'AVAILABLE'
                    driver['current_ride_id'] = None
                
                # Reset ride
                ride['status'] = 'REQUESTED'
                ride['assigned_driver'] = None
                print(f"Driver {driver_id} cancelled assignment to ride {ride_id}")
                
                ride_update = request.copy()
                ride_updates.append(ride_update)
        
        elif status == 'PICKUP_ARRIVED':
            # Driver has arrived at pickup
            # Small chance of user no-show
            if random.random() < 0.05:  # 5% chance
                driver_id = ride['assigned_driver']
                
                if driver_id and driver_id in self.drivers:
                    # Reset driver
                    driver = self.drivers[driver_id]
                    driver['status'] = 'AVAILABLE'
                    driver['current_ride_id'] = None
                
                # Update ride
                ride['status'] = 'CANCELLED'
                request['status'] = 'CANCELLED'
                print(f"User {user_id} no-show for ride {ride_id}")
                
                ride_update = request.copy()
                ride_updates.append(ride_update)
                
                # Mark for removal from active rides
                rides_to_remove.append(ride_id)
            else:
                # User gets in, ride starts
                ride['status'] = 'IN_PROGRESS'
                print(f"Ride {ride_id} started with driver {ride['assigned_driver']}")
                
                ride_update = request.copy()
                ride_updates.append(ride_update)
        
        elif status == 'COMPLETED':
            # Ride is complete, generate final update
            ride_update = request.copy()
            ride_updates.append(ride_update)
            
            # Mark for removal from active rides
            rides_to_remove.append(ride_id)
            
            print(f"Ride {ride_id} completed and removed from active rides")
    
    # Remove completed/cancelled rides
    for ride_id in rides_to_remove:
        self.active_rides.pop(ride_id)
    
    return ride_updates

def get_active_ride_stats(self):
    """
    Get statistics about active rides
    
    Returns:
    --------
    dict
        Dictionary with ride statistics
    """
    status_counts = {
        'REQUESTED': 0,
        'DRIVER_ASSIGNED': 0,
        'PICKUP_ARRIVED': 0,
        'IN_PROGRESS': 0,
        'COMPLETED': 0,
        'CANCELLED': 0
    }
    
    for ride in self.active_rides.values():
        status = ride['status']
        status_counts[status] = status_counts.get(status, 0) + 1
    
    return {
        'total_active': len(self.active_rides),
        'by_status': status_counts
    }

def get_driver_stats(self):
    """
    Get statistics about drivers
    
    Returns:
    --------
    dict
        Dictionary with driver statistics
    """
    status_counts = {
        'AVAILABLE': 0,
        'BUSY': 0,
        'OFFLINE': 0,
        'EN_ROUTE_TO_PICKUP': 0,
        'WITH_PASSENGER': 0
    }
    
    for driver in self.drivers.values():
        status = driver['status']
        status_counts[status] = status_counts.get(status, 0) + 1
    
    return {
        'total_drivers': len(self.drivers),
        'by_status': status_counts
    }

# Add these methods to the RideHailingDataGenerator class
RideHailingDataGenerator.process_ride_lifecycle_events = process_ride_lifecycle_events
RideHailingDataGenerator.get_active_ride_stats = get_active_ride_stats
RideHailingDataGenerator.get_driver_stats = get_driver_stats

# Test ride lifecycle management
if __name__ == "__main__":
    config = {
        'num_drivers': 20,
        'num_users': 50,
        'start_time': '2023-03-01 08:00:00',
        'base_request_rate': 5
    }
    
    generator = RideHailingDataGenerator(config)
    
    # Generate some ride requests
    ride_requests = generator.generate_ride_requests()
    
    # Update driver statuses and match with requests
    driver_updates = generator.update_driver_statuses()
    
    # Process ride lifecycle events
    ride_updates = generator.process_ride_lifecycle_events()
    
    # Print statistics
    print("\nRide Statistics:")
    print(json.dumps(generator.get_active_ride_stats(), indent=2))
    
    print("\nDriver Statistics:")
    print(json.dumps(generator.get_driver_stats(), indent=2))
    
    # Advance time by 5 minutes and process again
    print("\nAdvancing time by 5 minutes...")
    generator.current_time += datetime.timedelta(minutes=5)
    
    # Process ride lifecycle again
    ride_updates = generator.process_ride_lifecycle_events()
    
    # Print updated statistics
    print("\nUpdated Ride Statistics:")
    print(json.dumps(generator.get_active_ride_stats(), indent=2))


Initialized RideHailingDataGenerator with 20 drivers and 50 users
Generated 12 ride requests at 2023-03-01 08:00:00
Matched driver D-e219d168 to ride R-42dd5231-14ee-41c0-a6ee-ccdbf15559aa (distance: 2.87 km)
Matched driver D-db1b9daa to ride R-23e8af22-4e8d-4d99-a614-1c25b258a1d8 (distance: 4.80 km)
Matched driver D-4bd6bcbd to ride R-3f7e5dcf-534e-41c2-ad65-772b47ce69bb (distance: 7.99 km)
Matched driver D-ffc695e4 to ride R-2b91e532-00d8-4678-bd0b-8aec12d10dfd (distance: 2.60 km)
Matched driver D-c0ea640f to ride R-45c30ec8-2554-429d-87e9-3194764adc00 (distance: 1.35 km)
Matched driver D-99e7ed14 to ride R-d1eff3e2-75c8-4262-b07a-05cc3d95db77 (distance: 3.10 km)
Matched driver D-52993c8e to ride R-6f72f98b-efc7-4b31-ace3-d6a5b05bffaf (distance: 1.35 km)
Matched driver D-d282f030 to ride R-6f081966-4061-4a3e-b987-865f0c585f15 (distance: 8.24 km)
Matched driver D-3f14045d to ride R-808621de-4688-4f08-b741-22606bdd55e9 (distance: 3.26 km)
Matched driver D-afb9c630 to ride R-c82920b0-9f

## Step 7: Implement data serialization

In [7]:
# Step 7: Implement data serialization

# Redefine schemas with unique type names to avoid conflicts
ride_request_schema = {
    "namespace": "ridehailing.avro",
    "type": "record",
    "name": "RideRequest",
    "fields": [
        {"name": "request_id", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "pickup_location", "type": {"type": "record", "name": "PickupLocation", 
            "fields": [
                {"name": "latitude", "type": "double"},
                {"name": "longitude", "type": "double"}
            ]}
        },
        {"name": "destination", "type": {"type": "record", "name": "DestinationLocation", 
            "fields": [
                {"name": "latitude", "type": "double"},
                {"name": "longitude", "type": "double"}
            ]}
        },
        {"name": "status", "type": {"type": "enum", "name": "RequestStatus", 
            "symbols": ["REQUESTED", "ACCEPTED", "CANCELLED", "COMPLETED"]}},
        {"name": "vehicle_type", "type": "string"},
        {"name": "estimated_fare", "type": "double"},
        {"name": "estimated_duration", "type": "int"},
        {"name": "estimated_distance", "type": "double"},
        {"name": "passenger_count", "type": "int"}
    ]
}

driver_status_schema = {
    "namespace": "ridehailing.avro",
    "type": "record",
    "name": "DriverStatus",
    "fields": [
        {"name": "driver_id", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "current_location", "type": {"type": "record", "name": "DriverLocation", 
            "fields": [
                {"name": "latitude", "type": "double"},
                {"name": "longitude", "type": "double"}
            ]}
        },
        {"name": "status", "type": {"type": "enum", "name": "DriverStatusEnum", 
            "symbols": ["AVAILABLE", "BUSY", "OFFLINE", "EN_ROUTE_TO_PICKUP", "WITH_PASSENGER"]}},
        {"name": "vehicle_info", "type": {"type": "record", "name": "VehicleInfo", 
            "fields": [
                {"name": "vehicle_id", "type": "string"},
                {"name": "vehicle_type", "type": "string"},
                {"name": "capacity", "type": "int"}
            ]}
        },
        {"name": "current_ride_id", "type": ["null", "string"]},
        {"name": "last_update", "type": "long"},
        {"name": "battery_level", "type": ["null", "double"]}
    ]
}

def serialize_to_json(self, data, data_type, timestamp=None):
    """
    Serialize data to JSON format and save to file
    
    Parameters:
    -----------
    data : list or dict
        Data to serialize
    data_type : str
        Type of data ('ride_requests' or 'driver_status')
    timestamp : datetime, optional
        Timestamp to use in filename, defaults to current_time
    
    Returns:
    --------
    str
        Path to the saved file
    """
    if not data:
        return None
    
    if timestamp is None:
        timestamp = self.current_time
    
    # Create timestamp string for filename
    timestamp_str = timestamp.strftime('%Y%m%d_%H%M%S')
    
    # Create directory if it doesn't exist
    directory = f'data/json/{data_type}'
    os.makedirs(directory, exist_ok=True)
    
    # Create filename
    filename = f'{directory}/{data_type}_{timestamp_str}.json'
    
    # Write data to file
    with open(filename, 'w') as f:
        if isinstance(data, list):
            json.dump(data, f, indent=2)
        else:
            json.dump([data], f, indent=2)
    
    print(f"Saved {len(data) if isinstance(data, list) else 1} {data_type} records to {filename}")
    
    return filename

def serialize_to_avro(self, data, data_type, schema, timestamp=None):
    """
    Serialize data to AVRO format and save to file
    
    Parameters:
    -----------
    data : list or dict
        Data to serialize
    data_type : str
        Type of data ('ride_requests' or 'driver_status')
    schema : dict
        AVRO schema for the data
    timestamp : datetime, optional
        Timestamp to use in filename, defaults to current_time
    
    Returns:
    --------
    str
        Path to the saved file
    """
    if not data:
        return None
    
    if timestamp is None:
        timestamp = self.current_time
    
    # Create timestamp string for filename
    timestamp_str = timestamp.strftime('%Y%m%d_%H%M%S')
    
    # Create directory if it doesn't exist
    directory = f'data/avro/{data_type}'
    os.makedirs(directory, exist_ok=True)
    
    # Create filename
    filename = f'{directory}/{data_type}_{timestamp_str}.avro'
    
    # Prepare data for serialization
    if not isinstance(data, list):
        records = [data]
    else:
        records = data
    
    # Write data to file
    with open(filename, 'wb') as f:
        fastavro.writer(f, schema, records)
    
    print(f"Saved {len(records)} {data_type} records to {filename}")
    
    return filename

def save_data_batch(self, ride_requests, driver_updates):
    """
    Save a batch of ride requests and driver updates to both JSON and AVRO formats
    
    Parameters:
    -----------
    ride_requests : list
        List of ride request events
    driver_updates : list
        List of driver status update events
    
    Returns:
    --------
    dict
        Dictionary with paths to saved files
    """
    saved_files = {}
    
    # Save ride requests
    if ride_requests:
        saved_files['ride_requests_json'] = self.serialize_to_json(
            ride_requests, 'ride_requests'
        )
        saved_files['ride_requests_avro'] = self.serialize_to_avro(
            ride_requests, 'ride_requests', ride_request_schema
        )
    
    # Save driver updates
    if driver_updates:
        saved_files['driver_status_json'] = self.serialize_to_json(
            driver_updates, 'driver_status'
        )
        saved_files['driver_status_avro'] = self.serialize_to_avro(
            driver_updates, 'driver_status', driver_status_schema
        )
    
    return saved_files

# Add these methods to the RideHailingDataGenerator class
RideHailingDataGenerator.serialize_to_json = serialize_to_json
RideHailingDataGenerator.serialize_to_avro = serialize_to_avro
RideHailingDataGenerator.save_data_batch = save_data_batch

# Test data serialization
if __name__ == "__main__":
    config = {
        'num_drivers': 10,
        'num_users': 30,
        'start_time': '2023-03-01 08:00:00',
        'base_request_rate': 3
    }
    
    generator = RideHailingDataGenerator(config)
    
    # Generate some ride requests
    ride_requests = generator.generate_ride_requests()
    
    # Update driver statuses
    driver_updates = generator.update_driver_statuses()
    
    # Save data
    if ride_requests or driver_updates:
        saved_files = generator.save_data_batch(ride_requests, driver_updates)
        print("\nSaved files:")
        for file_type, file_path in saved_files.items():
            print(f"  {file_type}: {file_path}")
    else:
        print("No data to save in this batch")


Initialized RideHailingDataGenerator with 10 drivers and 30 users
Generated 5 ride requests at 2023-03-01 08:00:00
Matched driver D-2734d58e to ride R-b4a596b7-da4e-4e8e-9cdb-859bb63b865b (distance: 3.73 km)
Matched driver D-19eb04cd to ride R-d97dcef2-5b7b-4c14-b3b5-7340bc56cace (distance: 1.43 km)
Matched driver D-e4d976f8 to ride R-c312c327-8f07-4fda-8526-48e634f03238 (distance: 2.90 km)
Matched driver D-03d17ae2 to ride R-93765352-5e0f-4f33-8a17-d1cdff2992ff (distance: 0.73 km)
Matched driver D-0de2dfd7 to ride R-ff5ccdca-b380-46a1-beb5-f202c0a02e22 (distance: 4.39 km)
Saved 5 ride_requests records to data/json/ride_requests/ride_requests_20230301_080000.json
Saved 5 ride_requests records to data/avro/ride_requests/ride_requests_20230301_080000.avro
Saved 10 driver_status records to data/json/driver_status/driver_status_20230301_080000.json
Saved 10 driver_status records to data/avro/driver_status/driver_status_20230301_080000.avro

Saved files:
  ride_requests_json: data/json/ride

## Step 8: Implement the simulation runner


In [8]:
# Step 8: Implement the simulation runner with fixed statistics tracking

def run_simulation(self, duration_minutes, time_step_seconds=60, verbose=True):
    """
    Run the ride-hailing simulation for a specified duration
    """
    start_time = self.current_time
    end_time = start_time + datetime.timedelta(minutes=duration_minutes)
    
    # Statistics tracking
    stats = {
        'total_requests': 0,
        'completed_rides': 0,
        'cancelled_rides': 0,
        'total_fare': 0.0,
        'avg_wait_time': 0.0,
        'avg_ride_duration': 0.0,
        'ride_requests_by_hour': {},
        'saved_files': []
    }
    
    # Temporary storage for wait times and ride durations
    wait_times = []
    ride_durations = []
    
    # Track completed and cancelled rides separately
    completed_ride_ids = set()
    
    print(f"Starting simulation at {start_time} for {duration_minutes} minutes")
    print(f"Initial state: {len(self.drivers)} drivers, {len(self.users)} users")
    
    # Main simulation loop
    while self.current_time < end_time:
        # Generate ride requests
        ride_requests = self.generate_ride_requests()
        stats['total_requests'] += len(ride_requests)
        
        # Update driver statuses
        driver_updates = self.update_driver_statuses()
        
        # Process ride lifecycle events
        ride_updates = self.process_ride_lifecycle_events()
        
        # Check for completed rides in the updates
        for update in ride_updates:
            if update['status'] == 'COMPLETED' and update['request_id'] not in completed_ride_ids:
                stats['completed_rides'] += 1
                stats['total_fare'] += update['estimated_fare']
                completed_ride_ids.add(update['request_id'])
            elif update['status'] == 'CANCELLED':
                stats['cancelled_rides'] += 1
        
        # Also check for completed rides in the active_rides that are about to be removed
        for ride_id, ride in list(self.active_rides.items()):
            if ride['status'] == 'COMPLETED' and ride_id not in completed_ride_ids:
                stats['completed_rides'] += 1
                stats['total_fare'] += ride['request']['estimated_fare']
                completed_ride_ids.add(ride_id)
                
                # Calculate wait time and ride duration
                if ride['pickup_time'] and ride['completion_time']:
                    wait_time = (ride['pickup_time'] - ride['created_at']).total_seconds() / 60
                    wait_times.append(wait_time)
                    
                    duration = (ride['completion_time'] - ride['pickup_time']).total_seconds() / 60
                    ride_durations.append(duration)
        
        # Save data batch
        if ride_requests or driver_updates:
            saved_files = self.save_data_batch(ride_requests, driver_updates)
            if saved_files:
                stats['saved_files'].extend(saved_files.values())
        
        # Track statistics
        hour_key = self.current_time.strftime('%Y-%m-%d %H:00')
        if hour_key not in stats['ride_requests_by_hour']:
            stats['ride_requests_by_hour'][hour_key] = 0
        stats['ride_requests_by_hour'][hour_key] += len(ride_requests)
        
        # Print current status if verbose
        if verbose and self.current_time.minute % 5 == 0 and self.current_time.second == 0:
            active_rides = len(self.active_rides)
            available_drivers = sum(1 for d in self.drivers.values() if d['status'] == 'AVAILABLE')
            busy_drivers = sum(1 for d in self.drivers.values() if d['status'] in ['EN_ROUTE_TO_PICKUP', 'WITH_PASSENGER'])
            
            print(f"\n[{self.current_time}] Simulation status:")
            print(f"  Active rides: {active_rides}")
            print(f"  Available drivers: {available_drivers}")
            print(f"  Busy drivers: {busy_drivers}")
            print(f"  Total requests so far: {stats['total_requests']}")
            print(f"  Completed rides: {stats['completed_rides']}")
            print(f"  Cancelled rides: {stats['cancelled_rides']}")
            print(f"  Total fare collected: ${stats['total_fare']:.2f}")
        
        # Advance time
        self.current_time += datetime.timedelta(seconds=time_step_seconds)
    
    # Calculate average wait time and ride duration
    if wait_times:
        stats['avg_wait_time'] = sum(wait_times) / len(wait_times)
    
    if ride_durations:
        stats['avg_ride_duration'] = sum(ride_durations) / len(ride_durations)
    
    # Print summary
    print("\n" + "="*50)
    print(f"Simulation completed: {start_time} to {end_time}")
    print(f"Total duration: {duration_minutes} minutes")
    print(f"Total ride requests: {stats['total_requests']}")
    print(f"Completed rides: {stats['completed_rides']}")
    print(f"Cancelled rides: {stats['cancelled_rides']}")
    print(f"Total fare collected: ${stats['total_fare']:.2f}")
    print(f"Average wait time: {stats['avg_wait_time']:.2f} minutes")
    print(f"Average ride duration: {stats['avg_ride_duration']:.2f} minutes")
    print(f"Generated {len(stats['saved_files'])} data files")
    print("="*50)
    
    return stats

# Add the method to the RideHailingDataGenerator class
RideHailingDataGenerator.run_simulation = run_simulation


## Step 9: Create the main execution script


In [9]:
# Step 9: Create the main execution script (Jupyter Notebook version)

import json
import os
import datetime  # Import the whole module

# Configuration parameters (modify these as needed)
config = {
    'num_drivers': 1000,
    'num_users': 5000,
    'start_time': '2023-03-01 08:00:00',
    'base_request_rate': 5
}

# Simulation parameters
duration_minutes = 720  # Run for 60 minutes to see more completed rides
time_step_seconds = 60
output_dir = 'simulation_results'

# Create output directory
os.makedirs(output_dir, exist_ok=True)

# Initialize the generator
print("Initializing ride-hailing simulation...")
generator = RideHailingDataGenerator(config)

# Run the simulation
print(f"\nRunning simulation for {duration_minutes} minutes...")
stats = generator.run_simulation(
    duration_minutes=duration_minutes,
    time_step_seconds=time_step_seconds,
    verbose=True
)

# Save simulation results
results_file = os.path.join(output_dir, 'simulation_results.json')
with open(results_file, 'w') as f:
    json.dump({
        'config': config,
        'stats': {k: v for k, v in stats.items() if k != 'saved_files'},  # Remove file paths from JSON
        'timestamp': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }, f, indent=2)

print(f"\nSimulation results saved to {results_file}")

# Generate summary report
summary_file = os.path.join(output_dir, 'simulation_summary.txt')
with open(summary_file, 'w') as f:
    f.write("RIDE-HAILING SIMULATION SUMMARY\n")
    f.write("=" * 40 + "\n\n")
    
    f.write("Configuration:\n")
    f.write(f"  Drivers: {config['num_drivers']}\n")
    f.write(f"  Users: {config['num_users']}\n")
    f.write(f"  Start time: {config['start_time']}\n")
    f.write(f"  Base request rate: {config['base_request_rate']} per minute\n")
    f.write(f"  Duration: {duration_minutes} minutes\n\n")
    
    f.write("Results:\n")
    f.write(f"  Total ride requests: {stats['total_requests']}\n")
    f.write(f"  Completed rides: {stats['completed_rides']}\n")
    f.write(f"  Cancelled rides: {stats['cancelled_rides']}\n")
    f.write(f"  Total fare collected: ${stats['total_fare']:.2f}\n")
    f.write(f"  Average wait time: {stats['avg_wait_time']:.2f} minutes\n")
    f.write(f"  Average ride duration: {stats['avg_ride_duration']:.2f} minutes\n\n")
    
    f.write("Hourly request distribution:\n")
    for hour, count in stats['ride_requests_by_hour'].items():
        f.write(f"  {hour}: {count} requests\n")

print(f"Summary report saved to {summary_file}")

# Print final message
print("\nSimulation completed successfully!")
print(f"Generated {len(stats['saved_files'])} data files")
print(f"Processed {stats['total_requests']} ride requests")
print(f"Completed {stats['completed_rides']} rides")
print(f"Total revenue: ${stats['total_fare']:.2f}")

Initializing ride-hailing simulation...
Initialized RideHailingDataGenerator with 1000 drivers and 5000 users

Running simulation for 720 minutes...
Starting simulation at 2023-03-01 08:00:00 for 720 minutes
Initial state: 1000 drivers, 5000 users
Generated 10 ride requests at 2023-03-01 08:00:00
Matched driver D-da435b9f to ride R-67e25e17-246c-4909-addd-765a8017d3f9 (distance: 0.93 km)
Matched driver D-60cd452b to ride R-da7753a9-e8fd-4558-93f3-ea550c3c6cf1 (distance: 0.12 km)
Matched driver D-5e87bff9 to ride R-23a3cd63-f25c-4d54-9964-718d399762f2 (distance: 0.85 km)
Matched driver D-14dac7a6 to ride R-42afe3c1-f2da-48bf-97b6-f2150b24a5c0 (distance: 0.20 km)
Matched driver D-490e6a5e to ride R-5bcbe575-cc75-4b1f-87a2-d7d6aab45ac7 (distance: 1.50 km)
Matched driver D-b905bd6c to ride R-274fd9f7-75f1-4411-a9db-0cff8791bc13 (distance: 0.10 km)
Matched driver D-dd87bd2a to ride R-56493468-43fe-4a4d-b291-730619034578 (distance: 1.11 km)
Matched driver D-17742a9a to ride R-2ec30495-ceae-4