In [21]:
import numpy as np

In [22]:
#the log odds of one amino acid being substituted by another
blosum50 = np.loadtxt("blosum50.txt", dtype = 'i')

In [23]:
#map the characters with their indices
blosum_index_map = dict(zip(['A','R','N','D','C','Q','E','G','H','I','L','K',
                'M','F','P','S','T','W','Y','V'],
                  [i for i in range(len(blosum50))]))

#given the bases to consider swapping, get the blosum log probability score of swapping them
#by looking up the corresponding index of the chars and then accessing that index of the matrix 
def get_blosum_score(char1, char2):
    i = blosum_index_map[char1]
    j = blosum_index_map[char2]
    return blosum50[i][j]

In [24]:
from termcolor import colored

In [25]:
#assumes that the blosum50 field is accessible in its environment
def needleman_wunsch(str2, str1):
    
    #our penalty score
    d = -8
    
    #str1 is the chars on the "left" of the matrix
    #str2 is the chars along the "top" of the matrix
    
    #initialize an empty matrix
    height = len(str1) + 1
    width = len(str2) + 1
    
    #explicitly object data type so that we can store tuples
    matrix = np.zeros((height, width), dtype = "object")
    
    #initialization step, mark the cells at the sides of the matrix
    #as incrementing by the penalty, and mark the "arrow" directions all as north/west
    count = 0
    for i in range(width):
        matrix[0][i] = (count,"w")
        count += d
    count = 0
    for i in range(height):
        matrix[i][0] = (count, "n")
        count +=d
    
    for i in range(1, height):
        
        for j in range(1, width):
            
            char1 = str1[i - 1]
            char2 = str2[j - 1]
            score = get_blosum_score(char2, char1)
            
            #direct implementation of the dynamic programming step
            matrix[i][j] = max([
                #each cell contains a tuple of score and direction so extract the score
                (matrix[i-1][j-1][0] + score,"nw"),
                (matrix[i-1][j][0] + d,"n"),
                (matrix[i][j-1][0] + d,"w")
            ], key = lambda tup: tup[0])
        
    #F matrix has been generated, now need to perform backwards algorithm
    #start at the bottom right of the matrix
    i = height - 1
    j = width - 1
    
    #path of cell transitions we will build up
    path = []
    
    #the 2 string substitution visualizations we will build up
    str1sub = ""
    str2sub = ""
    
    while True:
        
        #end condition, we have retraced the path
        if i == 0 and j == 0:
            break
        
        #the direction we need to go in
        dr = matrix[i][j][1]
        
        #chars stayed the same
        if dr == "nw":
            path.append((i,j))
            str1sub += str1[i - 1]
            str2sub += str2[j - 1]
            i -= 1
            j -= 1
            continue
        
        #eliminate char from str2
        if dr == "n":
            path.append((i,j))
            str1sub += str1[i - 1]
            str2sub += '-'
            i -= 1
            continue
        
        #eliminate char from str1
        if dr == "w":
            path.append((i,j))
            str1sub += '-'
            str2sub += str2[j - 1]
            j -= 1
    
    path.append((0,0))
    
    print("\nIndices of path found starting from bottom right:\n\n" + str(path) + "\n")
    
    print("\nString comparison, dashes represent if a character was eliminated:\n\n")
    #reverse the strings as they have been built up backwards
    print(str2sub[::-1] + "\n")
    print(str1sub[::-1] + "\n\n")
    
    print("Solution visualization on F matrix:\n")
    
    for i in range(height):
        
        for j in range(width):
            
            if((i,j) in path):
                print(colored(matrix[i][j][0], "red"), end = '')
            else:
                print(matrix[i][j][0], end = '')
        
        #print a new line
        print()        

In [26]:
needleman_wunsch("HEAGAWGHEE", "PAWHEAE")


Indices of path found starting from bottom right:

[(7, 10), (6, 9), (5, 9), (4, 8), (3, 7), (3, 6), (2, 5), (1, 4), (1, 3), (0, 2), (0, 1), (0, 0)]


String comparison, dashes represent if a character was eliminated:


HEAGAWGHE-E

--P-AW-HEAE


Solution visualization on F matrix:

[31m0[0m[31m-8[0m[31m-16[0m-24-32-40-48-56-64-72-80
-8-2-9[31m-17[0m[31m-25[0m-33-41-49-57-65-73
-16-10-3-4-12[31m-20[0m-28-36-44-52-60
-24-18-11-6-7-15[31m-5[0m[31m-13[0m-21-29-37
-32-14-18-13-8-9-13-7[31m-3[0m-11-19
-40-22-8-16-16-9-12-15-7[31m3[0m-5
-48-30-16-3-11-11-12-12-15[31m-5[0m2
-56-38-24-11-6-12-14-15-12-9[31m1[0m


In [27]:
needleman_wunsch("SALPQPTTPVSSFTSGSMLGRTDTALTNTYSAL", "PSPTMEAVTSVEASTASHPHSTSSYFATTYYHLY")


Indices of path found starting from bottom right:

[(34, 33), (33, 33), (32, 32), (31, 31), (30, 30), (29, 29), (28, 28), (27, 27), (26, 26), (25, 25), (24, 24), (23, 23), (22, 22), (21, 21), (20, 20), (19, 19), (18, 18), (17, 17), (16, 16), (15, 15), (14, 14), (13, 13), (13, 12), (12, 11), (11, 10), (10, 9), (9, 8), (8, 7), (7, 6), (6, 5), (5, 4), (4, 3), (3, 2), (2, 1), (1, 0), (0, 0)]


String comparison, dashes represent if a character was eliminated:


-SALPQPTTPVSSFTSGSMLGRTDTALTNTYSAL-

PSPTMEAVTSVEA-STASHPHSTSSYFATTYYHLY


Solution visualization on F matrix:

[31m0[0m-8-16-24-32-40-48-56-64-72-80-88-96-104-112-120-128-136-144-152-160-168-176-184-192-200-208-216-224-232-240-248-256-264
[31m-8[0m-1-9-17-14-22-30-38-46-54-62-70-78-86-94-102-110-118-126-134-142-150-158-166-174-182-190-198-206-214-222-230-238-246
-16[31m-3[0m0-8-16-14-22-28-36-44-52-57-65-73-81-89-97-105-113-121-129-137-145-153-161-169-177-185-193-201-209-217-225-233
-24-11[31m-4[0m-42-6-4-12-20-26-34-42-50

In [28]:
#assumes that the blosum50 field is accessible in its environment
def smith_waterman(str2, str1):
    
    #our penalty score
    d = -8
    
    #str1 is the chars along the "left" of the matrix
    #str2 is the chars along the "right" of the matrix
    
    #initialize an empty matrix
    height = len(str1) + 1
    width = len(str2) + 1
    
    #explicitly object data type so that we can store tuples
    matrix = np.zeros((height, width), dtype = "object")
    
    #the cells at the edges are now marked as zeroes
    for i in range(width):
        matrix[0][i] = (0,"zero")
    for i in range(height):
        matrix[i][0] = (0, "zero")
    
    #keep track of the max seen 
    max_seen = 0
    #also keep track of its index
    max_index = (0,0)
    
    for i in range(1, height):
        
        for j in range(1, width):
            
            char1 = str1[i - 1]
            char2 = str2[j - 1]
            score = get_blosum_score(char2, char1)
            
            #direct implementation of the dynamic programming step
            matrix[i][j] = max([
                #the new case added for smith waterman
                (0, "zero"),
                #each cell contains a tuple of score and direction so extract the score
                (matrix[i-1][j-1][0] + score,"nw"),
                (matrix[i-1][j][0] + d,"n"),
                (matrix[i][j-1][0] + d,"w")
            ], key = lambda tup: tup[0])
            
            #keep track of the biggest cell value found as this
            #will now be our starting point
            if(matrix[i][j][0] > max_seen):
                max_seen = matrix[i][j][0]
                max_index = (i,j)
        
    #we are now starting at the index containing the max value 
    #instead of the bottom right of the matrix
    i = max_index[0]
    j = max_index[1]
    
    #path of cell transitions we will build up
    path = []
    
    #the 2 string substitution visualizations we will build up
    str1sub = ""
    str2sub = ""
    
    while True:
        
        #the direction we need to go in (or a zero)
        dr = matrix[i][j][1]
        
        #termination condition, the path stops at 0
        if dr == "zero":
            break
            
        #chars stayed the same
        if dr == "nw":
            path.append((i,j))
            str1sub += str1[i - 1]
            str2sub += str1[j - 1]
            i -= 1
            j -= 1
            continue
        
        #eliminate char from the string along the "top"
        if dr == "n":
            path.append((i,j))
            str1sub += str1[i - 1]
            str2sub += '-'
            i -= 1
            continue
        
        #eliminate char from the string along the "left"
        if dr == "w":
            path.append((i,j))
            str1sub += '-'
            str2sub += str2[j - 1]
            j -= 1
    
    #we finished at indices i,j therefore it must be a 0 and the final
    #node in the path, so add it to the path 
    path.append((i,j))
    
    print("\nIndices of path found starting from maximum:\n\n" + str(path) + "\n")
    
    print("\nLocal match substituted strings, dashes represent if a character was eliminated:")
    
    #reverse the strings as they have been built up backwards
    print(str2sub[::-1] + "\n")
    print(str1sub[::-1] + "\n\n")
    
    print("Solution visualization on F matrix:\n")
    
    for i in range(height):
        
        for j in range(width):
            
            if((i,j) in path):
                print(colored(matrix[i][j][0], "red")," ", end = '')
            else:
                print(matrix[i][j][0]," ", end = '')
        
        #print a new line
        print()        

In [29]:
smith_waterman("HEAGAWGHEE", "PAWHEAE")

IndexError: string index out of range

In [None]:
smith_waterman(
    "MQNSHSGVNQLGGVFVNGRPLPDSTRQKIVELAHSGARPCDISRILQVSNGCVSKILGRY",
    "TDDECHSGVNQLGGVFVGGRPLPDSTRQKIVELAHSGARPCDISRI"
)

In [31]:
class State:
    
    def __init__(self, emission_probabilities):
        self.emission_probabilities = emission_probabilities
        
    def set_change_probabilities(self, state_change_dict):
        self.state_change_dict = state_change_dict

## Question 3

In [32]:
#encode the information about the automaton
AT_rich = State({'A' : 0.2698, 'T' : 0.3237, 'C' : 0.2080, 'G' : 0.1985})
AT_rich.set_change_probabilities({
    #stay in the same state
    "loop" : 0.9998,
    "CG_rich" : 0.0002
})

CG_rich = State({'A' : 0.2459, 'T' : 0.2079, 'C' : 0.2478, 'G' : 0.2984})
CG_rich.set_change_probabilities({
    #stay in the same state
    "loop" : 0.9997,
    "AT_rich" : 0.0003
})

In [33]:
AT_rich.__dict__

{'emission_probabilities': {'A': 0.2698, 'T': 0.3237, 'C': 0.208, 'G': 0.1985},
 'state_change_dict': {'loop': 0.9998, 'CG_rich': 0.0002}}

In [34]:
CG_rich.__dict__

{'emission_probabilities': {'A': 0.2459,
  'T': 0.2079,
  'C': 0.2478,
  'G': 0.2984},
 'state_change_dict': {'loop': 0.9997, 'AT_rich': 0.0003}}

In [35]:
#run the HMM and generate a sequence up to the specified length, 
#returning a list of string and state pair tuples
def generate_HMM_sequence(length):
    
    
    

SyntaxError: unexpected EOF while parsing (<ipython-input-35-95962e510e40>, line 6)

In [36]:
import os

In [37]:
os.listdir()

['phaseLambda.fasta',
 'weeks 3-5.ipynb',
 'blosum50.txt',
 'O96791.fasta',
 'P63015.fasta',
 '.ipynb_checkpoints',
 '3CYASRFB114-Alignment.txt']

In [38]:
nuc_bases = ['A', 'T', 'C', 'G']

In [39]:
string = open("phaseLambda.fasta", 'r').read().replace('\n', '')

In [40]:
#the length of the phase lambda is 48502 so this is how large
#the HMM generated sequence will be
len(string)

48502

I found the most elegant way to model the HMM was using mutual recursion, where the 2 functions calling one another represents a change in state, and when n_remaining reaches 0 then the recursive stack that has been built up is returned (which will ultimately be a list of tuples of the string sections generated by each state, paired with a string stating the state that generated the section)

In [41]:
def run_HMM():
        
    #start in either state with equal chance
    if np.random.choice([True, False], p = [0.5,0.5]):
        return generate_ATrich_region(48502, [])
    else:
        return generate_CGrich_region(48502, [])

In [42]:
def generate_ATrich_region(n_remaining, accumulator):
    
    print("entered AT state, accumulator length:", len(accumulator))
    
    #emission probabilities for each of the nucleotide bases respectively
    #when in this state
    emission_probs = [0.2698, 0.3237, 0.2080, 0.1985]
    
    #a continuous section that will be built up whilst in this state
    section_str = ""
    
    #loop until we change into the other state by chance 
    while True:
        
        #whole recursive call finished and can be returned
        if n_remaining == 0:
            accumulator.append((section_str, "AT_rich"))
            print("Fully finished now in AT state")
            return accumulator
        
        section_str += np.random.choice(nuc_bases, p = emission_probs)
        n_remaining -= 1
        
        #on each iteration we may by chance finish in this state
        #and go into the other state
        if np.random.choice([True, False], p = [0.0002, 0.9998]):
            break
    
    print("finished in AT state, string length:",len(section_str))
    accumulator.append((section_str, "AT_rich"))
    return generate_CGrich_region(n_remaining, accumulator)

In [43]:
def generate_CGrich_region(n_remaining, accumulator):
    
    print("entered CG state, accumulator length:", len(accumulator))
    
    #emission probabilities for each of the nucleotide bases respectively
    #when in this state
    emission_probs = [0.2459, 0.2079, 0.2478, 0.2984]
    
    #a continuous section that will be built up whilst in this state
    section_str = ""
    
    #loop until we change into the other state by chance
    while True:
        
        #whole recursive call finished and can be returned
        if n_remaining == 0:
            accumulator.append((section_str, "CG_rich"))
            print("Fully finished now in CG state")
            return accumulator
    
        section_str += np.random.choice(nuc_bases, p = emission_probs)
        n_remaining -= 1
        
        #on each iteration we may by chance finish in this state
        #and go into the other state
        if np.random.choice([True, False], p = [0.0002, 0.9998]):
            break
    
    print("finished in CG state, string length:",len(section_str))
    accumulator.append((section_str, "CG_rich"))
    return generate_ATrich_region(n_remaining, accumulator)        

In [44]:
result = run_HMM()

entered AT state, accumulator length: 0
finished in AT state, string length: 2417
entered CG state, accumulator length: 1
finished in CG state, string length: 5386
entered AT state, accumulator length: 2
finished in AT state, string length: 1370
entered CG state, accumulator length: 3
finished in CG state, string length: 715
entered AT state, accumulator length: 4
finished in AT state, string length: 5445
entered CG state, accumulator length: 5
finished in CG state, string length: 3494
entered AT state, accumulator length: 6
finished in AT state, string length: 2137
entered CG state, accumulator length: 7
finished in CG state, string length: 5043
entered AT state, accumulator length: 8
finished in AT state, string length: 5295
entered CG state, accumulator length: 9
finished in CG state, string length: 5611
entered AT state, accumulator length: 10
finished in AT state, string length: 5329
entered CG state, accumulator length: 11
finished in CG state, string length: 3
entered AT state, 

In [45]:
result

[('TAAGATGTCTGAGCCAGGGTATCACAGCTTTCACTGTGAAACTGCCGGGTATAATGAAATTGTACTTTTTCATATCTAGGCTTAAGCGATCTCAGAAATACTCGGAGCTAGCTTTATAAATATTAGAATGTTCTGTCCAGTCTATCATGTACGCATATACTGATGTACCTTGTATTGGCTTCCTTCATCTCTACAATATAGGATCGTCGTCGCCATAAAGATCAAATTTAGAAATCCTTTTGCTGGTTAGTCCTTTCGTGGTATGATCACGAACATGAATTGGTAGCAAGGCATTGCCATCTCTATGGAATCAAAATCAGAAAAATAAGCCCTCAATTTGTTCATTGTTCACTCTTTTTCGCAGTTCATTAGATGCTATCATTACTCTAACTCTACATGACTTTGATTTGTCAATAGACGGGATAAATCACCCTGCTAAGTTATCTCGGTTTATCGTATATATTTATCTCAGATATAACTCAAGACTTCGGCAATTATTGGGTTTCCTTCCTAGCGTGGCGCAATTTAACACATACAATGTCGGACTAAGTGTACGATTAAGTTGCATATTTCTATATTGGCAACCCCGTAGTTACTAACCAACGCGTTTGGCAGAAGTTATTTTCGAAATAATGAAACGTTTACGACTATTATCGTATTCGTACGAAGGACGTCAAGTTAGTTGTTCGTCGGTTATTCTGCGTGAACTTGTGAGAATATTCCTAAAATGTCTCAACCAACCTCTTACTATGAAGCCAGTTTACCTACCACAATAGTTGTTGTTATTATTTTTCGGGCTTGGTCTTATCTAACTGTTTCGGGAGCTTATTATTTTTATATTAAGGTAGTAATCATAATCTGGCCGACAAAGCTCCAACATAAAATATTGGCATCTTAGGCGCCTTTTATAGTGCCCTTGGTTTCTCTGGTAAAATGATACGGTAACGGTAGTCGTAACCACGGACTGCCTGGATTTTCAAATCTAAGGAACCGCTAT

In [102]:
sum(list(map(lambda x: len(x[0]), result)))

48502

Now lets match the states and nucleotide base letters with index digits so that we can place them into our A and B matrices

we will set AT_rich, CG_rich as 0 and 1 respecitvely

and A, T, C, G as 0, 1, 2 and 3 respectively:

In [65]:
#takes as input the sequence of observations, which is just one large string
def viterbi_decode(observations): #obersvation sequence is often denotes as Y
    
    #Aij is the probability of tranistioning from state si to state sj
    A = [[0.9998, 0.0002],[0.9997, 0.0003]]
    
    #Bij stores the probability of observing oj from state si
    B = [[0.2698, 0.3237, 0.2080, 0.1985], [0.2459, 0.2079, 0.2478, 0.2984]]
    
    #all possible observations, (O)
    observation_space = ['A', 'G', 'C', 'T']
    
    #all possible states that can be entered (S)
    state_space = ["AT_rich", "CG_rich"]
    
    #equal chance of starting in either state (PI)
    initial_probs = [0.5,0.5]
    
    #T is the length of the observation sequence, so T = len(observations)
    T = len(observations)
    
    #K is the cardinality of the set of possible states that can be entered (S), so K = 2
    K = len(state_space)
    
    #N is the cardinality of the set of possible observations, so N = 4
    N = len(observation_space)
    
    #replace with whatever you want to be your data type here
    T1 = np.zeros((K, T), dtype = "int32")
    T2 = np.zeros((K, T), dtype = "int32")
    
    for index, state in enumerate(state_space):
        T1[i, 1] = initial_probs[i]
        T2[i, 1] = 0
        
    
    return A

In [66]:
viterbi_decode([1,2,3])

T1 value: [[0 0 0]
 [0 0 0]]
T2 value: [[0 0 0]
 [0 0 0]]


[[0.9998, 0.0002], [0.9997, 0.0003]]