In [None]:
#use jurafsky as a guide

In [1]:
import numpy as np

In [2]:
class HMM:
    def __init__(self, states, observations, trans, emis, initial):
        '''
        states: list of states (even though we are using np.array)
        
        observations: list of observations
        
        number_states: number of states (numpy shape: states.shape[0] gives length of array, since its a list)
        
        number_observations: number of observations (gives length of array, since its a list)
        
        trans: numpy arrary, matrix of transmission probabilities. Probability of moving from state i (row) to state j (column)
        
        emis: numpy array, matrix of emission probabilities. Probability of seeing observation o (column) from state s (row)
        
        initial: numpy array, array of all the starting probabilities. 
        '''
        self.states = np.array(states)
        self.observations = np.array(observations)
        self.number_states = self.states.shape[0]
        self.number_observations = self.observations.shape[0]
        self. trans = np.array(trans)
        self.emis = np.array(emis)
        self.initial = np.array(initial)
    
    def get_obs(self, o):
        #gets the index of the observation
       
        return np.argwhere(self.observations == o).flatten().item()
        #np.argwhere goes through each index and finds where its equal to o
        #.flatten() concatenates it into a single array
        #.item() turns the argwhere().flatten() to an int
    
    def forward_alg(self, obs):
        #observations: numpy array of observations of length T
        
        T = len(obs) # len(observations) == np.array(observations).shape[0]
        alpha = np.zeros((self.number_states, T)) #creates a number_states x T matrix of zeros 
        #alphas is forward probability matrix
        
        #initialize
        o_0 = self.get_obs(obs[0]) #gets the index (from list of observation space) of first observation in seq
        alpha[:, 0] = initial * emis[:, o_0] #the first column for each row of the forward prob matrix is the initial prob * the emission prob of seeing that obs from the initial state
        
        #loop through every observation in observation sequence
        for t in range(1, T): #we do range(1, T) because we already initialized it
            o_t = self.get_obs(obs[t])
            alpha[:, t] = alpha[:, t-1].dot(self.trans) * self.emis[:, o_t] #forward algo is prev forward prob (alpha[:, t-1]) * trans prob from previous state i ito current state q
        
        #sum the probabilities 
        prob = alpha[:, T-1].sum() # sums the last column column of each row (each row corresponds to a starting state and colums are obs sequence)
        
        return prob, alpha
    
    def likelihood(self, obs):
        #forward_alg return two things: prob and alpha (matrix), and we may just want the prob of the sequence
        prob, alpha = self.forward_alg(obs)
        
        return prob
    
    def backward_alg(self, obs):
        #observations: numpy array of observations of length T
        T = len(obs)
        beta = np.zeros((self.number_states, T))
        
        #initialize
        beta[:, T-1] = 1
        
        #loop through the observation sequence backwards
        for t in range(T-2, -1, -1): #range(start, stop, step)
            o_t1 = self.get_obs(obs[t+1])
            beta[:, t] = self.trans.dot(self.emis[:, o_t1] * beta[:, t+1])

        
        o_0 = self.get_obs(obs[0])
        prob = self.initial.dot(self.emis[:, o_0] * beta[:, 0])
        #prob = (self.initial * beta[:, 0]).dot(self.emis[:, o_0])

        return prob, beta
    
    def viterbi(self, obs):
        #viterbi: v_t(j) =max_(1≤i≤N−1)v_t−1(i) * a_ij * b_j(o_t)
        #Decoding:  Given as input an HMMλ= (A,B)and a sequence of observations O=o1,o2,...,oT, find the most probable sequence of states Q=q1q2q3...qT.
        T = len(obs)
        v = np.zeros((self.number_states, T))
        prev = np.zeros((T-1, self.number_states))
        
        #initialize viterbi matrix and observation
        o_0 = self.get_obs(obs[0])
        v[:, 0] = self.initial * emis[:, o_0]
        
        #loop through obs and each state
        for t in range(1, T):
            for n in range(self.number_states):
                o_t = self.get_obs(obs[t])
                seq_prob = v[:, t-1] * self.trans[:, n] * self.emis[n, o_t]
                prev[t-1, n] = np.argmax(seq_prob)
                v[n, t] = np.max(seq_prob)
                
        last_state = int(np.argmax(v[:, T-1]))
        path = []
        path.append(last_state)
        #path_idx = 1
        
        for t in range(T-2, -1, -1):
            path.append(prev[t, last_state])
            last_state = int(prev[t, last_state])
            #path_idx += 1
        
        path = list(reversed(path))
        prob = v[:, T-1].max()
        #path = self.states[v.argmax(axis=0)]
        
        return path, prob, v, prev
    
    def baum_welch(self, obs, iterations=1):
        #HMM training to learn the trans and emis probabilities given an observation sequence 
        #aka forward-backward algorithm
        
        T = len(obs)
        
        #expectation step (E-step)
        f_prob, alpha = self.forward_alg(obs)
        b_prob, beta = self.backward_alg(obs)
        
        #see pg 13 & 14 of https://web.stanford.edu/~jurafsky/slp3/A.pdf
        gamma = alpha * beta / (alpha * beta).sum(axis=0)
        #xi will have three axes bc xi is the probability of being in state i at time t and state j at time t+1
        #we need 3 axes because: 1) we need one for state i. 2) one for state j. 3) one for t
        xi = np.zeros((self.number_states, self.number_states, T-1))
        
        for t in range(T-1):
            o_t1 = self.get_obs(obs[t+1])
            demon = np.dot(np.dot(alpha[:, t], self.trans) * self.emis[:, o_t1], beta[:, t+1])
            for i in range(self.number_states):      
                numer = alpha[i, t] * self.trans[i, :] * self.emis[:, o_t1] * beta[:, t+1]
                xi[i, :, t] = numer / demon
                
        #maximization step (M-step)
        
        self.initial = gamma[:, 0]
        self.trans = xi.sum(axis=2) / gamma[:, :-1].sum(axis=1).reshape(-1, 1)
        for i, o in enumerate(self.observations):
            idx = np.argwhere(obs == o).flatten()
            self.emis[:, i] = gamma[:, idx].sum(axis=1) / gamma.sum(axis=1)
    

In [None]:
"""
update the current emis and trans (so self.emis and self.trans - what im doing now)

or

create new emis and trans for only buam-welch, use those for calc, and return them as emis and trans prob using
just that sequence
"""

Testing

In [3]:
states = ['hot', 'cold'] 
observation_space = np.array([1, 2, 3])

initial = np.array([0.8, 0.2])
trans = np.array([[0.6, 0.4], [0.5, 0.5]])
emis = [[0.2, 0.4, 0.4], [0.5, 0.4, 0.1]]
emis = np.array(emis)
hmm = HMM(states, observation_space, trans, emis, initial)


In [4]:
obs = np.array([1, 2, 3, 2, 2, 1, 2])

Viterbi

In [5]:
path, prob, v, prev = hmm.viterbi(obs)

print('Path: {}'.format(path))
print()
print('Probability: {}'.format(prob))
print()
print('Viterbi matrix: {}'.format(v))
print()
print('Prev: {}'.format(prev))

Path: [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0]

Probability: 2.1233664000000008e-05

Viterbi matrix: [[1.6000000e-01 3.8400000e-02 9.2160000e-03 2.2118400e-03 5.3084160e-04
  6.3700992e-05 2.1233664e-05]
 [1.0000000e-01 2.5600000e-02 1.5360000e-03 1.4745600e-03 3.5389440e-04
  1.0616832e-04 2.1233664e-05]]

Prev: [[0. 0.]
 [0. 0.]
 [0. 0.]
 [0. 0.]
 [0. 0.]
 [1. 1.]]


In [6]:
path, alpha = hmm.forward_alg(obs)
alpha

array([[0.16      , 0.0584    , 0.023136  , 0.00647584, 0.00247919,
        0.00049362, 0.00031569],
       [0.1       , 0.0456    , 0.004616  , 0.00462496, 0.00196113,
        0.00098612, 0.0002762 ]])

Baum-Welch

In [7]:
print("before learning")
print()
print("initial:\n", hmm.initial)
print()
print("transmission:\n", hmm.trans)
print()
print("emission:\n", hmm.emis)

before learning

initial:
 [0.8 0.2]

transmission:
 [[0.6 0.4]
 [0.5 0.5]]

emission:
 [[0.2 0.4 0.4]
 [0.5 0.4 0.1]]


In [88]:
learn_obs = np.array([3, 2, 1, 1, 2, 3, 3])
hmm.baum_welch(obs, iterations=1)

In [89]:
print("after learning")
print()
print("learning obs: ", learn_obs)
print()
print("initial:\n", hmm.initial)
print()
print("transmission:\n", hmm.trans)
print()
print("emission:\n", hmm.emis)

after learning

learning obs:  [3 2 1 1 2 3 3]

initial:
 [0.65756278 0.34243722]

transmission:
 [[0.51895955 0.48104045]
 [0.77911806 0.22088194]]

emission:
 [[0.31412435 0.46498682 0.22088882]
 [0.2394082  0.7449199  0.0156719 ]]


Misc.

In [None]:
states = ['hot', 'cold'] 
observation_space = np.array([1, 2, 3])

initial = np.array([.6, .4])
trans = np.array([[0.6, 0.4], [0.5, 0.5]])
emis = [[0.2, 0.4, 0.7], [0.5, 0.3, 0.1]]
emis = np.array(emis)

In [None]:
def get_obs(o):
        #gets the index of the observation
        return np.argwhere(observation_space == o).flatten().item()
        
T = len(obs)
beta = np.zeros((len(states), T))

# initialization

beta[:, T-1] = 1
#print(beta)


for t in range(T-2, -1, -1): 
            o_t1 = get_obs(obs[t+1])
            beta[:, t] = trans.dot(emis[:, o_t1] * beta[:, t+1])

#print(beta)
            
o_0 = get_obs(obs[0])
prob = (initial * beta[:, 0]).dot(emis[:, o_0]) 
prob2 = initial.dot(emis[:, o_0] * beta[:, 0])
print(prob)
print(prob2)

#0.0010070623740799996

In [None]:
states = ['hot', 'cold'] 
observation_space = np.array([1, 2, 3])

initial = np.array([.6, .4])
trans = np.array([[0.6, 0.4], [0.5, 0.5]])
emis = [[0.2, 0.4, 0.7], [0.5, 0.3, 0.1]]
emis = np.array(emis)

In [None]:
print(obs)
o_0 = get_obs(obs[0])
o_0

In [None]:
b = []
b.append(0)
t = 5
for i in range(t-2, -1, -1):
    b.append(i)
    
b

In [None]:
'''print(b)
list(reversed(b))'''

In [None]:
x = emis * emis
print(x)
print()
y = x.sum(axis=0)
print(y)
print()
print(x[:, 1])
print()
print(x[:, :-1].sum(axis=1).reshape(-1, 1))


In [None]:
q = np.random.rand(3, 3, 3)
print(q[0][0])
print(q)



In [None]:
gamma = [[0, 0] for i in range(5)]
gamma = np.array(gamma)
gamma

In [None]:
range(3)