<a href="https://colab.research.google.com/github/sanosenx86/si-s-e-fivefivetwotwo/blob/lab3/5522_lab3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Part 1: 
###Implement the Viterbi algorithm for HMMs for Eisner's Ice Cream Problem (predict whether each day is hot or cold based on the number of ice creams eaten).  Remember that the Viterbi algorithm computes the most likely sequence for an input.

**1.1** import packages

In [0]:
import io
import pandas as pd
import numpy as np
np.set_printoptions(suppress=True)

**Warning: High concentration of abusive object-oriented programming here.**

In [0]:
class HMM1:
  @staticmethod
  def get_prob_range(prob):
    p=prob.copy()
    for i in range(1, p.shape[0]):
      p[i]+=p[i-1]
    return p

  def __init__(self, hidden_states, trans_prob, emit_prob, start_prob, stop_prob = None):
    self.states = hidden_states
    self.trans_p = trans_prob
    self.emit_p = emit_prob
    self.start_p = start_prob
    self.stop_p = stop_prob
    self._log_trans_p = np.log(trans_prob)
    self._log_emit_p = np.log(emit_prob)
    self._log_start_p = np.log(start_prob)
    self._log_stop_p = np.log(stop_prob) if stop_prob is not None else None

  ##--------------------For Part 1--------------------##
  def __viterbi_compute(self, obs):
    obs = obs.ravel()[np.flatnonzero(obs)]-1
    max_p = np.zeros((len(obs), len(self.states)))
    path = np.zeros((len(self.states), len(obs)))

    #initialize
    max_p[0] = (self._log_start_p+self._log_emit_p[obs[0]]) 
    path[:,0] = range(len(self.states))

    #recursive
    for t in range(1, len(obs)):
      newpath=0*path
      for y in range(len(self.states)):
        newprob = max_p[t-1] + self._log_trans_p[:, y] + self._log_emit_p[obs[t], y]
        max_p[t, y] = np.max(newprob)
        newpath[y] = path[np.argmax(newprob)]
        newpath[y, t] = y
      path = newpath
    
    #stop prob
    if self._log_stop_p is not None:
      max_p[-1] += self._log_stop_p

    best = np.argmax(max_p[-1])
    return max_p[-1, best], path[best], [self.states[int(i)] for i in path[best]]
  
  def viterbi(self, obs):
    probs = []
    indices = []
    hiddens = []
    for l in obs:
      res = self.__viterbi_compute(l)
      probs.append(res[0])
      indices.append(res[1])
      hiddens.append(res[2])
    self.viterbi=indices
    return np.exp(probs), indices, hiddens

**1.2** preprocess dataframe

In [0]:
tp_df = pd.read_csv('https://raw.githubusercontent.com/sanosenx86/5522sp20public/master/lab3data/transitionProbs.csv')
print(tp_df.shape)
tp_df.head()

(3, 4)


Unnamed: 0,P(x|...),C,H,START
0,C,0.86,0.07,0.5
1,H,0.07,0.86,0.5
2,STOP,0.07,0.07,0.0


In [0]:
tp=tp_df.values
start_prob=tp[:2, 3].astype('float64')
stop_prob=tp[2, 1:3].astype('float64')
tp=tp[:2, 1:3].astype('float64')

display(tp, start_prob, stop_prob)

array([[0.86, 0.07],
       [0.07, 0.86]])

array([0.5, 0.5])

array([0.07, 0.07])

In [0]:
op_df = pd.read_csv('https://raw.githubusercontent.com/sanosenx86/5522sp20public/master/lab3data/observationProbs.csv')
print(op_df.shape)
op_df.head()

(3, 3)


Unnamed: 0,P(x|...),C,H
0,1,0.6407,0.0002
1,2,0.1481,0.5341
2,3,0.2122,0.4657


In [0]:
op=op_df.values
op=op[:, 1:].astype('float64')
display(op)

array([[0.6407, 0.0002],
       [0.1481, 0.5341],
       [0.2122, 0.4657]])

In [0]:
td_df = pd.read_csv('https://raw.githubusercontent.com/sanosenx86/5522sp20public/master/lab3data/testData.csv')
print(td_df.shape)
td_df.head()

(10, 6)


Unnamed: 0,SeqNumber,Obs1,Obs2,Obs3,Obs4,Obs5
0,1,2,3,3,2,3
1,2,2,3,2,2,0
2,3,3,1,3,3,1
3,4,2,1,1,0,0
4,5,1,1,1,2,3


In [0]:
td=td_df.values
td=td[:, 1:].astype('int')
display(td)

array([[2, 3, 3, 2, 3],
       [2, 3, 2, 2, 0],
       [3, 1, 3, 3, 1],
       [2, 1, 1, 0, 0],
       [1, 1, 1, 2, 3],
       [1, 3, 1, 1, 0],
       [3, 2, 3, 0, 0],
       [2, 2, 1, 1, 0],
       [1, 3, 2, 3, 3],
       [1, 1, 1, 0, 0]])

**1.3** implement viterbi algorithm. Use log probability to avoid underflow.

**1.4** test sequences from testdata.

In [0]:
states=['C','H']
model = HMM1(states, tp, op, start_prob, stop_prob)
path = model.viterbi(td)
display(path[0], path[2])

array([0.0005516 , 0.00157956, 0.00007509, 0.00157373, 0.00015824,
       0.00124243, 0.00299847, 0.00021219, 0.00005386, 0.00680815])

[['H', 'H', 'H', 'H', 'H'],
 ['H', 'H', 'H', 'H'],
 ['C', 'C', 'C', 'C', 'C'],
 ['C', 'C', 'C'],
 ['C', 'C', 'C', 'C', 'C'],
 ['C', 'C', 'C', 'C'],
 ['H', 'H', 'H'],
 ['H', 'H', 'C', 'C'],
 ['C', 'H', 'H', 'H', 'H'],
 ['C', 'C', 'C']]

#Part 2:  
###Using the same network, implement likelihood sampling for approximate inference.  For any test sequence, sample complete sequences of the hidden states n times, where n can range from 10 to 100000 samples. The goal is to approximate the likelihood of all possible sequences.

In [0]:
class HMM2(HMM1):

  ##--------------------For Part 2--------------------##
  def __generate_sequence(self, obs):
    obs = obs.ravel()[np.flatnonzero(obs)]-1
    obsn = len(obs)
    ns = len(self.states)
    rnd = np.random.rand(obsn)
    seq = []
    sprange = self.get_prob_range(self.start_p)
    tprange = self.get_prob_range(self.trans_p)

    #initialize
    for i in range(ns):
      if rnd[0] < sprange[i]:
        seq += [i]
        break

    #transmit
    for n in range(1, obsn):
      tp = tprange[:, seq[n-1]] #trans_prob from last state
      for i in range(ns):
        if rnd[n] < tp[i]:
          seq += [i]
          break
    
    sseq = [self.states[i] for i in seq]
    weight = self.__compute_weight(seq, obs)
    return seq, sseq, weight

  def __compute_weight(self, seq, obs):
    w = np.sum([self._log_emit_p[obs[i], int(seq[i])] for i in range(len(seq))])
    return w

  def __sample_one(self, obs, trials):
    table=dict()
    
    for i in range(trials):
      seq=self.__generate_sequence(obs)
      id=''.join(map(str,seq[0]))
      if id not in table: table[id]=0
      table[id] += np.exp(seq[2])
    
    beststr = max(table, key=table.get)
    bestseq = list(map(int, beststr))
    return bestseq, [self.states[j] for j in bestseq]

  def sampling(self, obs, trials=10):
    seq = []
    sseq = []
    for o in obs:
      res = self.__sample_one(o, trials)
      seq += [res[0]]
      sseq += [res[1]]
    return seq, sseq

  def __convergent_trials(self, obs, gold, min_sample, max_sample):
      best_sseq=[]
      best_error=np.Inf
      best_time=0
      for i in range(min_sample, max_sample+1):
        if best_error==0: break #Converged now.
        s, ss = self.__sample_one(obs, i)
        err=np.count_nonzero(s-gold)
        if err<=best_error: #don't use max here because it goes too large.
          best_sseq = ss
          best_error = err
          best_time = i
      return [best_sseq, best_error, best_time]

  def convergent(self, obs, min_sample=10, max_sample=100000):
    ctable=[] #sequence, weight, error, convergent time
    for o in range(obs.shape[0]):
      ctable += [list(self.__convergent_trials(obs[o], self.viterbi[o], min_sample, max_sample))]
    return ctable

  def eval_seq(self, obs, seq):
    all_weight=[]
    for o in range(obs.shape[0]):
      ob=obs[o]
      ob=ob.ravel()[np.flatnonzero(ob)]-1
      all_weight+=[self.__compute_weight(eval[o], ob)]
    return np.exp(all_weight)

In this part, the weight of each sequence is solely by the product of the emit probability because the transition probability is embedded from the sampling process -- the possibility to generate each sequence is described by the transition probability.

In [0]:
#tp_normalized
tpn = tp/tp.sum(0)
seq = HMM2(states, tpn, op, start_prob)
path = seq.viterbi(td)
display(path[2])

[['H', 'H', 'H', 'H', 'H'],
 ['H', 'H', 'H', 'H'],
 ['C', 'C', 'C', 'C', 'C'],
 ['C', 'C', 'C'],
 ['C', 'C', 'C', 'C', 'C'],
 ['C', 'C', 'C', 'C'],
 ['H', 'H', 'H'],
 ['H', 'H', 'C', 'C'],
 ['C', 'H', 'H', 'H', 'H'],
 ['C', 'C', 'C']]

**2.1** Judge by the overall weight which the most likely weather sequence would be.  Does the best string match your Viterbi answer?

In [0]:
display(seq.sampling(td, 100)[1])

[['H', 'H', 'H', 'H', 'H'],
 ['H', 'H', 'H', 'H'],
 ['C', 'C', 'C', 'C', 'C'],
 ['C', 'C', 'C'],
 ['C', 'C', 'C', 'C', 'C'],
 ['C', 'C', 'C', 'C'],
 ['H', 'H', 'H'],
 ['C', 'C', 'C', 'C'],
 ['C', 'H', 'H', 'H', 'H'],
 ['C', 'C', 'C']]

Yes. If sample number is big enough, it eventually converges to the viterbi answer.

**2.2** Assuming the Viterbi sequence is "correct", how long (how many samples) does it take the sampler to converge so that you get the highest match between samples and the Viterbi sequence?

In [0]:
display([(i[0], i[2]) for i in seq.convergent(td)])

[(['H', 'H', 'H', 'H', 'H'], 10),
 (['H', 'H', 'H', 'H'], 10),
 (['C', 'C', 'C', 'C', 'C'], 10),
 (['C', 'C', 'C'], 10),
 (['C', 'C', 'C', 'C', 'C'], 10),
 (['C', 'C', 'C', 'C'], 10),
 (['H', 'H', 'H'], 10),
 (['H', 'H', 'C', 'C'], 11),
 (['C', 'H', 'H', 'H', 'H'], 15),
 (['C', 'C', 'C'], 10)]

Usually it takes ~10 samples to converge to viterbi solution.

#Part 3:
###Implement Forward-Backward algorithm.

In [0]:
class HMM3(HMM1):
  
  ##--------------------For Bonus--------------------##
  def forward(self, obs):
    obs = obs.ravel()[np.flatnonzero(obs)]-1
    p = np.zeros((len(obs), len(self.states)))

    #initialize
    p[0]= np.multiply(self.start_p, self.emit_p[int(obs[0])])

    #compute
    for t in range(1, len(obs)):
      for i in range(len(self.states)):
        p[t,i] = np.dot(p[t-1], self.trans_p[i]) * self.emit_p[int(obs[t]), i]
    
    # if self.stop_p is not None:
    #   for i in range(len(self.states)):
    #     p = np.append(p, np.zeros((1,len(states))), axis=0)
    #     p[-1,i] = np.dot(p[-2].squeeze(), self.stop_p)

    return np.sum(p[-1]), p

  def backward(self, obs):
    obs = obs.ravel()[np.flatnonzero(obs)]-1
    p = np.zeros((len(obs), len(self.states)))

    #initialize
    p[-1,:] = 1

    #compute
    for t in reversed(range(len(obs)-1)):
      for i in range(len(self.states)):
        p[t,i]=np.sum(p[t+1]*self.trans_p[i]*self.emit_p[obs[t+1]])

    prob = np.sum(self.start_p*p[0]*self.emit_p[int(obs[0])])

    return prob, p

  def forward_backward(self, obs):
    for o in obs:
      print(o, self.forward(o)[0], self.backward(o)[0])

In [0]:
states2 = ['C', 'H']

fbmodel = HMM3(states, tpn, op, start_prob)
fbmodel.forward_backward(td)

[2 3 3 2 3] 0.011725192152318572 0.011725192152318572
[2 3 2 2 0] 0.030404240035234335 0.030404240035234346
[3 1 3 3 1] 0.0017943698358089577 0.0017943698358089582
[2 1 1 0 0] 0.03362698651738017 0.03362698651738018
[1 1 1 2 3] 0.005581655289316001 0.0055816552893160004
[1 3 1 1 0] 0.02238946940670452 0.022389469406704512
[3 2 3 0 0] 0.05722914495150521 0.05722914495150521
[2 2 1 1 0] 0.008459860910637784 0.008459860910637784
[1 3 2 3 3] 0.0021199306558590957 0.0021199306558590957
[1 1 1 0 0] 0.11245743037605936 0.11245743037605936


In [0]:
states3 = ['C', 'H', 'W']

tp3 = np.array([[0.8, 0.1, 0.1],
                [0.1, 0.8, 0.1],
                [0.1, 0.1, 0.8]])

#tp3 = np.ones((3,3))/3.0

op3 = np.array([[0.6, 0.1, 0.2],
                [0.3, 0.2, 0.7],
                [0.1, 0.7, 0.1]])
#op3 = np.ones((3,3))/3.0

sp3 = np.array([0.4, 0.4, 0.2])
#sp3 = np.ones(3)/3.0

fbmodel = HMM3(states3, tp3, op3, sp3)
fbmodel.forward_backward(td)

[2 3 3 2 3] 0.0043244952 0.0043244952
[2 3 2 2 0] 0.012142820000000002 0.012142820000000002
[3 1 3 3 1] 0.0017939696 0.0017939696000000005
[2 1 1 0 0] 0.04362 0.043620000000000006
[1 1 1 2 3] 0.004257696000000001 0.004257696
[1 3 1 1 0] 0.00745 0.007450000000000002
[3 2 3 0 0] 0.035031999999999994 0.035032
[2 2 1 1 0] 0.016125 0.016125000000000004
[1 3 2 3 3] 0.003141420000000001 0.0031414200000000002
[1 1 1 0 0] 0.06492 0.06492


Forward and backward algorithms always return the same result if the probability data is correct.

In [0]:
a=range(10)
print(list(a[:-1]))

[0, 1, 2, 3, 4, 5, 6, 7, 8]
