In [1]:
import copy
import numpy as np
import pandas as pd

# Week 1: Introduction to Read Mapping

In [2]:
'''
Code Challenge: Solve the Trie Construction Problem.
Input: A collection of strings Patterns.
Output: The adjacency list corresponding to Trie(Patterns), in the following format. If Trie(Patterns) has n nodes, first label the
     root with 0 and then label the remaining nodes with the integers 1 through n - 1 in any order you like. Each edge of the
     adjacency list of Trie(Patterns) will be encoded by a triple: the first two members of the triple must be the integers labeling the
     initial and terminal nodes of the edge, respectively; the third member of the triple must be the symbol labeling the edge.
'''

def Strings_2_Trie(strings):
    if type(strings) == str:
        strings = strings.split('\n')
    
    added_strings = dict()
    trie   = dict()
    
    # add the first string in to trie
    string = strings[0]
    for position in range(len(string)):
        base           = string[position]
        trie[position] = {position + 1 : base}
        max_label     =  position + 2
    added_strings[strings[0]] = len(strings[0]) - 1
    
    for string in strings[1:]:
        label_delta   = 0
        start_position = 0
        jump_position  = 0
        
        # find the longest prefix of string which already in trie
        max_len = 0
        prefix_string = ''
        for added_string in added_strings.keys():
            for i in range(1, len(added_string) + 1):
                s = added_string[:i]
                if s == string[:len(s)]:
                    if i > max_len:
                        max_len       = i
                        prefix_string = s
               
        current_node = 0
        if prefix_string != '':
            for i in range(len(prefix_string)):
                for node, base in trie[current_node].items():
                    if base == prefix_string[i]:
                        current_node  = node
                        jump_position = node
                        break
                        
        # start to add the base which do not exist in trie                
        start_position = len(prefix_string)
        
        # jump_position is to control if the new string have any prefix in trie
        # label_delta is to contral the new node label
        
        for position in range(start_position, len(string)):
            base  = string[position]
            label = position + label_delta + jump_position - start_position
            
            if label in trie:
                
                if (label + 1) in trie[label]:
                    
                    if trie[label][label + 1] == base:
                        continue
                        
                    else:
                        trie[label][max_label] = base
                        if (max_label - label != 1):
                            label_delta = max_label - label - 1
                        max_label   = max_label + 1
                        if position == len(string) - 1:
                            added_strings[string] = position + label_delta + jump_position - start_position + 1
                else:
                    trie[label] = {max_label : base}
                    if (max_label - label != 1):
                        label_delta = max_label - label - 1
                    max_label   = max_label + 1
                    if position == len(string) - 1:
                        added_strings[string] = position + label_delta + jump_position - start_position + 1
            else:
                trie[label] = {max_label : base}
                if (max_label - label != 1):
                    label_delta = max_label - label - 1
                max_label   = max_label + 1
                if position == len(string) - 1:
                    added_strings[string] = position + label_delta + jump_position - start_position + 1
                    
    return(trie)

# Test
strings = '''ATAGA
ATC
GAT
ATCIO
GATUYEWQ
GAAT'''

graph = Strings_2_Trie(strings)
for key1, values1 in graph.items():
    for key2, value2 in values1.items():    
        print(str(key1) + '->' + str(key2) + ':' + value2)

0->1:A
0->7:G
1->2:T
2->3:A
2->6:C
3->4:G
4->5:A
7->8:A
8->9:T
8->17:A
6->10:I
10->11:O
9->12:U
12->13:Y
13->14:E
14->15:W
15->16:Q
17->18:T


In [3]:
'''
Code Challenge: Implement TrieMatching to solve the Multiple Pattern Matching Problem.
Input: A string Text and a collection of strings Patterns.
Output: All starting positions in Text where a string from Patterns appears as a substring.
'''

def Prefix_Trie_Matching(text, trie):
    node  = 0
    index = 0

    while index != len(text):
        base = text[index]
        if node in trie:
            check = 0
            for n, b in trie[node].items():
                if b == base:
                    node  = n
                    index = index + 1
                    check = 1
                    break
            if check == 0:
                return(0)      
        else: 
            return(1)
        
    return(1)

def Trie_Matching(text, patterns):
    if type(patterns) == str:
        trie = Strings_2_Trie(patterns)
    
    positions = []
    for i in range(len(text) - 1):
        check = Prefix_Trie_Matching(text[i:], trie)
        if check == 1:
            positions.append(i)
    
    return(positions)

# Test
text = 'AATCGGGTTCAATCGGGGT'
patterns = '''ATCG
GGGT'''

Trie_Matching(text, patterns)

[1, 4, 11, 15]

In [4]:
'''
Code Challenge: Solve the Suffix Tree Construction Problem.
Input: A string Text.
Output: The edge labels of SuffixTree(Text). You may return these strings in any order.
'''

def Suffix_Trie_Construction(text):
    trie      = {}
    positions = {}
    new_node  = 1
    
    for i in range(len(text)):
        current_node = 0
        for j in range(i, len(text)):
            current_symbol = text[j]
            
            check = 0
            if current_node in trie:
                for node, symbol_position in trie[current_node].items():
                    if symbol_position[0] == current_symbol:
                        current_node = node
                        check        = 1
                        break            
            if check == 0:
                if current_node in trie:
                    trie[current_node][new_node] = [current_symbol, j]
                    #current_node = new_node
                    #new_node     = new_node + 1
                else:
                    trie[current_node] = {new_node : [current_symbol, j]}
                current_node = new_node
                new_node     = new_node + 1
                
        if current_node not in trie:
            positions[current_node] = i
    
    return(trie, positions)
                

def CountInOut(graph):
    in_count  = {}
    out_count = {}
    
    for key_1, values_1 in graph.items():
        out_count[key_1] = len(values_1)
        
        for key_2, values_2 in values_1.items():
            in_count[key_2] = 1
            
    in_count[0] = 0
    for key in in_count.keys():
        if key not in out_count:
            out_count[key] = 0
            
    return(in_count, out_count)

def Maximal_NonBranching_Paths(graph):
    paths = []
    
    in_count, out_count = CountInOut(graph)
    
    for node in in_count.keys():
        if (in_count.get(node) != 1) | (out_count.get(node) != 1):
            if out_count[node] > 0:
                for n, base in graph[node].items():
                    non_branching_path = base[0]
                    next_node = n
                    
                    while (in_count[next_node] == out_count[next_node] == 1):
                        non_branching_path = non_branching_path + list(graph[next_node].values())[0][0]
                        next_node          = list(graph[next_node].keys())[0]
                    paths.append(non_branching_path)
    return(paths)
# Test
text = 'ATATCGTTTTATCGTT$'

graph, positions = Suffix_Trie_Construction(text)
Maximal_NonBranching_Paths(graph)

['ATCGTT',
 'CGTT',
 'T',
 '$',
 'ATCGTTTTATCGTT$',
 'CGTT',
 'T',
 'ATCGTT$',
 '$',
 'TTATCGTT$',
 '$',
 'TTATCGTT$',
 '$',
 'TTATCGTT$',
 '$',
 'TTATCGTT$',
 '$',
 'TTATCGTT$',
 '$',
 'TATCGTT$',
 'ATCGTT$',
 'AT',
 'T',
 'CGTT',
 'GTT',
 '$']

In [5]:
def Modified_Suffix_Trie_Construction(text):
    trie      = {}
    positions = {}
    new_node  = 1
    
    for i in range(len(text)):
        current_node = 0
        for j in range(i, len(text)):
            current_symbol = text[j]
            
            check = 0
            if current_node in trie:
                for node, symbol_position in trie[current_node].items():
                    if symbol_position[0] == current_symbol:
                        current_node = node
                        check        = 1
                        break            
            if check == 0:
                if current_node in trie:
                    trie[current_node][new_node] = [current_symbol, j]

                else:
                    trie[current_node] = {new_node : [current_symbol, j]}
                current_node = new_node
                new_node     = new_node + 1
                
        if current_node not in trie:
            positions[current_node] = i
    
    paths = []
    
    in_count, out_count = CountInOut(trie)
    
    for node in in_count.keys():
        if (in_count.get(node) != 1) | (out_count.get(node) != 1):
            if out_count[node] > 0:
                
                for n, base in trie[node].items():
                    non_branching_path = base[0]
                    next_node = n
                    
                    while (in_count[next_node] == out_count[next_node] == 1):
                        non_branching_path = non_branching_path + list(trie[next_node].values())[0][0]
                        next_node_tmp      = list(trie[next_node].keys())[0]
                        length             = list(trie[next_node].values())[0][1]
                        trie.pop(next_node)
                        next_node          = next_node_tmp

                    if n!= next_node:
                        if n in trie:
                            trie[n][next_node] = [non_branching_path, base[1],length + 1 - base[1]]
                        else:
                            trie[n] = {next_node : [non_branching_path, base[1],length + 1 - base[1]]}

                    paths.append(non_branching_path)

    tmp_trie = {}
    
    for node_1, key_1 in trie.items():
        if len(key_1) != 1: 
            tmp_trie[node_1] = copy.deepcopy(key_1)

    for node_1, key_1 in tmp_trie.items():
        for node_2, key_2 in key_1.items():

            if node_2 in trie:
                if len(trie[node_2]) == 1:
                    trie[node_1].pop(node_2)
                    trie[node_1][list(trie[node_2].keys())[0]] = list(trie[node_2].values())[0]
                    trie.pop(node_2)
    
    return(trie)

In [6]:
'''
Longest Repeat Problem: Find the longest repeat in a string.
Input: A string Text.
Output: A longest substring of Text that appears in Text more than once.

Code Challenge: Solve the Longest Repeat Problem. (Multiple solutions may exist, in which case you may return any one.)
'''


def Longest_Repeat_in_String(string):
    trie = Modified_Suffix_Trie_Construction(string + '$')

    end_nodes      = []
    backward_graph = {}
    repeat_strings = []
    
    # find all the node have at least two sub-node, and make a backward graph for them
    for key_1, value_1 in trie.items():
        for key_2,value_2 in value_1.items():
            if key_2 in trie:
                if len(trie[key_2]) > 1:
                    end_nodes.append(key_2)
                    backward_graph[key_2] = [key_1, value_2[0]]
    
    # backtrack
    for node in backward_graph.keys():
        repeat_string = ''
        while node != 0:
            repeat_string = backward_graph[node][1] + repeat_string
            node          = backward_graph[node][0]
            repeat_strings.append(repeat_string)
    
    # find the longest repeat string
    max_len = -1
    for string in repeat_strings:
        string_len = len(string)
        if string_len > max_len:
            max_len        = string_len
            longest_string = string


    return(longest_string)

#Test
string = 'ATATCGTTTTATCGTT'

Longest_Repeat_in_String(string)

'TATCGTT'

In [7]:
'''
Longest Shared Substring Problem: Find the longest substring shared by two strings.
Input: Strings Text1 and Text2.
Output: The longest substring that occurs in both Text1 and Text2.

Code Challenge: Solve the Longest Shared Substring Problem. (Multiple solutions may exist, in which case you may return any one.)
'''

def Longest_Shared_Substring(string_1, string_2):
    text = string_1 + '#' + string_2 + '$'
    trie = Modified_Suffix_Trie_Construction(text)
    in_count, out_count = CountInOut(trie)
    classification = {} # 1 for string_1, 2 for string_2, 3 for both
    
    # classify each node from leaf
    while len(in_count) != len(classification):
        for key_1, value_1 in trie.items():
            if key_1 not in classification:
                nodes_class = []
                for key_2, value_2 in value_1.items():
                    if '$' in value_2[0]:
                        if '#' in value_2[0]:
                            classification[key_2] = 1
                        else:
                            classification[key_2] = 2
                    nodes_class.append(classification.get(key_2, -1))
                if -1 not in nodes_class:
                    if 3 in nodes_class:
                        classification[key_1] = 3
                        
                    elif (1 in nodes_class) & (2 in nodes_class):
                        classification[key_1] = 3
                        
                    else:
                        classification[key_1] = nodes_class[0]
    
    # make a backward graph for shared_edge, which if good to construct shared path
    shared_edge_back = {}
    for key_1, values_1 in trie.items():
        if classification[key_1] == 3:
            for key_2, values_2 in values_1.items():
                if classification[key_2] == 3:
                    shared_edge_back[key_2] = [key_1, values_2[0]]
    
    # find all shared path
    shared_paths = []
    for key in shared_edge_back.keys():
        shared_path = ''
        node = key
        while node != 0:
            pattern = shared_edge_back[node][1]
            shared_path = pattern + shared_path
            node = shared_edge_back[node][0]
            
        shared_paths.append(shared_path)
            
    # find the longest shared path
    max_len = -1
    for string in shared_paths:
        string_len = len(string)
        if string_len > max_len:
            max_len        = string_len
            longest_string = string
            
    return(longest_string)

# Test
string_1 = 'TCGGTAGATTGCGCCCACTC'
string_2 = 'AGGGGCTCGCAGTGTAAGAA'

Longest_Shared_Substring(string_1, string_2)

'TCG'

In [8]:
'''
Shortest Non-Shared Substring Problem: Find the shortest substring of one string that does not appear in another string.
Input: Strings Text1 and Text2.
Output: The shortest substring of Text1 that does not appear in Text2.

Code Challenge: Solve the Shortest Non-Shared Substring Problem. (Multiple solutions may exist, in which case you may return any one.)
'''

def Shortest_Non_Shared_Substring(string_1, string_2):
    
    trie = Suffix_Trie_Construction(string_2)[0]
    min_len = float('Inf')

    for i in range(len(string_1)):
        string     = string_1[i:]
        tmp_string = copy.deepcopy(string)
        
        node   = 0
        length = 0
        check  = 1

        while (node in trie) & (check == 1):
            check = 0

            for key, value in trie[node].items():

                if len(tmp_string) > 0:

                    if value[0] == tmp_string[0]:
                        length = length + 1
                        node   = key
                        tmp_string = tmp_string[1:]
                        check = 1
                    
        if len(string) > length:
            if length < min_len:
                min_len = length
                non_shared_string = string[ : length + 1 ]
            
                
    return(non_shared_string)

# Test
string_1 = 'CCAAGCTGCTAGAGG'
string_2 = 'CATGCTGGGCTGGCT'

Shortest_Non_Shared_Substring(string_1, string_2)

'CC'

# Week 2: The Burrows-Wheeler Transform

In [9]:
'''
Suffix Array Construction Problem: Construct the suffix array of a string.
Input: A string Text.
Output: SuffixArray(Text).

Code Challenge: Solve the Suffix Array Construction Problem.
'''

def Suffix_Array_Construction(string):
    char_indexs = []
    indexs      = []
    for i in range(len(string)):
        char_indexs.append([string[i:], i])
        
    char_indexs.sort()
    
    for char_index in char_indexs:
        indexs.append(char_index[1])
        
    return(indexs)

string = 'AACGATAGCGGTAGA$'

Suffix_Array_Construction(string)

[15, 14, 0, 1, 12, 6, 4, 2, 8, 13, 3, 7, 9, 10, 11, 5]

In [10]:
'''
Burrows-Wheeler Transform Construction Problem: Construct the Burrows-Wheeler transform of a string.
Input: A string Text.
Output: BWT(Text).

Code Challenge: Solve the Burrows-Wheeler Transform Construction Problem.
'''

def BWT_Construction(string):
    string_rotation = []
    BWT = ''
    
    for i in range(len(string)):
        string = string[-1] + string[:-1]
        string_rotation.append(string)
    
    string_rotation.sort()
    
    for characters in string_rotation:
        BWT = BWT + characters[-1]
    
    return(BWT)

# Test
string = 'GCGTGCCTGGTCA$'

BWT_Construction(string)

'ACTGGCT$TGCGGC'

In [11]:
'''
Inverse Burrows-Wheeler Transform Problem: Reconstruct a string from its Burrows-Wheeler transform.
Input: A string Transform (with a single "$" symbol).
Output: The string Text such that BWT(Text) = Transform.

Code Challenge: Solve the Inverse Burrows-Wheeler Transform Problem.
'''

def Inverse_BWT(BWT_string):

    BWT_string = list(BWT_string)
    first_col  = copy.deepcopy(BWT_string)
    first_col.sort()
    
    inverse_BWT_list = []

    for base in "ACGT":
        count = 0
        for index in range(len(BWT_string)):
            if BWT_string[index] == base:
                BWT_string[index] = base + str(count)
                count = count + 1
    
    for base in "ACGT":
        count = 0
        for index in range(len(first_col)):
            if first_col[index] == base:
                first_col[index] = base + str(count)
                count = count + 1
                
    char = '$'
    inverse_BWT_list.append(char)

    while len(inverse_BWT_list) != len(BWT_string):
        index = BWT_string.index(char)
        char  = first_col[index]
        inverse_BWT_list.append(char)

    inverse_BWT_list = inverse_BWT_list[1:] + list(inverse_BWT_list[0])
    inverse_BWT_list = ''.join(inverse_BWT_list)
    inverse_BWT = ''
    for char in inverse_BWT_list:
        if not char.isdigit():
            inverse_BWT = inverse_BWT + char
    
    return(inverse_BWT)

# Test
BWT_string = 'TTCCTAACG$A'

Inverse_BWT(BWT_string)

'TACATCACGT$'

In [12]:
'''
Code Challenge: Implement BWMatching.
Input: A string BWT(Text), followed by a collection of Patterns.
Output: A list of integers, where the i-th integer corresponds to the number of substring matches of the i-th member of Patterns in Text.
'''

def BW_Matching(BWT_string, patterns):
    
    if type(patterns) == str:
        patterns = patterns.split(' ')
  
    last_col = list(BWT_string)
    first_col  = copy.deepcopy(last_col)
    first_col.sort()

    for base in "ACGT":
        count = 0
        for index in range(len(last_col)):
            if last_col[index] == base:
                last_col[index] = base + str(count)
                count = count + 1

    for base in "ACGT":
        count = 0
        for index in range(len(first_col)):
            if first_col[index] == base:
                first_col[index] = base + str(count)
                count = count + 1

    n_matchs = []
    
    for pattern in patterns:

        top    = 0
        bottom = len(last_col) - 1
        
        while True:

            if len(pattern) > 0:
                symbol  = pattern[-1]
                pattern = pattern[:-1]

                top_index = float('Inf')
                for index in range(top, bottom + 1):
                    current_symbol = last_col[index][0]

                    if current_symbol == symbol:
                        if index < top_index:
                            top_index = index
                        bottom_index = index
                        
                if top_index == float('Inf'): # if there is no match
                    n_matchs.append(0)
                    break
                    
                else: 
                    top    = first_col.index(last_col[top_index])
                    bottom = first_col.index(last_col[bottom_index])
            else:
                mathchs = bottom - top + 1
                n_matchs.append(mathchs)
                break
                
    return(n_matchs)

# Test
BWT_string = 'TCCTCTATGAGATCCTATTCTATGAAACCTTCA$GACCAAAATTCTCCGGC'
patterns = 'CCT CAC GAG CAG ATC'

BW_Matching(BWT_string, patterns)

[2, 1, 1, 0, 1]

# Week 3: Speeding Up Burrows-Wheeler Read Mapping

In [13]:
def Inverse_BWT_number(BWT_string):

    BWT_string = list(BWT_string)
    first_col  = copy.deepcopy(BWT_string)
    first_col.sort()
    
    inverse_BWT_list = []

    for base in "ACGT":
        count = 0
        for index in range(len(BWT_string)):
            if BWT_string[index] == base:
                BWT_string[index] = base + str(count)
                count = count + 1
    
    for base in "ACGT":
        count = 0
        for index in range(len(first_col)):
            if first_col[index] == base:
                first_col[index] = base + str(count)
                count = count + 1
                
    char = '$'
    inverse_BWT_list.append(char)

    while len(inverse_BWT_list) != len(BWT_string):
        index = BWT_string.index(char)
        char  = first_col[index]
        inverse_BWT_list.append(char)

    inverse_BWT_list = inverse_BWT_list[1:] + list(inverse_BWT_list[0])
    
    return(inverse_BWT_list)

In [14]:
'''
Code Challenge: Solve the Multiple Pattern Matching Problem.
Input: A string Text followed by a collection of strings Patterns.
Output: All starting positions in Text where a string from Patterns appears as a substring.
'''

def BW_Matching_position(string, patterns):
    
    if type(patterns) == str:
        patterns = patterns.split(' ')
    
    string = string + '$'
    BWT_string = BWT_Construction(string)

    inverse_BWT_list = Inverse_BWT_number(BWT_string)
    
    last_col  = list(BWT_string)
    first_col = copy.deepcopy(last_col)
    first_col.sort()
    
    inverse_BWT_list = Inverse_BWT_number(BWT_string)
    
    for base in "ACGT":
        count = 0
        for index in range(len(last_col)):
            if last_col[index] == base:
                last_col[index] = base + str(count)
                count = count + 1

    for base in "ACGT":
        count = 0
        for index in range(len(first_col)):
            if first_col[index] == base:
                first_col[index] = base + str(count)
                count = count + 1

    positions = []
    
    for pattern in patterns:

        top    = 0
        bottom = len(last_col) - 1
        
        while True:

            if len(pattern) > 0:
                symbol  = pattern[-1]
                pattern = pattern[:-1]

                top_index = float('Inf')
                for index in range(top, bottom + 1):
                    current_symbol = last_col[index][0]

                    if current_symbol == symbol:
                        if index < top_index:
                            top_index = index
                        bottom_index = index
                        
                if top_index == float('Inf'): # if there is no match

                    break
                    
                else: 
                    top    = first_col.index(last_col[top_index])
                    bottom = first_col.index(last_col[bottom_index])
            else:
                
                for i in range(top, bottom + 1):
                    
                    position = inverse_BWT_list.index(first_col[i])
                    positions.append(position)
                    
                break
                
    return(positions)


# Test
string = 'AATCGGGTTCAATCGGGGT'
patterns = 'ATCG GGGT'

BW_Matching_position(string, patterns)

[11, 1, 15, 4]

In [15]:
'''
Code Challenge: Solve the Multiple Approximate Pattern Matching Problem.
Input: A string Text, followed by a collection of strings Patterns, and an integer d.
Output: All positions where one of the strings in Patterns appears as a substring of Text with at most d mismatches.
'''

def BW_Matching_d_Mismatch(string, patterns, d):
    
    if type(patterns) == str:
        patterns = patterns.split(' ')

    string = string + '$'
    BWT_string = BWT_Construction(string)

    inverse_BWT_list = Inverse_BWT_number(BWT_string)

    last_col = list(BWT_string)
    first_col  = copy.deepcopy(last_col)
    first_col.sort()

    inverse_BWT_list = Inverse_BWT_number(BWT_string)

    for base in "ACGT":
        count = 0
        for index in range(len(last_col)):
            if last_col[index] == base:
                last_col[index] = base + str(count)
                count = count + 1

    for base in "ACGT":
        count = 0
        for index in range(len(first_col)):
            if first_col[index] == base:
                first_col[index] = base + str(count)
                count = count + 1

    positions = []

    for pattern in patterns:

        candidate_first_col = list(range(len(first_col)))

        n_mistake = {}
        
        for candidate in candidate_first_col:
            n_mistake[candidate] = 0

        while True:

            if len(pattern) > 1:

                symbol  = pattern[-1]
                pattern = pattern[:-1]

                next_candidate_first_col = []
                next_n_mistake           = {}

                for index in candidate_first_col:

                    current_symbol = first_col[index][0]

                    if current_symbol == symbol:
                        next_index = first_col.index(last_col[index])
                        next_candidate_first_col.append(next_index)
                        next_n_mistake[next_index] = int(n_mistake[index])

                    else:

                        n_mistake[index] = n_mistake[index] + 1
                        if n_mistake[index] <= d:
                            next_index = first_col.index(last_col[index])
                            next_candidate_first_col.append(next_index)
                            next_n_mistake[next_index] = int(n_mistake[index])

                candidate_first_col = next_candidate_first_col
                n_mistake           = next_n_mistake

            elif len(pattern) == 1:
                candidate_first_col = next_candidate_first_col
                n_mistake           = next_n_mistake

                symbol  = pattern[-1]
                pattern = pattern[:-1]

                next_candidate_first_col = []
                next_n_mistake           = {}

                for index in candidate_first_col:

                    current_symbol = first_col[index][0]

                    if current_symbol == symbol:

                        next_candidate_first_col.append(index)
                        next_n_mistake[next_index] = int(n_mistake[index])

                    else:

                        n_mistake[index] = n_mistake[index] + 1
                        if n_mistake[index] <= d:

                            next_candidate_first_col.append(index)
                            next_n_mistake[next_index] = int(n_mistake[index])

            else:

                for i in next_candidate_first_col:
                    
                    position = inverse_BWT_list.index(first_col[i])
                    positions.append(position)
                    
                break

    return(positions)

# Test
string   = 'ACATGCTACTTT'
patterns = 'ATT GCC GCTA TATT'
d = 1

BW_Matching_d_Mismatch(string, patterns, d)

[2, 9, 8, 7, 4, 4, 6]

# Week 4: Introduction to Hidden Markov Models

In [16]:
'''
CODE CHALLENGE: Solve the Probability of a Hidden Path Problem.
Given: A hidden path π followed by the states States and transition matrix Transition of an HMM
     (Σ, States, Transition, Emission).
Return: The probability of this path, Pr(π).

Note: You may assume that transitions from the initial state occur with equal probability.
'''

def Probability_of_Hidden_Path(hidden_path, states, transition_matrix):
    
    if type(states) == str:
        states = states.split(' ')

    transition_matrix = transition_matrix.split('\n')

    for i in range(len(transition_matrix)):
        transition_matrix[i] = transition_matrix[i].split('\t')
        transition_matrix[i] = list(map(float, transition_matrix[i]))

    transition_matrix = pd.DataFrame(transition_matrix, columns = states, index = states)

    probability = 0.5
    for i in range(len(hidden_path) - 1):
        current_state = hidden_path[i]
        next_state    = hidden_path[i + 1]
        probability   = probability * transition_matrix.at[current_state,next_state]

    return(probability)

# Test
hidden_path = 'ABABBBAAAA'
states = 'A B'
transition_matrix = '''0.377	0.623
0.26	0.74'''

Probability_of_Hidden_Path(hidden_path, states, transition_matrix)

0.00038492869175467582

In [17]:
'''
CODE CHALLENGE: Solve the Probability of an Outcome Given a Hidden Path Problem.
Input: A string x, followed by the alphabet from which x was constructed, followed by
     a hidden path π, followed by the states States and emission matrix Emission of an HMM
     (Σ, States, Transition, Emission).
Output: The conditional probability Pr(x|π) that x will be emitted given that the HMM
     follows the hidden path π.

Note: You may assume that transitions from the initial state occur with equal probability.
'''

def Probability_of_x_by_Hidden_Path(string_x, xs, hidden_path, states, states_emission_matrix):
    
    if type(xs) == str:
        xs = xs.split(' ')
        
    if type(states) == str:
        states = states.split(' ')
        
    states_emission_matrix = states_emission_matrix.split('\n')

    for i in range(len(states_emission_matrix)):
        states_emission_matrix[i] = states_emission_matrix[i].split('\t')
        states_emission_matrix[i] = list(map(float, states_emission_matrix[i]))
        
    states_emission_matrix = pd.DataFrame(states_emission_matrix, columns = xs, index = states)
    
    probability = 1
    for i in range(len(string_x)):
        x     = string_x[i]
        state = hidden_path[i]

        probability = probability * states_emission_matrix.at[state, x]
    
    return(probability)

# Test
string_x = 'zzzyxyyzzx'
xs = 'x y z'
hidden_path = 'BAAAAAAAAA'
states = 'A B'
states_emission_matrix = '''0.176	0.596	0.228
0.225	0.572	0.203'''

Probability_of_x_by_Hidden_Path(string_x, xs, hidden_path, states, states_emission_matrix)

3.5974895474624624e-06

In [18]:
'''
CODE CHALLENGE: Implement the Viterbi algorithm solving the Decoding Problem.
Input: A string x, followed by the alphabet from which x was constructed,
     followed by the states States, transition matrix Transition, and emission matrix
     Emission of an HMM (Σ, States, Transition, Emission).
Output: A path that maximizes the (unconditional) probability Pr(x, π) over all possible paths π.

Note: You may assume that transitions from the initial state occur with equal probability.
'''

def Viterbi_algorithm(string_x, xs, states, transition_matrix, states_emission_matrix):
    
    # formating all parameters
    if type(xs) == str:
        xs = xs.split(' ')
        
    if type(states) == str:
        states = states.split(' ')
    
    transition_matrix      = transition_matrix.split('\n')
    for i in range(len(transition_matrix)):
        transition_matrix[i] = transition_matrix[i].split('\t')
        transition_matrix[i] = list(map(float, transition_matrix[i]))
        
    transition_matrix = pd.DataFrame(transition_matrix, columns = states, index = states)
    
    states_emission_matrix = states_emission_matrix.split('\n')
    for i in range(len(states_emission_matrix)):
        states_emission_matrix[i] = states_emission_matrix[i].split('\t')
        states_emission_matrix[i] = list(map(float, states_emission_matrix[i]))
        
    states_emission_matrix = pd.DataFrame(states_emission_matrix, columns = xs, index = states)
    
    probability_matrix = np.full([len(states), len(string_x)], 0, float)
    probability_matrix = pd.DataFrame(probability_matrix, columns = list(range(len(string_x))), index = states)
    
    # initailize matrix
    for state in states:
        probability_matrix.at[state, 0] = 0.5 * states_emission_matrix.at[state, string_x[0]]
    
    # dynanmic programing
    for string_index in range(1, len(string_x)):
        emission = string_x[string_index]
        
        for state in states:
            max_prob = -float('Inf')
            
            for state_index in range(len(states)):
                prob = probability_matrix.at[states[state_index], string_index - 1] * transition_matrix.at[states[state_index], state] * states_emission_matrix.at[state, emission]
                if prob > max_prob:
                    max_prob = prob
                    
            probability_matrix.at[state, string_index] = max_prob
    
    # find max probability
    max_prob = -float('Inf')
    for state in states:
        if probability_matrix.at[state, len(string_x) - 1] > max_prob:
            max_prob  = probability_matrix.at[state, len(string_x) - 1]
            max_state = state
    
    # backtrack
    hidden_path = max_state
    for string_index in range(len(string_x) - 1, 0, -1):
        emission = string_x[string_index]

        for state in states:
            if probability_matrix.at[max_state, string_index] == probability_matrix.at[state, string_index - 1] * transition_matrix.at[state, max_state] * states_emission_matrix.at[max_state, emission]:
                max_state = state

                break

        hidden_path = max_state + hidden_path
 
    return(hidden_path)

# Test
string_x = 'zxxxxyzzxyxyxyzxzzxzzzyzzxxxzxxyyyzxyxzyxyxyzyyyyzzyyyyzzxzxzyzzzzyxzxxxyxxxxyyzyyzyyyxzzzzyzxyzzyyy'
xs = 'x y z'
states = 'A B'

transition_matrix = '''0.634	0.366
0.387	0.613'''

states_emission_matrix = '''0.532	0.226	0.241
0.457	0.192	0.351'''

Viterbi_algorithm(string_x, xs, states, transition_matrix, states_emission_matrix)

'AAAAAAAAAAAAAABBBBBBBBBBBAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBAAA'

In [19]:
'''
CODE CHALLENGE: Solve the Outcome Likelihood Problem.
Input: A string x, followed by the alphabet from which x was constructed,
     followed by the states States, transition matrix Transition, and emission matrix
     Emission of an HMM (Σ, States, Transition, Emission).
Output: The probability Pr(x) that the HMM emits x.

Note: You may assume that transitions from the initial state occur with equal probability.
'''

def probability_of_HMM_emits(string_x, xs, states, transition_matrix, states_emission_matrix):
    # formating all parameters
    if type(xs) == str:
        xs = xs.split(' ')
        
    if type(states) == str:
        states = states.split(' ')
    
    transition_matrix      = transition_matrix.split('\n')
    for i in range(len(transition_matrix)):
        transition_matrix[i] = transition_matrix[i].split('\t')
        transition_matrix[i] = list(map(float, transition_matrix[i]))
        
    transition_matrix = pd.DataFrame(transition_matrix, columns = states, index = states)
    
    states_emission_matrix = states_emission_matrix.split('\n')
    for i in range(len(states_emission_matrix)):
        states_emission_matrix[i] = states_emission_matrix[i].split('\t')
        states_emission_matrix[i] = list(map(float, states_emission_matrix[i]))
        
    states_emission_matrix = pd.DataFrame(states_emission_matrix, columns = xs, index = states)
    
    probability_matrix = np.full([len(states), len(string_x)], 0, float)
    probability_matrix = pd.DataFrame(probability_matrix, columns = list(range(len(string_x))), index = states)
    
    # initailize matrix
    for state in states:
        probability_matrix.at[state, 0] = 0.5 * states_emission_matrix.at[state, string_x[0]]
    
    # fill the matrix
    for string_index in range(1, len(string_x)):
        emission = string_x[string_index]
        
        for state in states:
            sum_prob = 0
            
            for state_index in range(len(states)):
                prob = probability_matrix.at[states[state_index], string_index - 1] * transition_matrix.at[states[state_index], state] * states_emission_matrix.at[state, emission]
                sum_prob = sum_prob + prob
                    
            probability_matrix.at[state, string_index] = sum_prob
    
    probability = sum(probability_matrix[len(string_x) - 1])
    
    return(probability)

# Test
string_x = 'zyzzyyyzzzxxzxxzzxxxxxxzyyyyxzzyxzxzxyxyzyyxzxzxxzxzzzzzzzyxxyzzzzzxxzxyxyzxyyyzxxyzxzxzyyyzyxzzxxzz'
xs = 'x y z'
states = 'A B'

transition_matrix = '''0.597	0.403
0.268	0.732'''

states_emission_matrix = '''0.427	0.268	0.305
0.226	0.32	0.454'''

probability_of_HMM_emits(string_x, xs, states, transition_matrix, states_emission_matrix)

7.04227143648133e-48

# Week 5: Profile HMMs for Sequence Alignment

In [20]:
'''
CODE CHALLENGE: Solve the Profile HMM Problem.
Input: A threshold θ, followed by an alphabet Σ, followed by a multiple alignment
     Alignment whose strings are formed from Σ.
Output: The transition matrix followed by the emission matrix of HMM(Alignment, θ).
'''

def HMM_profile(threshold, alphabet, alignment):
    
    # formatting inputs
    if type(alphabet) == str:
        alphabet = alphabet.split(' ')

    if type(alignment) == str:
        alignment = alignment.split('\n')

    for i in range(len(alignment)):
        alignment[i] = list(alignment[i])
    alignment = np.array(alignment)

    # find insertion columns by threshold
    insertion_col = []
    for j in range(alignment.shape[1]):
        col   = alignment[:, j]
        count = 0
        for i in range(alignment.shape[0]):
            if alignment[i, j] == '-':
                count = count + 1
        if (count / alignment.shape[0]) >= threshold:
            insertion_col.append(j)

    n_M = len(alignment[0]) - len(insertion_col)
    
    # build transition_matrix and emission_matrix
    transition_col = ['S', 'I0']
    for n in range(1, n_M + 1):
        for node in ['M', 'D', 'I']:
            transition_col.append(node + str(n))
    transition_col.append('E')

    transition_matrix = np.full([len(transition_col) , len(transition_col)], 0, float)
    transition_matrix = pd.DataFrame(transition_matrix, columns = transition_col, index = transition_col)

    emission_matrix = np.full([len(transition_col), len(alphabet)], 0, float)
    emission_matrix = pd.DataFrame(emission_matrix, columns = alphabet, index = transition_col)

    # fill transition_matrix and emission_matrix by count
    for string in alignment:

        states = []
        M_count = 0

        for i in range(len(string)):
            base = string[i]
            if i in insertion_col:
                if base != '-':
                    state = 'I' + str(M_count)
                    states.append(state)
                    emission_matrix.at[state, string[i]] = emission_matrix.at[state, string[i]] + 1

            elif base != '-':
                M_count = M_count + 1
                state   = 'M' + str(M_count)
                states.append(state)
                emission_matrix.at[state, string[i]] = emission_matrix.at[state, string[i]] + 1

            else:
                M_count = M_count + 1
                state   = 'D' + str(M_count)
                states.append(state)

        current_state = 'S'

        for state in states:
            transition_matrix.at[current_state, state] = transition_matrix.at[current_state, state] + 1
            current_state = state

        transition_matrix.at[current_state, 'E'] = transition_matrix.at[current_state, 'E'] + 1

    # normalize transition_matrix and emission_matrix    
    for i in range(len(transition_col)):
        if sum(transition_matrix.iloc[i]) != 0:
            transition_matrix.iloc[i] = transition_matrix.iloc[i] / sum(transition_matrix.iloc[i])

        if sum(emission_matrix.iloc[i]) != 0:
            emission_matrix.iloc[i] = emission_matrix.iloc[i] / sum(emission_matrix.iloc[i])
            
    return(transition_matrix, emission_matrix)

# Test
threshold = 0.289
alphabet  = 'A B C D E'
alignment = '''DCDABACED
DCCA--CA-
DCDAB-CA-
BCDA---A-
BC-ABE-AE'''

transition_matrix, emission_matrix = HMM_profile(threshold, alphabet, alignment)
print(transition_matrix.to_string())
print(emission_matrix.to_string())

      S   I0   M1   D1   I1   M2   D2   I2   M3   D3   I3   M4   D4   I4   M5   D5   I5    E
S   0.0  0.0  1.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
I0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
M1  0.0  0.0  0.0  0.0  0.0  1.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
D1  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
I1  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
M2  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.8  0.2  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
D2  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
I2  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0
M3  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  1.0  0.0  0.0  0.0  0.0  0.0  0.0
D3  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  1.0  0.0  0

In [21]:
'''
CODE CHALLENGE: Solve the Profile HMM with Pseudocounts Problem.
Input: A threshold θ and a pseudocount σ, followed by an alphabet Σ, followed by a
     multiple alignment Alignment whose strings are formed from Σ.
Output: The transition and emission matrices of HMM(Alignment, θ, σ).
'''

def HMM_profile_pseudocount(threshold, pseudocount, alphabet, alignment):
    
    transition_matrix, emission_matrix = HMM_profile(threshold, alphabet, alignment)
    transition_col = list(transition_matrix.columns)
    n_M = 0
    for state in transition_col:
        if state[0] == 'M':
            n_M = n_M + 1
    
    # pseudocount
    transition_matrix.iloc[0,1:4] = transition_matrix.iloc[0,1:4] / sum(transition_matrix.iloc[0,1:3]) + pseudocount
    transition_matrix.iloc[0,1:4] = transition_matrix.iloc[0,1:4] / sum(transition_matrix.iloc[0,1:4])
    
    for i in range(1, len(transition_col[:-1])):
        state = transition_col[i]
        if sum(transition_matrix.iloc[i]) != 0 :
            transition_matrix.iloc[i] = transition_matrix.iloc[i] / sum(transition_matrix.iloc[i])
        
        state_I = 'I' + str(state[1:])
        state_M = 'M' + str(int(state[1:]) + 1)
        state_D = 'D' + str(int(state[1:]) + 1)

        if int(state[1:]) < n_M:
            transition_matrix.at[state, state_I] = transition_matrix.at[state, state_I] + pseudocount
            transition_matrix.at[state, state_M] = transition_matrix.at[state, state_M] + pseudocount
            transition_matrix.at[state, state_D] = transition_matrix.at[state, state_D] + pseudocount
        else:
            transition_matrix.at[state, state_I] = transition_matrix.at[state, state_I] + pseudocount
            transition_matrix.at[state, 'E']     = transition_matrix.at[state, 'E']     + pseudocount

        transition_matrix.iloc[i] = transition_matrix.iloc[i] / sum(transition_matrix.iloc[i])
        
        if state[0] != 'D':
            if sum(emission_matrix.iloc[i]) != 0:
                emission_matrix.iloc[i] = (emission_matrix.iloc[i]  / sum(emission_matrix.iloc[i])) + pseudocount
            else:
                emission_matrix.iloc[i] = emission_matrix.iloc[i] + pseudocount
                
            emission_matrix.iloc[i] = emission_matrix.iloc[i]  / sum(emission_matrix.iloc[i])
            
    return(transition_matrix, emission_matrix)

# Test
threshold   = 0.358
pseudocount = 0.01
alphabet  = 'A B C D E'
alignment = '''A-A
ADA
ACA
A-C
-EA
D-A'''

transition_matrix, emission_matrix = HMM_profile_pseudocount(threshold, pseudocount, alphabet, alignment)
print(transition_matrix.to_string())
print(emission_matrix.to_string())

      S        I0        M1        D1        I1        M2        D2        I2         E
S   0.0  0.008130  0.821138  0.170732  0.000000  0.000000  0.000000  0.000000  0.000000
I0  0.0  0.333333  0.333333  0.333333  0.000000  0.000000  0.000000  0.000000  0.000000
M1  0.0  0.000000  0.000000  0.000000  0.398058  0.592233  0.009709  0.000000  0.000000
D1  0.0  0.000000  0.000000  0.000000  0.980583  0.009709  0.009709  0.000000  0.000000
I1  0.0  0.000000  0.000000  0.000000  0.009709  0.980583  0.009709  0.000000  0.000000
M2  0.0  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.009804  0.990196
D2  0.0  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.500000  0.500000
I2  0.0  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.500000  0.500000
E   0.0  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000
           A         B         C         D         E
S   0.000000  0.000000  0.000000  0.000000  0.000000
I0  0.200000  

In [22]:
'''
CODE CHALLENGE: Solve the Sequence Alignment with Profile HMM Problem.
Input: A string x followed by a threshold θ and a pseudocount σ, followed by an
     alphabet Σ, followed by a multiple alignment Alignment whose strings are formed from Σ. 
Output: An optimal hidden path emitting x in HMM(Alignment, θ, σ).
'''
def HMM_sequence_alignment(string, threshold, pseudocount, alphabet, alignment):
    
    string = list(string)
    transition_matrix, states_emission_matrix = HMM_profile_pseudocount(threshold, pseudocount, alphabet, alignment)
    transition_col = list(transition_matrix.columns)
    
    # give emission a number to avoid replication
    for emission in list(set(string)):
        count = 0
        for i in range(len(string)):
            if string[i] == emission:
                string[i] = string[i] + str(count)
                count     = count + 1
    
    
    probability_matrix = np.full([len(transition_col), len(string) + 1], 0, float)
    probability_matrix = pd.DataFrame(probability_matrix, index = transition_col, columns = ['0'] + list(string))

    # fill the first colum
    probability_matrix.at['S', '0'] = 1
    for state in transition_col:
        if state[0] != 'D' :
            prob =  probability_matrix.at['S', '0'] * transition_matrix.at['S', state] * states_emission_matrix.at[state, string[0][0]]
        else:
            prob = -float('Inf')
            current_D_index = transition_col.index(state)
            for prev_state_index in range(current_D_index):
                prev_state = transition_col[prev_state_index]
                prob_D = probability_matrix.at['S', string[0]] * transition_matrix.at['S', state]

                if prob_D > prob:
                    prob = prob_D
            
        probability_matrix.at[state, string[0]] = prob

    # dynamic programing
    for string_index in range(1, len(string)):
        emission = string[string_index]
        for state in transition_col:           
            max_prob = -float('Inf')
            if state[0] != 'D' :
                for prev_state in transition_col:
                    prob =  probability_matrix.at[prev_state, string[string_index - 1]] * transition_matrix.at[prev_state, state] * states_emission_matrix.at[state, emission[0]]
                    if prob > max_prob:
                        max_prob = prob
            else:
                current_D_index = transition_col.index(state)
                for prev_state_index in range(current_D_index):
                    prev_state = transition_col[prev_state_index]
                    prob = probability_matrix.at[prev_state, emission] * transition_matrix.at[prev_state, state]

                    if prob > max_prob:
                        max_prob = prob

            probability_matrix.at[state, emission] = max_prob

    # backtrack
    last_emission = string[-1]
    max_prob = -float('Inf')
    for state in transition_col:
        prob = probability_matrix.at[state, last_emission] * transition_matrix.at[state, 'E']
        if prob > max_prob:
            max_prob   = prob
            last_state = state
    
    
    path = [last_state]
    string = string[:-1]
    string = ['0'] + string
    while last_state != 'S':
        if last_state[0] != 'D':
            for prev_state in transition_col:
                prob =  probability_matrix.at[prev_state, string[-1]] * transition_matrix.at[prev_state, last_state] * states_emission_matrix.at[last_state, last_emission[0]]
                if prob == probability_matrix.at[last_state, last_emission]:
                    path.append(prev_state)
                    last_state    = prev_state
                    last_emission = string[-1]
                    string        = string[:-1]
                    break
        else:
            current_D_index = transition_col.index(last_state)
            for prev_state_index in range(current_D_index):
                prev_state = transition_col[prev_state_index]
                prob =  probability_matrix.at[prev_state, last_emission] * transition_matrix.at[prev_state, last_state] 
                if prob == probability_matrix.at[last_state, last_emission]:
                    path.append(prev_state)
                    last_state = prev_state
                    break

    path = path[::-1]
    path = path[1:]

    return(path)

# Test
string = 'AEFDFDC'
threshold   = 0.4
pseudocount = 0.01
alphabet  = 'A B C D E F'
alignment = '''ACDEFACADF
AFDA---CCF
A--EFD-FDC
ACAEF--A-C
ADDEFAAADF'''

HMM_sequence_alignment(string, threshold, pseudocount, alphabet, alignment)

['M1', 'I1', 'M2', 'M3', 'D4', 'D5', 'M6', 'M7', 'M8']

In [23]:
'''
CODE CHALLENGE: Solve the HMM Parameter Estimation Problem.
Input: A string x of symbols emitted from an HMM, followed by the HMM's alphabet Σ,
     followed by a path π, followed by the collection of states of the HMM.
Output: A transition matrix Transition followed by an emission matrix Emission that maximize
     Pr(x, π) over all possible transition and emission matrices.
'''

def HMM_parameter_estimation(string, alphabet, hidden_path, states):
    if type(alphabet) == str:
        alphabet = alphabet.split(' ')
    if type(states) == str:
        states   = states.split(' ')

    transition_matrix = np.full([len(states), len(states)], 0, float)
    transition_matrix = pd.DataFrame(transition_matrix, index = states, columns = states)
    
    state_emission_matrix = np.full([len(states), len(alphabet)], 0, float)
    state_emission_matrix = pd.DataFrame(state_emission_matrix, index = states, columns = alphabet)
    
    for i in range(len(hidden_path) - 1):
        current_state = hidden_path[i]
        next_state    = hidden_path[i + 1]
        transition_matrix.at[current_state, next_state] = transition_matrix.at[current_state, next_state] + 1
    
    for state, emission in zip (hidden_path, string):
        state_emission_matrix.at[state, emission] = state_emission_matrix.at[state, emission] + 1
        
    for i in range(len(states)):
        if sum(transition_matrix.iloc[i]) == 0:
            transition_matrix.iloc[i] = transition_matrix.iloc[i] + 1
        if sum(state_emission_matrix.iloc[i]) == 0:
            state_emission_matrix.iloc[i] = state_emission_matrix.iloc[i] + 1
                
        transition_matrix.iloc[i] = transition_matrix.iloc[i]         / sum(transition_matrix.iloc[i])
        state_emission_matrix.iloc[i] = state_emission_matrix.iloc[i] / sum(state_emission_matrix.iloc[i])
    
    return(transition_matrix, state_emission_matrix)

# Test
string = 'yyzzzyzzyxyxzxyyzzzzyxyxxyyxyxzzyyyzyyyyyxxzzxxyxyxxxzxxxxxxzyzxxxzzxzyzxxxzxzxxxxzyzyxxyxzxxxxxxyxx'
alphabet = 'x y z'
hidden_path = 'BABACBABACAACBBBBCBBBCCACCABAABCAAACCACCBBBCBCBCCABBBCAABBCCABBCCBAABABACCCAACCAAABACCBCAABBCCACCABC'
states = 'A B C'

transition_matrix, state_emission_matrix = HMM_parameter_estimation(string, alphabet, hidden_path, states)
print(transition_matrix.to_string())
print(state_emission_matrix.to_string())

          A         B         C
A  0.312500  0.375000  0.312500
B  0.272727  0.363636  0.363636
C  0.382353  0.235294  0.382353
          x         y         z
A  0.343750  0.406250  0.250000
B  0.454545  0.242424  0.303030
C  0.542857  0.200000  0.257143


In [24]:
'''
CODE CHALLENGE: Implement Viterbi learning for estimating the parameters of an HMM.
Input: A number of iterations j, followed by a string x of symbols emitted by an HMM,
     followed by the HMM's alphabet Σ, followed by the HMM's states, followed by initial transition
     and emission matrices for the HMM.
Output: Emission and transition matrices resulting from applying Viterbi learning for j iterations.
'''

def matrix_2_hidden_path(string, transition_matrix, state_emission_matrix, states):
    
    prob_matrix = np.full([len(states), len(string)], 0, float)
    for state_i in range(len(states)):
        state = states[state_i]
        prob_matrix[state_i,0] = (1 / prob_matrix.shape[0]) * state_emission_matrix.at[state, string[0]]
    
    # dynamic programing
    for string_index in range(1, len(string)):
        current_emission = string[string_index]

        for state_index in range(len(states)):
            state = states[state_index]
            max_prob = -float('Inf')
            
            for prev_state_index in range(len(states)):
                prev_state = states[prev_state_index]
                prob =  prob_matrix[prev_state_index, string_index - 1] * transition_matrix.at[prev_state, state] * state_emission_matrix.at[state, current_emission]
                if prob > max_prob:
                    max_prob = prob
            prob_matrix[state_index, string_index] = max_prob
    
    # find the last state
    current_prob  = max(prob_matrix[:,-1])
    state_index   = list(prob_matrix[:,-1]).index(current_prob)
    current_state = states[state_index]
    string_index  = len(string) - 1
    hidden_path   = current_state
    
    # backtrack
    while string_index != 0:
        current_emission = string[string_index]
        for prev_state_index in range(len(states)):
            prev_state = states[prev_state_index]
            prob =  prob_matrix[prev_state_index, string_index - 1] * transition_matrix.at[prev_state, current_state] * state_emission_matrix.at[current_state, current_emission]
            if prob == current_prob:
                current_state = prev_state
                hidden_path   = current_state + hidden_path
                current_prob  = prob_matrix[prev_state_index, string_index - 1]
                string_index  = string_index - 1

    return(hidden_path)

def Viterbi_Learning(iterations, string, alphabet, states, initial_transition_matrix, initial_state_emission_matrix):
    alphabet = alphabet.split(' ')
    states   = states.split(' ')
    
    initial_transition_matrix = initial_transition_matrix.split('\n')
    for i in range(len(initial_transition_matrix)):
        initial_transition_matrix[i] = initial_transition_matrix[i].split('\t')
        initial_transition_matrix[i] = list(map(float, initial_transition_matrix[i]))
    initial_transition_matrix = pd.DataFrame(initial_transition_matrix, columns = states, index = states)
    
    initial_state_emission_matrix = initial_state_emission_matrix.split('\n')
    for i in range(len(initial_state_emission_matrix)):
        initial_state_emission_matrix[i] = initial_state_emission_matrix[i].split('\t')
        initial_state_emission_matrix[i] = list(map(float, initial_state_emission_matrix[i]))
    initial_state_emission_matrix = pd.DataFrame(initial_state_emission_matrix, columns = alphabet, index = states)
    
    transition_matrix, state_emission_matrix = initial_transition_matrix, initial_state_emission_matrix
    
    for i in range(iterations):
        hidden_path = matrix_2_hidden_path(string, transition_matrix, state_emission_matrix, states)
        transition_matrix, state_emission_matrix = HMM_parameter_estimation(string, alphabet, hidden_path, states)

    return(transition_matrix, state_emission_matrix)

iterations = 100
string = 'zzzyyxyzzzxyxzxxzzyxxzzzzzzyzxyzxxxzxxxzyzzxzxzzzxyzyyyxxxxzxyyyyyxzzzyxyzzxxzxxzxyxyxyzxzxzxzyxyzzz'
alphabet = 'x y z'
states = 'A B'
initial_transition_matrix = '''0.436	0.564
0.953	0.047'''
initial_state_emission_matrix = '''0.367	0.248	0.385
0.401	0.361	0.238'''

transition_matrix, state_emission_matrix = Viterbi_Learning(iterations, string, alphabet, states, initial_transition_matrix, initial_state_emission_matrix)
print(transition_matrix.to_string())
print(state_emission_matrix.to_string())

     A    B
A  0.0  1.0
B  1.0  0.0
      x     y     z
A  0.36  0.14  0.50
B  0.34  0.34  0.32


In [25]:
'''
CODE CHALLENGE: Solve the Soft Decoding Problem.
Input: A string x, followed by the alphabet Σ from which x was constructed,
     followed by the states States, transition matrix Transition, and emission matrix
     Emission of an HMM (Σ, States, Transition, Emission).
Output: An |x| x |States| matrix whose (i, k)-th element holds the conditional probability Pr(πi = k|x).
'''

def Prob_Forward(string, alphabet, states, transition_matrix, state_emission_matrix):
    prob_matrix = np.full([len(states), len(string)], 0, float)
    for state_i in range(len(states)):
        state = states[state_i]
        prob_matrix[state_i,0] = (1 / prob_matrix.shape[0]) * state_emission_matrix.at[state, string[0]]
    
    # dynamic programing
    for string_index in range(1, len(string)):
        current_emission = string[string_index]

        for state_index in range(len(states)):
            state = states[state_index]
            sum_prob = 0
            for prev_state_index in range(len(states)):
                prev_state = states[prev_state_index]
                prob =  prob_matrix[prev_state_index, string_index - 1] * transition_matrix.at[prev_state, state] * state_emission_matrix.at[state, current_emission]
                sum_prob = sum_prob + prob
            prob_matrix[state_index, string_index] = sum_prob
            
    return(prob_matrix)

def Prob_Backward(string, alphabet, states, transition_matrix, state_emission_matrix):
    prob_matrix = np.full([len(states), len(string)], 0, float)
    backward_string = string[::-1]
   
    for state_i in range(len(states)):
        prob_matrix[state_i,0] = 1 

    # dynamic programing
    for string_index in range(1, len(backward_string)):
        current_emission = backward_string[string_index - 1]

        for state_index in range(len(states)):
            state = states[state_index]
            sum_prob = 0
            for prev_state_index in range(len(states)):
                prev_state = states[prev_state_index]
                prob =  prob_matrix[prev_state_index, string_index - 1] * transition_matrix.at[state, prev_state] * state_emission_matrix.at[prev_state, current_emission]
                sum_prob = sum_prob + prob
            prob_matrix[state_index, string_index] = sum_prob

    prob_matrix = np.flip(prob_matrix, 1)

    return(prob_matrix)

def HMM_Soft_Decoding(string, alphabet, states, transition_matrix, state_emission_matrix):
    alphabet = alphabet.split(' ')
    states   = states.split(' ')
    
    transition_matrix = transition_matrix.split('\n')
    for i in range(len(transition_matrix)):
        transition_matrix[i] = transition_matrix[i].split('\t')
        transition_matrix[i] = list(map(float, transition_matrix[i]))
    transition_matrix = pd.DataFrame(transition_matrix, columns = states, index = states)
    
    state_emission_matrix = state_emission_matrix.split('\n')
    for i in range(len(state_emission_matrix)):
        state_emission_matrix[i] = state_emission_matrix[i].split('\t')
        state_emission_matrix[i] = list(map(float, state_emission_matrix[i]))
    state_emission_matrix = pd.DataFrame(state_emission_matrix, columns = alphabet, index = states)
    
    #
    forward_prob_matrix  = Prob_Forward( string, alphabet, states, transition_matrix, state_emission_matrix)
    backward_prob_matrix = Prob_Backward(string, alphabet, states, transition_matrix, state_emission_matrix)
    
    prob_forwad_sink = sum(forward_prob_matrix[:, -1])

    prob_matrix = np.full([len(states), len(string)], 0, float)
    for i in range(len(string)):

        for k_index in range(len(states)):
            prob_forward_k_i  = forward_prob_matrix[k_index, i]
            prob_backward_k_i = backward_prob_matrix[k_index, i]
            
            prob_matrix[k_index, i] = prob_forward_k_i * prob_backward_k_i / prob_forwad_sink
            
        #prob_matrix[:, i] = prob_matrix[:, i] / sum(prob_matrix[:, i])
        
    return(prob_matrix)

# Test
string = 'xyyzxzyxyy'
alphabet = 'x y z'
states = 'A B C D'
transition_matrix = '''0.401	0.009	0.195	0.396
0.375	0.237	0.269	0.119
0.283	0.25	0.259	0.207
0.108	0.529	0.107	0.256'''
state_emission_matrix = '''0.414	0.335	0.251
0.233	0.172	0.596
0.284	0.355	0.361
0.028	0.638	0.334'''

result = HMM_Soft_Decoding(string, alphabet, states, transition_matrix, state_emission_matrix)
for i in result.T:
    print(i)

[ 0.50026303  0.21136773  0.26618273  0.02218652]
[ 0.36476398  0.05299999  0.19091136  0.39132467]
[ 0.15107976  0.12510092  0.1553372   0.56848212]
[ 0.12970033  0.53591832  0.15417091  0.18021044]
[ 0.44135846  0.26281336  0.26732999  0.0284982 ]
[ 0.30312847  0.22129287  0.23387362  0.24170503]
[ 0.27885518  0.15358489  0.21385046  0.35370948]
[ 0.50876005  0.26895644  0.19754636  0.02473715]
[ 0.36952669  0.05783934  0.19779839  0.37483558]
[ 0.2230663   0.13556465  0.16584314  0.47552591]


In [26]:
'''
CODE CHALLENGE: Implement Baum-Welch Learning.
Input: A sequence of emitted symbols x = x1 . . . xn in an alphabet A, generated by a k-state HMM with
 unknown transition and emission probabilities, initial Transition and Emission matrices and a number of iterations I.
Output: A matrix of transition probabilities Transition and a matrix of emission probabilities Emission that
 maximizes Pr(x,π) over all possible transition and emission matrices and over all hidden paths π.
'''

def HMM_Soft_Decoding_hidden(string, alphabet, states, transition_matrix, state_emission_matrix):

    forward_prob_matrix  = Prob_Forward( string, alphabet, states, transition_matrix, state_emission_matrix)
    backward_prob_matrix = Prob_Backward(string, alphabet, states, transition_matrix, state_emission_matrix)
    
    prob_forwad_sink = sum(forward_prob_matrix[:, -1])

    node_prob_matrix = np.full([len(states), len(string)], 0, float)
    for i in range(len(string)):

        for k_index in range(len(states)):
            prob_forward_k_i  = forward_prob_matrix[k_index, i]
            prob_backward_k_i = backward_prob_matrix[k_index, i]
            
            node_prob_matrix[k_index, i] = prob_forward_k_i * prob_backward_k_i / prob_forwad_sink
        
    edge_prob_matrix = np.full([len(states), len(states), len(string)], 0, float)
    for k1 in range(len(states)):
        for k2 in range(len(states)):
            for i in range(len(string) - 1):
                edge_prob_matrix[k1, k2, i] = forward_prob_matrix[k1, i] * transition_matrix.at[states[k1], states[k2]] * state_emission_matrix.at[states[k2], string[i + 1]] * backward_prob_matrix[k2, i + 1] / prob_forwad_sink
    
    return(node_prob_matrix, edge_prob_matrix)

def Estimate_Parameters_Matrix(string, alphabet, states, node_prob_matrix, edge_prob_matrix):
    transition_matrix = np.full([len(states), len(states)], 0, float)
    transition_matrix = pd.DataFrame(transition_matrix, index = states, columns = states)
    
    state_emission_matrix = np.full([len(states), len(alphabet)], 0, float)
    state_emission_matrix = pd.DataFrame(state_emission_matrix, index = states, columns = alphabet)
    
    for state_i_1 in range(len(states)):
        state_1 = states[state_i_1]
        for state_i_2 in range(len(states)):
            state_2 = states[state_i_2]
            transition_matrix.at[state_1, state_2] = sum(edge_prob_matrix[state_i_1, state_i_2, :])
            
    for state_i in range(len(states)):
        state = states[state_i]
        for emission_i in range(len(string)):
            emission = string[emission_i]
            state_emission_matrix.at[state, emission] = state_emission_matrix.at[state, emission] + node_prob_matrix[state_i, emission_i]
    
    for i in range(len(states)):
        if sum(transition_matrix.iloc[i]) == 0:
            transition_matrix.iloc[i] = transition_matrix.iloc[i] + 1
        if sum(state_emission_matrix.iloc[i]) == 0:
            state_emission_matrix.iloc[i] = state_emission_matrix.iloc[i] + 1
                
        transition_matrix.iloc[i]     = transition_matrix.iloc[i]     / sum(transition_matrix.iloc[i])
        state_emission_matrix.iloc[i] = state_emission_matrix.iloc[i] / sum(state_emission_matrix.iloc[i])
        
    return(transition_matrix, state_emission_matrix)

def Baum_Welch_Learning(iterations, string, alphabet, states, initial_transition_matrix, initial_state_emission_matrix):
    alphabet = alphabet.split(' ')
    states   = states.split(' ')
    
    initial_transition_matrix = initial_transition_matrix.split('\n')
    for i in range(len(initial_transition_matrix)):
        initial_transition_matrix[i] = initial_transition_matrix[i].split('\t')
        initial_transition_matrix[i] = list(map(float, initial_transition_matrix[i]))
    initial_transition_matrix = pd.DataFrame(initial_transition_matrix, columns = states, index = states)
    
    initial_state_emission_matrix = initial_state_emission_matrix.split('\n')
    for i in range(len(initial_state_emission_matrix)):
        initial_state_emission_matrix[i] = initial_state_emission_matrix[i].split('\t')
        initial_state_emission_matrix[i] = list(map(float, initial_state_emission_matrix[i]))
    initial_state_emission_matrix = pd.DataFrame(initial_state_emission_matrix, columns = alphabet, index = states)
    
    transition_matrix, state_emission_matrix = initial_transition_matrix, initial_state_emission_matrix
    
    for i in range(iterations):
        node_prob_matrix, edge_prob_matrix = HMM_Soft_Decoding_hidden(string, alphabet, states, transition_matrix, state_emission_matrix)
        transition_matrix, state_emission_matrix = Estimate_Parameters_Matrix(string, alphabet, states, node_prob_matrix, edge_prob_matrix)
    
    return(transition_matrix, state_emission_matrix)

iterations = 10
string = 'xzyyzyzyxy'
alphabet = 'x y z'
states = 'A B'
initial_transition_matrix = '''0.019	0.981
0.668	0.332'''
initial_state_emission_matrix = '''0.175	0.003	0.821
0.196	0.512	0.293'''

transition_matrix, state_emission_matrix = Baum_Welch_Learning(iterations, string, alphabet, states, initial_transition_matrix, initial_state_emission_matrix)
print(transition_matrix.to_string())
print(state_emission_matrix.to_string())

          A         B
A  0.000007  0.999993
B  0.785779  0.214221
          x             y             z
A  0.242465  3.421964e-14  7.575351e-01
B  0.172156  8.278437e-01  1.004567e-09
