# ДЗ № 13, Волжина Лена

Реализуйте алгоритмы Витерби и Forward-Backward для задачи о кривой монете. [Задание](https://compscicenter.ru/learning/assignments/27578/)

![Viterbi](hw13_Viterbi.png)

In [1]:
from collections import defaultdict

class Solver(object):
    def __init__(self, a, pi, b):
        self.a = a
        self.b = b
        self.pi = pi
        self.states = self.a.keys()
    
    def process(self, open_states):
        raise NotImplementedError

In [2]:
class ViterbiSolver(Solver):
    def move_forward(self, open_states):
        n_steps = len(open_states)
        # [t][j] = [step][state]
        deltas, sources = defaultdict(dict), defaultdict(dict)
        
        # init deltas:
        for state in self.states:
            deltas[0][state] = (self.pi[state] * self.b[state][open_states[0]])
        
        # process all the rest open states:
        for step in range(1, n_steps):
            open_state = open_states[step]
            for state in self.states:
                max_prob, max_source = None, None
                for source in self.states:
                    prob = (deltas[step - 1][source] * self.a[source][state] * 
                            self.b[state][open_state])
                    if max_prob is None or max_prob < prob:
                        max_prob, max_source = prob, source
                deltas[step][state] = max_prob
                sources[step][state] = max_source
                
        return deltas, sources
                
    def move_backward(self, deltas, sources):
        n_steps = len(deltas.keys())
        states = [max(self.states, key=deltas[n_steps - 1].get)]
        for step in range(n_steps - 2, -1, -1):
            state = sources[step + 1][states[-1]]
            states.append(state)
        return list(reversed(states))
                
    def process(self, open_states):
        deltas, sources = self.move_forward(open_states)
        states = self.move_backward(deltas, sources)
        return states

In [3]:
a = {'+': {'+': 0.8, '-': 0.2}, '-': {'-': 0.7, '+': 0.3}}
pi = {'+': 0.5, '-': 0.5}
b = {'+': {0: 0.5, 1: 0.5}, '-': {0: 0.9, 1: 0.1}}    # 0 = орел, 1 = решка

open_states = [0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0]

v_solver = ViterbiSolver(a, pi, b)
print(v_solver.process(open_states))
# v_solver.move_forward(open_states)

['+', '+', '+', '+', '+', '+', '-', '-', '-', '-', '-', '-']


In [4]:
# https://en.wikipedia.org/wiki/Viterbi_algorithm#Example
states = ('Healthy', 'Fever')
observations = ('normal', 'cold', 'dizzy')
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
transition_probability = { 
    'Healthy' : {'Healthy': 0.7, 'Fever': 0.3},
    'Fever' : {'Healthy': 0.4, 'Fever': 0.6}
}
emission_probability = {
    'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
    'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6}
}

v_solver1 = ViterbiSolver(transition_probability, start_probability, emission_probability)
print('Result:', v_solver1.process(observations))
# v_solver1.move_forward(observations)

Result: ['Healthy', 'Healthy', 'Fever']


# Forward - Backward

In [15]:
class FBSolver(Solver):
    def calculate_alphas_betas(self, open_states):
        n_steps = len(open_states)
        # [t][j] = [step][state]
        alphas, betas = defaultdict(dict), defaultdict(dict)
        
        # init alphas
        for state in self.states:
            alphas[0][state] = (self.pi[state] * self.b[state][open_states[0]])
    
        # calculate alphas
        for step in range(1, n_steps):
            open_state = open_states[step]
            for state in self.states:
                open_prob = self.b[state][open_state]
                prob = sum(alphas[step - 1][source] * self.a[source][state] * open_prob
                           for source in self.states)
                alphas[step][state] = prob
        
        # init betas
        for state in self.states:
            betas[n_steps][state] = 1    # mmm....
        
        # calculate betas
        for step in range(n_steps - 1, -1, -1):
            open_state = open_states[step]         # ошибка в слайдах
            for state in self.states:
                open_prob = self.b[state][open_state]
                prob = sum(betas[step + 1][dest] * self.a[state][dest] * open_prob #self.b[dest][open_state]
                           for dest in self.states)
                betas[step][state] = prob
        
        p_fwd = sum(alphas[len(open_states) - 1][state] for state in self.states)
        p_bwd = sum(betas[0][state] * self.pi[state] for state in self.states)
        print('probabilities:', p_fwd, p_bwd)
        return alphas, betas            
    
    def process(self, open_states):
        alphas, betas = self.calculate_alphas_betas(open_states)
        norm = sum(alphas[len(open_states) - 1].values())
        result = []
        for step in range(len(open_states)):
            probs = {}
            for state in self.states:
                probs[state] = alphas[step][state] * betas[step][state] / norm
            result.append(probs)
        
        result = [{state: alphas[step][state] * betas[step][state] for state in self.states}
                  for step in range(len(open_states))]
        return result
        

In [16]:
a = {'+': {'+': 0.8, '-': 0.2}, '-': {'-': 0.7, '+': 0.3}}
pi = {'+': 0.5, '-': 0.5}
b = {'+': {0: 0.5, 1: 0.5}, '-': {0: 0.9, 1: 0.1}}    # 0 = орел, 1 = решка

open_states = [0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0]

fb_solver = FBSolver(a, pi, b)
#fb_solver.process(open_states)

#fb_solver.calculate_alphas_betas(open_states)
res = fb_solver.process(open_states)
print(res, '\n', [sum(r.values()) for r in res])
fb_solver.calculate_alphas_betas(open_states)

probabilities: 0.0008441922410745069 0.000844192241074507
[{'-': 0.00036763587671213014, '+': 0.0002178539668082923}, {'-': 1.4844170694170889e-05, '+': 0.00034787526706639904}, {'-': 0.00021336617449890994, '+': 0.00030355935692674804}, {'-': 1.0914402365206479e-05, '+': 0.0003675241087112211}, {'-': 0.00021651255324111243, '+': 0.0003018113687366355}, {'-': 1.5641296285403982e-05, '+': 0.0003438896391102336}, {'-': 0.00039514379066272104, '+': 0.00020257179239129743}, {'-': 0.0005055308020037426, '+': 0.00014124567497961871}, {'-': 0.0005508716789240136, '+': 0.00011605629891280154}, {'-': 0.0005635901862967409, '+': 0.00010899046148350855}, {'-': 0.0005527814700609106, '+': 0.00011499530383674762}, {'-': 0.0005107160971394333, '+': 0.00013836495545979058}] 
 [0.0005854898435204225, 0.00036271943776056993, 0.0005169255314256579, 0.0003784385110764276, 0.000518323921977748, 0.00035953093539563757, 0.0005977155830540184, 0.0006467764769833614, 0.0006669279778368151, 0.00067258064778024

(defaultdict(dict,
             {0: {'+': 0.25, '-': 0.45},
              1: {'+': 0.1675, '-': 0.036500000000000005},
              2: {'+': 0.07247500000000001, '-': 0.05314500000000001},
              3: {'+': 0.03696175000000001, '-': 0.005169650000000001},
              4: {'+': 0.015560147500000005, '-': 0.009909994500000003},
              5: {'+': 0.0077105581750000025, '-': 0.0010049025650000004},
              6: {'+': 0.003234958654750001, '-': 0.0020209890874500007},
              7: {'+': 0.0015971318250175006, '-': 0.0018555156829485006},
              8: {'+': 0.0009171800824492754, '-': 0.0014564586087607054},
              9: {'+': 0.000585340824293816, '-': 0.001082661338360114},
              10: {'+': 0.0003965355304715435, '-': 0.0007874379915397586},
              11: {'+': 0.00027672991091958117, '-': 0.0005674623301549258}}),
 defaultdict(dict,
             {0: {'+': 0.0008714158672331692, '-': 0.0008169686149158448},
              1: {'+': 0.002076867266068054,

In [None]:
observations = ('normal', 'cold', 'dizzy')
 
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
 
transition_probability = {
   'Healthy' : {'Healthy': 0.69, 'Fever': 0.3, 'E': 0.01},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.59, 'E': 0.01},
   }
 
emission_probability = {
   'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
   'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
   }

fb_solver_1 = FBSolver(transition_probability, start_probability, emission_probability)
fb_solver_1.process(observations)

In [None]:
fb_solver_1 = FBSolver(transition_probability, start_probability, emission_probability)
fb_solver_1.calculate_alphas_betas(observations)
