## 2. 对偶单纯形法

本notebook对应[这一篇blog](https://hanqiu92.github.io/blogs/2020/LP_dual_simplex_solver_2_202003/)中的内容，主要包括对对偶单纯形法（DS）的基本实现。

首先是一些准备工作：导入相关的计算工具包、从文件util_lec_2.py中读取上一个notebook中完成的类：问题类和解类。

In [1]:
import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as splinalg
from scipy.sparse import csc_matrix,csr_matrix,coo_matrix
from util_lec_2 import *

### DS单步迭代的实现

在这一小节中，我们根据blog中介绍的算法流程来实现DS的单步迭代。

下面的代码块中包含了以下模块：
* SolveStatus类: Enum类，对迭代步的求解状态进行管理
* Basis: 对基$(B,L,U)$的下标元素进行封装管理，提高代码可读性
* step: DS单步迭代流程
* ratio_test: 迭代步中的ratio test部分，由于代码量较多，单独形成一个函数

具体的实现逻辑可以参考下面代码中的注释。需要特别注意的是，在下面的代码中，一些变量的命名与blog中存在区别。具体来说，为了方便索引，我们用idxI,idxJ,idxBI,idxNJ来指代$i_B$,$j_N$,$i$,$j$；这也可以理解为我们考虑基于基$(B,L,U)$中顺序的下标$i$,$j$,$B_i$,$N_j$。为了符合Py的命名规范，变量lam_grad,s_grad,xB_grad分别对应$\delta \lambda$,$\delta s$,$\delta x_B$，即单位变动值；而包含delta的相关变量则对应$\Delta$，即实际变动值。

In [2]:
@unique
class SolveStatus(Enum):
    '''
    求解状态，每步DS迭代后返回，用于判断后续动作
    '''
    ONGOING = 0 ## 继续迭代
    OPT = 1 ## 最优
    PRIMAL_INFEAS = 2 ## 原始不可行
    DUAL_INFEAS = 3 ## 对偶不可行
    OTHER = -2
    ERR = -1
    
class Basis(object):
    '''
    对B和N=L \cup U的元素进行管理
    '''
    def __init__(self,idxB,m):
        self.idxB = idxB.copy()
        self.boolN = np.ones((m,),dtype=bool)
        self.boolN[self.idxB] = False
        self.idxN = np.where(self.boolN)[0]

def step(problem,sol,basis):
    '''
    进行一次对偶单纯形法的迭代
    '''
    ## step 0: 做一些计算准备
    n,m = problem.A.shape
    B = problem.A[:,basis.idxB] ## A_B
    
    ## step 1: pricing, 选出离开下标idxBI = idxB[idxI], 并计算相应对偶变量的单位变化量
    idxB = basis.idxB
    xB = sol.x[idxB]
    primal_inf = np.minimum(xB - problem.l[idxB],0) + np.maximum(xB - problem.u[idxB],0)
    bool_primal_inf = (primal_inf > problem.primal_upper_bound_tol[idxB]) | \
                      (primal_inf < problem.primal_lower_bound_tol[idxB])
    if not np.any(bool_primal_inf):
        ## 原始解可行，因此达到最优
        return SolveStatus.OPT,problem,sol,basis

    ## 否则，根据DSE规则选取离开下标idxBI，并保存相应信息
    ## DSE weight_i = |A_B^{-T}e_i|_2^2        
    DSE_weights = np.zeros((n,))
    for idx in range(n):
        e = np.zeros((n,))
        e[idx] = 1
        DSE_weights[idx] = np.sum(np.square(splinalg.spsolve(B.T,e)))
    idxI = np.argmax(np.square(primal_inf)/DSE_weights)
    idxBI = idxB[idxI]
    primal_gap = primal_inf[idxI]
    dual_grad = abs(primal_gap) ## 原始变量的不可行程度正是对偶问题的梯度

    bool_to_lower_bound = sol.x[idxBI] <= problem.l[idxBI]
    direcDualI = 1 if bool_to_lower_bound else -1 ## 原始变量的移动方向
    
    ## 计算对偶变量的单位变化量
    sB_grad0 = np.zeros((n,))
    sB_grad0[idxI] = 1
    lam_grad0 = splinalg.spsolve(B.T,sB_grad0) ## A_B^{-T}e_i
    s_grad0 = problem.A.T.dot(lam_grad0) ## A^TA_B^{-T}e_i
    if direcDualI == -1:
        lam_grad = lam_grad0
        s_grad = -s_grad0
    else:
        lam_grad = -lam_grad0
        s_grad = s_grad0

    ## step 2: ratio test, 选出进入下标idxNJ = idxN[idxJ]
    status_inner,idxJ,idxNJ,alpha_dual,flip_list = ratio_test(problem,sol,basis,s_grad,dual_grad)
    if status_inner == SolveStatus.PRIMAL_INFEAS:
        return SolveStatus.PRIMAL_INFEAS,problem,sol,basis

    ## step 3: 更新结果
    ## 更新对偶变量
    sol.lam += alpha_dual * lam_grad
    sol.s += alpha_dual * s_grad
    
    ## 更新原始变量
    aNJ = problem.A[:,idxNJ] ## A_j
    xB_grad0 = splinalg.spsolve(B,aNJ) ## A_B^{-1}A_j
    xB_grad = -xB_grad0
    if len(flip_list) > 0:
        ## 对x_N进行翻转
        idx_flip_to_lower = flip_list[sol.sign[flip_list] == VarStatus.AT_UPPER_BOUND.value]
        idx_flip_to_upper = flip_list[sol.sign[flip_list] == VarStatus.AT_LOWER_BOUND.value]
        sol.x[idx_flip_to_lower] = problem.l[idx_flip_to_lower]
        sol.x[idx_flip_to_upper] = problem.u[idx_flip_to_upper]
        sol.sign[idx_flip_to_lower] = VarStatus.AT_LOWER_BOUND.value
        sol.sign[idx_flip_to_upper] = VarStatus.AT_UPPER_BOUND.value
        ## 根据翻转的x_N，更新x_B
        delta_x_flipped = np.zeros((m,))
        delta_x_flipped[idx_flip_to_lower] = -problem.bounds_gap[idx_flip_to_lower]
        delta_x_flipped[idx_flip_to_upper] = problem.bounds_gap[idx_flip_to_upper]
        delta_b_flipped = problem.A.dot(delta_x_flipped)
        delta_xB = - splinalg.spsolve(B,delta_b_flipped)
        sol.x[basis.idxB] += delta_xB
        delta_xBI = delta_xB[idxI]
    else:
        delta_xBI = 0

    ## 然后，计算原始步长，并更新x_j和x_B
    alpha_primal = (-primal_gap - delta_xBI) / xB_grad[idxI]
    sol.x[basis.idxB] += alpha_primal * xB_grad
    sol.x[idxBI] = problem.l[idxBI] if bool_to_lower_bound else problem.u[idxBI]
    sol.sign[idxBI] = VarStatus.AT_LOWER_BOUND.value if bool_to_lower_bound else VarStatus.AT_UPPER_BOUND.value
    sol.x[idxNJ] += alpha_primal
    sol.sign[idxNJ] = VarStatus.OTHER.value ## 进入B

    ## 更新基
    basis.idxB[idxI] = idxNJ
    basis.idxN[idxJ] = idxBI
    basis.boolN[idxBI] = True
    basis.boolN[idxNJ] = False
    sol.s[basis.idxB] = 0

    return SolveStatus.ONGOING,problem,sol,basis

def ratio_test(problem,sol,basis,s_grad,dual_grad):
    idxN,boolN = basis.idxN,basis.boolN 
    
    ## 统计可能会约束对偶步长的对偶变量的下标
    idxL_bounded = np.where((sol.sign == VarStatus.AT_LOWER_BOUND.value) & (s_grad < 0))[0] ## 处于下界，要求s>=0
    idxU_bounded = np.where((sol.sign == VarStatus.AT_UPPER_BOUND.value) & (s_grad > 0))[0] ## 处于上界，要求s<=0
    idxF = np.where((sol.sign == VarStatus.OTHER.value) & boolN)[0] ## free变量，要求s==0
    idxF_bounded = idxF[(np.abs(s_grad[idxF]) > 0)]
    elems_bounded = np.concatenate([idxL_bounded,idxU_bounded,idxF_bounded])

    if len(elems_bounded) == 0:
        ## 没有变量可以约束对偶步长，因此对偶步长可以无限大，从而对偶目标无界/原始解不可行
        status,idxJ,idxNJ,alpha_dual,flip_list = SolveStatus.PRIMAL_INFEAS,-1,-1,0,[]
        return status,idxJ,idxNJ,alpha_dual,flip_list

    ## 针对可能约束对偶步长的变量，进一步判断其是否可以做bound flip；如果可行，则进行相关计算
    bool_not_both_bounded = problem.bool_not_both_bounded[elems_bounded]
    s_grad_bounded = s_grad[elems_bounded]
    ## 计算bound filp对对偶梯度的影响
    s_grad_abs_bounded = np.abs(s_grad_bounded)
    dual_grad_delta_flipped = problem.bounds_gap[elems_bounded] * s_grad_abs_bounded
    if (np.sum(dual_grad_delta_flipped) <= dual_grad) and (not np.any(bool_not_both_bounded)):
        ## 如果所有约束变量都可以做bound flip，而且flip完对偶的梯度仍是正数，则对偶目标无界/原始解不可行
        status,idxJ,idxNJ,delta_dual,flip_list = SolveStatus.PRIMAL_INFEAS,-1,-1,0,[]
        return status,idxJ,idxNJ,delta_dual,flip_list

    ## 计算每个约束变量对应bound flip的临界对偶步长
    alpha_dual_allowed = - sol.s[elems_bounded] / s_grad_bounded
    
    ## 通过不可flip的变量进一步筛选
    if np.any(bool_not_both_bounded):
        alpha_dual_ub = np.min(alpha_dual_allowed[bool_not_both_bounded])
        ## 找到对应最小步长的不可flip变量之前的所有变量
        idxs_remain = np.where(alpha_dual_allowed <= alpha_dual_ub + DUAL_TOL)[0] 
    else:
        ## 考虑全部变量
        idxs_remain = np.arange(len(elems_bounded),dtype=int)

    ## 通过alpha_dual_allowed对各变量进行排序
    idxs_remain = idxs_remain[np.argsort(alpha_dual_allowed[idxs_remain])]
    ## 做线搜索，寻找临界的变量
    dual_grad_remain = dual_grad
    for idx_pivot in idxs_remain:
        dual_grad_remain -= dual_grad_delta_flipped[idx_pivot]
        if dual_grad_remain < 0:
            break
    
    ## 整理结果
    idxNJ = elems_bounded[idx_pivot]
    alpha_dual = alpha_dual_allowed[idx_pivot]
    bool_flip = (alpha_dual_allowed < alpha_dual)
    flip_list = elems_bounded[bool_flip]
    idxJ = np.where(idxN == idxNJ)[0]
    return SolveStatus.ONGOING,idxJ,idxNJ,alpha_dual,flip_list

下面，我们实现一个简单的求解流程，来对上述代码进行测试。

In [3]:
from util import *
import time
import traceback
import glob
np.set_printoptions(suppress=False,precision=4)

def test_w_bound(problem_size,solve_func):    
    evaluator = Evaluator()
    
    fnames = sorted(glob.glob('netlib/*.SIF'))
    for fname in fnames:
        model_name = fname.split('/')[-1].split('.')[0]

        A_dict,b_dict,sense_dict,c_dict,l_dict,u_dict,m,n,row_key,col_key = read_mps(fname)
        A,b,sense,c,l,u = dicts_to_computable(A_dict,b_dict,sense_dict,c_dict,l_dict,u_dict,m,n)

        ## 为了保证对偶可行，将无界改为比较大的上下界
        bound = 1e4
        if np.all(l <= -INF):
            l_new = np.full((m,),-bound)
        else:
            l_new = np.maximum(l,np.min(l[l>-INF])-bound)
        if np.all(u >= INF):
            u_new = np.full((m,),bound)
        else:
            u_new = np.minimum(u,np.max(u[u<INF])+bound)
        bool_all_eq = np.all(sense == 0)

        ## 筛选比较小的问题来进行测试
        if A.nnz < problem_size and bool_all_eq and np.all(l_new <= u_new):
            evaluator.reset(A,b,sense,c,l,u)
            print('Problem name: {}, size:({},{}).'.format(model_name,A.shape,A.nnz))

            l_dict_new = dict([(i,l_new[i]) for i in range(m)])
            u_dict_new = dict([(i,u_new[i]) for i in range(m)])
            try:
                tt = time.time()
                x_pulp,lam_pulp,status_pulp = solve_pulp(A_dict,b_dict,sense_dict,c_dict,l_dict_new,u_dict_new,
                                                         m,n,msg=1)
                time_pulp = time.time() - tt
                print(f"Eval: {evaluator.eval_str(x_pulp)} Status: {status_pulp}. Elapsed time: {time_pulp:.3f}.\n")

                tt = time.time()
                status_ds,sol_ds,basis_ds = solve_func(A,b,sense,c,l_new,u_new)
                x_ds = sol_ds.x[:m]
                time_ds = time.time() - tt
                print(f"Eval: {evaluator.eval_str(x_ds)} Status: {status_ds}. Elapsed time: {time_ds:.3f}.\n")

            except Exception as e:
                print(repr(e))
                print(traceback.print_exc())

In [4]:
def solve(A_raw,b_raw,sense_raw,c_raw,l_raw,u_raw):
    '''
    基于step迭代步的DS求解流程
    '''
    ## 读取数据
    A,b,sense,c,l,u = A_raw.copy(),b_raw.copy(),sense_raw.copy(),c_raw.copy(),l_raw.copy(),u_raw.copy()
    n,m = A.shape
    ## 加上逻辑变量，保证A是满秩的；否则会出现数值问题
    c = np.concatenate([c,np.zeros((n,))])
    l = np.concatenate([l,np.zeros((n,))])
    u = np.concatenate([u,np.zeros((n,))])
    A = sp.hstack([A,sp.eye(n)],format='csc')
    n,m = A.shape

    ## 初始化
    problem = Problem(A,b,c,l,u)
    idxB = np.arange(m-n,m,1,dtype=int)
    basis = Basis(idxB,m)
    
    B = problem.A[:,idxB]
    lam = splinalg.spsolve(B.T,problem.c[idxB])
    s = problem.c - problem.A.T.dot(lam)
    s[idxB] = 0

    boolN = basis.boolN
    sign = np.full((m,),VarStatus.OTHER.value,dtype=int)
    sign[boolN & (s < 0)] = VarStatus.AT_UPPER_BOUND.value
    sign[boolN & (s >= 0)] = VarStatus.AT_LOWER_BOUND.value

    x = np.zeros((m,))
    x[sign == VarStatus.AT_LOWER_BOUND.value] = problem.l[sign == VarStatus.AT_LOWER_BOUND.value]
    x[sign == VarStatus.AT_UPPER_BOUND.value] = problem.u[sign == VarStatus.AT_UPPER_BOUND.value]
    x[idxB] = splinalg.spsolve(B,problem.b - problem.A.dot(x))
    sol = Solution(x,lam,s,sign)

    ## 开始DS迭代流程
    start_time = time.time()
    count = 0
    while True:
        status,problem,sol,basis = step(problem,sol,basis) ## 做一步迭代
        count += 1
        header = '{} '.format(count)
        
        ## 每隔一定迭代步数观察效果
        if ((count % 100 == 0 and count > 0) and (status == SolveStatus.ONGOING)):
            problem.check_sol_status(sol,print_func=print,print_header=header)

        ## 如果最优或者无解，abort
        if status != SolveStatus.ONGOING:
            problem.check_sol_status(sol,print_func=print,print_header=header)
            return status,sol,basis

        ## 限制迭代时长和次数
        if time.time() - start_time > 9.0e2 or count > 1e4:
            print('out of time / iterations.')
            problem.check_sol_status(sol,print_func=print,print_header=header)
            return SolveStatus.OTHER,sol,basis

In [5]:
test_w_bound(5000,solve)

Problem name: BANDM, size:((305, 472),2494).
Eval: con inf=9.5659e-05,var inf=0.0000e+00,obj=-1.5863e+02. Status: 1. Elapsed time: 0.051.

100   Obj Primal -1.2234e+06 Dual -1.2234e+06  Primal Inf 2.4241e+06 (230)
162   Obj Primal -5.7584e+05 Dual -5.7584e+05  Primal Inf 2.7332e+06 (222)
Eval: con inf=2.2583e+06,var inf=1.6345e+05,obj=-5.7584e+05. Status: SolveStatus.PRIMAL_INFEAS. Elapsed time: 20.896.

Problem name: GROW7, size:((140, 301),2612).
Eval: con inf=2.0063e-01,var inf=2.9632e-10,obj=-4.7788e+07. Status: 1. Elapsed time: 0.039.

100   Obj Primal -5.3512e+07 Dual -5.3512e+07  Primal Inf 1.1641e+07 (99)
200   Obj Primal -4.9523e+07 Dual -4.9523e+07  Primal Inf 2.2535e+06 (76)
300   Obj Primal -4.8176e+07 Dual -4.8176e+07  Primal Inf 4.1330e+06 (70)
400   Obj Primal -4.7905e+07 Dual -4.7905e+07  Primal Inf 1.3303e+07 (67)
471   Obj Primal -4.7788e+07 Dual -4.7788e+07
Eval: con inf=9.3237e-08,var inf=0.0000e+00,obj=-4.7788e+07. Status: SolveStatus.OPT. Elapsed time: 36.109.

Pr

通过上述测试我们可以看出，虽然本小节中的DS实现能够执行，但存在两个比较严重的问题：
1. 结果可能不正确（对于问题BANDM）；
2. 计算速度慢。

在下一小节中，我们先尝试解决第二个问题。

### 线性方程求解效率优化

通过应用%prun、%lprun等profile工具可以发现，在前一小节的实现中，每步DS迭代中计算时间开销最大的部分是线性方程$Ax=y$的求解。因此，在这一小节中，我们将介绍如何利用DS算法的性质来优化这部分计算的效率，并通过测试来说明这一效率优化的重要性。

对于线性方程$Ax=y$，一种常用的做法是先对矩阵$A$做LU分解（其中，$L$是一个下三角矩阵，而$U$是一个上三角矩阵），然后计算$x=U^{-1}L^{-1}y$。（前面用到的scipy.sparse.linalg.spsolve函数就采用了这种做法。）这种做法的好处在于，$L$和$U$都是三角阵，因此$L^{-1}y$和$U^{-1}z$可以高效求解；而且，如果矩阵$A$保持不变的话，LU分解的结果可以复用用于多组线性方程的求解中。然而，LU分解的计算开销是比较大的。注意到DS算法过程具有以下两个性质：
1. 每步迭代中我们需要对同一个矩阵$A_B$求解多个线性方程$A_Bx=y$和$A_B^Tx=y$；
2. 每步迭代后$A_B$的变化很小。

因此，如果在线性方程求解中能利用已有的LU分解结果和每步迭代中$A_B$变化的信息，而不是简单地重新做LU分解，则应能提高计算效率。具体来说，在每步迭代后，$A_B$的第$i_B$列（也是$A$的第$i$列）被$A$的第$j$列替代；用矩阵计算的方法表示，则有
$$
\begin{aligned}
A_B & \gets A_B - A_Be_{i_B}e_i^T + A_{j}e_i^T \\
& = A_B(I + (\delta x_B-e_i)e_i^T),
\end{aligned}\tag{1}\label{eq:forward_update}
$$ 其中$e_i$是$e_{i_B}$的简写（与blog中保持一致）。换句话说，矩阵$A_B$的更新可以看成是乘上一个只有第$i_B$列非$e_i$的矩阵。这一形式的好处是，$A_B$的逆的更新也可以很方便的表示：
$$
\begin{aligned}
A_B^{-1} & \gets (I + (\delta x_B-e_i)e_i^T)^{-1} A_B^{-1} \\
& = (I + \eta e_i^T) A_B^{-1},
\end{aligned}\tag{2}\label{eq:backward_update}
$$ 其中$\eta_{i_B}=1/(\delta x_B)_{i_B}-1; \eta_l = -(\delta x_B)_l/(\delta x_B)_{i_B}, \ \forall l\neq i_B$。基于这种表示，在迭代步$k$进行线性方程$A_B^{(k)}x=y$的求解时，相比于直接解$x=(A_B^{(k)})^{-1}y$，我们可以解$x=\prod_{k'=k-1}^{1}E_{k'}(A_B^{(1)})^{-1}y$，其中$E_{k'} = (I + \eta^{(k')}(e_i^{(k')})^T)$。同理，对于问题$(A_B^{(k)})^Tx=y$，我们可以解$x=(A_B^{(1)})^{-T}\prod_{k'=1}^{k-1}E_{k'}^Ty$。这种方法虽然在每次求解中增加了矩阵$E_{k}$乘积的计算量，但省下了LU分解的计算开销，从整体上看是更高效的。

上述思路在文献中称为product form of inverse（PFI），是一种经典做法；但目前主流求解器已经不采用这个思路了，而是使用一种称为LU update的思路：在迭代过程中直接对LU分解进行更新。LU update的优点在于能够更好地利用矩阵的稀疏性降低计算成本，而且每次求解的计算量是一定的；相比之下，PFI每次求解的计算量随着迭代步$k$的增长而增长，而且在矩阵$E_{k}$的乘积计算中无法利用$A$的稀疏性。另外，在PFI中，如果在一个迭代步中$(\delta x_B)_{i_B}$很小，那么数值误差会导致$\eta$很不稳定；而LU update可以更好地识别并解决这个问题。不过，LU update是一种局限于单纯形法的技术，因此目前通用的线性方程求解工具包中普遍没有支持这种技术；而直接用脚本语言进行实现无法有效利用其高效率，因此在本系列中暂不做尝试。A. Koberstein在其[博士论文](https://www.researchgate.net/profile/Achim_Koberstein/publication/35632487_The_dual_simplex_method_techniques_for_a_fast_and_stable_implementation_Elektronische_Ressource/links/0a85e52ef5144e9031000000/The-dual-simplex-method-techniques-for-a-fast-and-stable-implementation-Elektronische-Ressource.pdf)的第5章中对LU update进行了简单介绍，有兴趣的读者可以进行参考。一些其他的参考资料包括：
* LUSOL: Sparse LU for Ax = b. [[link1]](https://stanford.edu/group/SOL/software/lusol/) [[link2]](https://github.com/nwh/lusol)
* Hyper-sparsity: Hall, J. A., & McKinnon, K. I. (2005). Hyper-sparsity in the revised simplex method and how to exploit it. Computational Optimization and Applications, 32(3), 259-283. [[link]](http://www.optimization-online.org/DB_FILE/2000/11/234.pdf)

对于DSE，我们同样可以基于迭代公式(1-2)进行计算上的优化。在DSE的过程中，我们需要对$\forall l\in\{1,\cdots,n\}$计算$\beta_l = |\rho_l|_2^2=\rho_l^{T}\rho_l$，其中$\rho_l$表示$A_B^{-T}e_l$；注意到$\delta\lambda_0=A_B^{-T}e_i=\rho_{i_B}$。根据公式(1-2)，
$$
\rho_l \gets A_B^{-T} (I + e_i\eta^T) e_l  = \rho_l + \eta_l \rho_{i_B} = \rho_l + \eta_l \delta\lambda_0 
$$

因此我们可以推得以下更新方式
$$
\begin{aligned}
\beta_{i_B} & \gets \frac{1}{(\delta x_B)_{i_B}^2}\beta_{i_B}; \\
\beta_l & \gets \beta_l + \frac{(\delta x_B)_l^2}{(\delta x_B)_{i_B}^2} \beta_i - 2\frac{(\delta x_B)_l}{(\delta x_B)_{i_B}}\rho_l^T \delta\lambda_0  \\
& = \beta_l + \frac{(\delta x_B)_l^2}{(\delta x_B)_{i_B}^2} \beta_i - 2\frac{(\delta x_B)_l}{(\delta x_B)_{i_B}}\tau_l, \ \forall l \neq i_B, \\
\end{aligned}\tag{3}
$$

其中$\tau=A_B^{-1}\delta \lambda_0$可以通过线性方程求解来计算。

下面，我们对上述思路给出代码实现。具体来说，我们考虑以下实现方案：
1. Basis类在管理基$(B,L,U)$的下标元素的基础上，进一步负责所有的线性代数相关的计算（相乘/方程求解/LU分解更新）。
2. 我们引入DualSimplex类对上面DS算法的各部分（step、ratio_test、solve）进行封装，从而实现更好的对迭代步的管理（目前，仅考虑重新进行LU分解的触发）。

In [6]:
import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as splinalg
from scipy.sparse import csc_matrix,csr_matrix,coo_matrix
from scikits import umfpack
from scikits.umfpack import UMFPACK_A,UMFPACK_At,UMFPACK_Aat

@unique
class LinearSolver(Enum):
    '''
    用于对线性方程求解/LU分解工具的选用，默认是UMFPACK；
    计算实验表明UMFPACK比SUPERLU快20%以上
    '''
    UMFPACK = 0
    SUPERLU = 1

LINEAR_SOLVER_TYPE = LinearSolver.UMFPACK

class BasisPlus(object):
    '''
    在上面Basis类的基础上，进一步引入线性代数相关的计算（相乘/方程求解/LU分解更新）
    '''
    def __init__(self,A,AT=None):
        self.A = A
        if AT is None:
            self.AT = self.A.T
        else:
            self.AT = AT
        self.n,self.m = self.A.shape

        self.idxB,self.idxN,self.boolN = None,None,None
        self.DSE_weights = None ## DSE权重
        self.invB = None ## LU 分解结果
        self.etas = [] ## PFI的连乘部分
        self.eta_count = 0

    def copy(self):
        basis_new = Basis(self.A,self.AT)
        basis_new.invB = self.invB
        basis_new.eta_count = self.eta_count
        basis_new.idxB = self.idxB.copy()
        basis_new.idxN = self.idxN.copy()
        basis_new.boolN = self.boolN.copy()
        basis_new.DSE_weights = self.DSE_weights.copy()
        basis_new.etas = self.etas.copy()
        return basis_new
    
    def reset_basis_idx(self,idxB):
        self.idxB = idxB.copy()
        self.boolN = np.ones((self.m,),dtype=bool)
        self.boolN[self.idxB] = False
        self.idxN = np.where(self.boolN)[0]
        self.lu_factorize()

    def lu_update(self,eta):
        ## 在PFI中，只需要增加\eta向量和对应的位置i_B即可
        self.etas += [eta]
        self.eta_count += 1

    def lu_factorize(self):
        ## 重新做LU分解
        self.B = self.A[:,self.idxB]
        self.etas = []
        self.eta_count = 0

        if LINEAR_SOLVER_TYPE == LinearSolver.SUPERLU:
            self.invB = splinalg.splu(self.B)
        else:
            self.invB = umfpack.splu(self.B)

    def get_col(self,idx):
        ## 对从矩阵A中获取列idx的封装
        ## 需要A是CSC格式的
        idx_start,idx_end = self.A.indptr[idx],self.A.indptr[idx+1]
        data_,row_ = self.A.data[idx_start:idx_end],self.A.indices[idx_start:idx_end]
        col = np.zeros((self.n,))
        col[row_] = data_
        return col

    def get_elem_vec(self,idx,if_transpose=False):
        ## 对获取单位向量e的封装
        if if_transpose:
            e = np.zeros((self.n,))
        else:
            e = np.zeros((self.m,))
        e[idx] = 1
        return e

    def solve(self,y,if_transpose=False):
        ## 基于PFI的线性方程求解
        if if_transpose:
            ## A_B^T x = y
            y_ = y.copy()
            for eta in self.etas[::-1]:
                y_[eta[0]] += np.dot(eta[1],y_)
            if LINEAR_SOLVER_TYPE == LinearSolver.SUPERLU:
                x = self.invB.solve(y_,trans='T')
            else:
                x = self.invB.umf.solve(UMFPACK_Aat, self.invB._A, y_, autoTranspose=True)
        else:
            ## A_B x = y
            if LINEAR_SOLVER_TYPE == LinearSolver.SUPERLU:
                x = self.invB.solve(y)
            else:
                x = self.invB.umf.solve(UMFPACK_A, self.invB._A, y, autoTranspose=True)
            for eta in self.etas:
                x += x[eta[0]] * eta[1]
        return x

    def dot(self,x,if_transpose=False):
        ## 矩阵与向量的相乘
        if if_transpose:
            ## y = A^T x
            return self.AT._mul_vector(x)
        else:
            ## y = A x
            return self.A._mul_vector(x)

    def get_DSE_weight(self,idx):
        ## 计算idx行的DSE权重|A_B^{-T} e_{idx}|_2^2
        e = self.get_elem_vec(idx,if_transpose=True)
        return np.sum(np.square(self.solve(e,if_transpose=True)))

    def init_DSE_weights(self):
        self.DSE_weights = np.ones((self.n,))

    def reset_DSE_weights(self):
        ## 重新计算DSE权重
        self.DSE_weights = np.array([self.get_DSE_weight(i) for i in range(self.n)])

    def update_DSE_weights(self,idxI,xB_grad0,tau,betaI0):
        ## 通过迭代的方式更新DSE权重，参考公式(3)
        alpha_j = xB_grad0[idxI]
        betaI = betaI0 / alpha_j / alpha_j
        self.DSE_weights += xB_grad0 * (xB_grad0 * betaI - 2 / alpha_j * tau)
        self.DSE_weights[idxI] = betaI
        self.DSE_weights = np.maximum(self.DSE_weights,1e-4)

    def get_DSE_weights(self):
        return self.DSE_weights

In [7]:
class DualSimplexSolver(object):
    def __init__(self):
        ## 保存迭代过程中的信息，用于进行控制流处理
        self.global_info = {'count':0,'start_time':time.time()}

    def _pricing(self,problem,sol,basis):
        idxB = basis.idxB
        xB = sol.x[idxB]
        primal_inf = np.minimum(xB - problem.l[idxB],0) + np.maximum(xB - problem.u[idxB],0)
        bool_primal_inf = (primal_inf > problem.primal_upper_bound_tol[idxB]) | \
                          (primal_inf < problem.primal_lower_bound_tol[idxB])

        if not np.any(bool_primal_inf):
            ## 原始解可行，因此达到最优
            return SolveStatus.OPT,-1,-1,0

        ## 否则，根据DSE规则选取离开下标idxBI，并保存相应信息
        ## DSE weight已经保存在basis中
        idxI = np.argmax(np.square(primal_inf)/basis.DSE_weights)
        idxBI = idxB[idxI]
        primal_gap = primal_inf[idxI]
        return SolveStatus.ONGOING,idxI,idxBI,primal_gap

    def _ratio_test(self,problem,sol,basis,s_grad,dual_grad):
        return ratio_test(problem,sol,basis,s_grad,dual_grad)
    
    def _step(self,problem,sol,basis):
        count = self.global_info.get('count',0)
        header = '{} '.format(count)

        ## step 1: pricing, 选出离开下标idxBI = idxB[idxI], 并计算相应对偶变量的单位变化量
        status_inner,idxI,idxBI,primal_gap = self._pricing(problem,sol,basis)
        if status_inner == SolveStatus.OPT:
            return SolveStatus.OPT,problem,sol,basis
        dual_grad = abs(primal_gap) ## 原始变量的不可行程度正是对偶问题的梯度

        bool_to_lower_bound = sol.x[idxBI] <= problem.l[idxBI]
        direcDualI = 1 if bool_to_lower_bound else -1 ## 原始变量的移动方向
        
        ## 计算对偶变量的单位变化量
        sB_grad0 = basis.get_elem_vec(idxI,if_transpose=True) ## A_B^{-T}e_I
        lam_grad0 = basis.solve(sB_grad0,if_transpose=True) ## A_B^{-T}e_I
        s_grad0 = basis.dot(lam_grad0,if_transpose=True) ## A^TA_B^{-T}e_I
        if direcDualI == -1:
            lam_grad = lam_grad0
            s_grad = -s_grad0
        else:
            lam_grad = -lam_grad0
            s_grad = s_grad0

        ## step 2: ratio test, 选出进入下标idxNJ = idxN[idxJ]
        status_inner,idxJ,idxNJ,alpha_dual,flip_list = self._ratio_test(problem,sol,basis,s_grad,dual_grad)
        if status_inner == SolveStatus.PRIMAL_INFEAS:
            return SolveStatus.PRIMAL_INFEAS,problem,sol,basis

        ## step 3: 更新结果
        
        aNJ = basis.get_col(idxNJ) ## A_j
        xB_grad0 = basis.solve(aNJ,if_transpose=False) ## A_B^{-1}A_j
        xB_grad = - xB_grad0
        betaI = np.dot(lam_grad0,lam_grad0)
        tau = basis.solve(lam_grad0,if_transpose=False)
        
        ## 校核数值稳定性，在这一个notebook中只做评估而不进行处理
        if True:
            ## 校核通过\delta s和\delta x_B计算得到的alpha = e_I^T A_B^{-1} a_{NJ}的一致性
            err_pivot = s_grad0[idxNJ] + xB_grad[idxI]
            if abs(err_pivot) > PRIMAL_TOL * (1 + abs(xB_grad[idxI])):
                print('{}  WARN err FTRAN/BTRAN pivot consistency {:.4e}.'.format(header,err_pivot))
            ## 校核DSE权重的准确性
            err_dse = betaI - basis.DSE_weights[idxI]
            if abs(err_dse) > PIVOT_TOL:
                print('{}  WARN err DSE accuracy {:.4e}.'.format(header,err_dse))
        
        ## 更新对偶变量
        sol.lam += alpha_dual * lam_grad
        sol.s += alpha_dual * s_grad

        ## 更新原始变量  
        if len(flip_list) > 0:
            ## 对x_N进行翻转
            idx_flip_to_lower = flip_list[sol.sign[flip_list] == VarStatus.AT_UPPER_BOUND.value]
            idx_flip_to_upper = flip_list[sol.sign[flip_list] == VarStatus.AT_LOWER_BOUND.value]
            sol.x[idx_flip_to_lower] = problem.l[idx_flip_to_lower]
            sol.x[idx_flip_to_upper] = problem.u[idx_flip_to_upper]
            sol.sign[idx_flip_to_lower] = VarStatus.AT_LOWER_BOUND.value
            sol.sign[idx_flip_to_upper] = VarStatus.AT_UPPER_BOUND.value
            ## 根据翻转的x_N，更新x_B
            delta_x_flipped = np.zeros((basis.m,))
            delta_x_flipped[idx_flip_to_lower] = -problem.bounds_gap[idx_flip_to_lower]
            delta_x_flipped[idx_flip_to_upper] = problem.bounds_gap[idx_flip_to_upper]
            delta_b_flipped = basis.dot(delta_x_flipped,if_transpose=False)
            delta_xB = - basis.solve(delta_b_flipped,if_transpose=False)
            sol.x[basis.idxB] += delta_xB
            delta_xBI = delta_xB[idxI]
        else:
            delta_xBI = 0

        ## 然后，计算原始步长，并更新x_j和x_B
        alpha_primal = (-primal_gap - delta_xBI) / xB_grad[idxI]
        sol.x[basis.idxB] += alpha_primal * xB_grad
        sol.x[idxBI] = problem.l[idxBI] if bool_to_lower_bound else problem.u[idxBI]
        sol.sign[idxBI] = VarStatus.AT_LOWER_BOUND.value if bool_to_lower_bound else VarStatus.AT_UPPER_BOUND.value
        sol.x[idxNJ] += alpha_primal
        sol.sign[idxNJ] = VarStatus.OTHER.value ## 进入B

        ## 更新基
        basis.idxB[idxI] = idxNJ
        basis.idxN[idxJ] = idxBI
        basis.boolN[idxBI] = True
        basis.boolN[idxNJ] = False
        ## 更新PFI和DSE信息
        eta_vec = -xB_grad0 / xB_grad0[idxI]
        eta_vec[idxI] += 1 / xB_grad0[idxI]
        eta = (idxI,eta_vec)
        basis.lu_update(eta=eta)
        basis.update_DSE_weights(idxI,xB_grad0,tau,betaI)        
        sol.s[basis.idxB] = 0

        return SolveStatus.ONGOING,problem,sol,basis


    def compute_sol_from_basis(self,problem,basis,sign=None):
        '''
        给定一组基，计算对应的解。如果sign没有给出，则这组基是狭义基，不在B中的元素的L/U属性将按照对偶变量s的符号给出
        '''
        idxB,boolN,m = basis.idxB,basis.boolN,basis.m
        
        ## A^T \lambda + s = c, s_B = 0
        lam = basis.solve(problem.c[idxB],if_transpose=True)
        s = problem.c - basis.dot(lam,if_transpose=True)
        s[idxB] = 0
        if sign is None:
            sign = VarStatus.OTHER.value * np.ones((m,),dtype=int)
            sign[boolN & (s < 0)] = VarStatus.AT_UPPER_BOUND.value
            sign[boolN & (s >= 0)] = VarStatus.AT_LOWER_BOUND.value

        ## A_B x_B + A_L x_L + A_U x_U = b, x_L = l_L, x_U = u_U
        x = np.zeros((m,))
        x[sign == VarStatus.AT_LOWER_BOUND.value] = problem.l[sign == VarStatus.AT_LOWER_BOUND.value]
        x[sign == VarStatus.AT_UPPER_BOUND.value] = problem.u[sign == VarStatus.AT_UPPER_BOUND.value]
        x[idxB] = basis.solve(problem.b - basis.dot(x,if_transpose=False),if_transpose=False)

        sol = Solution(x,lam,s,sign)
        return sol

    def refactorize(self,problem,sol,basis):
        '''
        重新做LU分解并计算解，降低数值误差
        '''
        try:
            basis.lu_factorize()
        except Exception as e:
            print(e)
            return problem,sol,basis
        if sol is not None:
            sol = self.compute_sol_from_basis(problem,basis,sign=sol.sign)
        else:
            sol = self.compute_sol_from_basis(problem,basis)
        return problem,sol,basis
    
    def _solve(self,problem,sol,basis):
        '''
        对迭代步之间进行管理
        '''
        count = self.global_info.get('count',0)
        start_time = self.global_info.get('start_time',time.time())
        while True:
            if count % 5000 == 0 and count > 0:
                print('resetting the DSE weights!')
                basis.reset_DSE_weights() ## DSE更新
            if basis.eta_count % 20 == 0 and count > 0:
                basis.lu_factorize() ## LU分解
                
            status,problem,sol,basis = self._step(problem,sol,basis) ## 做一步迭代
            count += 1
            self.global_info['count'] = count
            header = '{} '.format(count)

            ## 每隔一定迭代步数观察效果
            if ((count % 1000 == 0 and count > 0) and (status == SolveStatus.ONGOING)):
                problem.check_sol_status(sol,print_func=print,print_header=header)

            ## 如果最优或者无解，abort
            if status != SolveStatus.ONGOING:
                problem.check_sol_status(sol,print_func=print,print_header=header)
                return status,problem,sol,basis

            ## 限制迭代时长和次数
            if time.time() - start_time > 9.0e2 or count > 1e4:
                print('out of time / iterations.')
                problem.check_sol_status(sol,print_func=print,print_header=header)
                return SolveStatus.OTHER,problem,sol,basis
            
    def solve(self,A_raw,b_raw,sense_raw,c_raw,l_raw,u_raw):
        '''
        主求解入口
        '''
        self.global_info = {'count':0,'start_time':time.time()}

        ## 读取数据
        A,b,sense,c,l,u = A_raw.copy(),b_raw.copy(),sense_raw.copy(),c_raw.copy(),l_raw.copy(),u_raw.copy()
        n,m = A.shape
        ## 加上逻辑变量，保证A是满秩的；否则会出现数值问题
        c = np.concatenate([c,np.zeros((n,))])
        l = np.concatenate([l,np.zeros((n,))])
        u = np.concatenate([u,np.zeros((n,))])
        A = sp.hstack([A,sp.eye(n)],format='csc')

        ## 初始化
        problem = Problem(A,b,c,l,u)
        n,m = A.shape
        idxB = np.arange(m-n,m,1,dtype=int)
        basis = BasisPlus(A)
        basis.init_DSE_weights()
        basis.reset_basis_idx(idxB)
        problem,sol,basis = self.refactorize(problem,sol=None,basis=basis)

        ## 开始DS迭代流程
        status,problem,sol,basis = self._solve(problem,sol,basis)

        return status,sol,basis

下面，对新的代码实现进行校核。

In [8]:
ds = DualSimplexSolver()
# test_w_bound(5000,ds.solve)
test_w_bound(10000,ds.solve)

Problem name: BANDM, size:((305, 472),2494).
Eval: con inf=9.5659e-05,var inf=0.0000e+00,obj=-1.5863e+02. Status: 1. Elapsed time: 0.046.

162   Obj Primal -7.2056e+05 Dual -7.2056e+05  Primal Inf 2.1287e+06 (213)
Eval: con inf=1.7871e+06,var inf=2.6871e+05,obj=-7.2056e+05. Status: SolveStatus.PRIMAL_INFEAS. Elapsed time: 0.087.

Problem name: FIT1P, size:((627, 1677),9868).
Eval: con inf=1.9123e-04,var inf=0.0000e+00,obj=9.1464e+03. Status: 1. Elapsed time: 0.142.

764   WARN err DSE accuracy -9.7774e-05.
822   WARN err DSE accuracy -7.6640e-05.
872   Obj Primal 9.1464e+03 Dual 9.1464e+03
Eval: con inf=1.2612e-08,var inf=0.0000e+00,obj=9.1464e+03. Status: SolveStatus.OPT. Elapsed time: 0.612.

Problem name: GROW15, size:((300, 645),5620).
Eval: con inf=4.5275e-01,var inf=3.8815e-10,obj=-1.0687e+08. Status: 1. Elapsed time: 0.075.

1000   Obj Primal -1.0762e+08 Dual -1.0762e+08  Primal Inf 4.8591e+07 (147)
2000   Obj Primal -1.0687e+08 Dual -1.0687e+08  Primal Inf 1.6404e+07 (67)
2218 

  econd = 1.0 / self.info[UMFPACK_RCOND]


580   WARN err DSE accuracy -3.3283e+06.
586   WARN err DSE accuracy 2.4933e+01.
587   WARN err DSE accuracy -4.1321e+01.
589   WARN err DSE accuracy 2.9012e+02.
590   WARN err FTRAN/BTRAN pivot consistency 2.0068e+00.
590   WARN err DSE accuracy 4.7616e+01.
591   WARN err DSE accuracy 3.5245e+03.
592   WARN err FTRAN/BTRAN pivot consistency -5.8647e+00.
592   WARN err DSE accuracy 3.7629e+03.
593   WARN err DSE accuracy 1.4699e+04.
598   WARN err DSE accuracy -3.8176e+02.
599   WARN err FTRAN/BTRAN pivot consistency -6.5876e+01.
599   WARN err DSE accuracy 6.5005e+04.
601   Obj Primal 6.4000e+02 Dual 7.2000e+01  Primal Inf 3.5616e+19 (597)  Con Inf 1.7706e+04 (324)
Eval: con inf=1.8629e+19,var inf=1.5805e+19,obj=6.4000e+02. Status: SolveStatus.PRIMAL_INFEAS. Elapsed time: 0.405.

Problem name: SCSD1, size:((77, 760),2388).
Eval: con inf=1.9562e-08,var inf=0.0000e+00,obj=8.6667e+00. Status: 1. Elapsed time: 0.043.

119   Obj Primal 8.6667e+00 Dual 8.6667e+00
Eval: con inf=5.8984e-10,va

根据上面的测试结果可以看出，在优化线性代数相关计算后，DS算法的速度显著上升。然而，迭代步中的校核显示，这些优化可能会引发数值问题，从而导致计算结果不准确。在本系列的后续文章/代码中，我将进一步讨论解决这些新问题的方法。