tutorial from https://transport-systems.imperial.ac.uk/tf/60008_21/n2_5_hungarian_algorithm.html#:~:text=The%20algorithm%20has%20two%20stages,has%20the%20cost_matrix%20as%20input.

In [1]:
import numpy as np
import copy

max_int = 100
n = 5
# cost_matrix = np.random.randint(max_int, size=(n,n))
cost_matrix = [[24, 60, 61,  2,  4],
 [38, 89, 30, 51, 93],
 [ 6, 48, 32, 70, 48],
 [39, 30,  0, 59,  1],
 [3, 43, 89, 56, 50]]

cost_matrix = np.array(cost_matrix)

print(cost_matrix)

[[24 60 61  2  4]
 [38 89 30 51 93]
 [ 6 48 32 70 48]
 [39 30  0 59  1]
 [ 3 43 89 56 50]]


In [2]:
# import scipy's linear_sum_assignment
from scipy.optimize import linear_sum_assignment

# execute the assignment
scp_assignment = linear_sum_assignment(cost_matrix)

print('scp_assignment: ', scp_assignment)
# find the total cost
scp_total = 0
for i in range(len(scp_assignment[0])):
    scp_total += cost_matrix[scp_assignment[0][i], scp_assignment[1][i]]
print('scp_totat: ', scp_total)

scp_assignment:  (array([0, 1, 2, 3, 4]), array([3, 2, 0, 4, 1]))
scp_totat:  82




# Part 2 - Clean scipy implementation and figure
In this section, we implement a neater version of Part 1. We still use scipy to find the assignment, but we also produce readable results and a figure to accompany the final solution. The structure is separated into 4 functions:

1. run_assignment - the master function - it runs the linear_sum_assignment code and calls the suplementary functions to make the results more readable.
2. clean_assignment - scipy returns two arrays as a solution, here we combine then into a single array containing each assignment pair.
3. calc_costs - calculates the total assignment costs.



In [3]:
def calc_costs(cost_matrix, assignment):
    total = 0
    for a in assignment:
        total += cost_matrix[a[0], a[1]]
    return total

def clean_assignment(row, columns):
    assignments = []
    # create pairs
    text = "The final assignment is "
    for i in range(len(row)):
        assignments.append([row[i], columns[i]])
        if i > 0:
            text += ", "
        text += f"({row[i]}, {columns[i]})"
    print(text)
    return assignments

    
def run_assignment(cost_matrix):
    row,columns = linear_sum_assignment(cost_matrix)
    assignments = clean_assignment(row, columns)
    total_cost = calc_costs(cost_matrix, assignments)
    print(f"The total cost of the assignment is {total_cost}.")

run_assignment(cost_matrix)

The final assignment is (0, 3), (1, 2), (2, 0), (3, 4), (4, 1)
The total cost of the assignment is 82.


# Part 3 - Full implementation of the Hungarian Algorithm
This section presents a step-by-step implementation of the algorithm.

The algorithm has two stages, first we find the minimum value at each row, and then subtract that value to every element of the row. Then we repeat the process using the columns instead. We define this process in the function hungarian_step that has the cost_matrix as input.

In [4]:
def hungarian_step(mat): 
    #The for-loop iterates through every column in the matrix so we subtract this value to every element of the row
    for row_num in range(mat.shape[0]): 
        mat[row_num] = mat[row_num] - np.min(mat[row_num])
    print(mat)
    #We repeat the process for the columns
    for col_num in range(mat.shape[1]): 
        mat[:,col_num] = mat[:,col_num] - np.min(mat[:,col_num])
    
    return mat

In [5]:
hung_matrix = hungarian_step(cost_matrix.copy())
print(hung_matrix)

[[22 58 59  0  2]
 [ 8 59  0 21 63]
 [ 0 42 26 64 42]
 [39 30  0 59  1]
 [ 0 40 86 53 47]]
[[22 28 59  0  1]
 [ 8 29  0 21 62]
 [ 0 12 26 64 41]
 [39  0  0 59  0]
 [ 0 10 86 53 46]]


The next step, while easy to carry out visually, becomes more difficult to code. We need to find the row containing the least number of zeros first.

The first step to do this is to define a function that finds the minimum number of rows to mark that contain a zero value, let's call this min_zeros. Let's assume that the matrix being input is boolean with True where 0 existed and False where non-zero.

Now, mark the column and row as False and repeat, saving the information where the last zero value was retrieved.

By repeating this process, we collect all zeros in the matrix.

In [6]:
def min_zeros(zero_mat, mark_zero, verbose=False):
    if verbose:
        print('finding the min_zeros rows')
    # min_row = [number of zeros, row index number]
    min_row = [99999, -1]

    for row_num in range(zero_mat.shape[0]): 
        if np.sum(zero_mat[row_num] == True) > 0 and min_row[0] > np.sum(zero_mat[row_num] == True):
            min_row = [np.sum(zero_mat[row_num] == True), row_num]

    # Marked the specific row and column as False
    zero_index = np.where(zero_mat[min_row[1]] == True)[0][0]
    mark_zero.append((min_row[1], zero_index))
    zero_mat[min_row[1], :] = False
    zero_mat[:, zero_index] = False
    if verbose:
        print('zero_mat: \n', zero_mat)
        print('marked_zero: \n', marked_zero)
        

In [7]:
cur_mat = hung_matrix
zero_bool_mat = (cur_mat == 0)
zero_bool_mat_copy = zero_bool_mat.copy()

marked_zero = []
print('zero_bool_mat_copy: \n', zero_bool_mat_copy)
print('marked_zero: \n', marked_zero)
while (True in zero_bool_mat_copy):
    print('*'*10)
    min_zeros(zero_bool_mat_copy, marked_zero)
    print('zero_bool_mat_copy: \n', zero_bool_mat_copy)
    print('marked_zero: \n', marked_zero)
        

zero_bool_mat_copy: 
 [[False False False  True False]
 [False False  True False False]
 [ True False False False False]
 [False  True  True False  True]
 [ True False False False False]]
marked_zero: 
 []
**********
zero_bool_mat_copy: 
 [[False False False False False]
 [False False  True False False]
 [ True False False False False]
 [False  True  True False  True]
 [ True False False False False]]
marked_zero: 
 [(0, 3)]
**********
zero_bool_mat_copy: 
 [[False False False False False]
 [False False False False False]
 [ True False False False False]
 [False  True False False  True]
 [ True False False False False]]
marked_zero: 
 [(0, 3), (1, 2)]
**********
zero_bool_mat_copy: 
 [[False False False False False]
 [False False False False False]
 [False False False False False]
 [False  True False False  True]
 [False False False False False]]
marked_zero: 
 [(0, 3), (1, 2), (2, 0)]
**********
zero_bool_mat_copy: 
 [[False False False False False]
 [False False False False False]
 [

Now that we extracted the zeros in the matrix, we can mark the rows and columns to determine whether the hungarian algorithm is complete.



In [21]:
def mark_matrix(mat, verbose=False):
    if verbose:
        print('marking the matrix')
    #Transform the matrix to boolean matrix(0 = True, others = False)
    cur_mat = mat
    zero_bool_mat = (cur_mat == 0)
    zero_bool_mat_copy = zero_bool_mat.copy()
    if verbose:
        print('the bool matrix: \n', zero_bool_mat_copy)

    #Recording possible answer positions by marked_zero
    marked_zero = []
    while (True in zero_bool_mat_copy):
        min_zeros(zero_bool_mat_copy, marked_zero, verbose=verbose)

    #Recording the row and column indexes seperately.
    marked_zero_row = []
    marked_zero_col = []
    for i in range(len(marked_zero)):
        marked_zero_row.append(marked_zero[i][0])
        marked_zero_col.append(marked_zero[i][1])
    if verbose:
        print('marked_zero_row: ', marked_zero_row)
        print('marked_zero_col: ', marked_zero_col)
    
    # collect rows not containing zeros
    non_marked_row = list(set(range(cur_mat.shape[0])) - set(marked_zero_row))
    if verbose:
        print('non_marked_row: ', non_marked_row)
    
    # mark columns with zeros
    marked_cols = []
    check_switch = True
    while check_switch:
        check_switch = False
        for i in range(len(non_marked_row)):
            row_array = zero_bool_mat[non_marked_row[i], :]
            for j in range(row_array.shape[0]):
                if row_array[j] == True and j not in marked_cols:

                    marked_cols.append(j)
                    check_switch = True

        for row_num, col_num in marked_zero:
            if row_num not in non_marked_row and col_num in marked_cols:
                
                non_marked_row.append(row_num)
                check_switch = True
    
    # mark rows with zeros
    marked_rows = list(set(range(mat.shape[0])) - set(non_marked_row))
    
    return(marked_zero, marked_rows, marked_cols)

In [22]:
marked_zero, marked_rows, marked_cols = mark_matrix(hung_matrix.copy(), verbose=True)
print(marked_zero, marked_rows, marked_cols)

marking the matrix
the bool matrix: 
 [[False False False  True False]
 [False False  True False False]
 [ True False False False False]
 [False  True  True False  True]
 [ True False False False False]]
finding the min_zeros rows
zero_mat: 
 [[False False False False False]
 [False False  True False False]
 [ True False False False False]
 [False  True  True False  True]
 [ True False False False False]]
marked_zero: 
 [(0, 3), (1, 2), (2, 0), (3, 1)]
finding the min_zeros rows
zero_mat: 
 [[False False False False False]
 [False False False False False]
 [ True False False False False]
 [False  True False False  True]
 [ True False False False False]]
marked_zero: 
 [(0, 3), (1, 2), (2, 0), (3, 1)]
finding the min_zeros rows
zero_mat: 
 [[False False False False False]
 [False False False False False]
 [False False False False False]
 [False  True False False  True]
 [False False False False False]]
marked_zero: 
 [(0, 3), (1, 2), (2, 0), (3, 1)]
finding the min_zeros rows
zero_mat: 

In the case where marked rows and columns do not add up to  𝑛
 , we need to adjust the matrix to continue the search.

In [23]:
def adjust_matrix(mat, cover_rows, cover_cols, verbose=False):
    if verbose:
        print("adjusting the matrix")
    cur_mat = mat
    non_zero_element = []
    
    # find the minimum value of an element not in a marked column/row 
    for row in range(len(cur_mat)):
        if row not in cover_rows:
            for i in range(len(cur_mat[row])):
                if i not in cover_cols:
                    non_zero_element.append(cur_mat[row][i])
    
    min_num = min(non_zero_element)

    # substract to all values not in a marked row/column
    for row in range(len(cur_mat)):
        if row not in cover_rows:
            for i in range(len(cur_mat[row])):
                if i not in cover_cols:
                    cur_mat[row, i] = cur_mat[row, i] - min_num
    # add to all values in marked rows/column
    for row in range(len(cover_rows)):  
        for col in range(len(cover_cols)):
            cur_mat[cover_rows[row], cover_cols[col]] = cur_mat[cover_rows[row], cover_cols[col]] + min_num
    
    if verbose:
        print('adjusted matrix = ', cur_mat)
    return cur_mat

In [24]:
adjust_matrix(hung_matrix.copy(), marked_rows, marked_cols )

array([[32, 28, 59,  0,  1],
       [18, 29,  0, 21, 62],
       [ 0,  2, 16, 54, 31],
       [49,  0,  0, 59,  0],
       [ 0,  0, 76, 43, 36]])

In [27]:
def hungarian_algorithm(cost_matrix, verbose=False):
    
    n = cost_matrix.shape[0]
    cur_mat = copy.deepcopy(cost_matrix)
    
    cur_mat = hungarian_step(cur_mat)
    
    count_zero_lines = 0
        
    while count_zero_lines < n:
        if verbose:
            print('*'*10)
        ans_pos, marked_rows, marked_cols = mark_matrix(cur_mat, verbose=verbose)
        count_zero_lines = len(marked_rows) + len(marked_cols)
        if verbose:
            print('count_zero_lines: ', count_zero_lines)

        if count_zero_lines < n:
            cur_mat = adjust_matrix(cur_mat, marked_rows, marked_cols)
    
    return ans_pos

In [28]:
assignment = hungarian_algorithm(cost_matrix, verbose=True)
print(f"The final assignment is: {assignment}")
print(cost_matrix)

[[22 58 59  0  2]
 [ 8 59  0 21 63]
 [ 0 42 26 64 42]
 [39 30  0 59  1]
 [ 0 40 86 53 47]]
**********
marking the matrix
the bool matrix: 
 [[False False False  True False]
 [False False  True False False]
 [ True False False False False]
 [False  True  True False  True]
 [ True False False False False]]
finding the min_zeros rows
zero_mat: 
 [[False False False False False]
 [False False  True False False]
 [ True False False False False]
 [False  True  True False  True]
 [ True False False False False]]
marked_zero: 
 [(0, 3), (1, 2), (2, 0), (3, 1)]
finding the min_zeros rows
zero_mat: 
 [[False False False False False]
 [False False False False False]
 [ True False False False False]
 [False  True False False  True]
 [ True False False False False]]
marked_zero: 
 [(0, 3), (1, 2), (2, 0), (3, 1)]
finding the min_zeros rows
zero_mat: 
 [[False False False False False]
 [False False False False False]
 [False False False False False]
 [False  True False False  True]
 [False False Fal

In [29]:
total = 0
for i in range(len(assignment)):
    total += cost_matrix[assignment[i][0], assignment[i][1]]
print(f"The total cost of the assignment is {total}")

The total cost of the assignment is 82
