## Kuhn-Munkres 算法代码实现
### 准备工作
- 导入相关包

In [1]:
import sys
import copy
from typing import Union, NewType, Sequence, Tuple, Optional, Callable
import numpy as np


- 接下来，定义类型
    - 分别定义实数和矩阵
    - 实数由整型和浮点型构成
    - 矩阵类型是数字的二位序列
- 补充知识：Sequence 序列是Python中的一种基本数据结构概念，具有有序性、支持索引访问、支持切片、可迭代的特性
- 常见的Sequence类型包括：
```python
# 列表（可变）
list_example = [1, 2, 3]

# 元组（不可变）
tuple_exmaple = (1, 2, 3)

# 字符串（不可变）
string_exmaple = "123"
```

In [2]:
AnyNum = NewType('AnyNum', Union[int, float]) # 定义实数类型
Matrix = NewType('Matrix', Sequence[Sequence[AnyNum]]) # 定义矩阵类型

- 定义了两个重要的类，用来标记矩阵中不允许配对的位置
- 创建唯一标记的简单类
- 这个类中什么也不用做（pass语句），但是创建了这个类的唯一实例DISALLOWED用于表示不允许配对的位置
- 使用类实例，而不是简单的数值或者字符串，这样做的好处有：
    - 确保这个标记是唯一的，不会与矩阵中的其他值混淆
    - 不会被误用于数学运算

In [3]:
class DISALLOWED_OBJ(object):
    pass
DISALLOWED = DISALLOWED_OBJ()
DISALLOWED_PRINTVALUE = 'D'

- 接下来，定义一个异常类

In [4]:
class UnsolveableMatrix(Exception):
    pass


## 算法实现
- 效用函数$$u_c(x_i)=\frac{x_i}{\sum^{N}_{n=1}x_n}\sum^{N}_{n=1}p_n-\epsilon \sigma_i x_i$$
### 对于下方代码的解释：
- `-> float`表示这个函数返回的是一个浮点数值
- `np.ndarray`是numpy的数组类型，可以存储多个数值

In [5]:
def calculate_utility(x_i: float, x: np.ndarray, p: np.ndarray, sigma_i: float, epsilon: float) -> float:
    """
    计算效用函数 u_c(x_i) = (x_i / sum(x_n)) * (sum(p_n) - epsilon * sigma_i * x_i)

    参数:
    x_i: float - 当前的 x 值
    x: np.ndarray - 所有 x 值的数组
    p: np.ndarray - p_n 值的数组
    sigma_i: float - σi 值
    epsilon: float - ε 值

    返回:
    float: 计算得到的效用值
    """
    # 计算分母部分（所有 x 的和）
    x_sum = np.sum(x)

    # 计算第一个系数 (x_i / sum(x_n))
    coefficient = x_i / x_sum

    # 计算括号内的部分 (sum(p_n) - epsilon * sigma_i * x_i)
    p_sum = np.sum(p)
    bracket_term = p_sum - epsilon * sigma_i * x_i

    # 计算最终结果
    utility = coefficient * bracket_term

    return utility

### 构建Munkres类
- 构建Munkres算法类，实现算法中的每一个步骤

In [6]:
class Munkres:


    def __init__(self):
        """
        类的构造函数，在创建类的新实例时自动调用
        """
        self.C = None # 成本矩阵
        self.row_covered = [] # 记录已覆盖的行
        self.col_covered = [] # 记录已覆盖的列
        self.n = 0  # 矩阵维度
        self.Z0_r = 0  # 当前位置的行坐标
        self.Z0_c = 0  # 当前位置的列坐标
        self.marked = None  # 标记矩阵
        self.path = None  # 路径记录


- `[pad_value]`创建一个只含填充值的列表
- `* (total_rows - row_len)`将这个列表重复指定列数
- `+=`将生成的列表添加到`new_row`末尾

In [7]:

    def pad_matrix(self, matrix: Matrix, pad_value: int = 0)-> Matrix:

        """
        self表示这是类的方法
        pad_value用于填充矩阵的值，默认为0
        -> Matrix：表示返回一个矩阵
        """
        max_colums = 0
        total_rows = len(matrix)

        for row in matrix:
            max_colums = max(max_colums, len(row))

        total_rows = max(total_rows, max_colums)
        """
        Matrix = NewType('Matrix', Sequence[Sequence[AnyNum]]) # 定义矩阵类型
        请注意matrix是这么定义的
        """
        new_matrix = []
        for row in matrix:
            row_len = len(row)
            new_row = row[:]
            if total_rows > row_len:
                new_row += [pad_value] * (total_rows - row_len)
            new_matrix += [new_row]

        """
        填充额外的行,新填充的额外的行中全是0（填充元素值）
        """
        while len(new_matrix) < total_rows:
            new_matrix += [pad_value] * total_rows

        return new_matrix

### 计算

In [8]:
    def compute(self, cost_matrix: Matrix) -> Sequence[Tuple[int,int]]:


        """
        匈牙利算法的主要计算方法，用于找出最优的分配方案
        """

        self.C = self.pad_matrix(cost_matrix)  # 填充成方阵
        self.n = len(self.C)                   # 获取矩阵维度
        self.original_length = len(cost_matrix) # 保存原始矩阵的长度
        self.original_width = len(cost_matrix[0]) # 保存原始矩阵的宽度


        # 初始化覆盖状态数组
        self.row_covered = [False for i in range(self.n)]  # 行覆盖状态
        self.col_covered = [False for i in range(self.n)]  # 列覆盖状态


        # 初始化其他必要变量
        self.Z0_r = 0   # 当前位置的行坐标
        self.Z0_c = 0   # 当前位置的列坐标
        self.path = self.__make_matrix(self.n * 2, 0)    # 路径矩阵
        self.marked = self.__make_matrix(self.n, 0)      # 标记矩阵

        # 定义算法步骤
        steps = {
            1: self.__step1,
            2: self.__step2,
            3: self.__step3,
            4: self.__step4,
            5: self.__step5,
            6: self.__step6
        }

        # 主循环
        while not done:
            try:
                func = steps[step]
                step = func()
            except KeyError:
                done = True

In [9]:
    def __copy_matrix(self, matrix: Matrix) -> Matrix:
        """Return an exact copy of the supplied matrix"""
        return copy.deepcopy(matrix)

In [12]:
    def __make_matrix(self, n: int, val: AnyNum) -> Matrix:

        matrix = []
        for i in range(n):
            # [val for j in range(n)] 创建一个长度为n的列表，每个元素都是val
            matrix += [[val for j in range(n)]]
        return matrix

In [13]:
def __step1(self) -> int:
    """
    匈牙利算法的第一步：行归约
    对矩阵的每一行进行处理，找到每行的最小元素，并将该行的所有元素减去这个最小值

    返回值:
    int: 返回2，表示下一步应该执行步骤2

    异常:
    UnsolvableMatrix: 当某一行全部是DISALLOWED值时抛出，表示矩阵无解
    """
    # 获取成本矩阵和矩阵维度
    C = self.C
    n = self.n

    # 遍历矩阵的每一行
    for i in range(n):
        # 找出当前行中所有非DISALLOWED的值
        # DISALLOWED表示该位置不允许分配
        vals = [x for x in self.C[i] if x is not DISALLOWED]

        # 如果当前行没有有效值（全是DISALLOWED）
        if len(vals) == 0:
            # 抛出异常，表示矩阵无解
            raise UnsolveableMatrix(
                "Row {0} is entirely DISALLOWED.".format(i)
            )

        # 获取当前行中的最小值
        minval = min(vals)

        # 遍历当前行的每个元素
        for j in range(n):
            # 如果当前元素不是DISALLOWED
            if self.C[i][j] is not DISALLOWED:
                # 将当前元素减去该行的最小值
                # 这样可以确保每行至少有一个0
                self.C[i][j] -= minval

    # 返回2，表示接下来执行步骤2
    return 2

"""
使用示例：
假设初始矩阵为：
[
    [4, 2, 3],
    [1, 5, 3],
    [2, 4, 2]
]

执行步骤1后的矩阵将变为：
[
    [2, 0, 1],  # 减去最小值2
    [0, 4, 2],  # 减去最小值1
    [0, 2, 0]   # 减去最小值2
]

这样处理的目的是：
1. 确保每行至少有一个0
2. 为寻找最优匹配做准备
3. 保持相对成本差异不变
"""

'\n使用示例：\n假设初始矩阵为：\n[\n    [4, 2, 3],\n    [1, 5, 3],\n    [2, 4, 2]\n]\n\n执行步骤1后的矩阵将变为：\n[\n    [2, 0, 1],  # 减去最小值2\n    [0, 4, 2],  # 减去最小值1\n    [0, 2, 0]   # 减去最小值2\n]\n\n这样处理的目的是：\n1. 确保每行至少有一个0\n2. 为寻找最优匹配做准备\n3. 保持相对成本差异不变\n'