#  Viterbi

In [1]:
import numpy as np
import pandas as pd
import math


def format_prob(state, transition, transition_states, emission, emission_states):
    # input prob as list of lists and states as list of states
    # output: pandas dataframe with states as indices
    df_state = pd.DataFrame(state, columns = transition_states)
    df_trans = pd.DataFrame(transition, index = transition_states, columns = transition_states)
    df_em = pd.DataFrame(emission, index = transition_states, columns = emission_states,)
    return df_state, df_trans, df_em

def viterbi(state, transition, transition_states, emission, emission_states, seq):
    # input: list of state, transition, and emission prob as lists
    # state should be in format: [[p1, p2, ...]]
    # transition indices: list of states
    state, transition, emission = format_prob(state, transition, transition_states, emission, emission_states)
    
    if transition.shape[1]!= emission.shape[0]: 
        # num rows of trans. states must equal num col. of emission states
        print("Number of rows in transition state must equal number of columns in emission states", file=sys.stderr)

    path = []
    seq = [*seq]
    for s,i in zip(seq, range(len(seq))):
        if i == 0: # if first in seq, use state prob.
            prob1 = {}
            for st in state:
                prob1[st] = float(state[st] * emission[s][st]) 
            t_state = max(prob1, key=prob1.get)
            path.append(t_state) # add max probability to path
            prev_prob = prob1[path[0]]
            v_prob = math.log2(prev_prob)
            prev_state = t_state
        else:
            prob = {}           
            for t in transition:                    
                prob[t] = prev_prob * transition[prev_state][t] * emission[s][t]
            x = max(prob, key=prob.get)
            path.append(x)
            prev_prob = prob[x]
            print(x, prev_prob, prob, path)
            prev_state = x
            v_prob += math.log2(prev_prob)
    return path, v_prob
    
answer = " HHHLLLLLL"
emission_states = ["normal", "cold", "dizzy"]
transition_states = ["Healthy", "Fever"]
state = [[0.6, 0.4]]
transition = [[0.7,0.3], [0.4,0.6]] 
emission = [[0.5,0.3,0.1], [0.1,0.3,0.6]]
seq = ["normal", "cold", "dizzy"]
viterbi(state, transition, transition_states, emission, emission_states, seq)




Healthy 0.063 {'Healthy': 0.063, 'Fever': 0.036} ['Healthy', 'Healthy']
Fever 0.01512 {'Healthy': 0.00441, 'Fever': 0.01512} ['Healthy', 'Healthy', 'Fever']


(['Healthy', 'Healthy', 'Fever'], -11.772868005544115)

In [2]:
state = [[0.5, 0.5]]
transition = [[0.5,0.4], [0.4,0.6]] 
transition_states = ['H', 'L']
emission = [[0.2,0.3,0.3,0.2], [0.3,0.2,0.2,0.3]] # hidden states, row: A,C,G,T, col: H, L
emission_states = ['A', 'C', 'G', 'T']

seq = 'GGCACTGAA'
format_prob(state, transition, transition_states, emission, emission_states)

(     H    L
 0  0.5  0.5,
      H    L
 H  0.5  0.4
 L  0.4  0.6,
      A    C    G    T
 H  0.2  0.3  0.3  0.2
 L  0.3  0.2  0.2  0.3)

In [3]:
# with log

import numpy as np
import pandas as pd
import math

state = [[0.5, 0.5]]
transition = [[0.5,0.4], [0.4,0.6]] 
transition_states = ['H', 'L']
emission = [[0.2,0.3,0.3,0.2], [0.3,0.2,0.2,0.3]] # hidden states, row: A,C,G,T, col: H, L
emission_states = ['A', 'C', 'G', 'T']

seq = 'GGCACTGAA'
seq = [*seq]

def format_prob(state, transition, transition_states, emission, emission_states):
    # input prob as list of lists and states as list of states
    # output: pandas dataframe with states as indices
    df_state = pd.DataFrame(state, columns = transition_states)
    df_trans = pd.DataFrame(transition, index = transition_states, columns = transition_states)
    df_em = pd.DataFrame(emission, index = transition_states, columns = emission_states,)
    return df_state, df_trans, df_em

def viterbi(state, transition, transition_states, emission, emission_states, seq):
    # input: list of state, transition, and emission prob as lists
    # state should be in format: [[p1, p2, ...]]
    # transition indices: list of states
    state, transition, emission = format_prob(state, transition, transition_states, emission, emission_states)
    
    if transition.shape[1]!= emission.shape[0]: 
        # num rows of trans. states must equal num col. of emission states
        print("Number of rows in transition state must equal number of columns in emission states", file=sys.stderr)

    path = []
    for s,i in zip(seq, range(len(seq))):
        if i == 0: # if first in seq, use state prob.
            prob1 = {}
            for st in state:
                prob1[st] = float(state[st] * emission[s][st]) 
            t_state = max(prob1, key=prob1.get)
            path.append(t_state) # add max probability to path
            prev_prob = math.log2(prob1[path[0]])
            prev_state = t_state
        else:
            prob = {}           
            for t in transition:                    
                prob[t] = prev_prob +  math.log2(transition[prev_state][t] * emission[s][t])
            x = max(prob, key=prob.get)
            path.append(x)
            prev_prob = prob[x]
            prev_state = x
    return path

viterbi(state, transition, transition_states, emission, emission_states, seq)


['H', 'H', 'H', 'L', 'H', 'L', 'H', 'L', 'L']

In [4]:
# with log

import numpy as np
import pandas as pd
import math

def format_prob(state, transition, transition_states, emission, emission_states):
    # input prob as list of lists and states as list of states
    # output: pandas dataframe with states as indices
    df_state = pd.DataFrame(state, columns = transition_states)
    df_trans = pd.DataFrame(transition, index = transition_states, columns = transition_states)
    df_em = pd.DataFrame(emission, index = transition_states, columns = emission_states,)
    return df_state, df_trans, df_em

def viterbi(state, transition, transition_states, emission, emission_states, seq, log=0):
    # input: list of state, transition, and emission prob as lists
    # state should be in format: [[p1, p2, ...]]
    # transition indices: list of states
    state, transition, emission = format_prob(state, transition, transition_states, emission, emission_states)
    
    if transition.shape[1]!= emission.shape[0]: 
        # num rows of trans. states must equal num col. of emission states
        print("Number of rows in transition state must equal number of columns in emission states", file=sys.stderr)
    probability = 1
    path = []
    if type(seq) == str:
        seq = [*seq]
    for s,i in zip(seq, range(len(seq))):
        if i == 0: # if first in seq, use state prob.
            prob1 = {}
            for st in state:
                prob1[st] = float(state[st] * emission[s][st]) 
            t_state = max(prob1, key=prob1.get)
            path.append(t_state) # add max probability to path
            if log == 1:
                prev_prob = math.log2(prob1[path[0]])
            else:
                prev_prob = prob1[path[0]]
            prev_state = t_state
        else:
            prob = {}           
            for t in transition:      
                if log == 1:
                    prob[t] = prev_prob +  math.log2(transition[prev_state][t] * emission[s][t])
                else:
                    prob[t] = prev_prob * (transition[prev_state][t] * emission[s][t])
            x = max(prob, key=prob.get)
            path.append(x)
            prev_prob = prob[x]
            prev_state = x
    return path, prev_prob


"""
answer
$ python viterbi_example.py
         0          1          2
Healthy: 0.30000 0.08400 0.00588
Fever: 0.04000 0.02700 0.01512
The steps of states are Healthy Healthy Fever with highest probability of 0.01512
"""

emission_states = ["normal", "cold", "dizzy"]
transition_states = ["Healthy", "Fever"]
state = [[0.6, 0.4]]
transition = [[0.7,0.3], [0.4,0.6]] 
emission = [[0.5,0.3,0.1], [0.1,0.3,0.6]]
seq = ["normal", "cold", "dizzy"]
viterbi(state, transition, transition_states, emission, emission_states, seq)


(['Healthy', 'Healthy', 'Fever'], 0.01512)

# pHMM

In [4]:
# profile Hidden Markov Model
# 1. get transition and emission probabilities
import numpy as np
from collections import Counter
import pandas as pd
import math

def parse_file(filename):
    with open(filename, 'r') as f:
        # read file into numpy array (rows: array of the sequence)
        lines = f.readlines()
        seq = [] 
        for line in lines:
            if not line.startswith('>') and line != "": 
                seq.append(np.array(list(line.strip())))
        seq = np.array(seq)
    return seq

def probabilities(seq):
    # def all the match, insert, and delete states
    # get transition and emission probabilities
    states = ['M'] # this is the begin state (not in seq)
    for i, col in enumerate(seq.T):
        x = Counter(col)
        if '-' in x:
            gaps = x['-']
        else:
            gaps = 0
        nuc = sum(x.values()) - gaps
        if i == 0:
            if nuc > gaps:        
                states.append('M') # if there are more nucleotides than '-' then it's a match, else insertion (for the first col)
            else:
                states.append('I')
        else:
            if nuc > gaps:        
                states.append('M')
            elif states[i] == 'M': # if previous state was a match, and there are more gaps than nuc, next state is deletion (M->D)
                states.append('D')
            else:
                # states[i+1] = ['I']
                states.append('I')
    return states
    transition = {}
    emission = {}

seq = parse_file('sample.txt')
prob = probabilities(seq)
print(seq)
print(len(seq[0]))

[['A' 'G' '-' '-' '-' 'C']
 ['A' '-' 'A' '-' '-' 'C']
 ['A' 'T' '-' 'G' 'A' '-']
 ['-' '-' 'A' 'A' 'A' 'C']
 ['A' 'G' '-' '-' '-' 'C']]
6


In [5]:
def get_states(seq):
    # marked: mark columns with match or mismatch
    marked = ["M"]
    for i, col in enumerate(seq.T):
        x = Counter(col) 
        if '-' in x:
            gaps = x['-']
        else:
            gaps = 0
        nuc = sum(x.values()) - gaps
        if nuc > gaps:       
            marked.append('M')
        else:
            marked.append('I')

    states = []
    for row, mark in zip(seq.T, marked[1:]):
        col_states = []
        for col in row:
            if col != "-" and mark == "M": # nuc in M state = M
                col_states.append("M")
            elif col == "-" and mark == "M": # gap in M state = D
                col_states.append("D")
            elif col != "-" and mark == "I": # nuc in I state = I
                col_states.append("I")
            else:                            # ignore gaps in I state
                col_states.append("-") 
        states.append(np.array(col_states)) # problem is that states is appending by col.  need to flip .T
    states = np.array(states).T
    
    
    return marked, states,
marked, states = get_states(seq)
marked, states, seq

(['M', 'M', 'M', 'I', 'I', 'I', 'M'],
 array([['M', 'M', '-', '-', '-', 'M'],
        ['M', 'D', 'I', '-', '-', 'M'],
        ['M', 'M', '-', 'I', 'I', 'D'],
        ['D', 'D', 'I', 'I', 'I', 'M'],
        ['M', 'M', '-', '-', '-', 'M']], dtype='<U1'),
 array([['A', 'G', '-', '-', '-', 'C'],
        ['A', '-', 'A', '-', '-', 'C'],
        ['A', 'T', '-', 'G', 'A', '-'],
        ['-', '-', 'A', 'A', 'A', 'C'],
        ['A', 'G', '-', '-', '-', 'C']], dtype='<U1'))

In [7]:
x = np.array([['M', 'M', '-', '-', '-', 'M'],
        ['M', 'D', 'I', '-', '-', 'M'],
        ['M', 'M', '-', 'I', 'I', 'D'],
        ['D', 'D', 'I', 'I', 'I', 'M'],
        ['M', 'M', '-', '-', '-', 'M']])
x

array([['M', 'M', '-', '-', '-', 'M'],
       ['M', 'D', 'I', '-', '-', 'M'],
       ['M', 'M', '-', 'I', 'I', 'D'],
       ['D', 'D', 'I', 'I', 'I', 'M'],
       ['M', 'M', '-', '-', '-', 'M']], dtype='<U1')

In [18]:
def transition(states, seq, marked):
    transition_counts = [] # length = # of match states
    #freq_t = {ind: {} for ind, key in enumerate(marked)}
    freq_t = {}
    matches = 0
    match_states = [ind for ind, ele in enumerate(marked) if ele == "M"] # gets indices of match states
    match_states[-1] = match_states[-1]-1 # b/c length is +1 of seq, so last index will be out of index
    state_prob = {}
    for ind, m in enumerate(match_states[:-1]):
        match_before = match_states[m] # index col of match state before insert states
        match_after = match_states[m+1] 
        # state probability: M->state in first column
        if m == match_states[0]:
            for row in states.T[0]:
                entry = ("M{}".format(row))
                state_prob[entry] = state_prob.get(entry, 0) + 1
        # insert state(s)
        elif match_states[m]+1 != match_states[m+1]: 
            insert_nuc = seq.T[m:match_states[m+1]] # gets all the seq col. that are in the insert state
            insert_nuc = insert_nuc != "-" # bool array of all the nuc
            for ind_r, row_i in enumerate(insert_nuc.T): 
                if np.any(row_i != False): # contains nuc. in insert col
                    if  states[ind_r][match_before-1] == "M":
                        freq_t["MI"] = freq_t.get("MI", 0) + 1
                    if states[ind_r][match_after] == "M": # I->M
                        freq_t["IM"] = freq_t.get("IM", 0) + 1
                    elif states[ind_r][match_after] == "D": # I->D
                        freq_t["ID"] = freq_t.get("ID", 0) + 1 
                    # I->I
                    for ind_c, col_c in enumerate(row_i[1:]):
                        if row_i[ind_c-1] == True and col_c == True:
                            freq_t["II"] = freq_t.get("II", 0) + 1 
                else:
                    if states[ind_r][match_before-1] == "M" and states[ind_r][match_after] == "M":
                        freq_t["MM"] = freq_t.get("MM", 0) + 1                     

         # match state
        for j, row in enumerate(states.T[m]): # Iterate down col at corresponding index to match state in marked
            if m == 0: # if the first col, assume the previous state was a match (the begin state)
                entry = ("M{}".format(row))
                freq_t[entry] = freq_t.get(entry, 0) + 1
            else:
                if states.T[m-1][j] == "-" or row == "-": # ignore gaps in insert state
                    continue
                entry = ("{}{}".format(states.T[m-1][j],row))
                freq_t[entry] = freq_t.get(entry, 0) + 1
        transition_counts.append(freq_t)
        freq_t = {}
    # if last column, assume transition is to match state?
    for j, row in enumerate(states.T[-1]):
        entry = ("{}{}".format(row,"M"))
        freq_t[entry] = freq_t.get(entry, 0) + 1
    transition_counts.append(freq_t)
    freq_t = {}
    # get probability of each starting state
    for key, value in state_prob.items():
        
        state_prob[key] = value/sum(state_prob.values())
    """
    # consolidate counts
    trans_counts = {}
    for i in transition_counts:
        for key, value in i.items():
            trans_counts[key] = trans_counts.get(key, 0) + value
    # get totals of all the transitions from a state (M,D,I)
    totals_t = {"M":0, "D":0, "I":0}
    for i in transition_counts: 
        for key, value in i.items():
            totals_t[key[0]] = totals_t.get(key[0], 0) + value
    # get probability of each transition(ie MM, MD, MI)
    prob_t = {}    
    for key, value in trans_counts.items():
        prob_t[key] = value/totals_t[key[0]] # laplace plus one rule maybe
    """
    # get probability of state transitions for each position in sequence
    prob_t = []
    temp = {}
    for trans_p in transition_counts:
        tot = sum(trans_p.values())
        for key, value in trans_p.items():
            temp[key] = value/tot
        prob_t.append(temp)
        temp = {}
    return state_prob, transition_counts, prob_t


state_prob, transition_counts, prob_t = transition(states, seq, marked)
print("state: {}, \n transition_counts: {}, \n prob_t: {}".format(state_prob, transition_counts, prob_t))

state: {'MM': 0.8, 'MD': 0.5555555555555556}, 
 transition_counts: [{'MM': 4, 'MD': 1}, {'MM': 3, 'MD': 1, 'DD': 1}, {'MM': 2, 'IM': 2, 'MI': 1, 'ID': 1, 'II': 3, 'DI': 2}, {'MM': 4, 'DM': 1}], 
 prob_t: [{'MM': 0.8, 'MD': 0.2}, {'MM': 0.6, 'MD': 0.2, 'DD': 0.2}, {'MM': 0.18181818181818182, 'IM': 0.18181818181818182, 'MI': 0.09090909090909091, 'ID': 0.09090909090909091, 'II': 0.2727272727272727, 'DI': 0.18181818181818182}, {'MM': 0.8, 'DM': 0.2}]


In [None]:
t_count =  [{'MM': 4, 'MD': 1}, {'MM': 3, 'MD': 1, 'DD': 1}, {'MM': 2, 'IM': 2, 'MI': 1, 'ID': 1, 'II': 3, 'DI': 2}, {'MM': 4, 'DM': 1}], 
for i in t_count:
    tot_m = sum(i.values()) if i.keys()[0][0] == "M" else 0

In [19]:
def emission(states, seq, marked):
    # get emission probabilities
    # emission prob.
    freq = []
    temp = {'M':{}, 'I':{}}
    nuc = {'A':0, 'C':0, 'G':0, 'T':0}
    match_states = [ind for ind, ele in enumerate(marked) if ele == "M"] # gets indices of match states

    for m in match_states[:-1]:
        match_before = match_states[m] # index col of match state before insert states
        match_after = match_states[m+1]        
        # insert state(s)
        if match_states[m]+1 != match_states[m+1]: 
            insert_nuc = seq.T[m:match_states[m+1]-1] # gets all the seq col. that are in the insert state
            for col_i in insert_nuc:
                for row_i in col_i:
                    if row_i != '-':
                        nuc[row_i] += 1
            temp['I'] = dict(Counter(temp['I']) + Counter(nuc))
            freq.append(temp)
            temp = {'M':{}, 'I':{}}
            nuc = {'A':0, 'C':0, 'G':0, 'T':0}
        # match state
        else:
            for col_m in seq.T[m]:
                for row_m in col_m:
                    if row_m != '-':
                        nuc[row_m] += 1
            temp['M'] = dict(Counter(temp['M']) + Counter(nuc))
            freq.append(temp)
            temp = {'M':{}, 'I':{}}
            nuc = {'A':0, 'C':0, 'G':0, 'T':0}
    for col in seq.T[-1]:
        for row in col:
            if row != '-':
                nuc[row] += 1
    if marked[-1] == 'M':
         temp['M'] = dict(Counter(nuc))
    else:
        temp['I'] = dict(Counter(nuc))
    freq.append(temp)
    
    # get emission probabilities
    emission_prob = []
    temp_t = {}
    temp_e = {}
    for emis_p in freq:
        tot = 0
        temp_t = {}
        temp_e = {}
        for val in emis_p.values():
            tot += sum(val.values())
        for t_key, value in emis_p.items():
            for e_key, e_val in value.items():
                temp_e[e_key] = e_val/tot
            temp_t[t_key] = temp_e        
        emission_prob.append(temp_t)    
    return emission_prob
emission_prob = emission(states,seq,marked)
emission_prob

[{'M': {'A': 1.0}, 'I': {'A': 1.0}},
 {'M': {'G': 0.6666666666666666, 'T': 0.3333333333333333},
  'I': {'G': 0.6666666666666666, 'T': 0.3333333333333333}},
 {'M': {'A': 0.8333333333333334, 'G': 0.16666666666666666},
  'I': {'A': 0.8333333333333334, 'G': 0.16666666666666666}},
 {'M': {'A': 0.0, 'C': 1.0, 'G': 0.0, 'T': 0.0},
  'I': {'A': 0.0, 'C': 1.0, 'G': 0.0, 'T': 0.0}}]

In [20]:
def format_input(state_prob, prob_t, prob_e):
    # format input into list of transition and emission states and probabilities as lists and list of lists
    # ex: output: 
    # emission_states = ["normal", "cold", "dizzy"]
    # transition_states = ["Healthy", "Fever"]
    # state = [[0.6, 0.4]]
    # transition = [[0.7,0.3], [0.4,0.6]] 
    # emission = [[0.5,0.3,0.1], [0.1,0.3,0.6]]
    # seq = ["normal", "cold", "dizzy"]
    
    transition_states = ["M", "D", "I"]
    transition = pd.DataFrame(columns=transition_states, index=transition_states)
    # row is the state it came from, col is the state it's going to
    for key, value in prob_t.items():
        transition[key[0]][key[1]] = value   
    emission_states = ["A", "C", "G", "T"]
    emission = pd.DataFrame(columns=emission_states, index=["M", "I"])
    for key, value in prob_e.items():
        for key1, val1 in value.items():
            emission.loc[key] = prob_e[key][key1]
    state = [list(state_prob.values())]
    df_state = pd.DataFrame(state, columns = ['M', 'D'])
    
    return df_state, transition, emission


In [22]:
consensus = ""
for col in seq.T:
    most_freq = Counter(col).most_common(1)[0][0]
    if most_freq != "-":
        consensus += most_freq
print(consensus)

AGC


In [None]:
def consensus(seq):
    consensus = ""
    for col in seq.T:
        most_freq = Counter(col).most_common(1)[0][0]
        if most_freq != "-":
            consensus += most_freq
    return consensus

In [47]:
# find mutations
# 1. loop through match states
# 2. compare seq to consensus: use transition and emission probabilities to calc prob of mutation
# 3. if prob of mutation is greater than 0.5, then it's a mutation
def find_mutations(seq, prob_e):
    marked, states = get_states(seq)
    mutations = np.zeros(states.shape)
    consensus = consensus(seq)
    i = -1
    for ind_m, m in enumerate(marked[1:]):
        if m == "M": 
            i += 1 # index of match state in consensus)
            for ind_n, n in enumerate(seq.T[ind_m]): # iterate down column of seq at index match state
                if n != consensus[i] and n != '-':
                    mutations[ind_n][ind_m] = prob_e[i]['M'][n]
    return mutations

mutations = find_mutations(seq, prob_e)
print(mutations)

[[0.         0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.         0.        ]
 [0.         0.33333333 0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.         0.        ]]


In [None]:
from pHMM.py import *
hmm = pHMM.pHMM(pHMM_test.txt)
hmm.filename

# junk

In [None]:
marked, states = get_states(seq)
transition_counts, prob_t = transition(states, seq, marked)
emission_counts = emission(states,seq, marked)
states, prob_t, emission_counts, seq

ValueError: too many values to unpack (expected 2)

In [None]:
def transition(states, seq, marked):
    transition_counts = [] # length = # of match states
    freq_t = {}
    matches = 0
    match_states = [ind for ind, ele in enumerate(marked) if ele == "M"] # gets indices of match states
    match_states[-1] = match_states[-1]-1 # b/c length is +1 of seq, so last index will be out of index
    state_prob = {}
    for m in match_states[:-1]:
        match_before = match_states[m] # index col of match state before insert states
        match_after = match_states[m+1] 
        # state probability: M->state in first column
        if m == match_states[0]:
            for row in states.T[0]:
                entry = ("M{}".format(row))
                state_prob[entry] = state_prob.get(entry, 0) + 1
            
        # insert state(s)
        elif match_states[m]+1 != match_states[m+1]: 
            insert_nuc = seq.T[m:match_states[m+1]] # gets all the seq col. that are in the insert state
            insert_nuc = insert_nuc != "-" # bool array of all the nuc
            for ind_r, row_i in enumerate(insert_nuc.T): 
                if np.any(row_i != False): # contains nuc. in insert col
                    if  states[ind_r][match_before-1] == "M":
                        freq_t["MI"] = freq_t.get("MI", 0) + 1
                    if states[ind_r][match_after] == "M": # I->M
                        freq_t["IM"] = freq_t.get("IM", 0) + 1
                    elif states[ind_r][match_after] == "D": # I->D
                        freq_t["ID"] = freq_t.get("ID", 0) + 1 
                    # I->I
                    for ind_c, col_c in enumerate(row_i[1:]):
                        if row_i[ind_c-1] == True and col_c == True:
                            freq_t["II"] = freq_t.get("II", 0) + 1 
                else:
                    if states[ind_r][match_before-1] == "M" and states[ind_r][match_after] == "M":
                        freq_t["MM"] = freq_t.get("MM", 0) + 1                     

         # match state
        for j, row in enumerate(states.T[m]): # Iterate down col at corresponding index to match state in marked
            if m == 0: # if the first col, assume the previous state was a match (the begin state)
                entry = ("M{}".format(row))
                freq_t[entry] = freq_t.get(entry, 0) + 1
            else:
                if states.T[m-1][j] == "-" or row == "-": # ignore gaps in insert state
                    continue
                entry = ("{}{}".format(states.T[m-1][j],row))
                freq_t[entry] = freq_t.get(entry, 0) + 1
        transition_counts.append(freq_t)
        freq_t = {}
    # if last column, assume transition is to match state?
    for j, row in enumerate(states.T[-1]):
        entry = ("{}{}".format(row,"M"))
        freq_t[entry] = freq_t.get(entry, 0) + 1
    transition_counts.append(freq_t)
    freq_t = {}
    for key, value in state_prob.items():
        state_prob[key] = value/sum(state_prob.values())
    # consolidate counts
    trans_counts = {}
    for i in transition_counts:
        for key, value in i.items():
            trans_counts[key] = trans_counts.get(key, 0) + value
    # get totals of all the transitions from a state (M,D,I)
    totals_t = {"M":0, "D":0, "I":0}
    for i in transition_counts: 
        for key, value in i.items():
            totals_t[key[0]] = totals_t.get(key[0], 0) + value
    # get probability of each transition(ie MM, MD, MI)
    prob_t = {}    
    for key, value in trans_counts.items():
        prob_t[key] = value/totals_t[key[0]] # laplace plus one rule maybe
    return state_prob, transition_counts, prob_t
    
state_prob, transition_counts, prob_t = transition(states, seq, marked)


In [None]:
import pandas as pd

transition_states = ["M", "D", "I"]
transition = pd.DataFrame(columns=transition_states, index=transition_states)
for key, value in prob_t.items():
        transition[key[0]][key[1]] = value
print(transition)

        M     D         I
M  0.8125  0.25  0.333333
D   0.125  0.25  0.166667
I  0.0625   0.5       0.5


In [None]:
state

[[0.6, 0.4]]

In [None]:
df_state = pd.DataFrame(state, columns = transition_states)
df_state

Unnamed: 0,H,L
0,0.5,0.5


In [3]:
import itertools 
import numpy as np
import pandas as pd

state = [[0.5, 0.5]]
transition = [[0.5,0.4], [0.4,0.6]] 
transition_states = ['H', 'L']
emission = [[0.2,0.3,0.3,0.2], [0.3,0.2,0.2,0.3]] # hidden states, row: A,C,G,T, col: H, L
emission_states = ['A', 'C', 'G', 'T']
seq = 'GGCACTGAA'

def format_prob(state, transition, transition_states, emission, emission_states):
    # input prob as list of lists and states as list of states
    # output: pandas dataframe with states as indices
    df_state = pd.DataFrame(state, columns = transition_states)
    df_trans = pd.DataFrame(transition, index = transition_states, columns = transition_states)
    df_em = pd.DataFrame(emission, index = transition_states, columns = emission_states,)
    return df_state, df_trans, df_em

state, transition, emission = format_prob(state, transition, transition_states, emission, emission_states)
 


In [7]:
all_path = dict.fromkeys(list(itertools.product(transition_states, repeat=len(seq))), 1)
all_path

{('H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'L'): 1,
 ('H', 'H', 'H', 'H', 'H', 'H', 'H', 'L', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'H', 'H', 'L', 'L'): 1,
 ('H', 'H', 'H', 'H', 'H', 'H', 'L', 'H', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'H', 'L', 'H', 'L'): 1,
 ('H', 'H', 'H', 'H', 'H', 'H', 'L', 'L', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'H', 'L', 'L', 'L'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'H', 'H', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'H', 'H', 'L'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'H', 'L', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'H', 'L', 'L'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'L', 'H', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'L', 'H', 'L'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'L', 'L', 'H'): 1,
 ('H', 'H', 'H', 'H', 'H', 'L', 'L', 'L', 'L'): 1,
 ('H', 'H', 'H', 'H', 'L', 'H', 'H', 'H', 'H'): 1,
 ('H', 'H', 'H', 'H', 'L', 'H', 'H', 'H', 'L'): 1,
 ('H', 'H', 'H', 'H', 'L', 'H', 'H', 'L', 'H'): 1,
 ('H', 'H', 'H', 'H', 'L', 'H',

# Genetic Algorithm

In [4]:
import random
import argparse
import math
import gzip

def count_kmers(filename, kmer_len, threshold):
    # count kmers of all introns in file
    # return list of all kmers in 2 lists (proximal/distal) and their counts
    with gzip.open(filename, "rt") as f:
        lines = f.readlines()
        proximal = {}
        distal = {}
        for line in lines:
            trans_id, start, end, strand, exp, seq = line.strip().split()
            start = int(start)
            end = int(end)
            # add all kmers in seq to proximal/distal list based on position of kmer
            for i in range(len(seq)-kmer_len+1):
                if (start + i) < threshold:
                    kmer = seq[i:i+kmer_len]
                    if kmer not in proximal: proximal[kmer] = 1
                    else: proximal[kmer] += 1
                else:
                    if kmer not in distal: distal[kmer] = 1
                    else: distal[kmer] += 1    
    return proximal, distal
filename = '../datacore/project_imeter/at_ime_master.txt.gz'
kmer_len = 4
threshold = 400
proximal, distal = count_kmers(filename, kmer_len, threshold)
p_total = sum(proximal.values())
d_total = sum(distal.values())

# get list of kmers with high expression values
# calculate nucleotide composition of high expression kmers




In [20]:
GENES = "ACGT"
class Individual():
    def __init__(self, proximal, distal, kmer_len, chr_len):
        self.kmer_len = kmer_len
        self.chr_len = chr_len
        self.chromosome = self.create_genome()
        self.proximal = proximal
        self.distal = distal 
        self.p_total = sum(proximal.values())
        self.d_total = sum(distal.values())
        self.fitness = self.calc_fitness()        

    def create_genome(self):
        chromosome = "".join(random.choice(GENES) for _ in range(self.chr_len))
        return chromosome
    
    def calc_fitness(self):
        fitness = 0
        for i in range(len(self.chromosome)-self.kmer_len+1):
            kmer = self.chromosome[i:i+self.kmer_len]
            if kmer not in self.proximal: 
                prob_p = 1 / self.p_total
            else:
                prob_p = self.proximal[kmer] / self.p_total
            if kmer not in self.distal: 
                prob_d = 1 / self.d_total
            else:
                prob_d = self.distal[kmer] / self.d_total
            fitness += math.log2(prob_p/prob_d) 
        return fitness 

    def mate(self, partner, operator = "opc", mutation_rate=0.01):
        offspring1 = Individual(self.proximal, self.distal, self.kmer_len, self.chr_len)
        offspring2 = Individual(self.proximal, self.distal, self.kmer_len, self.chr_len)
        cutoff = random.randint(0, len(self.chromosome))
        offspring = ""
        if operator == "opc":
            offspring1.chromosome = self.chromosome[:cutoff] + partner.chromosome[cutoff:]
            offspring2.chromosome = partner.chromosome[:cutoff] + self.chromosome[cutoff:]
        elif operator == "upc":
            for i in range(self.chr_len):
                if random.randint(0, 1):
                    offspring1.chromosome += self.chromosome[i]
                    offspring2.chromosome += partner.chromosome[i]
                else:
                    offspring1.chromosome += partner.chromosome[i]
                    offspring2.chromosome += self.chromosome[i]
        # mutation
        for i in range(len(offspring1.chromosome)):
            if random.randint(0, 100) > mutation_rate:
                offspring1.chromosome = offspring1.chromosome[:i] + random.choice(GENES) + offspring1.chromosome[i+1:]
            if random.randint(0, 100) > mutation_rate:
                offspring2.chromosome = offspring2.chromosome[:i] + random.choice(GENES) + offspring2.chromosome[i+1:]

        offspring1.fitness = offspring1.calc_fitness()
        offspring2.fitness = offspring2.calc_fitness()
        
        return offspring1, offspring2
        
    def __repr__(self):
        return f"Chromosome: {self.chromosome}, Fitness: {self.fitness:.4g}"
def gc_content(seq):
    s = [*seq]
    return (s.count("G") + s.count("C")) / len(s)
  
x = Individual(proximal, distal, kmer_len, 6)
y = Individual(proximal, distal, kmer_len, 6)
o1, o2 = x.mate(y)
print(o1, o2)
print(x)
(1-abs(gc_content(x.chromosome)-0.35))**3

Chromosome: CCCAAC, Fitness: 2.853 Chromosome: GCAGGC, Fitness: -3.5
Chromosome: GGCGAT, Fitness: 3.177


0.3190787037037037

In [30]:
class Individual():
    def __init__(self, proximal, distal, kmer_len, chr_len):
        self.kmer_len = kmer_len
        self.chr_len = chr_len
        self.chromosome = self.create_genome()
        self.proximal = proximal
        self.distal = distal 
        self.p_total = sum(proximal.values())
        self.d_total = sum(distal.values())
        self.fitness = self.calc_fitness()        

    def create_genome(self):
        chromosome = "".join(random.choice(GENES) for _ in range(self.chr_len))
        return chromosome
    
    def calc_fitness(self):
        fitness = 0
        for i in range(len(self.chromosome)-self.kmer_len+1):
            kmer = self.chromosome[i:i+self.kmer_len]
            if kmer not in self.proximal: 
                prob_p = 1 / self.p_total
            else:
                prob_p = self.proximal[kmer] / self.p_total
            if kmer not in self.distal: 
                prob_d = 1 / self.d_total
            else:
                prob_d = self.distal[kmer] / self.d_total
            fitness += math.log2(prob_p/prob_d) 
        return fitness 

    def mate(self, partner, operator = "opc", mutation_rate=0.01):
        offspring = Individual(self.proximal, self.distal, self.kmer_len, self.chr_len)
        cutoff = random.randint(0, len(self.chromosome))
        if operator == "opc":
            offspring.chromosome = self.chromosome[:cutoff] + partner.chromosome[cutoff:]
        elif operator == "upc":
            for i in range(self.chr_len):
                if random.randint(0, 1):
                    offspring.chromosome += self.chromosome[i]
                else:
                    offspring.chromosome += partner.chromosome[i]
        # mutation
        for i in range(len(offspring.chromosome)):
            if random.randint(0, 100) > mutation_rate:
                offspring.chromosome = offspring.chromosome[:i] + random.choice(GENES) + offspring.chromosome[i+1:]
            
        offspring.fitness = offspring.calc_fitness()        
        return offspring
        
    def __repr__(self):
        return f"Chromosome: {self.chromosome}, Fitness: {self.fitness:.4g}"


In [59]:
class Population():
    def __init__(self, proximal, distal, kmer_len, chr_len, gen_size, gen_len):
        self.gen_size = gen_size
        self.gen_len = gen_len
        self.kmer_len = kmer_len
        self.chr_len = chr_len
        self.proximal = proximal
        self.distal = distal 
        self.individuals = []

    def populate(self):
        for i in range(self.gen_size):
            self.individuals.append(Individual(self.proximal, self.distal, self.kmer_len, self.chr_len))
            
    def select_parents(self, individuals, k):
        # Can't use Fitness Proportionate Selection b/c negative fitness values (log odds)
        # Tournament selection
        pop = individuals.copy()
        select = []
        for i in range(2):
            turn = []
            for j in range(k):
                turn.append(pop[random.randint(0, len(pop)-1)])
            parent = max(turn, key = lambda x: x.fitness)
            pop.remove(parent)
            select.append(parent)
        return select
    
    def selection(self, proportion, k):
        # Fitness Based Selection
        for i in range(int(len(self.individuals)*proportion)):
            self.individuals.remove(min(self.individuals, key = lambda x: x.fitness))
            parents = self.select_parents(self.individuals, k)
            o = parents[0].mate(parents[1])
            self.individuals.append(o)
    def evolve(self):
        for i in range(self.gen_len):
            self.selection(0.5, 2)
            print(f"Generation {i+1}: {sum([i.fitness for i in self.individuals])}")

    def __repr__(self):
        return f"Individuals: {self.individuals}"
    
x = Population(proximal, distal, 4, 6, 10, 100)
x.populate()
x.evolve()

# sorted_indiv = sorted(x.individuals, key = lambda x: x.fitness, reverse=True)
# print(x.individuals, sum([i.fitness for i in x.individuals]))
# x.selection(0.5, 2)
# print(x.individuals, sum([i.fitness for i in x.individuals]))



Generation 1: 49.977778560580006
Generation 2: 55.70478364689321
Generation 3: 55.09371557636653
Generation 4: 58.16691226859525
Generation 5: 58.854055513481214
Generation 6: 54.413983432922336
Generation 7: 57.515409478240166
Generation 8: 60.03725117480729
Generation 9: 55.520842960705096
Generation 10: 58.081556328175175
Generation 11: 57.06225182537514
Generation 12: 59.39979689400115
Generation 13: 61.006987399251415
Generation 14: 60.65094276080863
Generation 15: 61.058112078594355
Generation 16: 59.725798070864045
Generation 17: 60.81932233960912
Generation 18: 62.01556554629775
Generation 19: 60.15611354970741
Generation 20: 62.12297709521267
Generation 21: 60.27452930784515
Generation 22: 63.47444271493602
Generation 23: 61.31728124304542
Generation 24: 62.706370306900375
Generation 25: 60.95997265299562
Generation 26: 64.77239303602838
Generation 27: 63.37130391542556
Generation 28: 63.237375509978875
Generation 29: 65.08227541271312
Generation 30: 63.729570247075074
Generat

# Stochastic Viterbi

In [2]:
import math
import gzip
import json

In [3]:
file = "./tests/1pct_elegans.fa"
with open(file, "rt") as f:
    lines = f.readlines()
    for line in lines[:10]:
        print(line)
        

>I 1..150725

GCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGC

CTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCT

AAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAA

GCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGC

CTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCT

AAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAA

GCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGC

CTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAGCCT

AAGCCTAAGCCTAAGCCTAAGCCTAAGCCTAAAAAATTGAGATAAGAAAA



In [4]:
file = "./tests/1pct_elegans.gff3"
with open(file, "rt") as f:
    lines = f.readlines()
    start = []
    end = []
    states = []
    for line in lines:
        states.append(line.strip().split()[2])
        start.append(int(line.strip().split()[3]))
        end.append(int(line.strip().split()[4]))

# get unique states 
transition_states = list(set(states))
transition_states

['gene',
 'pre_miRNA',
 'miRNA',
 'exon',
 'possible_base_call_error',
 'ncRNA',
 'three_prime_UTR',
 'nc_primary_transcript',
 'mRNA',
 'pseudogenic_transcript',
 'CDS',
 'tRNA',
 'circular_ncRNA',
 'piRNA',
 'base_call_error_correction',
 'five_prime_UTR',
 'intron',
 'snoRNA',
 'pseudogenic_tRNA',
 'transposable_element',
 'transcript_region',
 'transcription_end_site']

In [5]:
state = {}
transition = {}
emission = {}

with open("./tests/stoch_hmm.json") as f:
    x = json.load(f)
    for i in range(x['states']):
        temp = x['state'][i]
        state[temp['name']] = temp['init']
        transition[temp['name']] = temp['transition']
        emission[temp['name']] = temp['emission']

state, transition, emission



({'rain': 0.5, 'no_rain': 0.5},
 {'rain': {'rain': 0.7, 'no_rain': 0.3},
  'no_rain': {'rain': 0.3, 'no_rain': 0.7}},
 {'rain': {'umbrella': 0.9, 'no_umbrella': 0.1},
  'no_rain': {'umbrella': 0.2, 'no_umbrella': 0.8}})

In [6]:
def viterbi(transition_file, seq, log=1):
    state = {}
    transition = {}
    emission = {}
    with open(transition_file) as f:
        x = json.load(f)
        for i in range(x['states']):
            temp = x['state'][i]
            state[temp['name']] = temp['init']
            transition[temp['name']] = temp['transition']
            emission[temp['name']] = temp['emission']
        if len(transition) != len(emission):
            raise ValueError("Transition and emission matrices must be the same length")
        probability = 1
        path = []
        if type(seq) == str:
            seq = [*seq]
        for s,i in zip(seq, range(len(seq))):
            if i == 0: # if first in seq, use state prob.
                prob1 = {}
                for st in state:
                    prob1[st] = float(state[st] * emission[st][s]) 
                t_state = max(prob1, key=prob1.get)
                path.append(t_state) # add max probability to path
                if log == 1:
                    prev_prob = math.log2(prob1[path[0]])
                else:
                    prev_prob = prob1[path[0]]
                prev_state = t_state
            else:
                prob = {}           
                for t in transition:      
                    if log == 1:
                        prob[t] = prev_prob +  math.log2(transition[prev_state][t] * emission[t][s])
                    else:
                        prob[t] = prev_prob * (transition[prev_state][t] * emission[t][s])
                x = max(prob, key=prob.get)
                path.append(x)
                prev_prob = prob[x]
                prev_state = x
    return path, prev_prob
transition_file = "./tests/stoch_hmm.json"
viterbi(transition_file, "ACGT")


FileNotFoundError: [Errno 2] No such file or directory: './stoch_hmm.json'

In [None]:
seq = [*'ACGT']
emission = {'AT': {'A': 0.3, 'C': 0.2, 'G': 0.2, 'T': 0.3},
  'NT': {'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25},
  'GC': {'A': 0.2, 'C': 0.3, 'G': 0.3, 'T': 0.2}}

emission['AT']['A']

0.3

In [None]:
def baum_welch(seq_file, gff_file):
    with open(seq_file, "rt") as f:
        seqs = f.readlines()[1].strip()
    with open(gff_file, "rt") as f:
        lines = f.readlines()
        gff = {}
        start = []
        end = []
        states = []
        for line in lines:
            if line.strip().split()[0] not in gff:
                gff[line.strip().split()[0]] = {"states": [], "start": [], "end": []}
            gff[line.strip().split()[0]]["states"].append(line.strip().split()[2])
            gff[line.strip().split()[0]]["start"].append(int(line.strip().split()[3]))
            gff[line.strip().split()[0]]["end"].append(int(line.strip().split()[4]))
        # get unique states
    transition_states = list(set(states))
    for seq in seqs:
        seq = [*seq]

    

In [None]:
gff_file = "./tests/1pct_elegans.gff3"
with open(gff_file, "rt") as f:
    lines = f.readlines()
    gff = {}
    for line in lines:
        if line.strip().split()[0] not in gff:
            gff[line.strip().split()[0]] = {"states": [], "start": [], "end": []}
        gff[line.strip().split()[0]]["states"].append(line.strip().split()[2])
        gff[line.strip().split()[0]]["start"].append(int(line.strip().split()[3]))
        gff[line.strip().split()[0]]["end"].append(int(line.strip().split()[4]))
    # get unique states
states = []
for i in gff:
    states += gff[i]["states"]
transition_states = list(set(states))
transition_states


['pre_miRNA',
 'gene',
 'pseudogenic_transcript',
 'nc_primary_transcript',
 'pseudogenic_tRNA',
 'ncRNA',
 'base_call_error_correction',
 'snoRNA',
 'circular_ncRNA',
 'possible_base_call_error',
 'miRNA',
 'CDS',
 'exon',
 'transcript_region',
 'intron',
 'transposable_element',
 'three_prime_UTR',
 'five_prime_UTR',
 'transcription_end_site',
 'mRNA',
 'tRNA',
 'piRNA']

In [None]:
import gzip 

with open("../datacore/project_rloop/235.fa") as f:
    lines = f.readlines()
    for line in lines[:10]:
        print(line.strip())


>seq-1
ACACGTGACCACTGGCGTACCTAAGGGCCAGGGAATGCTGCTGACATGATGGGGGTGGCGAGGCCAAGGTCCCA
>seq-2
AAATACCTGCACAGTATGTATGATAAATGCATATGATAAAGTAAAAAAAAAAATAGCACACACTGAAAGAAAGCCAACAGAAGAGGGCACTGGGCATGGGCCAGGGAGGGCAAGAATTGGGATGGGGACATGGAGGAGC
>seq-3
GTCTGGACTTCGCTGTGAAAGACACAGGCCCTCATGTACGTCCAGGATGCGGTGACAGCGAGGCTTGCAGGAGACAGGTCCCTGCTGTGTGGGGGTGAAGCTGGAGGCAAGATGATGCCTGGAGCTAAGAGATGGTCACAGGAAATCCGGCAAGAATTAACGAGGACTGGACAGTGACAGGCAGGCCAAAGAGTGAGAGGAACTTCACTGGCAAGAGCCAACAGGGCTTGTTGATTTAGGAGAGGAGACAAAGGACTGAGGGGTTTGGGGGCTGGGGCCTGGGAGGGTGGAGAAGCCACTGTCTGCAT
>seq-4
AGCACAGGTGCCTGCTGAGTCCCATAGACCCACTAATGGCCCACCAGGACCAACCAGG
>seq-5
AGAGAACGTGCTGACTGCGCTCGTTTGGAAAGGCCTCATGGCCAAAGG


In [None]:
transition_file = "./stoch_hmm.json"
state = {}
transition = {}
emission = {}
with open(transition_file) as f:
    x = json.load(f)
    for i in range(x['states']):
        temp = x['state'][i]
        state[temp['name']] = temp['init']
        transition[temp['name']] = temp['transition']
        emission[temp['name']] = temp['emission']

emission

{'exon': {'A': 0.3, 'C': 0.2, 'G': 0.2, 'T': 0.3},
 'intron': {'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25},
 'utr': {'A': 0.2, 'C': 0.3, 'G': 0.3, 'T': 0.2}}

In [None]:
def stochastic_viterbi(json_file, seq, log=1):
    state = {}
    transition = {}
    emission = {}
    with open("./stoch_hmm.json") as f:
        x = json.load(f)
        for i in range(x['states']):
            temp = x['state'][i]
            state[temp['name']] = temp['init']
            transition[temp['name']] = temp['transition']
            emission[temp['name']] = temp['emission']
        # set illegal transition to zero probability
        for trans in transition:
            if set(transition.keys()) != set(transition[trans].keys()):
                # check for values in transition.keys that are not in transition[trans].keys() and add to transition[trans].keys()
                for i in set(transition.keys()) - set(transition[trans].keys()):
                    transition[trans][i] = 0
    if type(seq) == str:
        seq = [*seq]
    # Calculate forward probability
    forward = dict((key, []) for key in transition.keys())
    for o in seq:
        for trans in transition:
            if o == seq[0]:
                if emission[trans][o] == 0 or state[trans] == 0: 
                    fw = 0
                else:
                    fw = math.log10(emission[trans][o]) + math.log10(state[trans])
            else:
                fw = 0
                for prev in transition[trans]:
                    if transition[prev][trans] != 0:
                        fw += math.exp(forward[prev][-1] + math.log10(transition[prev][trans]))
                fw = math.log10(fw) + math.log10(emission[trans][o])
            forward[trans].append(fw)


In [None]:
state = {}
transition = {}
emission = {}

with open("./stoch_hmm.json") as f:
    x = json.load(f)
    for i in range(x['states']):
        temp = x['state'][i]
        state[temp['name']] = temp['init']
        transition[temp['name']] = temp['transition']
        emission[temp['name']] = temp['emission']
    # set illegal transition to zero probability
    for trans in transition:
        if set(transition.keys()) != set(transition[trans].keys()):
            # check for values in transition.keys that are not in transition[trans].keys() and add to transition[trans].keys()
            for i in set(transition.keys()) - set(transition[trans].keys()):
                transition[trans][i] = 0


state, transition, emission


seq = "ACAC"
path = []
if type(seq) == str:
    seq = [*seq]
# Calculate forward probability
forward = dict((key, []) for key in transition.keys())
for o in seq:
    for trans in transition:
        fw = 0
        if o == seq[0]:
            if not(emission[trans][o] == 0 or state[trans] == 0):
                fw = math.log10(emission[trans][o]) + math.log10(state[trans])
        else:
            for prev in transition[trans]:
                if transition[prev][trans] != 0:
                    fw += math.exp(forward[prev][-1] + math.log10(transition[prev][trans]))
            fw = math.log10(fw) + math.log10(emission[trans][o])
        forward[trans].append(fw)

# backward probability
backward = dict((key, []) for key in transition.keys())
for o in seq[::-1]:
    for trans in transition:
        if o == seq[-1]:
            pass
        

forward

{'exon': [0, -0.6011446912330125, 0, -0.6011446912330125],
 'intron': [-0.6020599913279624,
  -0.8117139066635318,
  -0.6020599913279624,
  -0.8117139066635318],
 'utr': [0, -0.5011246726930683, 0, -0.5011246726930683]}

### Scaling factor (Normalize)

In [1]:
import math
import gzip
import json

In [2]:
# Normalize vectors
state = {}
transition = {}
emission = {}

with open("./tests/stoch_hmm.json") as f:
    x = json.load(f)
    for i in range(x['states']):
        temp = x['state'][i]
        state[temp['name']] = temp['init']
        transition[temp['name']] = temp['transition']
        emission[temp['name']] = temp['emission']
    # set illegal transition to zero probability
    for trans in transition:
        if set(transition.keys()) != set(transition[trans].keys()):
            # check for values in transition.keys that are not in transition[trans].keys() and add to transition[trans].keys()
            for i in set(transition.keys()) - set(transition[trans].keys()):
                transition[trans][i] = 0


state, transition, emission




({'rain': 0.5, 'no_rain': 0.5},
 {'rain': {'rain': 0.7, 'no_rain': 0.3},
  'no_rain': {'rain': 0.3, 'no_rain': 0.7}},
 {'rain': {'umbrella': 0.9, 'no_umbrella': 0.1},
  'no_rain': {'umbrella': 0.2, 'no_umbrella': 0.8}})

In [3]:
# create list of size seq
seq = "umbrella, umbrella, no_umbrella, umbrella, umbrella"
path = []
seq = seq.split(", ")
# Calculate forward probability
forward = list(dict((key, 0) for key in transition.keys()) for i in range(len(seq)))
for ind, o in enumerate(seq):
    for trans in transition:
        fw = 0
        if ind == 0:
            if not(emission[trans][o] == 0 or state[trans] == 0):
                fw = emission[trans][o] * state[trans]
        else:
            for prev in transition[trans]:
                if transition[prev][trans] != 0:
                    fw += forward[ind-1][trans] * transition[prev][trans] * emission[prev][o]
        forward[ind][trans] = fw
    # normalize
    tot = sum(forward[ind].values())
    for t in forward[ind]: 
        forward[ind][t] /= tot
forward.insert(0, dict((key, state[key]) for key in transition.keys()))


forward

[{'rain': 0.5, 'no_rain': 0.5},
 {'rain': 0.8181818181818181, 'no_rain': 0.18181818181818182},
 {'rain': 0.883357041251778, 'no_rain': 0.1166429587482219},
 {'rain': 0.7991614429822741, 'no_rain': 0.20083855701772588},
 {'rain': 0.8700720584642331, 'no_rain': 0.12992794153576687},
 {'rain': 0.9184993701915494, 'no_rain': 0.0815006298084507}]

In [4]:
transition

{'rain': {'rain': 0.7, 'no_rain': 0.3},
 'no_rain': {'rain': 0.3, 'no_rain': 0.7}}

In [5]:
def stochastic_viterbi(json_file, seq, log=1):
    state = {}
    transition = {}
    emission = {}
    with open(json_file) as f:
        x = json.load(f)
        for i in range(x['states']):
            temp = x['state'][i]
            state[temp['name']] = temp['init']
            transition[temp['name']] = temp['transition']
            emission[temp['name']] = temp['emission']
        # set illegal transition to zero probability
        for trans in transition:
            if set(transition.keys()) != set(transition[trans].keys()):
                # check for values in transition.keys that are not in transition[trans].keys() and add to transition[trans].keys()
                for i in set(transition.keys()) - set(transition[trans].keys()):
                    transition[trans][i] = 0
    if type(seq) == str:
        seq = seq.split(", ")
    # Calculate forward probability
    # forward = dict((key, []) for key in transition.keys())
    # for o in seq:
    #     for trans in transition:
    #         if o == seq[0]:
    #             if emission[trans][o] == 0 or state[trans] == 0: 
    #                 fw = 0
    #             else:
    #                 fw = math.log10(emission[trans][o]) + math.log10(state[trans])
    #         else:
    #             fw = 0
    #             for prev in transition[trans]:
    #                 if transition[prev][trans] != 0:
    #                     fw += math.exp(forward[prev][-1] + math.log10(transition[prev][trans]))
    #             fw = math.log10(fw) + math.log10(emission[trans][o])
    #         forward[trans].append(fw)
    forward = list(dict((key, 0) for key in transition.keys()) for i in range(len(seq)))
    for ind, o in enumerate(seq):
        for trans in transition:
            fw = 0
            if ind == 0:
                if not(emission[trans][o] == 0 or state[trans] == 0):
                    fw = emission[trans][o] * state[trans]
            else:
                for prev in transition[trans]:
                    if transition[prev][trans] != 0:
                        print(trans, prev, forward[ind-1][prev], transition[prev][trans], emission[trans][o])
                        fw += forward[ind-1][prev] * transition[prev][trans] * emission[trans][o]
            forward[ind][trans] = fw
        # normalize
        tot = sum(forward[ind].values())
        for t in forward[ind]: 
            forward[ind][t] /= tot
    return forward

json_file = "./tests/stoch_hmm.json"
seq = "umbrella, umbrella, no_umbrella, umbrella, umbrella"
f = stochastic_viterbi(json_file, seq)
f

rain rain 0.8181818181818181 0.7 0.9
rain no_rain 0.18181818181818182 0.3 0.9
no_rain rain 0.8181818181818181 0.3 0.2
no_rain no_rain 0.18181818181818182 0.7 0.2
rain rain 0.883357041251778 0.7 0.1
rain no_rain 0.1166429587482219 0.3 0.1
no_rain rain 0.883357041251778 0.3 0.8
no_rain no_rain 0.1166429587482219 0.7 0.8
rain rain 0.19066793972352525 0.7 0.9
rain no_rain 0.8093320602764748 0.3 0.9
no_rain rain 0.19066793972352525 0.3 0.2
no_rain no_rain 0.8093320602764748 0.7 0.2
rain rain 0.730794004584982 0.7 0.9
rain no_rain 0.26920599541501794 0.3 0.9
no_rain rain 0.730794004584982 0.3 0.2
no_rain no_rain 0.26920599541501794 0.7 0.2


[{'rain': 0.8181818181818181, 'no_rain': 0.18181818181818182},
 {'rain': 0.883357041251778, 'no_rain': 0.1166429587482219},
 {'rain': 0.19066793972352525, 'no_rain': 0.8093320602764748},
 {'rain': 0.730794004584982, 'no_rain': 0.26920599541501794},
 {'rain': 0.8673388895754847, 'no_rain': 0.13266111042451528}]

In [6]:

seq = "umbrella, umbrella, no_umbrella, umbrella, umbrella"
seq = seq.split(", ")
vals = [0.8182, 0.181, 0.8834, 0.1166, 0.1907, 0.8093, 0.7308, 0.2692, 0.8673, 0.1327]
correct = list(dict((key, vals[i*2+j]) for j, key in enumerate(["rain", "no_rain"])) for i in range(len(seq)))
for i, j in zip(f, correct):
    for k in i:
        print(i, j)


{'rain': 0.8181818181818181, 'no_rain': 0.18181818181818182} {'rain': 0.8182, 'no_rain': 0.181}
{'rain': 0.8181818181818181, 'no_rain': 0.18181818181818182} {'rain': 0.8182, 'no_rain': 0.181}
{'rain': 0.883357041251778, 'no_rain': 0.1166429587482219} {'rain': 0.8834, 'no_rain': 0.1166}
{'rain': 0.883357041251778, 'no_rain': 0.1166429587482219} {'rain': 0.8834, 'no_rain': 0.1166}
{'rain': 0.19066793972352525, 'no_rain': 0.8093320602764748} {'rain': 0.1907, 'no_rain': 0.8093}
{'rain': 0.19066793972352525, 'no_rain': 0.8093320602764748} {'rain': 0.1907, 'no_rain': 0.8093}
{'rain': 0.730794004584982, 'no_rain': 0.26920599541501794} {'rain': 0.7308, 'no_rain': 0.2692}
{'rain': 0.730794004584982, 'no_rain': 0.26920599541501794} {'rain': 0.7308, 'no_rain': 0.2692}
{'rain': 0.8673388895754847, 'no_rain': 0.13266111042451528} {'rain': 0.8673, 'no_rain': 0.1327}
{'rain': 0.8673388895754847, 'no_rain': 0.13266111042451528} {'rain': 0.8673, 'no_rain': 0.1327}


In [7]:
vals = [0.8182, 0.181, 0.8834, 0.1166, 0.1907, 0.8093, 0.7308, 0.2692, 0.8673, 0.1327]
correct = list(dict((key, vals[i*2+j]) for j, key in enumerate(["rain", "no_rain"])) for i in range(len(seq)))
correct

[{'rain': 0.8182, 'no_rain': 0.181},
 {'rain': 0.8834, 'no_rain': 0.1166},
 {'rain': 0.1907, 'no_rain': 0.8093},
 {'rain': 0.7308, 'no_rain': 0.2692},
 {'rain': 0.8673, 'no_rain': 0.1327}]

In [15]:
### Backward Probability ###
seq = "umbrella, umbrella, no_umbrella, umbrella, umbrella"
seq = seq.split(", ")
smoothed = list(dict((key, 0) for key in transition.keys()) for i in range(len(seq)))
backward = list(dict((key, 0) for key in transition.keys()) for i in range(len(seq)))
backward.append(dict((key, 1) for key in transition.keys()))
smoothed.insert(0, dict((key, backward[0][key] * forward[0][key]) for key in transition.keys()))
for o, ind in zip(seq[::-1], reversed(range(len(seq)))):
    for trans in transition:
        bw = 0
        for next in transition[trans]:
            if transition[next][trans] != 0:
                # print(ind, o, trans, next, backward[ind+1][next], transition[trans][next], emission[next][o])
                bw += backward[ind+1][next] * transition[next][trans] * emission[next][o]
        backward[ind][trans] = bw
    # normalize
    tot = sum(backward[ind].values())
    for t in backward[ind]:
        backward[ind][t] /= tot
backward


4 no_rain 0.37272727272727274 0.12992794153576687
3 no_rain 0.34665718349928876 0.20083855701772588
2 no_rain 0.6237328241105898 0.1166429587482219
1 no_rain 0.4076823981660072 0.18181818181818182
0 no_rain 0.3530644441698061 0.5


([{'rain': 0.6469355558301939, 'no_rain': 0.3530644441698061},
  {'rain': 0.5923176018339928, 'no_rain': 0.4076823981660072},
  {'rain': 0.37626717588941005, 'no_rain': 0.6237328241105898},
  {'rain': 0.6533428165007112, 'no_rain': 0.34665718349928876},
  {'rain': 0.6272727272727272, 'no_rain': 0.37272727272727274},
  {'rain': 1, 'no_rain': 1}],
 [{'rain': 0.0, 'no_rain': 0.17653222208490305},
  {'rain': 0, 'no_rain': 0.07412407239381949},
  {'rain': 0, 'no_rain': 0.07275404207264348},
  {'rain': 0, 'no_rain': 0.06962212851382617},
  {'rain': 0, 'no_rain': 0.048427687299694926},
  {'rain': 0, 'no_rain': 0}])

In [16]:
vals = [1.0, 1.0, 0.6273, 0.3727, 0.6533, 0.3467, 0.3763, 0.6237, 0.5923, 0.4077, 0.6469, 0.3531]
correct = list(dict((key, vals[i*2+j]) for j, key in enumerate(["no_rain", "rain"])) for i in range(len(seq)+1))
correct.reverse()
correct

[{'no_rain': 0.6469, 'rain': 0.3531},
 {'no_rain': 0.5923, 'rain': 0.4077},
 {'no_rain': 0.3763, 'rain': 0.6237},
 {'no_rain': 0.6533, 'rain': 0.3467},
 {'no_rain': 0.6273, 'rain': 0.3727},
 {'no_rain': 1.0, 'rain': 1.0}]

In [19]:
### Smoothed Values ###
smoothed = list(dict((key, 0) for key in transition.keys()) for i in range(len(seq)))
smoothed.insert(0, dict((key, backward[0][key] * forward[0][key]) for key in transition.keys()))
for ind in range(len(forward)):
    for trans in transition.keys():
        smoothed[ind][trans] = backward[ind][trans] * forward[ind][trans]
    # normalize
    tot = sum(backward[ind].values())
    for t in backward[ind]:
        backward[ind][t] /= tot
smoothed

[{'rain': 0.32346777791509695, 'no_rain': 0.17653222208490305},
 {'rain': 0.4846234924096305, 'no_rain': 0.07412407239381949},
 {'rain': 0.33237825921383163, 'no_rain': 0.07275404207264348},
 {'rain': 0.5221263879968115, 'no_rain': 0.06962212851382617},
 {'rain': 0.5457724730366552, 'no_rain': 0.048427687299694926},
 {'rain': 0.9184993701915494, 'no_rain': 0.0815006298084507}]

### Traceback

In [10]:
import stochastic_viterbi as sv
json_file = "./tests/stoch_hmm.json"
seq = "umbrella, umbrella, no_umbrella, umbrella, umbrella"
sv.stochastic_viterbi(json_file, seq, log=0)


([{'rain': 0.5, 'no_rain': 0.5},
  {'rain': 0.8181818181818181, 'no_rain': 0.18181818181818182},
  {'rain': 0.883357041251778, 'no_rain': 0.1166429587482219},
  {'rain': 0.19066793972352525, 'no_rain': 0.8093320602764748},
  {'rain': 0.730794004584982, 'no_rain': 0.26920599541501794},
  {'rain': 0.8673388895754847, 'no_rain': 0.13266111042451528}],
 [{'rain': 0.6469355558301939, 'no_rain': 0.3530644441698061},
  {'rain': 0.5923176018339928, 'no_rain': 0.4076823981660072},
  {'rain': 0.37626717588941005, 'no_rain': 0.6237328241105898},
  {'rain': 0.6533428165007112, 'no_rain': 0.34665718349928876},
  {'rain': 0.6272727272727272, 'no_rain': 0.37272727272727274},
  {'rain': 1, 'no_rain': 1}],
 [{'rain': 0.19881051652268797, 'no_rain': 0.10850064411920954},
  {'rain': 0.22456823748297028, 'no_rain': 0.034348133248295144},
  {'rain': 0.11389274375820754, 'no_rain': 0.024929902126428292},
  {'rain': 0.07961253155910467, 'no_rain': 0.1793038391721608},
  {'rain': 0.5042478631636376, 'no_rain'

In [11]:
traceback = 10
for t in traceback:
    

SyntaxError: incomplete input (2645289944.py, line 3)