In [1]:
import math
import os
import time
from dataclasses import dataclass
from typing import List, Tuple

from ortools.constraint_solver import pywrapcp, routing_enums_pb2


# ---------- Data model ----------

@dataclass
class VRPTWInstance:
    name: str
    num_vehicles: int
    capacity: int
    coords: List[Tuple[float, float]]  # index 0 is depot
    demands: List[int]                 # same indexing
    ready_times: List[int]
    due_times: List[int]
    service_times: List[int]
    depot_index: int = 0               # depot is customer 0


# ---------- Parser for Solomon / Gehring & Homberger VRPTW ----------

def parse_vrptw_instance(path: str) -> VRPTWInstance:
    """
    Parse VRPTW instance in the classic format, e.g.:

        VEHICLE
        NUMBER     CAPACITY
          50          200

        CUSTOMER
        CUST NO.  XCOORD.    YCOORD.    DEMAND   READY TIME  DUE DATE   SERVICE TIME

            0      70         70          0          0       1351          0
            1      33         78         20        750        809         90
            ...

    Returns a VRPTWInstance with 0-based indexing (0 is depot).
    """
    with open(path, "r") as f:
        lines = [ln.rstrip("\n") for ln in f]

    name = os.path.basename(path)

    num_vehicles = None
    capacity = None

    in_vehicle_section = False
    in_customer_section = False
    header_seen = False
    customer_lines: List[str] = []

    for line in lines:
        stripped = line.strip()
        if not stripped:
            continue

        # Detect sections
        if stripped.upper().startswith("VEHICLE"):
            in_vehicle_section = True
            in_customer_section = False
            header_seen = False
            continue

        if stripped.upper().startswith("CUSTOMER"):
            in_vehicle_section = False
            in_customer_section = True
            header_seen = False
            continue

        if in_vehicle_section:
            # Expect a line "NUMBER     CAPACITY" then a line with the actual numbers
            if not header_seen:
                # first non-empty line after "VEHICLE" is header
                header_seen = True
                continue
            else:
                # next non-empty line should contain two integers: num_vehicles capacity
                parts = stripped.split()
                if len(parts) >= 2 and parts[0].isdigit():
                    num_vehicles = int(parts[0])
                    capacity = int(parts[1])
                    in_vehicle_section = False  # done
                continue

        if in_customer_section:
            if not header_seen:
                # skip the header line "CUST NO. XCOORD ..."
                header_seen = True
                continue
            # remaining non-empty lines are customers until EOF
            customer_lines.append(stripped)

    if num_vehicles is None or capacity is None:
        raise ValueError("Failed to parse VEHICLE section (number / capacity).")

    # Parse customer lines
    # Each line: id x y demand ready due service
    records = {}
    for line in customer_lines:
        parts = line.split()
        if len(parts) < 7:
            continue  # ignore malformed lines
        cust_id = int(parts[0])
        x = float(parts[1])
        y = float(parts[2])
        demand = int(parts[3])
        ready = int(parts[4])
        due = int(parts[5])
        service = int(parts[6])
        records[cust_id] = (x, y, demand, ready, due, service)

    if 0 not in records:
        raise ValueError("Depot (customer 0) not found in CUSTOMER section.")

    max_id = max(records.keys())
    n = max_id + 1  # assuming ids are 0..max_id

    coords = [(0.0, 0.0)] * n
    demands = [0] * n
    ready_times = [0] * n
    due_times = [0] * n
    service_times = [0] * n

    for cid in range(n):
        if cid not in records:
            raise ValueError(f"Missing customer id {cid} in instance file.")
        x, y, demand, ready, due, service = records[cid]
        coords[cid] = (x, y)
        demands[cid] = demand
        ready_times[cid] = ready
        due_times[cid] = due
        service_times[cid] = service

    depot_index = 0

    total_demand = sum(demands)
    print(f"Loaded instance {name}")
    print(f"  customers (incl. depot): {n}")
    print(f"  vehicles                : {num_vehicles}")
    print(f"  capacity per vehicle    : {capacity}")
    print(f"  total demand            : {total_demand}")
    print(f"  max demand              : {max(demands)}")
    print(f"  total capacity          : {num_vehicles * capacity}")
    if max(demands) > capacity:
        raise ValueError("Infeasible: some customer demand exceeds vehicle capacity.")
    if total_demand > num_vehicles * capacity:
        print("Warning: total demand > total vehicle capacity; "
              "standard VRPTW would be infeasible unless dropping is allowed.")

    return VRPTWInstance(
        name=name,
        num_vehicles=num_vehicles,
        capacity=capacity,
        coords=coords,
        demands=demands,
        ready_times=ready_times,
        due_times=due_times,
        service_times=service_times,
        depot_index=depot_index,
    )


# ---------- Time / distance matrix ----------

def build_time_matrix(coords: List[Tuple[float, float]]) -> List[List[int]]:
    """
    Build a travel time matrix from coordinates using Euclidean distance,
    rounded to nearest integer. Units are arbitrary but must be consistent
    with time windows and service times. 
    """
    n = len(coords)
    mat = [[0] * n for _ in range(n)]
    for i in range(n):
        x_i, y_i = coords[i]
        for j in range(i + 1, n):
            x_j, y_j = coords[j]
            d = math.hypot(x_i - x_j, y_i - y_j)
            t = int(d + 0.5)  # round to nearest integer
            mat[i][j] = t
            mat[j][i] = t
    return mat


# ---------- OR-Tools VRPTW model ----------

def solve_vrptw_with_ortools(instance: VRPTWInstance, time_limit_seconds: int = 60):
    """
    VRPTW model following OR-Tools' VRPTW example:

    - Time dimension for cumulative travel + service time, with time windows.
    - Capacity dimension for demands. 
    """
    time_matrix = build_time_matrix(instance.coords)

    data = {
        "time_matrix": time_matrix,
        "demands": instance.demands,
        "vehicle_capacities": [instance.capacity] * instance.num_vehicles,
        "num_vehicles": instance.num_vehicles,
        "depot": instance.depot_index,
        "time_windows": list(zip(instance.ready_times, instance.due_times)),
        "service_times": instance.service_times,
    }

    num_locations = len(time_matrix)

    # Index manager & routing model
    manager = pywrapcp.RoutingIndexManager(
        num_locations,
        data["num_vehicles"],
        data["depot"],
    )
    routing = pywrapcp.RoutingModel(manager)

    # Transit callback: travel time + service time at 'from' node
    def time_callback(from_index, to_index):
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        travel_time = data["time_matrix"][from_node][to_node]
        service_time = data["service_times"][from_node]
        return travel_time + service_time

    transit_callback_index = routing.RegisterTransitCallback(time_callback)
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

    # Capacity (load) callback & dimension (CVRP-style) 
    def demand_callback(from_index):
        from_node = manager.IndexToNode(from_index)
        return data["demands"][from_node]

    demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
    routing.AddDimensionWithVehicleCapacity(
        demand_callback_index,
        0,  # no slack
        data["vehicle_capacities"],
        True,  # start cumul at 0
        "Capacity",
    )

    # Time dimension: cumulative time with windows 
    max_due = max(instance.due_times)
    max_travel = max(max(row) for row in time_matrix) if num_locations > 1 else 0
    max_service = max(instance.service_times) if instance.service_times else 0
    horizon = max_due + max_travel + max_service

    routing.AddDimension(
        transit_callback_index,
        horizon,   # maximum waiting (slack) allowed
        horizon,   # maximum time per vehicle
        False,     # don't force start at 0; we'll set TWs
        "Time",
    )
    time_dimension = routing.GetDimensionOrDie("Time")

    # Apply time windows to all locations (including depot)
    for node in range(num_locations):
        index = manager.NodeToIndex(node)
        start, end = data["time_windows"][node]
        time_dimension.CumulVar(index).SetRange(start, end)

    # Force all vehicles to start within depot TW as well
    for v in range(data["num_vehicles"]):
        start_index = routing.Start(v)
        depot_start, depot_end = data["time_windows"][data["depot"]]
        time_dimension.CumulVar(start_index).SetRange(depot_start, depot_end)

    # Optionally, minimize maximum route time
    time_dimension.SetGlobalSpanCostCoefficient(1)

    # Search parameters (OR-Tools recommended strategies) 
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = (
        routing_enums_pb2.FirstSolutionStrategy.PARALLEL_CHEAPEST_INSERTION
    )
    search_parameters.local_search_metaheuristic = (
        routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
    )
    search_parameters.time_limit.FromSeconds(time_limit_seconds)
    search_parameters.log_search = True

    print(
        f"\nSolving instance {instance.name} "
        f"({num_locations - 1} customers, "
        f"{instance.num_vehicles} vehicles, Q={instance.capacity})"
    )
    start_time = time.perf_counter()
    solution = routing.SolveWithParameters(search_parameters)
    wall_time = time.perf_counter() - start_time

    if solution is None:
        status = routing.status()
        status_name = routing_enums_pb2.RoutingModelStatus.Name(status)
        print(f"No solution found. Routing status: {status_name}")
        print(f"Wall time: {wall_time:.3f} seconds")
        return

    print_solution(instance, data, manager, routing, solution, wall_time)


# ---------- Pretty-print solution ----------

def print_solution(
    instance: VRPTWInstance,
    data,
    manager: pywrapcp.RoutingIndexManager,
    routing: pywrapcp.RoutingModel,
    solution: pywrapcp.Assignment,
    wall_time: float,
):
    print("\n========== Solution ==========")
    print(f"Instance: {instance.name}")
    print(f"Objective (total time): {solution.ObjectiveValue()}")
    print(f"Wall time: {wall_time:.3f} seconds\n")

    time_dimension = routing.GetDimensionOrDie("Time")
    capacity_dimension = routing.GetDimensionOrDie("Capacity")

    total_time = 0
    total_load = 0

    for vehicle_id in range(data["num_vehicles"]):
        index = routing.Start(vehicle_id)
        if routing.IsEnd(solution.Value(routing.NextVar(index))):
            # Vehicle unused (goes depot -> depot only)
            continue

        print(f"Route for vehicle {vehicle_id}:")
        route_load = 0
        route_str = ""
        while not routing.IsEnd(index):
            node = manager.IndexToNode(index)
            load = solution.Value(capacity_dimension.CumulVar(index))
            time_var = time_dimension.CumulVar(index)
            t = solution.Value(time_var)
            route_str += f" {node} (load={load}, time={t}) ->"
            route_load = load
            index = solution.Value(routing.NextVar(index))

        # End at depot
        node = manager.IndexToNode(index)
        load = solution.Value(capacity_dimension.CumulVar(index))
        t = solution.Value(time_dimension.CumulVar(index))
        route_str += f" {node} (load={load}, time={t})"
        route_time = t - solution.Value(
            time_dimension.CumulVar(routing.Start(vehicle_id))
        )

        print(route_str)
        print(f"  Route load: {route_load}")
        print(f"  Route time: {route_time}\n")

        total_time += route_time
        total_load += route_load

    print("========== Summary ==========")
    print(f"Total route time (sum): {total_time}")
    print(f"Total delivered load  : {total_load}")
    print("=============================\n")


In [2]:
instance = parse_vrptw_instance("Vrp-Set-HG/C1_2_1.txt")
sol = solve_vrptw_with_ortools(
    instance,
    time_limit_seconds=60,
)

Loaded instance C1_2_1.txt
  customers (incl. depot): 201
  vehicles                : 50
  capacity per vehicle    : 200
  total demand            : 3530
  max demand              : 40
  total capacity          : 10000

Solving instance C1_2_1.txt (200 customers, 50 vehicles, Q=200)

Instance: C1_2_1.txt
Objective (total time): 22055
Wall time: 60.006 seconds

Route for vehicle 0:
 0 (load=0, time=17) -> 177 (load=0, time=104) -> 3 (load=10, time=200) -> 88 (load=40, time=292) -> 8 (load=70, time=386) -> 186 (load=110, time=479) -> 127 (load=130, time=572) -> 98 (load=140, time=670) -> 157 (load=150, time=765) -> 56 (load=160, time=1130) -> 0 (load=180, time=1284)
  Route load: 160
  Route time: 1267

Route for vehicle 1:
 0 (load=0, time=17) -> 45 (load=0, time=36) -> 178 (load=10, time=131) -> 27 (load=20, time=225) -> 173 (load=50, time=318) -> 154 (load=60, time=411) -> 24 (load=70, time=505) -> 61 (load=80, time=599) -> 100 (load=100, time=691) -> 64 (load=120, time=782) -> 179 (l