# Exploring the Impact of Evaluation Order on Edit Distance Algorithms

Removing data-dependencies in the Wagner-Fisher, Needleman-Wunsch, Smith-Waterman, and Gotoh Dynamic Programming algorithms to explain the hardware-accelerated variants in StringZilla.

## Levenshtein Distance

Levenshtein edit distance is one of the most broadly studied string similarity metrics.
It is defined as the minimum number of single-character insertions, deletions, and substitutions required to change one string into another.
The Levenshtein distance between two strings is calculated using dynamic programming algorithms, such as the Wagner-Fisher algorithm, and its variations for Bioinformatics: 

- Needleman-Wunsch for global alignment with substitution matrices, 
- Smith-Waterman for local alignment with substitution matrices, 
- Gotoh for different penalties for gap opening and extensions.

Given the shared nature of these algorithms, the same tricks can be applied to all of them to improve their performance.

## Warner-Fisher Algorithm

Wagner-Fisher algorithm, in its most naive form, has a time and space complexity of $O(NM)$, where $N$ and $M$ are the lengths of the two strings being compared.
A rectangular matrix of size $(N+1) \times (M+1)$ is created to store the edit distances between all prefixes of the two strings.
The first row and column are, naturally, initialized with ${0, 1, 2, ..., N}$ and ${0, 1, 2, ..., M}$ respectively.

In [None]:
from typing import Tuple
import numpy as np # NumPy for matrices

def wagner_fisher(s1: str, s2: str) -> Tuple[int, np.ndarray]:
    # Create a matrix of size (len(s1)+1) x (len(s2)+1)
    matrix = np.zeros((len(s1) + 1, len(s2) + 1), dtype=int)

    # Initialize the first column and first row of the matrix
    for i in range(len(s1) + 1):
        matrix[i, 0] = i
    for j in range(len(s2) + 1):
        matrix[0, j] = j

    # Compute Levenshtein distance
    for i in range(1, len(s1) + 1):
        for j in range(1, len(s2) + 1):
            substitution_cost = s1[i - 1] != s2[j - 1]
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      #? Deletion cost
                matrix[i, j - 1] + 1,                      #? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  #? Substitution cost
            )

    # The distance will be placed in the bottom right corner of the matrix
    return matrix[len(s1), len(s2)], matrix

In [None]:
s1 = "kiten"
s2 = "katerinas"
distance_wf, matrix_wf = wagner_fisher(s1, s2)
s1, s2, f"{distance_wf = }", matrix_wf

This algorithm is almost never recommended for practical use, as it has a quadratic space complexity.
It's trivial to see that the space complexity can be reduced to $O(min(N, M))$ by only storing the last two rows of the matrix, but we want to keep the entire matrix as a reference to allow debugging and visualization.

To feel safer, while designing our alternative traversal algorithm, let's define an extraction function, that will get the values of a certain skewed diagonal.

In [None]:
def get_skewed_diagonal(matrix: np.ndarray, index: int):
    flipped_matrix = np.fliplr(matrix)
    return np.flip(np.diag(flipped_matrix, k= matrix.shape[1] - index - 1))

# Let's test this function right away.
matrix = np.array([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9]])
assert np.all(get_skewed_diagonal(matrix, 2) == [7, 5, 3])
assert np.all(get_skewed_diagonal(matrix, 1) == [4, 2])
assert np.all(get_skewed_diagonal(matrix, 4) == [9])

# Let's test this function right away.
matrix = np.array([
    [1, 2, 3],
    [4, 5, 6]])
assert np.all(get_skewed_diagonal(matrix, 0) == [1])
assert np.all(get_skewed_diagonal(matrix, 1) == [4, 2])
assert np.all(get_skewed_diagonal(matrix, 2) == [5, 3])
assert np.all(get_skewed_diagonal(matrix, 3) == [6])

In [None]:
get_skewed_diagonal(matrix_wf, 10)

## Diagonal Evaluation Order

Accelerating this exact algorithm with SIMD instructions isn't trivial, is the `matrix[i, j]` value has a dependency on the `matrix[i, j - 1]` value.
So we can't brute-force accelerate the inner loop.
Instead, we can show that we can evaluate the matrix in a different order, and still get the same result.

![Skewed Diagonals Evaluation Order](https://mathworld.wolfram.com/images/eps-svg/SkewDiagonal_1000.svg)

But before complicating things too much, let's start with a simple case - when both strings have identical lengths and the DP matrix has a square shape.

In [None]:
from typing import Optional

def square_skewed_diagonals(
    s1: str, s2: str, 
    verbose: bool = False, 
    baseline: Optional[np.ndarray] = None) -> Tuple[int, np.ndarray]:

    assert len(s1) == len(s2), "First define an algo for square matrices!"
    # Create a matrix of size (len(s1)+1) x (len(s2)+1)
    matrix = np.zeros((len(s1) + 1, len(s2) + 1), dtype=int)
    matrix[:, :] = 99

    # Initialize the first column and first row of the matrix
    for i in range(len(s1) + 1):
        matrix[i, 0] = i
    for j in range(len(s2) + 1):
        matrix[0, j] = j

    # Number of rows and columns in the square matrix.
    n = len(s1) + 1
    
    # Number of diagonals and skewed diagonals in the square matrix of size (n x n).
    diagonals_count = 2 * n - 1
    
    # Populate the matrix in 2 separate loops: for the top left triangle and for the bottom right triangle.
    for skew_diagonal_index in range(2, n):
        skew_diagonal_length = skew_diagonal_index + 1
        for offset_within_diagonal in range(1, skew_diagonal_length - 1):
            # If we haven't passed the main skew diagonal yet, 
            # then we have to skip the first and the last operation,
            # as those are already pre-populated and form the first column 
            # and the first row of the Levenshtein matrix respectively.
            i = skew_diagonal_index - offset_within_diagonal
            j = offset_within_diagonal
            if verbose:
                print(f"top left triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            substitution_cost = s1[i - 1] != s2[j - 1]
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      #? Deletion cost
                matrix[i, j - 1] + 1,                      #? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  #? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"
            
    # Now the bottom right triangle of the matrix.
    for skew_diagonal_index in range(n, diagonals_count):
        skew_diagonal_length = 2 * n - skew_diagonal_index - 1
        for offset_within_diagonal in range(skew_diagonal_length):
            i = n - offset_within_diagonal - 1
            j = skew_diagonal_index - n + offset_within_diagonal + 1
            if verbose:
                print(f"bottom right triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            substitution_cost = s1[i - 1] != s2[j - 1]
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      #? Deletion cost
                matrix[i, j - 1] + 1,                      #? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  #? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"

    # Similarly, the distance will be placed in the bottom right corner of the matrix
    return matrix[len(s1), len(s2)], matrix

Let's generate some random strings and make sure we produce the right result.

In [None]:
import random
for _ in range(10):
    s1 = ''.join(random.choices("abc", k=50))
    s2 = ''.join(random.choices("abc", k=50))
    distance_wf, matrix_wf = wagner_fisher(s1, s2)
    distance_sd, matrix_sd = square_skewed_diagonals(s1, s2, baseline=matrix_wf)
    assert distance_wf == distance_sd, f"{distance_wf = } != {distance_sd = }"
    assert np.all(matrix_wf == matrix_sd), f"{matrix_wf = }\n{matrix_sd = }"

## Vectorizing the Skewed Diagonals Algorithm

Going further, we can avoid storing the whole matrix, and only store three diagonals at a time.
The longer will never exceed `n` in length.
The others are always at most `n-1`.
Let's try vectorizing different parts of our algorithm, validating it against the output of the naive algorithm for 2 strings: `"BCDE"` and `"FKPU"`.

In [None]:
s1 = "BCDE"
s2 = "FKPU"
distance_wf, matrix_wf = wagner_fisher(s1, s2)
s1, s2, f"{distance_wf = }", matrix_wf

Replacing the letters with numbers and annotating with a header row and column for `"BCDE"` and `"FKPU"`:

|       |     | **B** | **C** | **D** | **E** |
| ----- | --- | ----- | ----- | ----- | ----- |
|       | a   | b     | c     | d     | e     |
| **F** | f   | g     | h     | i     | j     |
| **K** | k   | l     | m     | n     | o     |
| **P** | p   | q     | r     | s     | t     |
| **U** | u   | v     | w     | x     | y     |

At any point we will be working with 3 diagonals:

- `previous` set to `[a]` at start
- `current` set to `[f, b]` at start
- `following` set to `[k, g, c]` at start

In [None]:
assert len(s1) == len(s2), "First define an algo for square matrices!"
# Number of rows and columns in the square matrix.
n = len(s1) + 1

following = np.zeros(n, dtype=np.uint) # let's assume we are computing the main skew diagonal: [u, q, m, i, e]
current = np.zeros(n, dtype=np.uint) # will contain: [p, l, h, e]
previous = np.zeros(n, dtype=np.uint) # will contain: [k, g, c]

# Initialize the first two diagonals.
# The `previous` would contain the values [a].
# The `current` would contain the values [f, b]. 
previous[0] = 0
current[0:2] = 1
previous, current, following

Now we can rewrite the first nested loop for the upper-left triangle of the matrix in NumPy primitives, using it's `np.minimum` function to calculate the minimum of three values.

In [None]:
# To evaluate every subsequent entry:
next_diagonal_index = 2
while next_diagonal_index < n:
    next_skew_diagonal_length = next_diagonal_index + 1

    old_substitution_costs = previous[:next_skew_diagonal_length - 2]
    added_substitution_costs = [s1[next_diagonal_index - i - 2] != s2[i] for i in range(next_skew_diagonal_length - 2)]
    substitution_costs = old_substitution_costs + added_substitution_costs

    following[1:next_skew_diagonal_length - 1] = np.minimum(current[1:next_skew_diagonal_length - 1] + 1, current[:next_skew_diagonal_length - 2] + 1) # Insertions or deletions
    following[1:next_skew_diagonal_length - 1] = np.minimum(following[1:next_skew_diagonal_length - 1], substitution_costs) # Substitutions
    following[0] = next_diagonal_index
    following[next_skew_diagonal_length - 1] = next_diagonal_index
    assert np.all(following[:next_skew_diagonal_length] == get_skewed_diagonal(matrix_wf, next_diagonal_index))
    
    previous[:] = current[:]
    current[:] = following[:]
    next_diagonal_index += 1

previous, current, following # Log the state

By now we've scanned through the upper-left triangle of the matrix, where each subsequent iteration results in a larger diagonal.
From now onwards, we will be shrinking.
Instead of adding value equal to the skewed diagonal index on either side, we will be cropping those values out.

In [None]:
while next_diagonal_index < 2 * n - 1:
    next_skew_diagonal_length = 2 * n - 1 - next_diagonal_index
    old_substitution_costs = previous[:next_skew_diagonal_length]
    added_substitution_costs = [s1[len(s1) - i - 1] != s2[next_diagonal_index - n + i] for i in range(next_skew_diagonal_length)]
    substitution_costs = old_substitution_costs + added_substitution_costs
    
    following[:next_skew_diagonal_length] = np.minimum(current[:next_skew_diagonal_length] + 1, current[1 : next_skew_diagonal_length + 1] + 1) # Insertions or deletions
    following[:next_skew_diagonal_length] = np.minimum(following[:next_skew_diagonal_length], substitution_costs) # Substitutions
    assert np.all(following[:next_skew_diagonal_length] == get_skewed_diagonal(matrix_wf, next_diagonal_index)), f"\n{following[:next_skew_diagonal_length]} not equal to \n{get_skewed_diagonal(baseline, next_diagonal_index)}"
    
    previous[:next_skew_diagonal_length] = current[1:next_skew_diagonal_length + 1]
    current[:next_skew_diagonal_length] = following[:next_skew_diagonal_length]
    next_diagonal_index += 1

previous, current, following # Log the state

In [None]:
assert distance_wf == following[0], f"{distance_wf = } != {following[0] = }"

## Generalizing to Non-Square Matrices

Let's imaging 2 inputs of length 3 and 5: `"KPU"` and `"BCDEF"`:

|       |     | **B** | **C** | **D** | **E** | **F** |
| ----- | --- | ----- | ----- | ----- | ----- | ----- |
|       | a   | b     | c     | d     | e     | f     |
| **K** | g   | h     | i     | j     | k     | l     |
| **P** | m   | n     | o     | p     | q     | r     |
| **U** | s   | t     | u     | v     | w     | x     |

At any point we will be working with 3 diagonals:

- `previous` set to `[a]` at start
- `current` set to `[g, b]` at start
- `next` set to `[m, h, c]` at start

Once we proceed to for X cycles:

- `previous` set to `[s, n, i, d]`
- `current` set to `[t, o, j, e]`
- `next` set to `[u, p, k, f]`


In [None]:
from typing import Optional

def skewed_diagonals(
    s1: str, s2: str, 
    verbose: bool = False, 
    baseline: Optional[np.ndarray] = None) -> Tuple[int, np.ndarray]:
    
    shorter, longer = (s1, s2) if len(s1) <= len(s2) else (s2, s1)    
    baseline = baseline if len(s1) <= len(s2) else baseline.T
    shorter_dim = len(shorter) + 1
    longer_dim = len(longer) + 1
    if verbose:
        print(f"{shorter=}, {longer=}, {shorter_dim=}, {longer_dim=}")
    
    # Create a matrix of size (shorter_dim) x (longer_dim)
    matrix = np.zeros((shorter_dim, longer_dim), dtype=int)
    matrix[:, :] = longer_dim + 1 # or +inf 

    # Initialize the first column and first row of the matrix
    for i in range(shorter_dim):
        matrix[i, 0] = i
    for j in range(longer_dim):
        matrix[0, j] = j

    # Let's say we are dealing with 3 and 5 letter words.
    # The matrix will have size 4 x 6, parameterized as (shorter_dim x longer_dim).
    # It will have:
    # - 4 diagonals of increasing length, at positions: 0, 1, 2, 3.
    # - 2 diagonals of fixed length, at positions: 4, 5.
    # - 3 diagonals of decreasing length, at positions: 6, 7, 8.
    diagonals_count = shorter_dim + longer_dim - 1

    # Same as with square matrices, the 0th diagonal contains - just one element - zero - skipping it.
    # Same as with square matrices, the 1st diagonal contains the values 1 and 1 - skipping it.
    # Now let's handle the rest of the upper-left triangle.
    for skew_diagonal_index in range(2, shorter_dim):
        skew_diagonal_length = (skew_diagonal_index + 1)
        for offset_within_diagonal in range(1, skew_diagonal_length - 1): # ! Skip the first column & row
            # If we haven't passed the main skew diagonal yet, 
            # then we have to skip the first and the last operation,
            # as those are already pre-populated and form the first column 
            # and the first row of the Levenshtein matrix respectively.
            i = skew_diagonal_index - offset_within_diagonal
            j = offset_within_diagonal
            if verbose:
                print(f"top left triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            shorter_char = shorter[i - 1]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"
                            
    # Now let's handle the anti-diagonal band of the matrix, between the top and bottom-right triangles.        
    for skew_diagonal_index in range(shorter_dim, longer_dim):
        skew_diagonal_length = shorter_dim
        for offset_within_diagonal in range(skew_diagonal_length - 1): # ! Skip the first row
            i = shorter_dim - offset_within_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_within_diagonal + 1
            if verbose:
                print(f"anti-band: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            shorter_char = shorter[i - 1]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"
    
    # Now let's handle the bottom right triangle.
    for skew_diagonal_index in range(longer_dim, diagonals_count):
        skew_diagonal_length = diagonals_count - skew_diagonal_index
        for offset_within_diagonal in range(skew_diagonal_length):
            i = shorter_dim - offset_within_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_within_diagonal + 1
            if verbose:
                print(f"bottom right triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            assert (i - 1) >= 0 and (i - 1) < len(shorter), f"{i = }"
            assert (j - 1) >= 0 and (j - 1) < len(longer), f"{j = }"
            shorter_char = shorter[i - 1]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"

    # Return the Levenshtein distance
    distance = matrix[len(shorter), len(longer)]
    if len(s1) > len(s2):
        matrix = matrix.T
    return distance, matrix

In [None]:
import random
for _ in range(100):
    len1 = random.randint(1, 50)
    len2 = random.randint(1, 50)
    s1 = ''.join(random.choices("abc", k=len1))
    s2 = ''.join(random.choices("abc", k=len2))
    distance_wf, matrix_wf = wagner_fisher(s1, s2)
    distance_sd, matrix_sd = skewed_diagonals(s1, s2, baseline=matrix_wf, verbose=False)
    assert distance_wf == distance_sd, f"{distance_wf = } != {distance_sd = }"
    assert np.all(matrix_wf == matrix_sd), f"{matrix_wf = }\n{matrix_sd = }"

In [None]:
s1 = "listeners"
s2 = "silents"
distance_wf, matrix_wf = wagner_fisher(s1, s2)
distance_sd, matrix_sd = skewed_diagonals(s1, s2, baseline=matrix_wf)
s1, s2, f"{distance_sd = }", matrix_sd

## Reversing the Input

One of the issues with vectorizing this algorithm is the traversal order of the shorter string.
It's different from the longer string and different from the natural traversal order of the loop.
To make the indexing simpler, we can pre-reverse the shorter string.

In [None]:
from typing import Optional

def skewed_diagonals_reversed(
    s1: str, s2: str, 
    verbose: bool = False, 
    baseline: Optional[np.ndarray] = None) -> Tuple[int, np.ndarray]:
    
    shorter, longer = (s1, s2) if len(s1) <= len(s2) else (s2, s1)    
    baseline = baseline if len(s1) <= len(s2) else baseline.T
    shorter_dim = len(shorter) + 1
    longer_dim = len(longer) + 1
    if verbose:
        print(f"{shorter=}, {longer=}, {shorter_dim=}, {longer_dim=}")
    
    # Create a matrix of size (shorter_dim) x (longer_dim)
    matrix = np.zeros((shorter_dim, longer_dim), dtype=int)
    matrix[:, :] = longer_dim + 1 # or +inf 

    # Initialize the first column and first row of the matrix
    for i in range(shorter_dim):
        matrix[i, 0] = i
    for j in range(longer_dim):
        matrix[0, j] = j

    # Let's say we are dealing with 3 and 5 letter words.
    # The matrix will have size 4 x 6, parameterized as (shorter_dim x longer_dim).
    # It will have:
    # - 4 diagonals of increasing length, at positions: 0, 1, 2, 3.
    # - 2 diagonals of fixed length, at positions: 4, 5.
    # - 3 diagonals of decreasing length, at positions: 6, 7, 8.
    diagonals_count = shorter_dim + longer_dim - 1
    shorter_reversed = "".join(reversed(shorter))

    # In reality, we need to keep only 3 diagonals to produce the same score in the end.
    previous_distances = np.zeros(shorter_dim, dtype=np.uint)
    current_distances = np.zeros(shorter_dim, dtype=np.uint)
    next_distances = np.zeros(shorter_dim, dtype=np.uint)
    temporary_distances = np.zeros(shorter_dim, dtype=np.uint)
    previous_distances[0] = 0
    current_distances[0] = current_distances[1] = 1

    # Same as with square matrices, the 0th diagonal contains - just one element - zero - skipping it.
    # Same as with square matrices, the 1st diagonal contains the values 1 and 1 - skipping it.
    # Now let's handle the rest of the upper-left triangle.
    for skew_diagonal_index in range(2, shorter_dim):
        skew_diagonal_length = (skew_diagonal_index + 1)
        for offset_in_diagonal in range(1, skew_diagonal_length - 1): # ! Skip the left column & top row
            # If we haven't passed the main skew diagonal yet, 
            # then we have to skip the first and the last operation,
            # as those are already pre-populated and form the first column 
            # and the first row of the Levenshtein matrix respectively.
            i = skew_diagonal_index - offset_in_diagonal
            j = offset_in_diagonal
            if verbose:
                print(f"top left triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            shorter_char = shorter_reversed[len(shorter) - i]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )

            # ? For reproducibility let's also try doing the same only using the info in the 3 diagonals
            next_distances[offset_in_diagonal] = min(
                current_distances[offset_in_diagonal - 1] + 1,
                current_distances[offset_in_diagonal] + 1,
                previous_distances[offset_in_diagonal - 1] + substitution_cost,
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"
            
        next_distances[0] = next_distances[skew_diagonal_length-1] = skew_diagonal_index
        
        # ? Let's validate the contents of the diagonal
        skew_diagonal_expected = get_skewed_diagonal(matrix, skew_diagonal_index)
        assert len(skew_diagonal_expected) == skew_diagonal_length
        assert (skew_diagonal_expected == next_distances[:skew_diagonal_length]).all(), f"diagonal:{skew_diagonal_index}\nexpected:{skew_diagonal_expected}\nproduced:{next_distances[:skew_diagonal_length]}"
        temporary_distances[:] = previous_distances[:]
        previous_distances[:] = current_distances[:]
        current_distances[:] = next_distances[:]
        next_distances[:] = temporary_distances[:]

    # Now let's handle the anti-diagonal band of the matrix, between the top and bottom-right triangles.        
    for skew_diagonal_index in range(shorter_dim, longer_dim):
        skew_diagonal_length = shorter_dim
        for offset_in_diagonal in range(skew_diagonal_length - 1): # ! Skip the top row
            i = shorter_dim - offset_in_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_in_diagonal + 1
            if verbose:
                print(f"anti-band: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            shorter_char = shorter_reversed[len(shorter) - i]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"
    
            # ? For reproducibility let's also try doing the same only using the info in the 3 diagonals
            next_distances[offset_in_diagonal] = min(
                current_distances[offset_in_diagonal] + 1,
                current_distances[offset_in_diagonal + 1] + 1,
                previous_distances[offset_in_diagonal] + substitution_cost,
            )
            
        next_distances[shorter_dim-1] = skew_diagonal_index
        
        # ? Let's validate the contents of the diagonal
        skew_diagonal_expected = get_skewed_diagonal(matrix, skew_diagonal_index)
        assert len(skew_diagonal_expected) == skew_diagonal_length
        assert (skew_diagonal_expected == next_distances[:skew_diagonal_length]).all(), f"diagonal:{skew_diagonal_index}\nexpected:{skew_diagonal_expected}\nproduced:{next_distances[:skew_diagonal_length]}"
        temporary_distances[:] = previous_distances[:]
        previous_distances[:-1] = current_distances[1:] # ! Note we shift here
        current_distances[:] = next_distances[:]
        next_distances[:] = temporary_distances[:]
    
    # Now let's handle the bottom right triangle.
    for skew_diagonal_index in range(longer_dim, diagonals_count):
        skew_diagonal_length = diagonals_count - skew_diagonal_index
        for offset_in_diagonal in range(skew_diagonal_length):
            i = shorter_dim - offset_in_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_in_diagonal + 1
            if verbose:
                print(f"bottom right triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            assert (i - 1) >= 0 and (i - 1) < len(shorter), f"{i = }"
            assert (j - 1) >= 0 and (j - 1) < len(longer), f"{j = }"
            shorter_char = shorter_reversed[len(shorter) - i]
            longer_char = longer[j - 1]
            print(f"{shorter_char=}, {longer_char=}")
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"

            # ? For reproducibility let's also try doing the same only using the info in the 3 diagonals
            next_distances[offset_in_diagonal] = min(
                current_distances[offset_in_diagonal] + 1,
                current_distances[offset_in_diagonal + 1] + 1,
                previous_distances[offset_in_diagonal] + substitution_cost,
            )

        # ? Let's validate the contents of the diagonal
        skew_diagonal_expected = get_skewed_diagonal(matrix, skew_diagonal_index)
        assert len(skew_diagonal_expected) == skew_diagonal_length
        assert (skew_diagonal_expected == next_distances[:skew_diagonal_length]).all(), f"diagonal:{skew_diagonal_index}\nexpected:{skew_diagonal_expected}\nproduced:{next_distances[:skew_diagonal_length]}"
        temporary_distances[:] = previous_distances[:]
        previous_distances[:-1] = current_distances[1:] # ! Note we shift here
        current_distances[:] = next_distances[:]
        next_distances[:] = temporary_distances[:]

    # Return the Levenshtein distance
    distance_from_matrix = matrix[len(shorter), len(longer)]
    distance_from_diagonal = current_distances[0]
    assert distance_from_diagonal == distance_from_matrix
    if len(s1) > len(s2):
        matrix = matrix.T
    return distance_from_matrix, matrix

In [None]:
import random
for _ in range(100):
    len1 = random.randint(1, 50)
    len2 = random.randint(1, 50)
    s1 = ''.join(random.choices("abc", k=len1))
    s2 = ''.join(random.choices("abc", k=len2))
    distance_wf, matrix_wf = wagner_fisher(s1, s2)
    distance_sd, matrix_sd = skewed_diagonals_reversed(s1, s2, baseline=matrix_wf, verbose=False)
    assert distance_wf == distance_sd, f"{distance_wf = } != {distance_sd = }"
    assert np.all(matrix_wf == matrix_sd), f"{matrix_wf = }\n{matrix_sd = }"

In [None]:
s1 = "listeners"
s2 = "silents"
distance_wf, matrix_wf = wagner_fisher(s1, s2)
distance_sd, matrix_sd = skewed_diagonals_reversed(s1, s2, baseline=matrix_wf)
s1, s2, f"{distance_wf = }", f"{distance_sd = }", matrix_sd

In [None]:
s1 = "atca"
s2 = "ctactcaccc"
distance_wf, matrix_wf = wagner_fisher(s1, s2)
distance_sd, matrix_sd = skewed_diagonals_reversed(s1, s2, baseline=matrix_wf)
s1, s2, f"{distance_wf = }", f"{distance_sd = }", matrix_sd

## Shift-less with Reverse Order

In [None]:
from typing import Optional

def skewed_diagonals_reversed(
    s1: str, s2: str, 
    verbose: bool = False, 
    baseline: Optional[np.ndarray] = None) -> Tuple[int, np.ndarray]:
    
    shorter, longer = (s1, s2) if len(s1) <= len(s2) else (s2, s1)    
    baseline = baseline if len(s1) <= len(s2) else baseline.T
    shorter_dim = len(shorter) + 1
    longer_dim = len(longer) + 1
    if verbose:
        print(f"{shorter=}, {longer=}, {shorter_dim=}, {longer_dim=}")
    
    # Create a matrix of size (shorter_dim) x (longer_dim)
    matrix = np.zeros((shorter_dim, longer_dim), dtype=int)
    matrix[:, :] = longer_dim + 1 # or +inf 

    # Initialize the first column and first row of the matrix
    for i in range(shorter_dim):
        matrix[i, 0] = i
    for j in range(longer_dim):
        matrix[0, j] = j

    # Let's say we are dealing with 3 and 5 letter words.
    # The matrix will have size 4 x 6, parameterized as (shorter_dim x longer_dim).
    # It will have:
    # - 4 diagonals of increasing length, at positions: 0, 1, 2, 3.
    # - 2 diagonals of fixed length, at positions: 4, 5.
    # - 3 diagonals of decreasing length, at positions: 6, 7, 8.
    diagonals_count = shorter_dim + longer_dim - 1
    shorter_reversed = "".join(reversed(shorter))

    # In reality, we need to keep only 3 diagonals to produce the same score in the end.
    previous_distances = np.zeros(shorter_dim, dtype=np.uint)
    current_distances = np.zeros(shorter_dim, dtype=np.uint)
    next_distances = np.zeros(shorter_dim, dtype=np.uint)
    temporary_distances = np.zeros(shorter_dim, dtype=np.uint)
    previous_distances[0] = 0
    current_distances[0] = current_distances[1] = 1

    # Same as with square matrices, the 0th diagonal contains - just one element - zero - skipping it.
    # Same as with square matrices, the 1st diagonal contains the values 1 and 1 - skipping it.
    # Now let's handle the rest of the upper-left triangle.
    for skew_diagonal_index in range(2, shorter_dim):
        skew_diagonal_length = (skew_diagonal_index + 1)
        for offset_in_diagonal in range(1, skew_diagonal_length - 1): # ! Skip the left column & top row
            # If we haven't passed the main skew diagonal yet, 
            # then we have to skip the first and the last operation,
            # as those are already pre-populated and form the first column 
            # and the first row of the Levenshtein matrix respectively.
            i = skew_diagonal_index - offset_in_diagonal
            j = offset_in_diagonal
            if verbose:
                print(f"top left triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            shorter_char = shorter_reversed[len(shorter) - i]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )

            # ? For reproducibility let's also try doing the same only using the info in the 3 diagonals
            next_distances[offset_in_diagonal] = min(
                current_distances[offset_in_diagonal - 1] + 1,
                current_distances[offset_in_diagonal] + 1,
                previous_distances[offset_in_diagonal - 1] + substitution_cost,
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"
            
        next_distances[0] = next_distances[skew_diagonal_length-1] = skew_diagonal_index
        
        # ? Let's validate the contents of the diagonal
        skew_diagonal_expected = get_skewed_diagonal(matrix, skew_diagonal_index)
        assert len(skew_diagonal_expected) == skew_diagonal_length
        assert (skew_diagonal_expected == next_distances[:skew_diagonal_length]).all(), f"diagonal:{skew_diagonal_index}\nexpected:{skew_diagonal_expected}\nproduced:{next_distances[:skew_diagonal_length]}"
        temporary_distances[:] = previous_distances[:]
        previous_distances[:] = current_distances[:]
        current_distances[:] = next_distances[:]
        next_distances[:] = temporary_distances[:]

    # Now let's handle the anti-diagonal band of the matrix, between the top and bottom-right triangles.        
    for skew_diagonal_index in range(shorter_dim, longer_dim):
        skew_diagonal_length = shorter_dim
        for offset_in_diagonal in range(skew_diagonal_length - 1): # ! Skip the top row
            i = shorter_dim - offset_in_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_in_diagonal + 1
            if verbose:
                print(f"anti-band: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            shorter_char = shorter_reversed[len(shorter) - i]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"
    
            # ? For reproducibility let's also try doing the same only using the info in the 3 diagonals
            next_distances[offset_in_diagonal] = min(
                current_distances[offset_in_diagonal] + 1,
                current_distances[offset_in_diagonal + 1] + 1,
                previous_distances[offset_in_diagonal] + substitution_cost,
            )
            
        next_distances[shorter_dim-1] = skew_diagonal_index
        
        # ? Let's validate the contents of the diagonal
        skew_diagonal_expected = get_skewed_diagonal(matrix, skew_diagonal_index)
        assert len(skew_diagonal_expected) == skew_diagonal_length
        assert (skew_diagonal_expected == next_distances[:skew_diagonal_length]).all(), f"diagonal:{skew_diagonal_index}\nexpected:{skew_diagonal_expected}\nproduced:{next_distances[:skew_diagonal_length]}"
        temporary_distances[:] = previous_distances[:]
        previous_distances[:-1] = current_distances[1:] # ! Note we shift here
        current_distances[:] = next_distances[:]
        next_distances[:] = temporary_distances[:]
    
    # Now let's handle the bottom right triangle.
    for skew_diagonal_index in range(longer_dim, diagonals_count):
        skew_diagonal_length = diagonals_count - skew_diagonal_index
        for offset_in_diagonal in range(skew_diagonal_length):
            i = shorter_dim - offset_in_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_in_diagonal + 1
            if verbose:
                print(f"bottom right triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}")
            assert (i - 1) >= 0 and (i - 1) < len(shorter), f"{i = }"
            assert (j - 1) >= 0 and (j - 1) < len(longer), f"{j = }"
            shorter_char = shorter_reversed[len(shorter) - i]
            longer_char = longer[j - 1]
            print(f"{shorter_char=}, {longer_char=}")
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,                      # ? Deletion cost
                matrix[i, j - 1] + 1,                      # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            if baseline is not None:
                assert matrix[i, j] == baseline[i, j], f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"

            # ? For reproducibility let's also try doing the same only using the info in the 3 diagonals
            next_distances[offset_in_diagonal] = min(
                current_distances[offset_in_diagonal] + 1,
                current_distances[offset_in_diagonal + 1] + 1,
                previous_distances[offset_in_diagonal] + substitution_cost,
            )

        # ? Let's validate the contents of the diagonal
        skew_diagonal_expected = get_skewed_diagonal(matrix, skew_diagonal_index)
        assert len(skew_diagonal_expected) == skew_diagonal_length
        assert (skew_diagonal_expected == next_distances[:skew_diagonal_length]).all(), f"diagonal:{skew_diagonal_index}\nexpected:{skew_diagonal_expected}\nproduced:{next_distances[:skew_diagonal_length]}"
        temporary_distances[:] = previous_distances[:]
        previous_distances[:-1] = current_distances[1:] # ! Note we shift here
        current_distances[:] = next_distances[:]
        next_distances[:] = temporary_distances[:]

    # Return the Levenshtein distance
    distance_from_matrix = matrix[len(shorter), len(longer)]
    distance_from_diagonal = current_distances[0]
    assert distance_from_diagonal == distance_from_matrix
    if len(s1) > len(s2):
        matrix = matrix.T
    return distance_from_matrix, matrix

## Bounding the Error

It's easy to spot that the algorithm can be further optimized if we are dealing with "bounded" edit distances, where the maximum allowed number of edits is known in advance.
In such cases, we only need to evaluate a band around the main diagonal, and can skip the rest of the matrix.
For the bound $k$, we only need to evaluate $2k+1$ diagonals.

In [None]:
from typing import Optional


def bounded_skewed_diagonals(
    s1: str,
    s2: str,
    verbose: bool = False,
    bound: Optional[int] = None,
    baseline: Optional[np.ndarray] = None,
) -> Tuple[int, np.ndarray]:

    shorter, longer = (s1, s2) if len(s1) <= len(s2) else (s2, s1)
    baseline = baseline if len(s1) <= len(s2) else baseline.T
    shorter_dim = len(shorter) + 1
    longer_dim = len(longer) + 1
    if verbose:
        print(f"{shorter=}, {longer=}, {shorter_dim=}, {longer_dim=}")

    # Create a matrix of size (shorter_dim) x (longer_dim)
    matrix = np.zeros((shorter_dim, longer_dim), dtype=int)
    matrix[:, :] = np.iinfo(matrix.dtype).max

    # Initialize the first column and first row of the matrix
    for i in range(shorter_dim):
        matrix[i, 0] = i
    for j in range(longer_dim):
        matrix[0, j] = j

    # Let's say we are dealing with 3 and 5 letter words.
    # The matrix will have size 4 x 6, parameterized as (shorter_dim x longer_dim).
    # It will have:
    # - 4 diagonals of increasing length, at positions: 0, 1, 2, 3.
    # - 2 diagonals of fixed length, at positions: 4, 5.
    # - 3 diagonals of decreasing length, at positions: 6, 7, 8.
    diagonals_count = shorter_dim + longer_dim - 1

    # Same as with square matrices, the 0th diagonal contains - just one element - zero - skipping it.
    # Same as with square matrices, the 1st diagonal contains the values 1 and 1 - skipping it.
    # In unbounded case, we the upper-left triangle will have `shorter_dim` rows and columns.
    # In bounded case, we will have `min(bound, shorter_dim)` rows and columns.
    upper_triangle_dim = min(bound, shorter_dim) if bound is not None else shorter_dim
    for skew_diagonal_index in range(2, upper_triangle_dim):
        skew_diagonal_length = skew_diagonal_index + 1
        for offset_within_diagonal in range(
            1, skew_diagonal_length - 1
        ):  #! Skip the first column & row
            # If we haven't passed the main skew diagonal yet,
            # then we have to skip the first and the last operation,
            # as those are already pre-populated and form the first column
            # and the first row of the Levenshtein matrix respectively.
            i = skew_diagonal_index - offset_within_diagonal
            j = offset_within_diagonal
            if verbose:
                print(
                    f"top left triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}"
                )
            shorter_char = shorter[i - 1]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,  # ? Deletion cost
                matrix[i, j - 1] + 1,  # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )
            
            # Validation checks:
            if baseline is not None:
                assert (
                    matrix[i, j] == baseline[i, j]
                ), f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"

    # Now let's handle the anti-diagonal band of the matrix, between the top and bottom-right triangles.
    # In the unbounded case, we will enumerate diagonal indices from `shorter_dim` to `longer_dim`.
    # In the bounded case, we go through the same 
    for skew_diagonal_index in range(shorter_dim, longer_dim):
        skew_diagonal_length = shorter_dim
        for offset_within_diagonal in range(
            skew_diagonal_length - 1
        ):  #! Skip the first row
            i = shorter_dim - offset_within_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_within_diagonal + 1
            if verbose:
                print(
                    f"anti-band: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}"
                )
            shorter_char = shorter[i - 1]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,  # ? Deletion cost
                matrix[i, j - 1] + 1,  # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )

            if baseline is not None:
                assert (
                    matrix[i, j] == baseline[i, j]
                ), f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"

    # Now let's handle the bottom right triangle.
    for skew_diagonal_index in range(longer_dim, diagonals_count):
        skew_diagonal_length = diagonals_count - skew_diagonal_index
        for offset_within_diagonal in range(skew_diagonal_length):
            i = shorter_dim - offset_within_diagonal - 1
            j = skew_diagonal_index - shorter_dim + offset_within_diagonal + 1
            if verbose:
                print(
                    f"bottom right triangle: {skew_diagonal_index=}, {skew_diagonal_length=}, {i=}, {j=}"
                )
            assert (i - 1) >= 0 and (i - 1) < len(shorter), f"{i = }"
            assert (j - 1) >= 0 and (j - 1) < len(longer), f"{j = }"
            shorter_char = shorter[i - 1]
            longer_char = longer[j - 1]
            substitution_cost = shorter_char != longer_char
            matrix[i, j] = min(
                matrix[i - 1, j] + 1,  # ? Deletion cost
                matrix[i, j - 1] + 1,  # ? Insertion cost
                matrix[i - 1, j - 1] + substitution_cost,  # ? Substitution cost
            )

            if baseline is not None:
                assert (
                    matrix[i, j] == baseline[i, j]
                ), f"{matrix[i, j]} != {baseline[i, j]} at {i=}, {j=}"

    # Return the Levenshtein distance
    distance = matrix[len(shorter), len(longer)]
    if len(s1) > len(s2):
        matrix = matrix.T
    return distance, matrix

## Putting Everything Together

In [None]:
def vectorized_skewed_diagonals(
    s1: str, s2: str, 
    verbose: bool = False, 
    baseline: Optional[np.ndarray] = None) -> Tuple[int, np.ndarray]:
    
    shorter, longer = (s1, s2) if len(s1) <= len(s2) else (s2, s1)    
    baseline = baseline if len(s1) <= len(s2) else baseline.T
    shorter_dim = len(shorter) + 1
    longer_dim = len(longer) + 1
    if verbose:
        print(f"{shorter=}, {longer=}, {shorter_dim=}, {longer_dim=}")
    
    # Create a matrix of size (shorter_dim) x (longer_dim)
    matrix = np.zeros((shorter_dim, longer_dim), dtype=int)
    matrix[:, :] = longer_dim + 1 # or +inf 

    # Initialize the first column and first row of the matrix
    for i in range(shorter_dim):
        matrix[i, 0] = i
    for j in range(longer_dim):
        matrix[0, j] = j

    # Let's say we are dealing with 3 and 5 letter words.
    # The matrix will have size 4 x 6, parameterized as (shorter_dim x longer_dim).
    # It will have:
    # - 4 diagonals of increasing length, at positions: 0, 1, 2, 3.
    # - 2 diagonals of fixed length, at positions: 4, 5.
    # - 3 diagonals of decreasing length, at positions: 6, 7, 8.
    diagonals_count = shorter_dim + longer_dim - 1

    # Same as with square matrices, the 0th diagonal contains - just one element - zero - skipping it.
    # Same as with square matrices, the 1st diagonal contains the values 1 and 1 - skipping it.
    # Now let's handle the rest of the upper-left triangle.
    next_diagonal_index = 2
    while next_diagonal_index < shorter_dim:
        next_skew_diagonal_length = next_diagonal_index + 1

        old_substitution_costs = previous[:next_skew_diagonal_length - 2]
        added_substitution_costs = [shorter[next_diagonal_index - offset_within_diagonal - 2] != longer[offset_within_diagonal] for offset_within_diagonal in range(next_skew_diagonal_length - 2)]
        substitution_costs = old_substitution_costs + added_substitution_costs

        following[1:next_skew_diagonal_length - 1] = np.minimum(current[1:next_skew_diagonal_length - 1] + 1, current[:next_skew_diagonal_length - 2] + 1) # Insertions or deletions
        following[1:next_skew_diagonal_length - 1] = np.minimum(following[1:next_skew_diagonal_length - 1], substitution_costs) # Substitutions
        following[0] = next_diagonal_index
        following[next_skew_diagonal_length - 1] = next_diagonal_index
        assert np.all(following[:next_skew_diagonal_length] == get_skewed_diagonal(baseline, next_diagonal_index))
        
        previous[:] = current[:]
        current[:] = following[:]
        next_diagonal_index += 1
                        
    # Now let's handle the anti-diagonal band of the matrix, between the top and bottom-right triangles.        
    while next_diagonal_index < longer_dim:
        next_skew_diagonal_length = shorter_dim
    
        ...