
# 带容量限制的车辆路径问题（CVRP）多行程解决方案

 ### 导入必要的库

In [2]:
%pip install ortools

Defaulting to user installation because normal site-packages is not writeable
Collecting ortools
  Downloading ortools-9.12.4544-cp39-cp39-macosx_11_0_arm64.whl.metadata (3.1 kB)
Collecting protobuf<5.30,>=5.29.3 (from ortools)
  Downloading protobuf-5.29.3-cp38-abi3-macosx_10_9_universal2.whl.metadata (592 bytes)
Collecting immutabledict>=3.0.0 (from ortools)
  Downloading immutabledict-4.2.1-py3-none-any.whl.metadata (3.5 kB)
Downloading ortools-9.12.4544-cp39-cp39-macosx_11_0_arm64.whl (18.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading immutabledict-4.2.1-py3-none-any.whl (4.7 kB)
Downloading protobuf-5.29.3-cp38-abi3-macosx_10_9_universal2.whl (417 kB)
Installing collected packages: protobuf, immutabledict, ortools
  Attempting uninstall: protobuf
    Found existing installation: protobuf 5.28.3
    Uninstalling protobuf-5.28.3:
      Successfully uninstalled protobuf-5.

In [None]:
from functools import partial
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2

### 数据模型定义
定义问题所需的基础数据，包括：
- 车辆容量
- 各节点坐标
- 需求量和时间窗口
- 车辆参数等

In [None]:
def create_data_model():
    """创建问题数据模型"""
    data = {}
    # 基础参数设置
    _capacity = 15
    data["vehicle_capacity"] = _capacity
    data["num_vehicles"] = 3
    data["vehicle_max_distance"] = 10_000
    data["vehicle_max_time"] = 1_500
    data["vehicle_speed"] = 5 * 60 / 3.6  # 5km/h转换为米/分钟
    data["depot"] = 0
    
    # 节点坐标（区块单位）
    _locations = [
        (4, 4), (4, 4), (4, 4), (4, 4), (4, 4), (4, 4),  # 仓库和补给站
        (2, 0), (8, 0), (0, 1), (1, 1), (5, 2),          # 客户位置
        (7, 2), (3, 3), (6, 3), (5, 5), (8, 5), 
        (1, 6), (2, 6), (3, 7), (6, 7), (0, 8), (7, 8)
    ]
    
    # 转换为实际米数（假设每个区块114m x 80m）
    data["locations"] = [(l[0]*114, l[1]*80) for l in _locations]
    data["num_locations"] = len(data["locations"])
    
    # 各节点需求（正数为客户需求，负数为补给站容量）
    data["demands"] = [
        0, -_capacity, -_capacity, -_capacity, -_capacity, -_capacity,
        3, 3, 3, 4, 3, 4, 8, 8, 3, 3, 3, 3, 4, 4, 8, 8
    ]
  
    return data


### 距离计算模块
#### 实现曼哈顿距离计算和距离评估函数
本例选择曼哈顿距离是为了：
- 模拟城市道路网络的网格化特性；
- 简化计算并避免浮点运算；
- 适应问题对路径顺序优化的核心需求。

1. 实际道路网络的模拟
城市交通特性：在真实的城市环境中，车辆通常需要沿着网格状的道路系统行驶（如纽约曼哈顿的街道布局），不能直接穿越建筑物。这种场景下，曼哈顿距离（即两点在网格坐标系上的横向和纵向距离之和）更符合实际行驶路径。代码中的场景设定：在示例代码中，节点坐标是基于街区（Block）定义的（例如 (4, 4) 表示第4行第4列的街区），且明确提到使用曼哈顿街区模型（114m x 80m 的街区）。这种布局天然适合曼哈顿距离。

2. 计算效率与简单性
- 整数运算：曼哈顿距离的计算公式为 |x1 - x2| + |y1 - y2|，仅涉及加减法和绝对值运算，计算速度快，适合需要频繁计算距离的优化问题。
- 避免浮点数：欧几里得距离需要平方根运算（√((x1 - x2)^2 + (y1 - y2)^2)），会引入浮点数计算。在需要整数运算或离散化处理的场景中（如某些路由算法实现），曼哈顿距离更友好。

In [5]:
def manhattan_distance(pos1, pos2):
    """计算两点间的曼哈顿距离"""
    return abs(pos1[0]-pos2[0]) + abs(pos1[1]-pos2[1])

def create_distance_evaluator(data):
    """创建距离评估函数"""
    _distances = {}
    size = data["num_locations"]
    
    # 预计算所有节点间距离
    for from_node in range(size):
        _distances[from_node] = {}
        for to_node in range(size):
            if from_node == to_node:
                _distances[from_node][to_node] = 0
            # 防止直接往返于仓库/补给站之间
            elif from_node < 6 and to_node < 6:
                _distances[from_node][to_node] = data["vehicle_max_distance"]
            else:
                _distances[from_node][to_node] = manhattan_distance(
                    data["locations"][from_node], 
                    data["locations"][to_node]
                )

    def distance_evaluator(manager, from_idx, to_idx):
        """返回节点间的距离"""
        from_node = manager.IndexToNode(from_idx)
        to_node = manager.IndexToNode(to_idx)
        return _distances[from_node][to_node]
    
    return distance_evaluator


### 距离约束模块
#### 添加车辆最大行驶距离约束

In [12]:
def add_distance_dimension(routing, manager, data, distance_eval_idx):
    """添加距离维度约束"""
    distance_dimension_name = "Distance"
    routing.AddDimension(
        distance_eval_idx,
        0,
        data["vehicle_max_distance"],
        True,
        distance_dimension_name
    )
    distance_dimension = routing.GetDimensionOrDie(distance_dimension_name)
    distance_dimension.SetGlobalSpanCostCoefficient(100)


### 容量约束模块
#### 处理车辆容量限制和补给站逻辑

In [6]:
def create_demand_evaluator(data):
    """创建需求评估函数"""
    _demands = data["demands"]
    
    def demand_evaluator(manager, node_idx):
        return _demands[manager.IndexToNode(node_idx)]
    
    return demand_evaluator

def add_capacity_constraints(routing, manager, data, demand_eval_idx):
    """添加容量约束"""
    capacity = data["vehicle_capacity"]
    routing.AddDimension(
        demand_eval_idx,
        capacity,  # 车辆最大容量
        capacity,  # 车辆初始容量
        True,  # 强制初始容量为0
        "Capacity"
    )
    
    # 设置补给站的松弛变量
    capacity_dim = routing.GetDimensionOrDie("Capacity")
    for node in range(1, 6):  # 补给站节点
        node_idx = manager.NodeToIndex(node)
        routing.AddDisjunction([node_idx], 0)  # 允许跳过补给站
        
    for node in range(6, data["num_locations"]):  # 客户节点
        node_idx = manager.NodeToIndex(node)
        capacity_dim.SlackVar(node_idx).SetValue(0)  # 无松弛
        routing.AddDisjunction([node_idx], 100000)  # 跳过的高成本

### 时间约束模块
#### 处理时间窗口和行程时间约束

In [9]:
def create_time_evaluator(data):
    """创建时间评估函数"""
    def service_time(node):
        """计算服务时间"""
        return abs(data["demands"][node]) * data["time_per_demand_unit"]
    
    def travel_time(from_node, to_node):
        """计算行程时间"""
        if from_node == to_node:
            return 0
        dist = manhattan_distance(
            data["locations"][from_node],
            data["locations"][to_node]
        )
        return dist / data["vehicle_speed"]  # 时间（分钟）
    
    # 预计算总时间（服务时间 + 行程时间）
    _total_time = {}
    size = data["num_locations"]
    for from_node in range(size):
        _total_time[from_node] = {}
        for to_node in range(size):
            _total_time[from_node][to_node] = int(
                service_time(from_node) + travel_time(from_node, to_node)
            )
    
    def time_evaluator(manager, from_idx, to_idx):
        from_node = manager.IndexToNode(from_idx)
        to_node = manager.IndexToNode(to_idx)
        return _total_time[from_node][to_node]
    
    return time_evaluator

def add_time_window_constraints(routing, manager, data, time_eval_idx):
    """添加时间窗口约束"""
    routing.AddDimension(
        time_eval_idx,
        data["vehicle_max_time"],  # 允许最大等待时间
        data["vehicle_max_time"],  # 最大行程时间
        False,  # 不强制初始时间为0
        "Time"
    )
    
    time_dim = routing.GetDimensionOrDie("Time")
    # 为每个节点添加时间窗口
    for loc_idx, (start, end) in enumerate(data["time_windows"]):
        if loc_idx == 0:  # 仓库节点
            continue
        node_idx = manager.NodeToIndex(loc_idx)
        time_dim.CumulVar(node_idx).SetRange(start, end)
        
    # 为车辆设置初始时间窗口
    for veh_id in range(data["num_vehicles"]):
        start_idx = routing.Start(veh_id)
        time_dim.CumulVar(start_idx).SetRange(*data["time_windows"][0])


### 结果输出模块
#### 格式化输出解决方案

In [18]:
from matplotlib import animation, pyplot as plt
import numpy as np


def animate_CVRP_solution(routes, node_coords, save=False, filename='cvrp_animation.mp4'):
    fig, ax = plt.subplots(figsize=(10, 8))

    # Plot the nodes
    for i, (x, y) in enumerate(node_coords):
        ax.scatter(x, y, c='blue' if i == 0 else 'red', zorder=2)
        ax.text(x, y, f'{i}', fontsize=9, ha='right')

    # Prepare the routes with depot at start and end
    vehicle_routes = []
    vehicle_cumulative_distances = []
    total_route_lengths = []
    for route in routes:
        full_route = [0] + route + [0]
        vehicle_routes.append(full_route)

        # Compute cumulative distances
        distances = [0.0]
        for i in range(1, len(full_route)):
            node_a = full_route[i - 1]
            node_b = full_route[i]
            coord_a = node_coords[node_a]
            coord_b = node_coords[node_b]
            dist = np.hypot(coord_b[0] - coord_a[0], coord_b[1] - coord_a[1])
            distances.append(distances[-1] + dist)
        vehicle_cumulative_distances.append(distances)
        total_route_lengths.append(distances[-1])

    # Colors for vehicles
    colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']

    # Prepare data for animation
    lines = []
    markers = []
    for vehicle, route in enumerate(vehicle_routes):
        color = colors[vehicle % len(colors)]
        line, = ax.plot([], [], c=color, label=f'Vehicle {vehicle + 1}', lw=2)
        lines.append(line)
        # Add a marker to represent the vehicle
        marker, = ax.plot([], [], marker='o', c=color, markersize=8)
        markers.append(marker)

    # Set plot limits
    x_coords, y_coords = zip(*node_coords)
    margin = 10
    ax.set_xlim(min(x_coords) - margin, max(x_coords) + margin)
    ax.set_ylim(min(y_coords) - margin, max(y_coords) + margin)

    ax.set_xlabel('X Coordinate')
    ax.set_ylabel('Y Coordinate')
    ax.set_title('Vehicle Routing Problem Solution')
    # ax.legend()
    ax.grid(True)

    # Total number of frames in the animation
    total_frames = 200

    # Function to update the animation at each frame
    def update(frame):
        t = frame / total_frames  # Normalized time from 0 to 1
        for i, line in enumerate(lines):
            route = vehicle_routes[i]
            cumulative_distances = vehicle_cumulative_distances[i]
            total_length = total_route_lengths[i]
            elapsed_distance = t * total_length

            # Find current segment
            idx = np.searchsorted(cumulative_distances, elapsed_distance, side='right') - 1
            if idx >= len(route) - 1:
                idx = len(route) - 2
                fraction = 1.0
            else:
                # Fraction along the segment
                fraction = ((elapsed_distance - cumulative_distances[idx]) /
                            (cumulative_distances[idx + 1] - cumulative_distances[idx]))

            node_a = route[idx]
            node_b = route[idx + 1]
            coord_a = node_coords[node_a]
            coord_b = node_coords[node_b]
            x = coord_a[0] + fraction * (coord_b[0] - coord_a[0])
            y = coord_a[1] + fraction * (coord_b[1] - coord_a[1])

            # Update line data
            route_nodes = route[:idx + 1]
            x_coords_line = [node_coords[node][0] for node in route_nodes]
            y_coords_line = [node_coords[node][1] for node in route_nodes]
            x_coords_line.append(x)
            y_coords_line.append(y)
            line.set_data(x_coords_line, y_coords_line)

            # Update marker position
            markers[i].set_data([x], [y])
        return lines + markers

    # Create the animation
    ani = animation.FuncAnimation(fig, update, frames=total_frames + 1, interval=50, blit=True, repeat=False)

    # Save or display the animation
    if save:
        # Determine writer based on file extension
        ext = filename.split('.')[-1]
        if ext.lower() == 'mp4':
            # Try to use ffmpeg writer
            try:
                Writer = animation.writers['ffmpeg']
                writer = Writer(fps=20, metadata=dict(artist='Me'), bitrate=1800)
            except KeyError:
                raise RuntimeError(
                    "ffmpeg writer is not available on your system. Please install ffmpeg or choose a different file format.")
        elif ext.lower() == 'gif':
            # Use PillowWriter or ImageMagick
            try:
                from matplotlib.animation import PillowWriter
                writer = PillowWriter(fps=20)
            except ImportError:
                raise RuntimeError(
                    "PillowWriter is not available. Please install Pillow or choose a different file format.")
        else:
            raise ValueError(f"Unsupported file extension: .{ext}. Please use .mp4 or .gif.")

        ani.save(filename, writer=writer)
    else:
        plt.show()



In [19]:
# def print_solution(data, manager, routing, solution):
#     """打印解决方案"""
#     print(f"总目标值: {solution.ObjectiveValue()}")
    
#     # 获取各维度信息
#     capacity_dim = routing.GetDimensionOrDie("Capacity")
#     time_dim = routing.GetDimensionOrDie("Time")
#     distance_dim = routing.GetDimensionOrDie("Distance")
    
#     # 统计未访问节点
#     dropped = []
#     for node in range(6, routing.nodes()):
#         if solution.Value(routing.NextVar(node)) == node:
#             dropped.append(node)
#     print(f"未访问客户节点: {dropped}")
    
#     # 逐辆车输出路线详情
#     total_distance = 0
#     total_load = 0
#     total_time = 0
#     routes = []

    
#     for veh_id in range(data["num_vehicles"]):
#         if not routing.IsVehicleUsed(solution, veh_id):
#             continue
            
#         route = []
#         index = routing.Start(veh_id)
#         route_load = 0
#         while not routing.IsEnd(index):
#             node = manager.IndexToNode(index)
#             load = solution.Min(capacity_dim.CumulVar(index))
#             time_var = time_dim.CumulVar(index)
#             route.append(
#                 f"{node} 载重({load}) "
#                 f"时间({solution.Min(time_var)},{solution.Max(time_var)})"
#             )
#             prev_index = index
#             index = solution.Value(routing.NextVar(index))
#             route_load += max(0, capacity_dim.GetTransitValue(prev_index, index, veh_id))
            
#         # 路线统计
#         time_var = time_dim.CumulVar(index)
#         distance = solution.Value(distance_dim.CumulVar(index))
#         total_distance += distance
#         total_load += route_load
#         total_time += solution.Min(time_var)
        
#         # 打印单辆车路线
#         print(f"\n车辆 {veh_id} 路线:")
#         print(" ->\n".join(route))
#         print(f"行驶距离: {distance}m")
#         print(f"载货量: {route_load}")
#         print(f"总时间: {solution.Min(time_var)}min")
    
#     # 打印全局统计
#     print("\n全局统计:")
#     print(f"总行驶距离: {total_distance}m")
#     print(f"总载货量: {total_load}")
#     print(f"总耗时: {total_time}min")
#     animate_CVRP_solution(routes, node_coords)

def print_solution(data, manager, routing, solution, node_coords):
    """打印解决方案"""
    print(f"总目标值: {solution.ObjectiveValue()}")

    # 获取各维度信息
    capacity_dim = routing.GetDimensionOrDie("Capacity")
    time_dim = routing.GetDimensionOrDie("Time")
    distance_dim = routing.GetDimensionOrDie("Distance")

    # 统计未访问节点
    dropped = []
    for node in range(6, routing.nodes()):
        if solution.Value(routing.NextVar(node)) == node:
            dropped.append(node)
    print(f"未访问客户节点: {dropped}")

    # 逐辆车输出路线详情
    total_distance = 0
    total_load = 0
    total_time = 0
    routes = []

    for veh_id in range(data["num_vehicles"]):
        if not routing.IsVehicleUsed(solution, veh_id):
            continue

        route = []
        index = routing.Start(veh_id)
        route_load = 0
        while not routing.IsEnd(index):
            node = manager.IndexToNode(index)
            load = solution.Min(capacity_dim.CumulVar(index))
            time_var = time_dim.CumulVar(index)
            route.append(
                f"{node} 载重({load}) "
                f"时间({solution.Min(time_var)},{solution.Max(time_var)})"
            )
            prev_index = index
            index = solution.Value(routing.NextVar(index))
            route_load += max(0, capacity_dim.GetTransitValue(prev_index, index, veh_id))

        # 路线统计
        time_var = time_dim.CumulVar(index)
        distance = solution.Value(distance_dim.CumulVar(index))
        total_distance += distance
        total_load += route_load
        total_time += solution.Min(time_var)

        # 提取路线节点
        route_nodes = []
        index = routing.Start(veh_id)
        while not routing.IsEnd(index):
            node = manager.IndexToNode(index)
            route_nodes.append(node)
            index = solution.Value(routing.NextVar(index))
        routes.append(route_nodes)

        # 打印单辆车路线
        print(f"\n车辆 {veh_id} 路线:")
        print(" ->\n".join(route))
        print(f"行驶距离: {distance}m")
        print(f"载货量: {route_load}")
        print(f"总时间: {solution.Min(time_var)}min")

    # 打印全局统计
    print("\n全局统计:")
    print(f"总行驶距离: {total_distance}m")
    print(f"总载货量: {total_load}")
    print(f"总耗时: {total_time}min")

    # 绘制动画
    animate_CVRP_solution(routes, node_coords)

### 主求解流程
#### 整合各模块进行问题求解

In [20]:
# 初始化模型
data = create_data_model()
node_coords = [(0, 0)] * len(data['distance_matrix'])

# 创建路由管理器
manager = pywrapcp.RoutingIndexManager(
    data["num_locations"],
    data["num_vehicles"],
    data["depot"]
)

# 创建路由模型
routing = pywrapcp.RoutingModel(manager)

# 注册距离评估函数
distance_eval_idx = routing.RegisterTransitCallback(
    partial(create_distance_evaluator(data), manager)
)
routing.SetArcCostEvaluatorOfAllVehicles(distance_eval_idx)

# 添加距离约束
add_distance_dimension(routing, manager, data, distance_eval_idx)

# 添加容量约束
demand_eval_idx = routing.RegisterUnaryTransitCallback(
    partial(create_demand_evaluator(data), manager)
)
add_capacity_constraints(routing, manager, data, demand_eval_idx)

# 添加时间约束
time_eval_idx = routing.RegisterTransitCallback(
    partial(create_time_evaluator(data), manager)
)
add_time_window_constraints(routing, manager, data, time_eval_idx)

# 配置求解参数
search_params = pywrapcp.DefaultRoutingSearchParameters()
search_params.first_solution_strategy = (
    routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
search_params.local_search_metaheuristic = (
    routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
)
search_params.time_limit.FromSeconds(5)

# 执行求解
solution = routing.SolveWithParameters(search_params)

KeyError: 'distance_matrix'

In [14]:
if solution:
    print_solution(data, manager, routing, solution,node_coords)
else:
    print("未找到可行解决方案")

总目标值: 272744
未访问客户节点: []

车辆 0 路线:
0 载重(0) 时间(0,0) ->
14 载重(0) 时间(2,138) ->
21 载重(3) 时间(30,158) ->
19 载重(11) 时间(72,200) ->
5 载重(15) 时间(97,429) ->
12 载重(0) 时间(174,506) ->
9 载重(8) 时间(218,550) ->
6 载重(12) 时间(240,850)
行驶距离: 2648m
载货量: 30
总时间: 261min

车辆 1 路线:
0 载重(0) 时间(0,0) ->
13 载重(0) 时间(10,200) ->
3 载重(8) 时间(53,446) ->
8 载重(0) 时间(136,529) ->
20 载重(3) 时间(157,550) ->
16 载重(11) 时间(200,950)
行驶距离: 2648m
载货量: 22
总时间: 221min

车辆 2 路线:
0 载重(0) 时间(0,0) ->
17 载重(0) 时间(5,150) ->
18 载重(3) 时间(22,250) ->
2 载重(7) 时间(46,482) ->
15 载重(0) 时间(127,563) ->
7 载重(3) 时间(146,582) ->
11 载重(6) 时间(164,600) ->
10 载重(10) 时间(186,800)
行驶距离: 2648m
载货量: 20
总时间: 204min

全局统计:
总行驶距离: 7944m
总载货量: 72
总耗时: 686min
