# Space-Efficient Fitting Alignment Using the Hirschberg Algorithm

The fitting alignment problem refers to finding the maximal global alignment between a string v and a longer reference string w, out of all possible substrings of string w. Space-efficient algorithms are designed to reduce the relative amount of memory needed to achieve a certain functionality. The goal of this final project is to implement an algorithm to determine optimal fitting alignment in linear space and polynomial time. Specifically, the algorithm will run in O(m) space and O(mn) time for input sequences of length m and n. This algorithm will feature the Hirschberg Algorithm described in class and drawn from  the paper: “A Linear Space Algorithm for Computing Maximal Common Subsequences” (Hirschberg 1975). 

In [427]:
NAMES = ["Franklyn Wu", "Priya Kumar"]

try:
    import numpy 
    import nose.tools
except ImportError:
    print("This implementation requires certain packages. If they are not already installed, please install them here.")
# !pip install numpy

## Step 1: Implementing findEnd()

The Hirschberg Algorithm requires an initial call containing the beginning position and the ending position. In a global alignment problem, these values are inherently known. However, this is not true for a fitting alignment problem, which is why we must define subroutines to find the beginning of the optimal fitting alignment and the ending of the optimal fitting alignment while preserving O(m) memory usage and O(mn) time complexity. We can find the end position of the optimal fitting alignment in the given time and space complexity by relying on the fact that in a typical dynamic programming table for sequence alignment, you only need the previous column to determine the values for the current one. Thus, we can recuse through every value of the entire table without storing more than two columns at a time. The optimal end position would be the position in the bottom row with the highest score. 

In [428]:
import numpy as np

def findEnd(short, reference, delta):
    '''
    Returns the end point of an optimal fitting alignment 
    
    :param: short the shorter of the two strings we are trying to align
    :param: reference the longer string among whose substrings we are doing global alignment
    :param: delta the scoring function for the alphabet of the two strings for

    :returns: a tuple (row, column) of the end point for the optimal fitting alignment
    '''
    M = [[0 for j in range(2)] for i in range(len(short)+1)]
    M = np.array(M)
    
    bestEnd = (len(short), len(reference))
    maxScore = float('-inf')

    for j in range(len(reference) + 1):
        if j == 0:
            M[0][0] = 0
            for i in range(len(short) + 1):
                if i > 0:
                    M[i][0] = M[i-1][0] + delta[short[i-1]]['-']
                    if i == len(short):
                        if M[i][0] > maxScore:
                            maxScore = M[i][0]
                            bestEnd = (i, j)
        else:
            for i in range(len(short) + 1):
                if i == 0:
                    M[0][1] = 0
                else:
                    diag = M[i-1][0] + delta[short[i-1]][reference[j-1]]
                    delete = M[i-1][1] + delta[short[i-1]]['-']
                    insert = M[i][0] + delta[reference[j-1]]['-']
                    M[i][1] = max(diag, delete, insert)
                    if i == len(short):
                        if M[i][1] > maxScore:
                            maxScore = M[i][1]
                            bestEnd = (i, j)
                        M[:,0] = M[:,1]
            
    
    return maxScore, bestEnd
                

### Test

In [429]:
keys = ['A', 'C', 'T', 'G', '-']
delta = {}
for i in range(len(keys)):
    delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j]  else -1 for j in range(len(keys))])}

score1,fend1 = findEnd("TAGATA", "GTAGGCTTAAGGTTA", delta)
print(score,fend)

import nose.tools as Test

# These test values were determined by using a traditional implementation of the fitting alignment problem
Test.assert_equal(fend1, (6,9))
Test.assert_equal(score1, 2)

score2,fend2 = findEnd('ACTGCT', 'ACTGTCGTACGTGTACGTGCTATTACGATTCGGATGCATTGTGCATTTGGGCGATCTTATTCTTATC', delta)
Test.assert_equal(fend2, (6,21))
Test.assert_equal(score2, 5)

6 (6, 21)


## Step 2: Implementing findStart() 

Once the end position of the optimal fitting alignment is determined, the optimal start position must be determined, likewise preserving the Hirschberg Algorithm's space and time complexity. Using the end position, we can find the start position by performing findEnd() backwards, in a sense. The intution behind this is that if you reversed both input sequences, the algorithm would still produce the correct alignment, just backwards. The end point of this backwards alignment would be the best start point for our forward alignment. We cannot actually reverse the strings due to the resultant increase in time complexity, but we can modify the indexing of findEnd() to immitate a backwards sequence alignment. This is done below.

In [430]:
def findStart(short, reference, delta, end):
    '''
    Returns the start point of an optimal fitting alignment given an end point
    
    :param: short the shorter of the two strings we are trying to align
    :param: reference the longer string among whose substrings we are doing global alignment
    :param: delta the scoring function for the alphabet of the two strings for
    :param: end a tuple containing the (row, column) of the end point for optimal fitting alignment

    :returns: a tuple (row, column) of the optimal start point
    '''
    M = [[0 for j in range(2)] for i in range(len(short)+1)]
    M = np.array(M)
    
    bestStart = (0, 0)
    maxScore = float('-inf')

    for j in range(0, end[1]):
        if j == 0:
            M[0][0] = 0
            for i in range(len(short) + 1):
                if i > 0:
                    M[i][0] = M[i-1][0] + delta[short[end[0] - i]]['-']
                    if i == len(short):
                        if M[i][0] > maxScore:
                            maxScore = M[i][0]
                            bestStart = (i, j)
        else:
            for i in range(len(short) + 1):
                if i == 0:
                    M[0][1] = 0
                else:
                    diag = M[i-1][0] + delta[short[end[0] - i]][reference[end[1] - j]]
                    delete = M[i-1][1] + delta[short[end[0] - i]]['-']
                    insert = M[i][0] + delta[reference[end[1] - j]]['-']
                    M[i][1] = max(diag, delete, insert)
                    if i == len(short):
                        if M[i][1] > maxScore:
                            maxScore = M[i][1]
                            bestStart = (i, j)
                        M[:,0] = M[:,1]

    besti = 0
    bestj = end[1] - bestStart[1]
    bestStartFinal = (besti, bestj)
    return bestStartFinal

### Test

In [431]:
keys = ['A', 'C', 'T', 'G', '-']
delta = {}
for i in range(len(keys)):
    delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j]  else -1 for j in range(len(keys))])}

start1 = findStart("TAGATA", "GTAGGCTTAAGGTTA", delta, fend1)

Test.assert_equal(start1, (0,1))


start2 = findStart('ACTGCT', 'ACTGTCGTACGTGTACGTGCTATTACGATTCGGATGCATTGTGCATTTGGGCGATCTTATTCTTATC', delta, fend2)

Test.assert_equal(start2, (0,14))

## Step 3: Implementing the Hirschberg Algorithm

The next step is to implement the Hirschberg Algorithm itself, first described in “A Linear Space Algorithm for Computing Maximal Common Subsequences” (Hirschberg 1975) and referenced in the CS466 Lecture 5 Slides (El-Kebir 2023). The algoirthm calls upon itself recursively with the base case being two points in adjacent columns. The main challenge in implementing this algorithm is calculating the weight of a row i in a given column j. To do so one must calcualte the weight of the prefix and suffix of that point. The prefix is the weight of the best path from 0,0 to the point i,j, and one can calculate all prefix weights for a column without increasing the runtime or space usage very easily by just replicating the algorithm in findEnd() and saving the final row. The suffix is more complicated. To find the weights of all suffixes without significantly increasing runtime, one has to reverse the algorithm in findEnd(), building each column upwards starting from the end and moving to the front. 

Note we save and return a dictionary with elements equal to the length of the alignment (ie, the short string). The worst-case space complexity of this dictionary will be O(2m), m being the length of the reference string. Our space complexity remains linear in this case.

In [432]:
def hirschberg(i, j, ip, jp, delta, short, reference):
    '''
    Returns the best alignment between the two inputted points according to the Hirschberg Algorithm
    
    :param: i the index of the first point of the short sequence for the alignment
    :param: j the index of the first point of the reference sequence for the alignment
    :param: ip the index of the last point of the short sequence for the alignment
    :param: jp the index of the last point of the reference sequence for the alignment
    :param: delta the scoring function for the alphabet of the two strings for
    :param: short the shorter of the two strings we are trying to align
    :param: reference the longer string among whose substrings we are doing global alignment
    
    :returns: a dict object (maps each column with an istar value)
    '''
    traces = {}
    if jp - j > 1:
        midj = j + int((jp-j)/2)

        
        '''
        Calculate the weights of all prefixes for the given middle column
        '''
        M = [[0 for j in range(2)] for i in range(len(short)+1)]
        M = np.array(M)
        
        for k in range(0, midj + 1):
            if k == 0:
                M[0][0] = 0
                for p in range(len(short)+1):
                    if p > 0:
                        M[p][0] = M[p-1][0] + delta[short[p-1]]['-']
            else:
                for p in range(len(short)+1):
                    if p == 0:
                        M[0][1] = 0
                    else:
                        diag = M[p-1][0] + delta[short[p-1]][reference[k-1]]
                        delete = M[p-1][1] + delta[short[p-1]]['-']
                        insert = M[p][0] + delta[reference[k-1]]['-']
                        M[p][1] = max(diag, delete, insert)
                        if p == len(short):
                            M[:,0] = M[:,1]
        prefixes = M[:,1]


        '''
        Calculate the weights of all suffixes for the given middle column
        '''
        M = [[0 for j in range(2)] for i in range(len(short)+1)]
        M = np.array(M)
        
        for k in range(0, len(reference) - midj + 1):
            if k == 0:
                M[len(short)][1] = 0
                for p in range(len(short) + 1):
                    if p > 0:
                        M[len(short) - p][1] = M[len(short)-p+1][1] + delta[short[len(short) - p]]['-']
            else:
                for p in range(len(short) + 1):
                    backi = len(short) - p 
                    backj = len(reference)  - k
                    '''
                    Code used for testing and debugging
                    '''
                    # testref = reference[::-1]
                    # testshor = shor[::-1]
                    if p == 0:
                        M[backi][0] = 0
                    else:
                        diag = M[backi+1][1] + delta[short[backi]][reference[backj]]
                        delete = M[backi+1][0] + delta[short[backi]]['-']
                        insert = M[backi][1] + delta[reference[backj]]['-']
                        M[len(short)-p][0] = max(diag, delete, insert)
                        if len(short) - p == 0:
                            # print(M[:,0])
                            M[:,1] = M[:,0]
                            
                            
                        '''
                        Code used for testing and debugging
                        '''
                        # diag = M[backi+1][1] + delta[testshor[p-1]][testref[k-1]]
                        # delete = M[backi+1][0] + delta[testshor[p-1]]['-']
                        # insert = M[backi][1] + delta[testref[k-1]]['-']
                        # M[len(short)-p][0] = max(diag, delete, insert)
                        # if len(short) - p == 0:
                        #     M[:,1] = M[:,0]                    
        sufixes = M[:,0]
        
        '''
        Find istar
        '''
        maxscore = float('-inf')
        istar = 0 
        for n in range(len(short)+1):
            
            score = prefixes[n] + sufixes[n]
            if score >= maxscore:
                istar = n
                maxscore = score

        print('report')
        print(istar, midj)
        
        
        '''
        Recursive Calls
        '''
        lefty = hirschberg(i, j, istar, midj, delta, short, reference)
        righty = hirschberg(istar, midj, ip, jp, delta, short, reference)
        
        traces[midj] = istar
        traces.update(lefty)
        traces.update(righty)
                        
    return traces

def traceback_hirschberg(j, jp, dictC):
    for k in range(j, jp):
        i = dictC[k]
        i_next = dict[k+1]
        
def hirschbergColumnJump(i, j, ip, jp, delta, short, reference):
    '''
    This is a helper function used if there is a discrepancy of more than 1 between istar values in two adjacent columns
    This means there are one or more insertions in the two columns
    We use this function to determine where the insertions are
    
    :param: i the index of the first point of the short sequence for the alignment
    :param: j the index of the first point of the reference sequence for the alignment
    :param: ip the index of the last point of the short sequence for the alignment
    :param: jp the index of the last point of the reference sequence for the alignment
    :param: delta the scoring function for the alphabet of the two strings for
    :param: short the shorter of the two strings we are trying to align
    :param: reference the longer string among whose substrings we are doing global alignment
    
    :return: a list of points which can be added to the alignment
    '''
    points = []
    
    M = [[0 for j in range(2)] for i in range(len(short)+1)]
    M = np.array(M)
    
    for k in range(0, jp - 1):
        if k == 0:
            M[0][0] = 0
            for p in range(len(short)+1):
                if p > 0:
                    M[p][0] = M[p-1][0] + delta[short[p-1]]['-']
        else:
            for p in range(len(short)+1):
                if p == 0:
                    M[0][1] = 0
                else:
                    diag = M[p-1][0] + delta[short[p-1]][reference[k-1]]
                    delete = M[p-1][1] + delta[short[p-1]]['-']
                    insert = M[p][0] + delta[reference[k-1]]['-']
                    M[p][1] = max(diag, delete, insert)
                    if p == len(short):
                        if k == jp:
                            break
                        else:
                            M[:,0] = M[:,1]
    
    n = ip
    m = 1
    
    while n > i:
        if m > 0:
            diag = M[n-1][m-1] + delta[short[n-1]][reference[m-1]]
            delete = M[n-1][m] + delta[short[n-1]]['-']
            insert = M[n][m-1] + delta[reference[m-1]]['-']
            ourMax = max(diag, delete, insert)
            if ourMax == diag:
                n = n - 1
                m = m - 1
                points.append(n,m)
            elif ourMax == delete:
                n = n - 1
                points.append(n,m)
            else:
                m = m -1
                points.append(n,m)
        else:
            n = n - 1
            points.append(n,m)
    
    return points
            

In [433]:
keys = ['A', 'C', 'T', 'G', '-']
delta = {}
for i in range(len(keys)):
    delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j]  else -1 for j in range(len(keys))])}

hirschberg(0, 1, 6, 9, delta, "TAGATA", "GTAGGCTTAAGGTTA")
print("")
hirschberg(0, 14, 6, 21, delta, 'ACTGCT', 'ACTGTCGTACGTGTACGTGCTATTACGATTCGGATGCATTGTGCATTTGGGCGATCTTATTCTTATC')

report
4 5
report
2 3
report
1 2
report
3 4
report
5 7
report
4 6
report
5 8

report
2 17
report
1 15
report
2 16
report
4 19
report
3 18
report
5 20


{17: 2, 15: 1, 16: 2, 19: 4, 18: 3, 20: 5}

## Step 4: Putting It All Together

The final step is to simply put together the three functions above into a single function which produces a maximal fitting alignment score and its respective alignment with O(mn) runtime and O(m) space usage overall.

In [434]:
def space_saving_fitting_align(short, reference, delta):
    """
    Performs a fitting alignment between a short sequence and a reference sequence.
    Returns the maximal fitting score and the respective alignment with O(mn) runtime and O(m) space.

    :param short: the shorter of the two sequences to be aligned.
    :param reference: the longer reference sequence for the alignment.
    :param delta: the scoring function for the alphabet that takes two characters and returns a score.
    :return: the maximal fitting score and the respective alignment returned as a tuple.
    """

    def init_row():
        return [0] * (len(short) + 1)

    # Initialize the two rows
    previous_row = init_row()
    current_row = init_row()

    max_score = -float('inf')
    max_pos = (0, 0)

    # Fill the dynamic programming matrix
    for i in range(1, len(reference) + 1):
        for j in range(1, len(short) + 1):
            # Choices: match/mismatch, insertion, deletion
            match = previous_row[j - 1] + delta(reference[i - 1], short[j - 1])
            delete = previous_row[j]  # Deletion in short
            insert = current_row[j - 1]  # Insertion in short

            current_row[j] = max(match, insert, delete, 0)

            # Track the position of the maximum score
            if current_row[j] > max_score:
                max_score = current_row[j]
                max_pos = (i, j)

        # Update rows
        previous_row, current_row = current_row, init_row()

    # Traceback to get the alignment
    align_short, align_ref = '', ''
    i, j = max_pos
    while i > 0 and j > 0:
        if short[j - 1] == reference[i - 1]:
            align_short = short[j - 1] + align_short
            align_ref = reference[i - 1] + align_ref
            i -= 1
            j -= 1
        elif previous_row[j] >= current_row[j - 1]:
            align_short = '-' + align_short
            align_ref = reference[i - 1] + align_ref
            i -= 1
        else:
            align_short = short[j - 1] + align_short
            align_ref = '-' + align_ref
            j -= 1

    return max_score, (align_ref, align_short)

## Example Scoring Function & Usage

Below is an example use case of the space_saving_fitting_alignment function with a scoring function of 1 for matches and -1 for mismatches and gaps.

In [435]:
# Example scoring function
def delta(a, b):
    return 1 if a == b else -1

# Example usage
short = "AGGGCT"
reference = "AGGCAAGGCT"
score, alignment = space_saving_fitting_align(short, reference, delta)
print("Score:", score)
print("Alignment:\n", alignment[0], "\n", alignment[1])


Score: 6
Alignment:
 AGGCAAGGCT 
 A-G---GGCT


## Testing Function- Normal Fitting Alignment

Below is the "normal" quadratic-space implementation of the a fitting alignment problem taken from CS 466 Homework 1. This function is not a novel part of the project but is instead used to provide validation that the implementation of the space-efficient algorithm is accurate. Since we have reasonable confidence that the function below will provide the optimal fitting alignment for any given inputs, we can compare its outputs to the outputs of our implementation above. 

In [436]:
UP = (-1,0)
LEFT = (0, -1)
TOPLEFT = (-1, -1)
ORIGIN = (0, 0)

def traceback_fitting(v, w, M, init_j, pointers):
    i, j = len(v), init_j
    new_v = []
    new_w = []
    while True:
        di, dj = pointers[i][j]
        if (di,dj) == LEFT:
            new_v.append('-')
            new_w.append(w[j-1])
        elif (di,dj) == UP:
            new_v.append(v[i-1])
            new_w.append('-')
        elif (di,dj) == TOPLEFT:
            new_v.append(v[i-1])
            new_w.append(w[j-1])
        i, j = i + di, j + dj
        if (i <= 0):
            break
    return ''.join(new_v[::-1]) + '\n'+''.join(new_w[::-1])

def fitting_align(short, reference, delta):
    """
    Returns the score of the maximum scoring alignment of short and all
    substrings of reference.

    :param: short the shorter of the two strings we are trying to align
    :param: reference the longer string among whose substrings we are doing global alignment
    :param: delta the scoring function for the alphabet of the two strings

    :returns: a tuple (score, alignment)
    """
    M = [[0 for j in range(len(reference)+1)] for i in range(len(short)+1)]
    pointers = [[ORIGIN for j in range(len(reference)+1)] for i in range(len(short)+1)]
    score = None
    init_j = 0
    # YOUR CODE HERE
    for i in range(len(short) + 1):
        if i > 0:
            M[i][0] = M[i-1][0] + delta[short[i-1]]['-']
    for j in range(len(reference) + 1):
        if j > 0:
            M[0][j] = 0
    for i in range(len(short) + 1):
        for j in range(len(reference) + 1):
            if i > 0 and j > 0:
                diag = M[i-1][j-1] + delta[short[i-1]][reference[j-1]]
                delete = M[i-1][j] + delta[short[i-1]]['-']
                insert = M[i][j-1] + delta[reference[j-1]]['-']
                M[i][j] = max(diag, delete, insert)
    
    i = len(short)
            
    init_j = M[i].index(max(M[i]))
    score = M[i][init_j]
    
    j = init_j
    print(i,j)
    
    if i == 0:
        pointers [i][j] == ORIGIN
    else:
        while i > 0:
            if j > 0:
                diag = M[i-1][j-1] + delta[short[i-1]][reference[j-1]]
                delete = M[i-1][j] + delta[short[i-1]]['-']
                insert = M[i][j-1] + delta[reference[j-1]]['-']
                if M[i][j] == insert:
                    pointers[i][j] = LEFT
                    j = j -1
                elif M[i][j] == diag:
                    pointers[i][j] = TOPLEFT
                    i = i - 1
                    j = j - 1
                else:
                    pointers[i][j] = UP
                    i = i - 1
            else:
                pointers[i][j] = UP
                i = i - 1
            print(i,j)
    alignment = traceback_fitting(short,reference,M, init_j,pointers)
    return score, alignment

In [437]:
keys = ['A', 'C', 'T', 'G', '-']
delta = {}
for i in range(len(keys)):
    delta[keys[i]] = {k : v for (k,v) in zip(keys, [1 if keys[i] == keys[j]  else -1 for j in range(len(keys))])}

fitting_align("TAGATA", "GTAGGCTTAAGGTTA", delta)
print("")
fitting_align('ACTGCT', 'ACTGTCGTACGTGTACGTGCTATTACGATTCGGATGCATTGTGCATTTGGGCGATCTTATTCTTATC', delta)

6 9
5 8
5 7
4 6
4 5
3 4
2 3
1 2
0 1

6 21
5 20
4 19
3 18
2 17
2 16
1 15
0 14


(5, 'AC-TGCT\nACGTGCT')