In [None]:
def RevCom(seq):
    '''
    Returns the reverse complement of a dna sequence
    '''
    com_dict = {'A': 'T',
                'T': 'A',
                'G': 'C',
                'C': 'G',
                'N': 'N'}
    revcom = ''.join([com_dict.get(base, 'N') for base in reversed(seq)])   # (memory) seq returns an iterator to return sequence for long reads; creating a reverse string uses additional mem
    return revcom


def genome_index_minimizer(genome_sequence, k_stride, k_size, include_revcom=True, window_size=5):
    
    kmer_list = []
    rev_kmer_list = []
    minimizer_list = []
    
    for i in range(0, len(genome_sequence)-k_size+1, k_stride):
        kmer = genome_sequence[i:i+k_size]
        kmer_list.append(kmer)

        if include_revcom:
            rev_kmer_list.append(RevCom(kmer))
        
        if i>=window_size:
            minimizer = min(kmer_list[i-window_size:i])
            minimizer_list.append((i, minimizer, 1))

            if include_revcom:
                minimizer_list.append((i, minimizer, -1))

    return minimizer_list


test_sequence =  "CTAGGTTACTAGAAGTTTTGCGACGTTCTAAGAGTTGGACGAAATGTTTCGCGACCTAGGATGAGGTCGCCCTAGAAAATAGATTTCTGCTACTCTCCTCATAAGCAGTCCGGTGTATCGAAAGTACAAGACTAGCCTTGCTAGCAACCGCGGGCTGGGAGCCTAAGGCATCACTCAAGATACAGGCTCGGTAACGTACGCTCTAGCCATCTAACTATCCCCTATGTCTTATAGGGACCTACGTTATCTGCCTGTCGAACCATAGGATTCGCATCAGCGCGCAGGCTTGGGTCGAGATAAAATCTCCAGTGCCCAAGACCACGGGCGCTCGGCGTCTTGGCTAATCCCCGTACATGTTGTTATAAATAATCAGTAGAAACTCTGTGTTAGAGGGTGGAGTGACCTTAAATCAAGGACGATATTAATCGGAAGGAGTATTCAACGTGATGAAGTCGCAGGGTTGACGTGGGAATGGTGCTTCTGTCCAAACAGGTTAGGGTATAACGCCGGAACCGTCCCCCAAGCGTACAGGGTGGGCTTTGCAACGACTTCCGAGTCCAAAGACTCGCTGTTTTCGAAATTTGCGCTCAAGGGCGAGTATTGAACCAGGCTTACGCCCAAGTACGTAGCAAGGTGACTCAAACAGAGTACATCCTGCCCGCGTTTCGTATGAATCAAGTTAGAAGTTATGGAACATAATAACATGTGGATGGCCAGTGGTCGGTTGTTACACGCCTGCCGCAACGTTGAAAGTCCCGGATTAGACTGGCAGGATCTATGCCGTGAGACCCGTTATGCTCCATTACCGTCAGTGGGTCACAGCTTGTTGTGGACTGGATTGCCATTCTCTGAGTGTATTACGGAGGCGGCCGCACGGGTCCCATATAAACCAGTCATAGCTTACCTGACTGTACTTAGAAATGTGGCTTCGCCTTTGCCCACGCACCTGATCGCTCCTCGTTTGCTTTTAAGGACCGGACGAACTACAGAGCATTGGAAG"
for i in genome_index_minimizer(test_sequence,1,20):
    print(i)


(5, 'AGGTTACTAGAAGTTTTGCG', 1)
(5, 'AGGTTACTAGAAGTTTTGCG', -1)
(6, 'AGGTTACTAGAAGTTTTGCG', 1)
(6, 'AGGTTACTAGAAGTTTTGCG', -1)
(7, 'AGGTTACTAGAAGTTTTGCG', 1)
(7, 'AGGTTACTAGAAGTTTTGCG', -1)
(8, 'ACTAGAAGTTTTGCGACGTT', 1)
(8, 'ACTAGAAGTTTTGCGACGTT', -1)
(9, 'ACTAGAAGTTTTGCGACGTT', 1)
(9, 'ACTAGAAGTTTTGCGACGTT', -1)
(10, 'ACTAGAAGTTTTGCGACGTT', 1)
(10, 'ACTAGAAGTTTTGCGACGTT', -1)
(11, 'ACTAGAAGTTTTGCGACGTT', 1)
(11, 'ACTAGAAGTTTTGCGACGTT', -1)
(12, 'ACTAGAAGTTTTGCGACGTT', 1)
(12, 'ACTAGAAGTTTTGCGACGTT', -1)
(13, 'AAGTTTTGCGACGTTCTAAG', 1)
(13, 'AAGTTTTGCGACGTTCTAAG', -1)
(14, 'AAGTTTTGCGACGTTCTAAG', 1)
(14, 'AAGTTTTGCGACGTTCTAAG', -1)
(15, 'AAGTTTTGCGACGTTCTAAG', 1)
(15, 'AAGTTTTGCGACGTTCTAAG', -1)
(16, 'AAGTTTTGCGACGTTCTAAG', 1)
(16, 'AAGTTTTGCGACGTTCTAAG', -1)
(17, 'AAGTTTTGCGACGTTCTAAG', 1)
(17, 'AAGTTTTGCGACGTTCTAAG', -1)
(18, 'AGTTTTGCGACGTTCTAAGA', 1)
(18, 'AGTTTTGCGACGTTCTAAGA', -1)
(19, 'GTTTTGCGACGTTCTAAGAG', 1)
(19, 'GTTTTGCGACGTTCTAAGAG', -1)
(20, 'GCGACGTTCTAAGAGTTGGA', 1)
(20

In [143]:
# genome_test = "ACTTGATCTAGGACGAGCAGCTCTATCGAGCATCTAGC"
# read_test = "GACGAGCAGCTCTATCG"

genome_test = "AAAAAAAAGACGAGCAAAAAAAAAAAAAAAAAAA"
read_test = "GACGAGC"

# genome_test = "ATCTAGC"
# read_test = "CTA"


In [148]:
def banded_sw_align(seq1, seq2, half_bandwidth, gap_penalty=-1, match_score=1, mismatch_score=-1):
    '''
    Function banded_sq_align performs a banded smith waterman local alignment of two input DNA strings
        INPUT
            seq1: A DNA string to be aligned
            seq2: The other DNA string to be aligned
            gap_penalty: integer value for the gap penalty
            match_score: integer value for a match score
            mismatch_score: integer value for a mismatch score
            half_bandwidth: integer value specifying how far out from the main diagonal to fill in the DP table. Total bandwidth is 2*half_bandwidth+1
        OUTPUT
            seqs[0]: The aligned DNA string for seq1
            seqs[1]: The aligned DNA string for seq2
            max_pointer
        FUNCTION
            banded_sw_align first fills in a smith waterman DP table according to the smith waterman algorithm rules
            banded_sw_align constrains the filling to only the columns half_bandwidth away from the central diagonal
            banded_sw_align then calls traceback, which starts from an optimal value found in the DP table and uses the traceback_dp table to reconstruct the optimal alignments for seq1 and seq2
    '''
    '''
    Initialize Values and DP Table
    '''
    n, m = len(seq1), len(seq2) # Extract the lengths of seq1 and seq2

    # Create traceback_dp and match_mistmatch_dp tables containing n+1 rows as in standard SW
    # However, banded tables will only contain 2 * half_bandwidth + 1 rows
    # This will only include the diagonal region of width half_bandwidth around the central diagonal
    # 2 * half_bandwidth to include both side of the diagonal
    # + 1 to include the main diagonal cell itself

    traceback_dp = [["end"] * (2 * half_bandwidth + 1) for _ in range(n+1)] # Create a DP table with n+1 rows and 2 * half_bandwidth * 1 columns
    max_val_dp = [[0] * (2 * half_bandwidth + 1) for _ in range(n+1)] # Create a DP table with n+1 rows and 2 * half_bandwidth * 1 columns

    '''
    Define function specific functions
    '''
    def score(a, b): # Simple scoring function
        return match_score if a == b else mismatch_score
    

    def traceback(traceback_dp, i, j_band): # Function that traces back from the optimal alignment value and constructs alignment strings
        seq_1_align = '' # Initialize strings
        seq_2_align = ''
       
        while True: #and 0 <= j_band <= 2 * half_bandwidth: # Set up while loop break condition to end when i hits zero or the j_band goes out of the banded region
            
            if i <= 0:
                print("reached last index")
                break

            if j_band <0 or j_band > 2* half_bandwidth:
                print("bandwidth exceeded!")
                break

            if traceback_dp[i][j_band] == "end": # End local alignment when an end pointer is hit
                print("end found!")
                break

            j = j_band - half_bandwidth + i # Convert j_band to j to allow for indexing into seq2, see how j is converted to j_band in dp filling below
            
            if traceback_dp[i][j_band] == "seq1_gap": # if a gap in seq1 is found, then append a gap to aligned seq1 and append the curret seq2 char to aligned seq2
                seq_1_align = "_" + seq_1_align
                seq_2_align = seq2[j - 1] + seq_2_align
                j_band -= 1 # Move to j_band - 1
            elif traceback_dp[i][j_band] == "match_mismatch": # if a match/mismatch is found, append the current char for seq1 and seq2 to their aligned sequences
                seq_1_align = seq1[i - 1] + seq_1_align
                seq_2_align = seq2[j - 1] + seq_2_align
                i -= 1 # Move back diagonally
                # Diagonal move cancels out the shift to j_band from moving up a row, so j_band is not updated
            elif traceback_dp[i][j_band] == "seq2_gap":  # if a gap in seq2 is found, then append a gap to aligned seq 2 and append the curret seq1 char to aligned seq1
                seq_1_align = seq1[i - 1] + seq_1_align
                seq_2_align = "_" + seq_2_align
                i -= 1 # Move to i - 1
                j_band += 1 # Moving up a row results in a right shift of the j_band index

        return seq_1_align, seq_2_align # Returned the constructed alignments
    
    '''
    Dyanmic table creation
    '''
    max_pointer = float('-inf') # Initiate value to hold largest value found in max_val_dp
    max_index = (0,0) # Initiate tuple for holding where the maximum index is found in max_val_dp

    for i in range(1, n +1): # Iterates over rows, or positions in seq1
        start_j = max(1, i - half_bandwidth) # start of j range set to leftmost position within band at row i, min is one for first row
        end_j = min(m, i + half_bandwidth) # end of j range set to rightmost position within band at row i, max at m so index does not exceed seq2 length
        for j in range(start_j, end_j + 1): # For the current row, iterate across possible j positions
            j_band = j-i+half_bandwidth # Convert the j value to j_band, which will actually allow access within the row as each row is in range 0, 2 * half_bandwidth

            if 0 <= j_band <= 2 * half_bandwidth: # Check to see if previous j position is within the bandwidth
                # Previous j_band value will just be the same j_band for a diagonal move
                match_mismatch = max_val_dp[i - 1][j_band] + score(seq1[i - 1], seq2[j - 1]) # If so, calculate match_mismatch value
            else: 
                match_mismatch = float('-inf')  # If the previous j position is out of the bandwidth, set to -inf

            if 0 <= j_band - 1 <= 2 * half_bandwidth: # Check to see if previous j position is within the bandwidth
                # A move to the left column is just j_band - 1
                seq1_gap = max_val_dp[i][j_band - 1] + gap_penalty # If so, calculate the seq1_gap value
            else:
                seq1_gap = float('-inf') # If the previous j position is out of the bandwidth, set to -inf

            if 0 <= j_band + 1 <= 2 * half_bandwidth: # Check to see if previous j position is within the bandwidth, no movement in j so tested value is just j_band
                # A move up a row results in j_band + 1
                seq2_gap = max_val_dp[i - 1][j_band+1] + gap_penalty # If so, calculate the seq2_gap value
            else:
                seq2_gap = float('-inf') # If the previous j position is out of the bandwidth, set to -inf

            max_val = max(match_mismatch, seq1_gap, seq2_gap, 0) # Find the move that gives the maximum score with a floor of zero
            max_val_dp[i][j_band] = max_val # Set the current position in max_val_dp to be that maximum score

            if max_val > max_pointer: # Update the maximum value holder across the loop to keep track of what and where it is
                max_pointer = max_val
                max_index = (i, j_band)
            
            if max_val == match_mismatch: # Set pointer stored in traceback depending on which condition gave the max
                traceback_dp[i][j_band] = "match_mismatch"
            elif max_val == seq1_gap:
                traceback_dp[i][j_band] = "seq1_gap"
            elif max_val == seq2_gap:
                traceback_dp[i][j_band] = "seq2_gap"
            else:
                traceback_dp[i][j_band] = "end"

    seqs = traceback(traceback_dp, max_index[0], max_index[1]) # Call traceback to construct alignment sequences
            
    return seqs[0], seqs[1], max_pointer # Returned seq1 alignment, seq2 alignment, and the optimal alignment score

read_aln, genome_aln, score = banded_sw_align(read_test,genome_test,8)


print("Read sequence:", read_test)
print("len:", len(read_test))
print("Genome sequence:", genome_test)
print("len", len(genome_test))
print("\n")
print("genome alignment:", genome_aln)
print("read alignment:", read_aln)
print("alignment lenghts:", len(genome_aln), len(read_aln))



reached last index
Read sequence: GACGAGC
len: 7
Genome sequence: AAAAAAAAGACGAGCAAAAAAAAAAAAAAAAAAA
len 34


genome alignment: GACGAGC
read alignment: GACGAGC
alignment lenghts: 7 7


genome length = 38
min bandwidth is 23

genome length = 34
min band is 23

genome length = 30
min band is 15

actually it seems like the min viable half bandwidth is a function of how many non matching bases are in front of the match region
4 bases, half bandwidth = 4
8 bases, half bandwidth = 8

In [84]:
def sw_align(seq1, seq2, gap_penalty, match_score, mismatch_score):
    '''
    Initialize values
    '''
    # DP tables are initialized to hold score and traceback information
    traceback_dp = [["end"] * (len(seq2) + 1) for _ in range(len(seq1) + 1)] # This table will keep backtracking pointers for reconstructing the sequence
    max_val_dp = [[0] * (len(seq2) + 1) for _ in range(len(seq1) + 1)] # This table will be for the best alignment ending with a match/mismatch
    
    '''
    Define function specific functions
    '''
    def score(a,b, match_score=match_score, mismatch_score=mismatch_score): # Simple scoring function
        if a == b:
            return match_score

        else:
            return mismatch_score


    def traceback(traceback_dp, i, j): # Function that traces back from the optimal alignment value and constructs alignment strings
        seq_1_align = '' # Initialize strings
        seq_2_align = ''
        while i > 0 or j > 0: # Set up while loop break condition to end when eithe index is zero
            if traceback_dp[i][j] == "end": # Base case, end local alignment when an end pointer is hit
                i = 0 # Update indices to break loop
                j = 0
                continue
            else:
                if traceback_dp[i][j]=="seq1_gap": # if a gap in seq1 is found, then append a gap to aligned seq1 and append the curret seq2 char to aligned seq2
                    seq_1_align = "_" + seq_1_align 
                    seq_2_align = seq2[j-1] + seq_2_align
                    j -= 1 # Move to j-1
                    continue
                elif traceback_dp[i][j]=="match_mismatch": # if a match/mismatch is found, append the current char for seq1 and seq2 to their aligned sequences
                    seq_1_align = seq1[i-1] + seq_1_align
                    seq_2_align = seq2[j-1] + seq_2_align
                    i -= 1 # Move back diagonally
                    j -= 1
                    continue
                elif traceback_dp[i][j]=="seq2_gap": # if a gap in seq2 is found, then append a gap to aligned seq 2 and append the curret seq1 char to aligned seq1
                    seq_1_align = seq1[i-1] + seq_1_align
                    seq_2_align = "_" + seq_2_align 
                    i -= 1 # Move to i - 1

        return seq_1_align, seq_2_align # Return aligned sequences
    
    ''' 
    Dynamic table creation, iterate through both strings and fill in DP tables
    '''
    max_pointer = float('-inf') # Initiate value to hold largest value found in table
    max_index = (0,0) # Initiate tuple for holding where the maximum index is found

    # Set dimension of tables to be 1 larger in length than the string for both strings
    # Iterate through every character in both input strings
    for i in range(1,len(seq1)+1): 
        for j in range(1,len(seq2)+1): # Range starting at 1 as array construction pre initializes all base cases, i=0 j=0, to 0
        
            # Calculate score for a match/mismatch
            match_mismatch = max_val_dp[i-1][j-1] + score(seq1[i-1],seq2[j-1])

            # Calculate score for gap in seq1
            seq1_gap = max_val_dp[i][j-1] + gap_penalty
            
            # Calculate score for gap in seq2
            seq2_gap = max_val_dp[i-1][j] + gap_penalty
          
            # Fill the match/mismatch table with the max value of the current index across all three possibilies with a floor of zero
            max_val = max(match_mismatch,seq1_gap,seq2_gap,0)
            max_val_dp[i][j] = max_val

            if max_val > max_pointer: # Update the maximum value holder across the loop to keep track of what and where it is
                max_pointer = max_val
                max_index = (i,j)

            if max_val == match_mismatch: # Set pointer stored in traceback depending on which condition gave the max
                traceback_dp[i][j] = "match_mismatch"
            elif max_val == seq1_gap:
                traceback_dp[i][j] = "seq1_gap"
            elif max_val == seq2_gap:
                traceback_dp[i][j] = "seq2_gap"
            elif max_val == 0:
                traceback_dp[i][j] = "end"

    '''
    Generate optimal SW local alignment from DP table and format function outputs
    '''

    seqs = traceback(traceback_dp, max_index[0], max_index[1])

    return seqs[0],seqs[1],max_pointer # Return seq1 alignment, seq2 alignment, maximum value

print(sw_align(genome_test, read_test,-1,1,-1))

('GACGAGC', 'GACGAGC', 7)


In [2]:
import numpy as np
from numba import njit, prange

In [33]:
@njit(cache=True)
def traceback_numba(seq1_array, seq2_array, traceback_dp, n, m, half_bandwidth, max_pos):
    i, j = max_pos

    END = 0
    MATCH_MISMATCH = 1
    GAP_IN_SEQ1 = 2  # Insertion in seq1 (gap in seq2)
    GAP_IN_SEQ2 = 3  # Deletion in seq1 (gap in seq1)

    max_pos_len = i + j
    aligned_seq1 = np.empty(max_pos_len, dtype=np.uint8)
    aligned_seq2 = np.empty(max_pos_len, dtype=np.uint8)
    index = max_pos_len - 1

    while i > 0 and j > 0:
        j_band = j - i + half_bandwidth
        if j_band < 0 or j_band >= 2 * half_bandwidth + 1:
            break

        traceback_code = traceback_dp[i, j_band]

        if traceback_code == END:
            break
        elif traceback_code == MATCH_MISMATCH:
            aligned_seq1[index] = seq1_array[i - 1]
            aligned_seq2[index] = seq2_array[j - 1]
            i -= 1
            j -= 1
        elif traceback_code == GAP_IN_SEQ1:
            aligned_seq1[index] = ord('_')
            aligned_seq2[index] = seq2_array[j - 1]
            j -= 1
        elif traceback_code == GAP_IN_SEQ2:
            aligned_seq1[index] = seq1_array[i -1]
            aligned_seq2[index] = ord('_')
        else:
            break
        index -= 1

    rev_ind = index + 1
    aligned_seq1 = aligned_seq1[rev_ind::]
    aligned_seq2 = aligned_seq2[rev_ind::]

    return aligned_seq1, aligned_seq2


@njit(cache=True)
def banded_sw_align_numba(seq1_array, seq2_array, n, m, half_bandwidth, gap_penalty, match_score, mismatch_score):
    max_val_dp = np.zeros((n + 1, 2 * half_bandwidth + 1), dtype=np.int32)
    traceback_dp = np.zeros((n + 1, 2 * half_bandwidth + 1), dtype=np.int8)

    END = 0
    MATCH_MISMATCH = 1
    GAP_IN_SEQ1 = 2  # Insertion in seq1 (gap in seq2)
    GAP_IN_SEQ2 = 3  # Deletion in seq1 (gap in seq1)

    # Initialize the DP table
    for i in range(1, n + 1):
        j_band = half_bandwidth - i + 0  # Since j = 0 for the first column
        if 0 <= j_band <= 2 * half_bandwidth:
            max_val_dp[i, j_band] = 0
            traceback_dp[i, j_band] = END 
        else:
            break

    for j in range(1, m + 1):
        i = 0
        j_band = j - i + half_bandwidth
        if 0 <= j_band <= 2 * half_bandwidth:
            max_val_dp[0, j_band] = 0
            traceback_dp[0, j_band] = END 
        else:
            break

    # Sets (0,0) value in hypothetical table to zero
    max_val_dp[0, half_bandwidth] = 0
    traceback_dp[0, half_bandwidth] = END

    max_score = 0
    max_pos = (0,0)

    # Fill the DP table
    for i in range(1, n + 1):
        start_j = max(1, i - half_bandwidth)
        end_j = min(m, i + half_bandwidth)

        for j in range(start_j, end_j + 1):
            j_band = j - i + half_bandwidth

            # Diagonal move (match/mismatch)
            if 0 <= j_band <= 2 * half_bandwidth:
                if seq1_array[i - 1] == seq2_array[j - 1]:
                    score = match_score
                else:
                    score = mismatch_score
                match_mismatch = max_val_dp[i - 1, j_band] + score
            else:
                match_mismatch = float('-inf')

            # Left move (gap in seq1)
            if 0 <= j_band - 1 <= 2 * half_bandwidth:
                seq1_gap = max_val_dp[i, j_band - 1] + gap_penalty
            else:
                seq1_gap = float('-inf')

            # Up move (gap in seq2)
            if 0 <= j_band + 1 <= 2 * half_bandwidth:
                seq2_gap = max_val_dp[i - 1, j_band + 1] + gap_penalty
            else:
                seq2_gap = float('-inf')

            # Choose the maximum score
            max_val = max(match_mismatch, seq1_gap, seq2_gap, 0)
            max_val_dp[i, j_band] = max_val

            if max_val == 0:
                traceback_dp[i, j_band] = END
            elif max_val == match_mismatch:
                traceback_dp[i, j_band] = MATCH_MISMATCH
            elif max_val == seq1_gap:
                traceback_dp[i, j_band] = GAP_IN_SEQ1
            elif max_val == seq2_gap:
                traceback_dp[i, j_band] = GAP_IN_SEQ2

            if max_val > max_score:
                max_score = max_val
                max_pos = (i ,j)

    return max_score, max_pos, traceback_dp

In [4]:
njit(cache = True)
def sw_align_numba(seq1_array, seq2_array, gap_penalty, match_score, mismatch_score):
    '''
    Function sw_align_numba uses the numba package to implement just-in-time compilation for creating the smith-waterman dynamic programming table
    Compilation occurs at runtime in the first function call and is cached for all subsequent calls (cache = True)

        INPUTS
            -seq1_array: numpy array of characters 


    '''
    n = len(seq1_array)
    m = len(seq2_array)

    max_val_dp = np.zeros((n + 1, m + 1), dtype=np.int32)
    traceback_dp = np.zeros((n + 1, m + 1), dtype=np.int8)

    MATCH_MISTMATCH = 1
    GAP_IN_SEQ1 = 2
    GAP_IN_SEQ2 = 3
    END = 0

    max_score = 0
    max_pos = (0,0)

    for i in range(1, n+1):
        seq1_i = seq1_array[i-1]
        for j in range(1, m + 1):
            seq2_j = seq2_array[j-1]

            if seq1_i == seq2_j:
                score = match_score
            else:
                score = mismatch_score

            match_mismatch = max_val_dp[i - 1, j - 1] + score
            seq1_gap = max_val_dp[i, j - 1] + gap_penalty
            seq2_gap = max_val_dp[i - 1, j] + gap_penalty
            max_val = max(match_mismatch, seq1_gap, seq2_gap, 0)

            max_val_dp[i, j] = max_val

            if max_val == 0:
                traceback_dp[i, j] = END
            elif max_val == match_mismatch:
                traceback_dp[i, j] = MATCH_MISTMATCH
            elif max_val == seq1_gap:
                traceback_dp[i, j] = GAP_IN_SEQ1
            elif max_val == seq2_gap:
                traceback_dp[i, j] = GAP_IN_SEQ2

            if max_val > max_score:
                max_score = max_val
                max_pos = (i ,j)

    
    return max_score, max_pos, traceback_dp
        
def traceback(seq1_array, seq2_array, traceback_dp, max_pos):
    aligned_seq1 = []
    aligned_seq2 = []
    i, j = max_pos

    MATCH_MISTMATCH = 1
    GAP_IN_SEQ1 = 2
    GAP_IN_SEQ2 = 3
    END = 0

    while i > 0 and j > 0:
        traceback_code = traceback_dp[i, j]
        if traceback_code == END:
            break
        elif traceback_code == MATCH_MISTMATCH:
            aligned_seq1.append(chr(seq1_array[i - 1]))
            aligned_seq2.append(chr(seq2_array[j - 1]))
            i -= 1
            j -= 1
        elif traceback_code == GAP_IN_SEQ1:
            aligned_seq1.append('_')
            aligned_seq2.append(chr(seq2_array[j - 1]))
            j -= 1
        elif traceback_code == GAP_IN_SEQ2:
            aligned_seq1.append(chr(seq1_array[i - 1]))
            aligned_seq2.append('_')
            i -= 1

    aligned_seq1 = ''.join(aligned_seq1[::-1])
    aligned_seq2 = ''.join(aligned_seq2[::-1])

    return aligned_seq1, aligned_seq2

In [42]:
test_seq_genome = "ATTACAT"
test_seq_read = "TTACA"
seq1_array = np.frombuffer(test_seq_genome.encode('ascii'), dtype=np.uint8)
seq2_array = np.frombuffer(test_seq_read.encode('ascii'), dtype=np.uint8)
half_bandwidth = int(max(len(seq1_array), len(seq2_array))/2)

max_score, max_pos, traceback_dp = sw_align_numba(test_seq_genome, test_seq_read, -2, 4, -1)
aln1, aln2 = traceback(seq1_array, seq2_array, traceback_dp, max_pos)
print(aln1)
print(aln2)

TTACA
TTACA


In [43]:

seq1_array = np.frombuffer(test_seq_genome.encode('ascii'), dtype=np.uint8)
seq2_array = np.frombuffer(test_seq_read.encode('ascii'), dtype=np.uint8)

max_score, max_pos, traceback_dp = banded_sw_align_numba(seq1_array, seq2_array,len(seq1_array), len(seq2_array), half_bandwidth, -2, 4, -1)
aln1, aln2 = traceback_numba(seq1_array, seq2_array, traceback_dp, len(seq1_array), len(seq2_array), half_bandwidth, max_pos)
aligned_seq1 = ''.join(map(chr, aln1.tolist()))
aligned_seq2 = ''.join(map(chr, aln2.tolist()))

print(aligned_seq1)
print(aligned_seq2)
print(traceback_dp)

TTACA
TTACA
[[0 0 0 0 0 0 0]
 [0 0 0 0 0 1 2]
 [0 0 1 1 2 1 1]
 [0 1 1 2 2 1 0]
 [3 3 1 2 1 0 0]
 [3 3 1 2 0 0 0]
 [1 3 1 0 0 0 0]
 [3 3 0 0 0 0 0]]
