In [1]:
import sys

sys.path.append(r"C:\Repos\pomegranate")
sys.path.append(r"C:/Repos/WhoCell/")
sys.path.append(r"C:/Repos/WhoCell/who_cell/")
sys.path.append(r"C:/Repos/WhoCell/who_cell/models/")
sys.path.append(r"C:/Repos/WhoCell/who_cell/simulation/")

In [2]:
import numpy as np
import pandas as pd
import itertools
from collections import Counter
from scipy.stats import binom

import pomegranate
from pomegranate import *


from simulation_for_gibbs import Simulator_for_Gibbs


## build datasets for test

### build model

In [3]:
n_of_states = 10
sigma = 0.1
N=20
d = 5
P_C = 1

In [4]:
sfg = Simulator_for_Gibbs(N,d,n_of_states)
pome_res = sfg.build_pome_model(N, d, list(range(n_of_states)), [sigma for i in range(n_of_states)],is_acyclic = True)

### sample exmp

In [5]:
full_seq_res = pome_res['model'].sample(length =N,path=True)

full_seq_obs = full_seq_res[0]
full_seq_states = [s.name for s in full_seq_res[1]][1:]

In [6]:
binom_dist = binom(len(full_seq_obs), P_C)
n_of_obs = binom_dist.rvs(1)
n_of_obs = n_of_obs if n_of_obs > 2 else 2

few_seq_res = sfg._sample_n_points_from_traj(full_seq_obs, n_of_obs)

few_seq_obs = few_seq_res[0]
W = few_seq_res[1]
few_seq_states = [full_seq_states[w] for w in W]

## build algo

In [7]:
states = list(pome_res['start_probabilites'].keys()) 
state_to_distrbution_mapping = pome_res['state_to_distrbution_mapping']

start_p = pome_res['start_probabilites']
trans_p_sparse = pome_res['transition_matrix_sparse']

In [21]:
def calc_emission_probability(st,obs) : 
    return state_to_distrbution_mapping[st].probability(obs)

def calc_transition_probability(_f,_t,transitions_matrix,P_C) : 
    #we need to adjust the transitions for the prior of seen observation - P_C
    new_transition
    
    if _t in transitions_matrix[_f].keys() : 
        return transitions_matrix[_f][_t]
    return 0 

def extrect_best_path(V,observed_length) : 
    opt = []
    max_prob = 0.0
    best_st = None
    # Get most probable state and its backtrack
    for st, data in V[-1].items():
        if data["prob"] > max_prob:
            max_prob = data["prob"]
            best_st = st
    opt.append(best_st)
    previous = best_st

    # Follow the backtrack till the first observation
    for t in range(len(V) - 2, -1, -1):
        opt.insert(0, V[t + 1][previous]["prev"])
        previous = V[t + 1][previous]["prev"]

    print(" with highest probability of %s" % max_prob)
    return opt

def update_first_step(start_p,few_seq_obs) : 
    V = [{}]
    for st in states:    
        V[0][st] = {"prob": start_p[st] * calc_emission_probability(st,few_seq_obs[0]), "prev": None}
    return V

def calculate_transition_cost(st,prev_st,_V,obs):
    tr_prob = _V[prev_st]["prob"] * calc_transition_probability(prev_st,st,transitions_matrix,P_C)*calc_emission_probability(st,obs)
    
    return tr_prob
        
def return_optimal_transition(st,_V,states,obs):
    optimal_path_cost = 0.0
    optimal_path = states[0]
    for prev_st in states:
        _transition_cost = calculate_transition_cost(st,prev_st,_V,obs)
        
        if _transition_cost > optimal_path_cost : 
            optimal_path_cost = _transition_cost
            optimal_path = prev_st
    return optimal_path,optimal_path_cost

def ml_path_viterbi(start_p,seq,states):
    #initial probabiltes
    V = update_first_step(start_p,seq)

    # Run Viterbi when t > 0
    for t,obs in enumerate(seq):
        if t==0 : continue
        V.append({})
        for st in states:
            opt_state,opt_state_cost =  return_optimal_transition(st,V[t-1],states,obs)
            V[t][st] = {"prob": opt_state_cost, "prev": opt_state}

    return extrect_best_path(V,len(seq))

ml_path_viterbi(start_p,few_seq_obs,states)

 with highest probability of 3.1305270702913908


['(1, 0.1)',
 '(3, 0.1)',
 '(4, 0.1)',
 '(9, 0.1)',
 '(7, 0.1)',
 '(1, 0.1)',
 '(2, 0.1)',
 '(3, 0.1)',
 '(9, 0.1)',
 '(3, 0.1)',
 '(9, 0.1)',
 '(3, 0.1)',
 '(0, 0.1)',
 '(5, 0.1)',
 '(7, 0.1)',
 '(1, 0.1)',
 '(3, 0.1)',
 '(0, 0.1)',
 '(3, 0.1)',
 '(0, 0.1)']

In [22]:
full_seq_states

['(1, 0.1)',
 '(3, 0.1)',
 '(4, 0.1)',
 '(9, 0.1)',
 '(7, 0.1)',
 '(1, 0.1)',
 '(2, 0.1)',
 '(3, 0.1)',
 '(9, 0.1)',
 '(3, 0.1)',
 '(9, 0.1)',
 '(3, 0.1)',
 '(0, 0.1)',
 '(5, 0.1)',
 '(7, 0.1)',
 '(1, 0.1)',
 '(3, 0.1)',
 '(0, 0.1)',
 '(3, 0.1)',
 '(0, 0.1)']

In [None]:
results_list = [] 
for i,predicted_state in enumerate(opt):
    obs = full_seq_obs[i]
    state = full_seq_states[i]
    w = 'obs' if i in W else 'not'
    sucsses = ((w == 'obs') and ('_wc0' not in predicted_state)) or ((w == 'not') and ('_wc0' in predicted_state))
    
    _res = (i,predicted_state,state,w,obs,sucsses)
    results_list.append(_res)

results_df = pd.DataFrame(columns=['idx',"predicted_state",'known_state','W','obs','sucsses'],data = results_list).set_index('idx')
results_df
    
    

In [None]:
[s[1].name for s in pome_res['model'].viterbi(full_seq_obs)[1]]

In [None]:
for i in range(len(few_seq_obs)) : 
    print((few_seq_obs[i],W[i],few_seq_states[i]))

In [None]:
pr_v['k'] == len(few_seq_obs)

In [29]:
p=0.01

In [31]:
for x in range(5) :
    print(p*((1-p)**(x)))
    

0.01
0.0099
0.009801
0.00970299
0.0096059601
