In [None]:
from typing import Tuple, Literal
import pandas as pd
import numpy as np
from scipy.spatial import distance
from scipy.optimize import linear_sum_assignment
from pathlib import Path
import os
import ast

from datetime import datetime
from datetime import timedelta

from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
import psutil
import platform
import shutil
import subprocess
import importlib

In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
def get_gpu_info():
    try:
        gpu_info = subprocess.check_output(
            "nvidia-smi --query-gpu=name,memory.total --format=csv,noheader",
            shell=True
        )
        return gpu_info.decode("utf-8").strip()
    except:
        return "No GPU detected"

def get_package_version(pkg_name):
    try:
        return importlib.import_module(pkg_name).__version__
    except:
        return "Not installed"

def system_report():
    print("=== Google Colab Runtime Specs ===")
    print(f"Python version: {platform.python_version()}")
    print(f"CPU cores (vCPUs): {psutil.cpu_count(logical=True)}")
    print(f"Total RAM: {round(psutil.virtual_memory().total / (1024**3), 2)} GB")
    print(f"Disk capacity: {round(shutil.disk_usage('/').total / (1024**3), 2)} GB")
    print(f"GPU: {get_gpu_info()}")
    print("\n=== Key Libraries ===")
    for pkg in ["numpy", "pandas", "scipy"]:
        print(f"{pkg}: {get_package_version(pkg)}")
    print("==================================")

system_report()

In [None]:
class Simulation:
    """Simulation class to model passenger-driver matching with various assignment strategies."""

    def __init__(self, max_dist: float, sim_duration: int, passengers_df: pd.DataFrame, drivers_df: pd.DataFrame, assignment_strategy: Literal["base", "passenger_priority", "driver_priority"]):
      """
      Args:
        max_dist (float): Maximum feasible assignment distance (km).
        sim_duration (int): Total simulation duration (time units).
        passengers_df (pd.DataFrame): Passenger dataframe.
        drivers_df (pd.DataFrame): Driver dataframe.
        assignment_strategy Literal["base", "passenger_priority", "driver_priority", "nearest_neighbor"]: Matching strategy.
      """

      self.time_now = 0
      self.max_dist = max_dist
      self.sim_duration = sim_duration
      self.passengers_df = passengers_df.copy()
      self.drivers_df = drivers_df.copy()
      self.assignment_strategy = assignment_strategy


    def get_waiting_passengers(self) -> pd.DataFrame:
      """Return all passengers currently waiting."""
      return self.passengers_df.loc[self.passengers_df["status"] == "Waiting"]


    def get_idle_drivers(self) -> pd.DataFrame:
      """Return all drivers currently idle."""
      return self.drivers_df.loc[self.drivers_df["status"] == "Idle"]


    def assign(self) -> None:
      """
      Assign drivers to passengers based on the selected assignment strategy.
      """
      idle_drivers = self.get_idle_drivers()
      waiting_passengers = self.get_waiting_passengers()

      if len(idle_drivers) == 0 or len(waiting_passengers) == 0:
          return

      if self.assignment_strategy == "base":
          self._assign_base(idle_drivers, waiting_passengers)
      elif self.assignment_strategy == "passenger_priority":
          self._assign_passenger_priority(idle_drivers, waiting_passengers)
      elif self.assignment_strategy == "driver_priority":
          self._assign_driver_priority(idle_drivers, waiting_passengers)


    def _assign_closest(self, drivers: pd.DataFrame, passengers: pd.DataFrame, current_time: int) -> None:
      """Greedy nearest_neighbor assignment.

      Args:
        drivers (pd.DataFrame): DataFrame of drivers to assign.
        passengers (pd.DataFrame): DataFrame of passengers to assign.
      """
      if drivers.empty or passengers.empty:
          return

      mat = distance.cdist(list(drivers["loc"]), list(passengers["loc"]), metric="cityblock")
      mat[mat > self.max_dist * 1000] = np.inf

      if not np.isfinite(mat).any():
        return

      driver_ind, passenger_ind = np.unravel_index(mat.argmin(), mat.shape)
      dist = mat[driver_ind, passenger_ind]

      driver_id = drivers.iloc[driver_ind]["id"]
      passenger_id = passengers.iloc[passenger_ind]["id"]

      self.passengers_df.loc[self.passengers_df["id"] == passenger_id, ["assignment_time", "assignment_distance", "assigned_driver", "status"]] = [current_time, dist, driver_id, "Assigned"]
      self.drivers_df.loc[self.drivers_df["id"] == driver_id, ["assignment_time", "assigned_passenger", "status"]] = [current_time, passenger_id, "Assigned"]


    def _assign_base(self, idle_drivers: pd.DataFrame, waiting_passengers: pd.DataFrame) -> None:
      """Assign drivers to passengers without priority consideration.

      Args:
        idle_drivers (pd.DataFrame): DataFrame of available drivers.
        waiting_passengers (pd.DataFrame): DataFrame of waiting passengers.
      """
      self._assign_min_cost(idle_drivers, waiting_passengers)


    def _assign_passenger_priority(self, idle_drivers: pd.DataFrame, waiting_passengers: pd.DataFrame) -> None:
      """Assign priority passengers first, then non-priority

      Args:
        idle_drivers (pd.DataFrame): DataFrame of available drivers.
        waiting_passengers (pd.DataFrame): DataFrame of waiting passengers.
      """
      priority = waiting_passengers[waiting_passengers["priority"] == 1]
      if not priority.empty:
          self._assign_min_cost(idle_drivers, priority)
          idle_drivers = self.get_idle_drivers()
          waiting_passengers = self.get_waiting_passengers()
          non_priority = waiting_passengers[waiting_passengers["priority"] == 0]

      non_priority = waiting_passengers[waiting_passengers["priority"] == 0]
      if not non_priority.empty and not idle_drivers.empty:
          self._assign_min_cost(idle_drivers, non_priority)


    def _assign_driver_priority(self, idle_drivers: pd.DataFrame, waiting_passengers: pd.DataFrame) -> None:
      """Assign priority drivers first, then non-priority.

      Args:
        idle_drivers (pd.DataFrame): DataFrame of available drivers.
        waiting_passengers (pd.DataFrame): DataFrame of waiting passengers.
      """

      priority = idle_drivers[idle_drivers["priority"] == 1]

      if not priority.empty:
          self._assign_min_cost(priority, waiting_passengers)
          waiting_passengers = self.get_waiting_passengers()
          idle_drivers = self.get_idle_drivers()
          non_priority = idle_drivers[idle_drivers["priority"] == 0]

      non_priority = idle_drivers[idle_drivers["priority"] == 0]
      if not non_priority.empty and not waiting_passengers.empty:
        self._assign_min_cost(non_priority, waiting_passengers)


    def _assign_min_cost(self, drivers: pd.DataFrame, passengers: pd.DataFrame) -> pd.DataFrame:
      """
      Computes the minimum cost matching between drivers and passengers (linear_sum_assignment).

      Args:
        drivers (pd.DataFrame): DataFrame of drivers to assign.
        passengers (pd.DataFrame): DataFrame of passengers to assign.
      """

      self.drivers_df.drop(drivers.index, axis=0, inplace=True)
      self.passengers_df.drop(passengers.index, axis=0, inplace=True)

      drivers = drivers.reset_index(drop=True)
      passengers = passengers.reset_index(drop=True)

      mat = distance.cdist(list(drivers["loc"]), list(passengers["loc"]), metric="cityblock")
      mat[mat > self.max_dist * 1000] = 1e6

      if np.any(mat < 1e6):
          driver_indices, passenger_indices = linear_sum_assignment(mat, maximize=False)
          assignment_distances = [mat[i, j] for i, j in zip(driver_indices, passenger_indices)]
          assignment_df = pd.DataFrame(zip(driver_indices, passenger_indices, assignment_distances), columns=["driver_idx", "passenger_idx", "dist"])
          assignment_df = assignment_df[assignment_df["dist"] <= self.max_dist * 1000]
          driver_idx = list(assignment_df["driver_idx"])
          passenger_idx = list(assignment_df["passenger_idx"])

          drivers.loc[driver_idx ,"assignment_time"] = self.time_now
          drivers.loc[driver_idx, "assigned_passenger"] = list(passengers.loc[passenger_idx,"id"])
          drivers.loc[driver_idx, "status"] = "Assigned"


          passengers.loc[passenger_idx, "assignment_distance"] = list(assignment_df["dist"])
          passengers.loc[passenger_idx, "assignment_time"] = self.time_now
          passengers.loc[passenger_idx, "assigned_driver"] = list(drivers.loc[driver_idx,"id"])
          passengers.loc[passenger_idx, "status"] = "Assigned"

      self.drivers_df = pd.concat([self.drivers_df, drivers], axis=0).reset_index(drop=True)
      self.passengers_df = pd.concat([self.passengers_df, passengers], axis=0).reset_index(drop=True)


    def simulate(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
      """
      Run simulation using batching, priority mathcing or nearest neighbot depending on strategy.
      """
      if self.assignment_strategy == "nearest_neighbor":
        return self._simulate_event_driven_nn()
      else:
        return self._simulate_time_step()


    def _simulate_time_step(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
      """
      "Run the simulation over the defined time period `sim_duration`.
      """

      while self.time_now <= self.sim_duration:

            #Update arrivals
            self.passengers_df.loc[
                (self.passengers_df["arrival_time"] > self.time_now - 1) &
                (self.passengers_df["arrival_time"] <= self.time_now), "status"] = "Waiting"

            self.drivers_df.loc[
                (self.drivers_df["arrival_time"] > self.time_now - 1) &
                (self.drivers_df["arrival_time"] <= self.time_now), "status"] = "Idle"

            #Update cancellations
            self.passengers_df.loc[
                (self.passengers_df["waits_until_matching"] == self.time_now - 1) &
                (self.passengers_df["status"] == "Waiting"),  "status"] = "Cancelled"

            self.drivers_df.loc[
                (self.drivers_df["waits_until_matching"] == self.time_now - 1) &
                (self.drivers_df["status"] == "Idle"), "status"] = "Cancelled"

            #Perform assignments
            self.assign()
            self._finalize_assignments()
            self.time_now += 1


    def _simulate_event_driven_nn(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
      """
      Sequential simulation for nearest-neighbor assignment.
      """

      events = []
      for _, row in self.passengers_df.iterrows():
          events.append((row["arrival_time"], "passenger_arrival", row["id"]))
          events.append((row["waits_until_matching"], "passenger_leave", row["id"]))
      for _, row in self.drivers_df.iterrows():
          events.append((row["arrival_time"], "driver_arrival", row["id"]))
          events.append((row["waits_until_matching"], "driver_leave", row["id"]))

      events.sort(key=lambda x: (x[0], x[1]))

      for current_time, event_type, entity_id in events:
          self.time_now = current_time

          if event_type == "passenger_arrival":
              self.passengers_df.loc[(self.passengers_df["arrival_time"] == current_time) & (self.passengers_df['status'].isna()), "status"] = "Waiting"
              self._assign_closest(self.get_idle_drivers(), self.get_waiting_passengers(), current_time)

          elif event_type == "driver_arrival":
              self.drivers_df.loc[( self.drivers_df["arrival_time"] == current_time) & ( self.drivers_df['status'].isna()), "status"] = "Idle"
              self._assign_closest(self.get_idle_drivers(), self.get_waiting_passengers(), current_time)

          elif event_type == "passenger_leave":
              self.passengers_df.loc[(self.passengers_df["id"] == entity_id) & (self.passengers_df["status"] == "Waiting"), "status"] = "Cancelled"

          elif event_type == "driver_leave":
              self.drivers_df.loc[(self.drivers_df["id"] == entity_id) & (self.drivers_df["status"] == "Idle"), "status"] = "Cancelled"

      self._finalize_assignments()
      return self.passengers_df, self.drivers_df


    def _finalize_assignments(self) -> None:
      """
      Mark abandoned or completed rides.
      """

      #Update abandonment (willing distance < assignment distance)
      abandoned_driver_ids = self.passengers_df.loc[
          (self.passengers_df["willing_distance"] * 1000 < self.passengers_df["assignment_distance"])
          & (self.passengers_df["status"] == "Assigned"), "assigned_driver"]

      self.passengers_df.loc[
          (self.passengers_df["willing_distance"] * 1000 < self.passengers_df["assignment_distance"])
          & (self.passengers_df["status"] == "Assigned"), "status"] = "Abandoned"

      self.drivers_df.loc[self.drivers_df["id"].isin(abandoned_driver_ids), "status"] = "Abandoned"

      #Update completed rides (willing distance >= assignment distance)
      completed_driver_ids = self.passengers_df.loc[
          (self.passengers_df["willing_distance"] * 1000 >= self.passengers_df["assignment_distance"])
          & (self.passengers_df["status"] == "Assigned"), "assigned_driver"]

      self.passengers_df.loc[
          (self.passengers_df["willing_distance"] * 1000 >= self.passengers_df["assignment_distance"])
          & (self.passengers_df["status"] == "Assigned"), "status"] = "Completed"

      self.drivers_df.loc[self.drivers_df["id"].isin(completed_driver_ids), "status"] = "Completed"


In [None]:
def load_data(i: int, j: int) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Load passenger and driver CSV data for given experiment.

    Args:
        i (int): Experiment paramaters set index.
        j (int): Replication index.
    """
    pdf_path = BASE_DIR / f"pdf_{i:03d}_{j:03d}.csv"
    ddf_path = BASE_DIR / f"ddf_{i:03d}_{j:03d}.csv"

    if not pdf_path.exists() or not ddf_path.exists():
        raise FileNotFoundError(f"Missing input data: {pdf_path}, {ddf_path}")

    passengers_df = pd.read_csv(pdf_path, converters={"loc": ast.literal_eval})
    drivers_df = pd.read_csv(ddf_path, converters={"loc": ast.literal_eval})

    return passengers_df, drivers_df


def run_simulation(scenario: str, max_dist: int, sim_duration: int, passengers_df: pd.DataFrame, drivers_df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Run a simulation and save results.

    Args:
        scenario (str): Simulation scenario type ("nearest_neighbor", "base", "driver_priority", "passenger_priority").
        max_dist (int): Maximum distance threshold for matching.
        sim_duration (int): Simulation duration in time periods.
        passengers_df (pd.DataFrame): Input passengers dataframe.
        drivers_df (pd.DataFrame): Input drivers dataframe.

    """
    start = datetime.now()
    sim = Simulation(max_dist, sim_duration, passengers_df, drivers_df, scenario)
    sim.simulate()
    end = datetime.now()
    print(f"[{scenario}] runtime:", end - start)

    return sim.passengers_df, sim.drivers_df


def _already_exists(outdir: Path, exp_name: str) -> bool:
    """Check if passenger & driver results already exist in directory."""
    expected_files = [
        outdir / f"result_pdf_{exp_name}.csv",
        outdir / f"result_ddf_{exp_name}.csv",
    ]
    return all(f.exists() for f in expected_files)


def worker(i: int, j: int, max_dist: int = 5, sim_duration: int = 500, num_experiments: int=108) -> None:
    """
    Worker to run all scenarios for a single (i, j) experiment.
    Skip execution if output files already exist.

    Args:
        i (int): Experiment index.
        j (int): Replication index.
        max_dist (int): Maximum distance threshold.
        sim_duration (int): Simulation duration.
    """

    print(f"\n=== Running experiment {i:03d}_{j:03d} ===")
    passengers_df, drivers_df = load_data(i, j)
    exp_name = f"{i:03d}_{j:03d}"

    #Batched matching without priorities
    outdir = BASE_DIR / "base"
    if not _already_exists(outdir, exp_name):
      passengers_results, drivers_results = run_simulation("base", max_dist, sim_duration, passengers_df, drivers_df)
      outdir.mkdir(parents=True, exist_ok=True)
      passengers_results.to_csv(outdir / f"result_pdf_{exp_name}.csv", index=False)
      drivers_results.to_csv(outdir / f"result_ddf_{exp_name}.csv", index=False)
    else:
      print(f"Skipping base for {exp_name}")

    #Driver priority simulations
    for priority_level in [0.05, 0.1, 0.2, 0.3]:
        outdir = BASE_DIR / f"driver_priority_{priority_level}"
        if not _already_exists(outdir, exp_name):
          priority_col_name = f"priority_{priority_level}"
          priority_run_driver_df = drivers_df.loc[:,["id", "arrival_time", "loc", "waits_until_matching", priority_col_name, "status", "assignment_time", "assigned_passenger"]]
          priority_run_driver_df = priority_run_driver_df.rename(columns={priority_col_name: "priority"})

          passengers_results, drivers_results = run_simulation("driver_priority", max_dist, sim_duration, passengers_df, priority_run_driver_df)
          drivers_results = drivers_results.rename(columns={'priority':priority_col_name})

          outdir.mkdir(parents=True, exist_ok=True)
          passengers_results.to_csv(outdir / f"result_pdf_{exp_name}.csv", index=False)
          drivers_results.to_csv(outdir / f"result_ddf_{exp_name}.csv", index=False)
        else:
          print(f"Skipping driver_priority {priority_level} for {exp_name}")

    #Passenger priority simulations
    for priority_level in [0.05, 0.1, 0.2, 0.3]:
        outdir = BASE_DIR / f"passenger_priority_{priority_level}"
        if not _already_exists(outdir, exp_name):
          priority_col_name = f"priority_{priority_level}"
          priority_run_passenger_df = passengers_df.loc[:,["id", "arrival_time", "loc", "waits_until_matching", "willing_distance", priority_col_name, "status", "assignment_time", "assignment_distance", "assigned_driver"]]
          priority_run_passenger_df = priority_run_passenger_df.rename(columns={priority_col_name: "priority"})

          passengers_results, drivers_results = run_simulation("passenger_priority", max_dist, sim_duration, priority_run_passenger_df, drivers_df)
          passengers_results = passengers_results.rename(columns={'priority':priority_col_name})

          outdir.mkdir(parents=True, exist_ok=True)
          passengers_results.to_csv(outdir / f"result_pdf_{exp_name}.csv", index=False)
          drivers_results.to_csv(outdir / f"result_ddf_{exp_name}.csv", index=False)
        else:
          print(f"Skipping passenger_priority {priority_level} for {exp_name}")

    #Nearest neighbor
    outdir = BASE_DIR / "nearest_neighbor"
    if not _already_exists(outdir, exp_name):
      passengers_results, drivers_results = run_simulation("nearest_neighbor", max_dist, sim_duration, passengers_df, drivers_df)

      outdir.mkdir(parents=True, exist_ok=True)
      passengers_results.to_csv(outdir / f"result_pdf_{exp_name}.csv", index=False)
      drivers_results.to_csv(outdir / f"result_ddf_{exp_name}.csv", index=False)
    else:
       print(f"Skipping nearest_neighbor for {exp_name}")

In [None]:
def run_worker(args):
    i, j, max_dist, sim_duration = args
    worker(i, j, max_dist, sim_duration)


def run_all(max_dist: int = 5, sim_duration: int = 500, num_experiments: int = 108) -> None:
    """
    Run all experiments in parallel.

    Args:
        max_dist (int): Maximum distance threshold.
        sim_duration (int): Simulation duration.
        num_experiments (int): Number of experiments.
    """
    tasks = [(i, j, max_dist, sim_duration) for i in range(1, num_experiments + 1) for j in [1, 2, 3]]

    with ProcessPoolExecutor(max_workers=4) as executor:
        list(tqdm(executor.map(run_worker, tasks), total=len(tasks)))

In [None]:
BASE_DIR = Path.cwd() / ""

In [None]:
run_all(max_dist=5, sim_duration=500, num_experiments=108)