In [4]:
import numpy as np
from typing import Tuple

class NeedlemanWunsch:
    """ Class for NeedlemanWunsch Alignment

    Parameters:
        sub_matrix_file: str
            Path/filename of substitution matrix
        gap_open: float
            Gap opening penalty
        gap_extend: float
            Gap extension penalty

    Attributes:
        seqA_align: str
            seqA alignment
        seqB_align: str
            seqB alignment
        alignment_score: float
            Score of alignment from algorithm
        gap_open: float
            Gap opening penalty
        gap_extend: float
            Gap extension penalty
    """
    def __init__(self, sub_matrix_file: str, gap_open: float, gap_extend: float):
        # Init alignment and gap matrices
        self._align_matrix = None
        self._gapA_matrix = None
        self._gapB_matrix = None

        # Init matrices for backtrace procedure
        self._back = None
        self._back_A = None
        self._back_B = None

        # Init alignment_score
        self.alignment_score = 0

        # Init empty alignment attributes
        self.seqA_align = ""
        self.seqB_align = ""

        # Init empty sequences
        self._seqA = ""
        self._seqB = ""

        # Setting gap open and gap extension penalties
        self.gap_open = gap_open
        assert gap_open < 0, "Gap opening penalty must be negative."
        self.gap_extend = gap_extend
        assert gap_extend < 0, "Gap extension penalty must be negative."

        # Generating substitution matrix
        self.sub_dict = self._read_sub_matrix(sub_matrix_file) # substitution dictionary

    def _read_sub_matrix(self, sub_matrix_file):
        """
        DO NOT MODIFY THIS METHOD! IT IS ALREADY COMPLETE!

        This function reads in a scoring matrix from any matrix like file.
        Where there is a line of the residues followed by substitution matrix.
        This file also saves the alphabet list attribute.

        Parameters:
            sub_matrix_file: str
                Name (and associated path if not in current working directory)
                of the matrix file that contains the scoring matrix.

        Returns:
            dict_sub: dict
                Substitution matrix dictionary with tuple of the two residues as
                the key and score as value e.g. {('A', 'A'): 4} or {('A', 'D'): -8}
        """
        with open(sub_matrix_file, 'r') as f:
            dict_sub = {}  # Dictionary for storing scores from sub matrix
            residue_list = []  # For storing residue list
            start = False  # trigger for reading in score values
            res_2 = 0  # used for generating substitution matrix
            # reading file line by line
            for line_num, line in enumerate(f):
                # Reading in residue list
                if '#' not in line.strip() and start is False:
                    residue_list = [k for k in line.strip().upper().split(' ') if k != '']
                    start = True
                # Generating substitution scoring dictionary
                elif start is True and res_2 < len(residue_list):
                    line = [k for k in line.strip().split(' ') if k != '']
                    # reading in line by line to create substitution dictionary
                    assert len(residue_list) == len(line), "Score line should be same length as residue list"
                    for res_1 in range(len(line)):
                        dict_sub[(residue_list[res_1], residue_list[res_2])] = float(line[res_1])
                    res_2 += 1
                elif start is True and res_2 == len(residue_list):
                    break
        return dict_sub

    def align(self, seqA: str, seqB: str) -> Tuple[float, str, str]:
        """
        TODO

        This function performs global sequence alignment of two strings
        using the Needleman-Wunsch Algorithm

        Parameters:
        	seqA: str
         		the first string to be aligned
         	seqB: str
         		the second string to be aligned with seqA

        Returns:
         	(alignment score, seqA alignment, seqB alignment) : Tuple[float, str, str]
         		the score and corresponding strings for the alignment of seqA and seqB
        """
        # Resetting alignment in case method is called more than once
        self.seqA_align = ""
        self.seqB_align = ""

        # Resetting alignment score in case method is called more than once
        self.alignment_score = 0

        # Initializing sequences for use in backtrace method
        self._seqA = seqA
        self._seqB = seqB

        # TODO: Initialize matrix private attributes for use in alignment

        gap_penalty = self.gap_open

        sub_dict = self.sub_dict
        # create matrices for alignment scores, gaps, and backtracing

        # Determine length of each sequence and store in n or m
        m = len(seqA)
        n = len(seqB)

        # Generate matrix of zeros to store scores
        self._align_matrix = np.zeros((n + 1, m + 1))


        # TODO: Implement global alignment here

        for i in range(0, n + 1):
            self._align_matrix[i][0] = gap_penalty * i

        for j in range(0, m + 1):
            self._align_matrix[0][j] = gap_penalty * j

        for i in range(0, n + 1):
            for j in range(0, m + 1):
                match = self._align_matrix[i][j] + sub_dict[(seqA[i - 1], seqB[j - 1])]
                delete = self._align_matrix[i][j + 1] + gap_penalty
                insert = self._align_matrix[i + 1][j] + gap_penalty

                self._align_matrix[i][j] = max(match, delete, insert)

        return self._backtrace()

    def _backtrace(self) -> Tuple[float, str, str]:
        """
        TODO

        This function traces back through the back matrix created with the
        align function in order to return the final alignment score and strings.

        Parameters:
        	None

        Returns:
         	(alignment score, seqA alignment, seqB alignment) : Tuple[float, str, str]
         		the score and corresponding strings for the alignment of seqA and seqB
        """
        seqA = self._seqA
        seqB = self._seqB
        sub_dict = self.sub_dict
        alignA = self.seqA_align
        alignB = self.seqB_align

        gap_penalty = self.gap_open

        i = len(seqA)
        j = len(seqB)

        while i > 0 and j > 0:  # end touching the top or the left edge
            score_current = self._align_matrix[i][j]
            score_diagonal = self._align_matrix[i - 1][j - 1]
            score_up = self._align_matrix[i][j - 1]
            score_left = self._align_matrix[i - 1][j]

            # Check to figure out which cell the current score was calculated from,
            # then update i and j to correspond to that cell.
            if score_current == score_diagonal + sub_dict[(seqA[j - 1], seqB[i - 1])]:
                alignA += seqA[j - 1]
                alignB += seqB[i - 1]
                i -= 1
                j -= 1
            elif score_current == score_up + gap_penalty:
                alignA += seqA[j - 1]
                alignB += '-'
                j -= 1
            elif score_current == score_left + gap_penalty:
                alignA += '-'
                alignB += seqB[i - 1]
                i -= 1

        # Finish tracing up to the top left cell
        while i > 0:
            alignA += seqA[i - 1]
            alignB += '-'
            j -= 1
        while j > 0:
            alignA += '-'
            alignB += seqB[j - 1]
            i -= 1

        # Traversed the score matrix from the bottom right so the two sequences will be reversed.
        # Reverse the sequences.
        alignA = alignA[::-1]
        alignB = alignB[::-1]

        # Update seq alignment values
        self.seqA_align = alignA
        self.seqB_align = alignB

        self.alignment_score = self._align_matrix.sum()

        return (self.alignment_score, self.seqA_align, self.seqB_align)


def read_fasta(fasta_file: str) -> Tuple[str, str]:
    """
    DO NOT MODIFY THIS FUNCTION! IT IS ALREADY COMPLETE!

    This function reads in a FASTA file and returns the associated
    string of characters (residues or nucleotides) and the header.
    This function assumes a single protein or nucleotide sequence
    per fasta file and will only read in the first sequence in the
    file if multiple are provided.

    Parameters:
        fasta_file: str
            name (and associated path if not in current working directory)
            of the Fasta file.

    Returns:
        seq: str
            String of characters from FASTA file
        header: str
            Fasta header
    """
    assert fasta_file.endswith(".fa"), "Fasta file must be a fasta file with the suffix .fa"
    with open(fasta_file) as f:
        seq = ""  # initializing sequence
        first_header = True
        for line in f:
            is_header = line.strip().startswith(">")
            # Reading in the first header
            if is_header and first_header:
                header = line.strip()  # reading in fasta header
                first_header = False
            # Reading in the sequence line by line
            elif not is_header:
                seq += line.strip().upper()  # generating full sequence
            # Breaking if more than one header is provided in the fasta file
            elif is_header and not first_header:
                break
    return seq, header

In [75]:
alg = NeedlemanWunsch("substitution_matrices/BLOSUM62.mat", -10.0, -1.0)

In [201]:
seq1, _ = read_fasta("./data/test_seq3.fa")
seq2, _ = read_fasta("./data/test_seq4.fa")
n = len(seq1) # 10
m = len(seq2) # 7
gap_open_penalty = -10
gap_ext_penalty = -1

# Initialize empty matrices
align_mat = np.zeros((m + 1, n + 1)) # rows vs col
score_mat = np.zeros((m + 1, n + 1))
gapA_mat = np.zeros((m + 1, n + 1))
gapB_mat = np.zeros((m + 1, n + 1))

for i in range(1, m + 1):
    align_mat[i][0] = -np.inf
    gapA_mat[i][0] = gap_open_penalty + ((gap_ext_penalty) * i)
    gapB_mat[i][0] = -np.inf
    
for j in range(1, n + 1):
    align_mat[0][j] = -np.inf
    gapA_mat[0][j] = -np.inf
    gapB_mat[0][j] = gap_open_penalty + ((gap_ext_penalty) * j)

for i in range(1, m + 1):
    for j in range(1, n + 1):
        align_mat[i][j] = max(align_mat[i - 1][j - 1] + alg.sub_dict[(seq1[j-1], seq2[i-1])], gapA_mat[i-1][j-1] + alg.sub_dict[(seq1[j-1], seq2[i-1])], gapB_mat[i-1][j-1] + alg.sub_dict[(seq1[j-1], seq2[i-1])])
        gapA_mat[i][j] = max((align_mat[i - 1][j] + gap_open_penalty + gap_ext_penalty), (gapA_mat[i-1][j] + (gap_ext_penalty)))
        gapB_mat[i][j] = max((align_mat[i][j - 1] + gap_open_penalty + gap_ext_penalty), (gapB_mat[i][j-1] + (gap_ext_penalty)))
        score_mat[i][j] = max(align_mat[i][j], gapA_mat[i][j], gapB_mat[i][j])
print(np.max(score_mat))

17.0


In [202]:
print(score_mat)

[[  0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.]
 [  0.   5.  -6.  -7.  -8.  -9. -10. -11. -12. -13. -14.]
 [  0.  -6.   4.  -7.  -7.  -3. -10. -11. -10. -11. -14.]
 [  0.  -7.  -7.   5.  -6.  -7.   1.  -8. -10. -11. -12.]
 [  0.  -8.  -8.  -4.   2.  -9.  -5.   5.  -6.  -7.  -8.]
 [  0.  -9.  -9.  -7.  -4.   3.  -8.  -6.  10.  -1.  -2.]
 [  0. -10. -10.  -8.   1.  -4.   0.  -7.  -1.  10.  -1.]
 [  0. -11. -11.  -9. -10.   0.  -7.  -3.  -2.  -1.  17.]]


In [203]:
print(align_mat)

[[  0. -inf -inf -inf -inf -inf -inf -inf -inf -inf -inf]
 [-inf   5. -12. -11. -15. -14. -13. -15. -18. -19. -21.]
 [-inf -11.   4.  -8.  -7.  -3. -11. -13. -10. -11. -14.]
 [-inf -10.  -7.   5. -10.  -9.   1.  -8. -13. -12. -14.]
 [-inf -12.  -8.  -4.   2.  -9.  -5.   5. -11. -13. -14.]
 [-inf -15.  -9. -11.  -4.   3. -11.  -8.  10.  -1.  -9.]
 [-inf -17. -11. -12.   1.  -4.   0. -11.  -6.  10.  -3.]
 [-inf -18. -11. -12. -10.   0.  -7.  -3.  -9.  -3.  17.]]


In [204]:
# Fill in back matrix for backtracing
traceback_mat = np.ones((m + 1, n + 1)) * -np.inf
#best_score = max(align_mat[i][j], gapA_mat[i][j], gapB_mat[i][j])

for i in range(1, m + 1):
    for j in range(1, n + 1):
        # A match is represented in the back matrix as a 0
        if score_mat[i][j] == align_mat[i][j]:
            traceback_mat[i][j] = 0
        # A gap in seqA is represented in the back matrix as a -1
        elif score_mat[i][j] == gapA_mat[i][j]:
            traceback_mat[i][j] = -1
        # A gap in seqB is represented in the back matrix as a 1
        else:
            traceback_mat[i][j] = 1

In [205]:
print(traceback_mat)

[[-inf -inf -inf -inf -inf -inf -inf -inf -inf -inf -inf]
 [-inf   0.   1.   1.   1.   1.   1.   1.   1.   1.   1.]
 [-inf  -1.   0.   1.   0.   0.   1.   1.   0.   0.   0.]
 [-inf  -1.   0.   0.   1.   1.   0.   0.   1.   1.   1.]
 [-inf  -1.   0.   0.   0.   0.   0.   0.   1.   1.   1.]
 [-inf  -1.   0.  -1.   0.   0.   1.  -1.   0.   0.   1.]
 [-inf  -1.  -1.  -1.   0.   0.   0.  -1.  -1.   0.   1.]
 [-inf  -1.   0.  -1.   0.   0.   0.   0.  -1.  -1.   0.]]


In [206]:
print(seq1)
print(seq2)

MAVHQLIRRP
MQLIRHP


In [207]:
seqA_align = ""
seqB_align = ""

while i > 0 and j > 0:
    back_step = traceback_mat[i][j]
    if back_step == 0:
        seqA_align += seq1[j-1]
        seqB_align += seq2[i-1]
        i -= 1
        j -= 1
    elif back_step == -1:
        seqA_align += "-"
        seqB_align = seq2[i -1]
        i-=1
    elif back_step == 1:
        seqA_align += seq1[j - 1]
        seqB_align += "-"
        j -= 1

if i > 0 and j == 0:
    while i > 0:
        seqA_align += '-'
        seqB_align += seq2[j-1]
        i -= 1
if i == 0 and j > 0:
    while j > 0:
        seqA_align += seq1[j - 1]
        seqB_align += '-'
        i -= 1
    
seqA_align = seqA_align[::-1]
seqB_align = seqB_align[::-1]

In [208]:
print(seqA_align)
print(seqB_align)

MAVHQLIRRP
M---QLIRHP


''