In [1]:
import numpy as np
from typing import Dict, Tuple, List
from copy import copy
from collections import defaultdict
from heapq import *

In [2]:
class Network:
    def __init__(
        self,
        nodes: List[object],
        edges: List[Tuple[object, object]],
        capacities: List[int],
        costs: List[int],
        supplies: List[int] 
    ) -> None:
        
        M = 9999
        self.U = int(np.max(np.array(capacities)))
        
        self.E = [
            *edges,
            *[(i, -1) for i in nodes],
            *[(-2, i) for i in nodes],
            (-1, -2)
        ]
        self.V = [*nodes, -1, -2]
        self.u = {
            **dict(zip(edges, capacities)),
            **{(i, -1): M for i in nodes},
            **{(-2, i): M for i in nodes},
            (-1, -2): M
        }
            
        self.c = {
            **dict(zip(edges, costs)),
            **{(i, -1): 0 for i in nodes},
            **{(-2, i): 0 for i in nodes},
            (-1, -2): M
        }
        
        self.b = {**dict(zip(nodes, supplies)), -1: 0, -2: 0}

In [3]:
# Adapted from https://gist.github.com/kachayev/5990802


def dijkstra(
    s: object,
    t: object,
    adj: Dict[Tuple[object, object], int]
) -> Tuple[Dict[object, int], List[Tuple[object, object]]]:
    '''
    Computes the shortest path distances from [s] to all other nodes 
    and the shortest path from [s] to [t] on the graph with edges/weights 
    given by the entries of [adj]. Implementation of Dijkstra's shortest
    path algorithm for graphs with non-negative edge costs.
    
    Args:
        s: Start node
        t: End node (for path)
        adj: Dictionary containing edges and their respective weights, with
            adj[(i, j)] = w_{ij} for nodes i and j.
    '''
    def _format_path(lst):
        return [(lst[i], lst[i+1]) for i in range(len(lst) - 1)]
            
            
    # Generate underlying adjacency lists
    adjacency = defaultdict(list)
    for (i, j), c in adj.items():
        adjacency[i].append((j, c))

    # Initialize queue, distances
    queue, seen, distances = [(0, s, [])], set(), {s: 0}
    
    while queue:
        cost, v1, path = heappop(queue)
        if v1 not in seen:
            seen.add(v1)
            path = [*path, v1]
            if v1 == t: 
                out_path = _format_path(path)

            for v2, c in adjacency.get(v1, ()):
                if v2 in seen: 
                    continue
                prev = distances.get(v2, None)
                nxt = cost + c
                if prev is None or nxt < prev:
                    distances[v2] = nxt
                    heappush(queue, (nxt, v2, path))
    return distances, out_path

In [22]:
def reduced_cost(
    N: Network,
    u_f: Dict[Tuple[object, object], int],
    p: Dict[object, int]
) -> Dict[Tuple[object, object], int]:
    '''
    Computes reduced costs of the edges in the residual graph [u_f] with respect to edge
    costs [N.c] and node potentials [p].
    
    Args:
        N: Network representing the problem input
        u_f: Dictionary encoding the residual graph w.r.t the current flow
        p: Current node potentials
        
    Returns:
        A dictionary which gives the reduced cost for each edge (u, v) according to
        c_p[(u, v)] = c[(u, v)] + p[u] - p[v].
    '''
    reduced_costs = {}
    for (u, v) in u_f.keys():
        if (u, v) in N.c:
            reduced_costs[(u, v)] = N.c[(u, v)] + p[u] - p[v]
        else:
            reduced_costs[(u, v)] = -N.c[(v, u)] + p[u] - p[v]
    return reduced_costs
    
def excess_nodes(
    N: Network,
    f: Dict[Tuple[object, object], int],
    K: int
) -> Tuple[List, List]:
    '''
    Compute nodes in the network [N] where flow conservation is violated by at least [K] units
    for the flow [f]. 
    
    Args:
        N: Network object representing the problem input
        f: Potentially infeasible flow
        K: Scaling parameter
    
    Returns:
        A tuple consisting of a list of nodes where net flow in is greater than [K]
        and a list of nodes where the net flow in is less than -[K].
    '''
    def _excess(N, f) -> Dict[object, int]:
        # Initialize excess to be supply
        excess = {v: N.b[v] for v in N.V}
        for (u, v), val in f.items():
            excess[u] -= val
            excess[v] += val

        return excess
    
    e_f = _excess(N, f)
    S_f = [i for (i, val) in e_f.items() if val >= K]
    T_f = [i for (i, val) in e_f.items() if val <= -K]
    return S_f, T_f

def update_potentials(
    p: Dict[object, int],
    distances: Dict[object, int]
) -> None:
    '''
    Updates node potentials [p] with the shortest path distances in
    [distances] according to p[i] <- p[i] + distances[i].
    
    Args:
        p: Previous node potentials
        distances: Shortest path distance to each node in graph from a node with
            surplus above the current scaling threshold
        
    '''
    for i in p.keys():
        p[i] += distances[i]
                   
def saturate_edges(
    f: Dict[Tuple[object, object], int],
    u_f: Dict[Tuple[object, object], int],
    edges: List[Tuple[object, object]]
) -> None:
    '''
    Updates the flow [f] and residual graph [u_f] by saturating
    all edges in [edges].
    
    Args:
        u_f: The residual graph for the current flow
        f: The current flow
        edges: List of edges to saturate
        
    '''
    for (i, j) in edges:
        if (i, j) in f:
            f[(i, j)] = N.u[(i, j)]              # Saturate foward edge
            u_f[(i, j)] = 0                      # Zero forward residual edge
            u_f[(j, i)] = N.u[(i, j)]            # Saturate backward residual edge
        else:
            f[(i, j)] = 0                        # Zero forward edge
            u_f[(i, j)] = N.u[(j, i)]            # Saturate forward residual edge
            u_f[(j, i)] = 0                      # Zero backward residual edge
        
def saturate_neg_cost_admissible(
    c_p: Dict[Tuple[object, object], int],
    f: Dict[Tuple[object, object], int],
    u_f: Dict[Tuple[object, object], int],
    K: int
) -> None:
    '''
    Updates the current flow [f] and residual graph [u_f] by
    saturating all edges with residual capacity of at least [K]
    and negative reduced cost [c_p] to preserve invariants in the
    algorithm.
    
    Args:
        c_p: Current reduced costs
        u_f: The residual graph for the current flow
        f: The current flow
        K: Scaling parameter
    '''
    neg_cost_admissible = [
        (i, j)
        for (i, j), u in u_f.items() 
        if u >= K and c_p[(i, j)] < 0
    ]
    
    saturate_edges(f, u_f, neg_cost_admissible)
    
def augment_flow_along_path(
    P: List[Tuple[object, object]],
    f: Dict[Tuple[object, object], int],
    u_f: Dict[Tuple[object, object], int],
    K: int
) -> None:
    ''' 
    Updates the current flow [f] and residual graph [u_f] by
    pushing [K] units of flow along the directed path P.
    
    Args:
        P: Path of edges to push flow
        f: Current flow
        c_f: Current residual graph
        K: Scaling parameter
    
    '''
    for (i, j) in P:
        if (i, j) in f:
            f[(i, j)] += K
            u_f[(i, j)] -= K
            u_f[(j, i)] = u_f.get((j, i), 0) + K
        else:
            f[(j, i)] -= K
            u_f[(j, i)] -= K
            u_f[(i, j)] = u_f.get((i, j), 0) + K
    
def capacity_scaling(
    N: Network,
    p: Dict[object, int]
) -> Tuple[Dict[Tuple[object, object], int], Dict[object, int]]:
    '''
    Primal-dual algorithm for computing a minimum-cost flow for the 
    network [N] starting from dual-feasible node potentials [p].
    
    Args:
        N: Flow network encoding the problem input
        p: Initial node potentials for warm start
        
    Returns:
        Minimum cost flow and corresponding optimal node potentials.
    '''
    
    # Init zero flow and potentials
    f = {e: 0 for e in N.E}
    K = N.U
    u_f = copy(N.u)
    iters = 0
    
    while K >= 1:
        # Compute reduced costs w.r.t potentials p
        c_p = reduced_cost(N, u_f, p)

        # Saturate admissible edges with negative reduced cost
        saturate_neg_cost_admissible(c_p, f, u_f, K)
   
        # Compute new admissible edges, and surplus/deficit nodes above scaling threshold
        S_f, T_f = excess_nodes(N, f, K)  
        
        while len(S_f) > 0 and len(T_f) > 0:
            s = S_f[0]
            t = T_f[0]
            
            adj = {e: c for (e, c) in c_p.items() if u_f[e] >= K}
            D, P = dijkstra(s, t, adj)
            update_potentials(p, D)
            augment_flow_along_path(P, f, u_f, K)
            c_p = reduced_cost(N, u_f, p)
            S_f, T_f = excess_nodes(N, f, K) 
            iters += 1

        K //= 2
        
    print(f"Number of flow updates: {iters}")
    return f, p

In [25]:
edges = [(0,1), (0,2), (1,2), (1,3), (1,4), (2,3), (2,4), (3,4), (4,2)]
capacities = np.array([15, 8, 20, 4, 10, 15, 4, 20, 5])
costs = np.array([4, 4, 2, 2, 6, 1, 3, 2, 3])
supplies = [20, 0, 0, -5, -15]
nodes = [0, 1, 2, 3, 4]
N = Network(nodes, edges, capacities, costs, supplies)     


p = {v: 0 for v in N.V}
f, p = capacity_scaling(N, p)

Number of flow updates: 7


In [26]:
f, p = capacity_scaling(N, p)

Number of flow updates: 7


In [19]:
def feasibility_check(N, f):
    assert np.all(np.array([len(excess_nodes(N, f, 0.00001)[i]) == 0 for i in [0,1]]))
    assert np.all(np.array(list(f.values())) >= 0)
    assert np.all(np.array(list(f.values())) <= np.array(list(N.u.values())))
    
def optimality_check(N, f, p):
    primal = np.sum([f[(i,j)]*N.c[(i,j)] for (i,j) in N.E])
    dual = -np.sum([p[i] * N.b[i] for i in N.V]) - np.sum([N.u[(i,j)] * max(0, p[j] - p[i] - N.c[(i,j)]) for (i,j) in N.E])
    assert primal == dual
    return primal

In [20]:
feasibility_check(N, f)

In [21]:
optimality_check(N, f, p)

150