In [1]:
# ice-cream example

states = ('HOT', 'COLD')
symbols = ('1', '2', '3')

start_prob = {
    'HOT': 0.8,
    'COLD': 0.2
}

trans_prob = {
    'HOT': { 'HOT': 0.6, 'COLD': 0.4 },
    'COLD': { 'HOT': 0.5, 'COLD': 0.5 },
}

emit_prob = {
    'HOT': { '1': 0.2, '2': 0.4, '3': 0.4 },
    'COLD': { '1': 0.5, '2': 0.4, '3': 0.1 },
}

sequence = ['3', '1', '3']

In [None]:
sequences = [
    (['HOT', 'HOT', 'COLD', 'COLD', 'COLD'], ['3', '2', '2', '1', '1']),
    (['COLD', 'COLD', 'COLD', 'COLD', 'HOT'], ['1', '2', '1', '1', '2']),
    (['COLD', 'HOT', 'COLD', 'HOT', 'COLD'], ['1', '2', '2', '3', '1']),
    (['HOT', 'HOT', 'HOT', 'HOT', 'COLD'], ['3', '2', '3', '3', '1']),
    (['HOT', 'COLD', 'COLD', 'HOT', 'COLD'], ['2', '1', '1', '3', '1']),
]

In [2]:
from math import log

def _normalize_prob(prob, item_set):
    result = {}
    if prob is None:
        number = len(item_set)
        for item in item_set:
            result[item] = 1.0 / number
    else:
        prob_sum = 0.0
        for item in item_set:
            prob_sum += prob.get(item, 0)
            
        if prob_sum > 0:
            for item in item_set:
                result[item] = prob.get(item, 0) / prob_sum
        else:
            for item in item_set:
                result[item] = 0
                
    return result

def _normalize_prob_two_dim(prob, item_set1, item_set2):
    result = {}
    if prob is None:
        for item in item_set1:
            result[itme] = _normalize_prob(None, item_set2)
    else:
        for item in item_set1:
            result[item] = _normalize_prob(prob.get(item), item_set2)
            
    return result

def _count(item, count):
    if item not in count:
        count[item] = 0
    count[item] += 1
    
def _count_two_dim(item1, item2, count):
    if item1 not in count:
        count[item1] = {}
    _count(item2, count[item1])
    
def _get_init_model(sequences):
    symbol_count = {}
    state_count = {}
    state_symbol_count = {}
    state_start_count = {}
    state_trans_count = {}
    
    for state_list, symbol_list in sequences:
        pre_state = None
        for state, symbol in zip(state_list, symbol_list):
            _count(state, state_count)
            _count(symbol, symbol_count)
            _count_two_dim(state, symbol, state_symbol_count)
            if pre_state is None:
                _count(state, state_start_count)
            else:
                _count_two_dim(pre_state, state, state_trans_count)
            pre_state = state
            
    return Model(state_count.keys(), symbol_count.keys(),
                state_start_count, state_trans_count, state_symbol_count)

def train(sequences, delta=0.0001, smoothing=0):
    """
    Use the given sequences to train a HMM model.
    This method is an implementation of the `EM algorithm
    <http://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm>`_.
    
    The `delta` argument (which is defaults to 0.0001) specifies that the
    learning algorithm will stop when the difference of the log-likelihood
    between two consecutive iterations is less than delta.
    
    The `smoothing` argument is used to avoid zero probability,
    see :py:meth:`~hmm.Model.learn`.
    """
    
    model = _get_init_model(sequences)
    length = len(sequences)
    
    old_likelihood = 0
    for _, symbol_list in sequences:
        old_likelihood += np.log(model.evaluate(symbol_list))
        
    old_likelihood /= length
    
    while True:
        new_likelihood = 0
        for _, symbol_list in sequences:
            model.learn(symbol_list, smoothing)
            new_likelihood += np.log(model.evaluate(symbol_list))
            
            new_likelihood /= length
            
        if abs(new_likelihood - old_likelihood) < delta:
            break
            
        old_likelihood = new_likelihood
        
    return model

In [None]:
# class Model
    # init(self, states, symbols, start_prob, trans_prob, emit_prob)
    # _forward(self, sequence)
    # _backward(self, sequence)
    # evaluate(self, sequence)
    # decode(self, sequence)
    # learn(self, sequence, smoothing=0)

In [3]:
class Model(object):
    
    def __init__(self, states, symbols, start_prob=None, trans_prob=None, emit_prob=None):
        self._states = set(states)
        self._symbols = set(symbols)
        self._start_prob = _normalize_prob(start_prob, self._states)
        self._trans_prob = _normalize_prob_two_dim(trans_prob, self._states, self._states)
        self._emit_prob = _normalize_prob_two_dim(emit_prob, self._states, self._symbols)
        
    def __repr__(self):
        return "{name}({_states}, {_symbols}, {_start_prob}, {_trans_prob}, {_emit_prob})" \
            .format(name=self.__class__.__name__, **self.__dict__)
        
    def _forward(self, sequence):
        length = len(sequence)
        if length == 0:
            return []
        
        alpha = [{}]
        for state in self._states:
            alpha[0][state] = self._start_prob[state] * self._emit_prob[state][sequence[0]]
            
        for t in range(1, length):
            alpha.append({})
            for state_j in self._states:
                prob_sum = 0
                for state_i in self._states:
                    prob_sum += alpha[t-1][state_i] * self._trans_prob[state_i][state_j] \
                    * self._emit_prob[state_j][sequence[t]]
                alpha[t][state_j] = prob_sum
        
        return alpha
    
    def _backward(self, sequence):
        length = len(sequence)
        if length == 0:
            return []
        
        beta = [{}]
        for state in self._states:
            beta[0][state] = 1
            
        for t in range(length-1, 0, -1):
            beta.insert(0, {})
            for state_i in self._states:
                prob_sum = 0
                for state_j in self._states:
                    prob_sum += self._trans_prob[state_i][state_j] * \
                                self._emit_prob[state_j][sequence[t]] * beta[1][state_j]
                beta[0][state_i] = prob_sum
                
        return beta
    
    def evaluate(self, sequence):
        length = len(sequence)
        if length == 0:
            return 0
        
        prob_sum = 0
        alpha = self._forward(sequence)
        for state in alpha[length-1]:
            prob_sum += alpha[length-1][state]
            
        return prob_sum
    
    def decode(self, sequence):
        length = len(sequence)
        if length == 0:
            return []
        
        viterbi = {}
        for state in self._states:
            viterbi[state] = self._start_prob[state] * self._emit_prob[state][sequence[0]]
        
        # make path {"state_j":"state_i"}
        path = []
        for t in range(1, length):
            viterbi_tmp = {}
            prev_state = {}
            for state_j in self._states:
                max_prob = 0
                max_state = None
                for state_i in self._states:
                    prob = viterbi[state_i] * self._trans_prob[state_i][state_j]
                    if prob > max_prob:
                        max_prob = prob
                        max_state = state_i
                viterbi_tmp[state_j] = max_prob * self._emit_prob[state_j][sequence[t]]
                prev_state[state_j] = max_state
            viterbi = viterbi_tmp
            path.append(prev_state)
        print("path: ", path)
        print("the last viterbi ", viterbi)

        # select the last time state
        max_prob = 0
        max_state = None
        for state in self._states:
            if viterbi[state] > max_prob:
                max_prob = viterbi[state]
                max_state = state
        
        # backtrace
        result = [max_state]
        for t in range(length-1, 0, -1):
            max_state = path[t-1][max_state]
            result.insert(0, max_state)
            
        return result
    
    def learn(self, sequence, smoothing=0):
        length = len(sequence)
        alpha = self._forward(sequence)
        beta = self._backward(sequence)
        
        gamma = []
        for t in range(length):
            prob_sum = 0
            gamma.append({})
            for state in self._states:
                prob = alpha[t][state] * beta[t][state]
                gamma[t][state] = prob
                prob_sum += prob
                
            if prob_sum == 0:
                continue
                
            for state in self._states:
                gamma[t][state] /= prob_sum
                
        xi = []
        for t in range(length-1):
            prob_sum = 0
            xi.append({})
            for state_i in self._states:
                xi[t][state_i] = {}
                for state_j in self._states:
                    prob = alpha[t][state_i] * beta[t+1][state_j] \
                        * self._trans_prob[state_i][state_j] \
                        * self._emit_prob[state_j][sequence[t+1]]
                    xi[t][state_i][state_j] = prob
                    prob_sum += prob
                    
            if prob_sum ==0:
                continue
                    
            for state_i in self._states:
                for state_j in self._states:
                    xi[t][state_i][state_j] /= prob_sum
        
        states_number = len(self._states)
        symbols_number = len(self._symbols)
        for state in self._states:
            # update start probability
            self._start_prob[state] = \
                (smoothing + gamma[0][state]) / (1 + states_number * smoothing)
            
            # update transition probability
            gamma_sum = 0
            for t in range(length-1):
                gamma_sum += gamma[t][state]
            
            if gamma_sum > 0:
                denominator = gamma_sum + states_number * smoothing
                for state_j in self._states:
                    xi_sum = 0
                    for t in range(length-1):
                        xi_sum += xi[t][state][state_j]
                    self._trans_prob[state][state_j] = (smoothing + xi_sum) / denominator
            else:
                for state_j in self._states:
                    self._trans_prob[state][state_j] = 0
        
            # update emission probability
            gamma_sum += gamma[length-1][state]
            emit_gamma_prob = {}
            for symbol in self._symbols:
                emit_gamma_prob[symbol] = 0
                
            for t in range(length):
                emit_gamma_prob[sequence[t]] += gamma[t][state]
                
            if gamma_sum > 0:
                denominator = gamma_sum + symbols_number * smoothing
                for symbol in self._symbols:
                    self._emit_prob[state][symbol] = \
                        (smoothing + emit_gamma_prob[symbol]) / denominator
            else:
                for symbol in self._symbols:
                    self._emit_prob[state][symbol] = 0            

In [None]:
# model = train(sequences, delta=0.9)

In [4]:
model = Model(states, symbols, start_prob, trans_prob, emit_prob)

In [7]:
model._start_prob

{'COLD': 0.2, 'HOT': 0.8}

In [6]:
model._trans_prob

{'COLD': {'COLD': 0.5, 'HOT': 0.5}, 'HOT': {'COLD': 0.4, 'HOT': 0.6}}

In [5]:
model._emit_prob

{'COLD': {'1': 0.5, '2': 0.4, '3': 0.1}, 'HOT': {'1': 0.2, '2': 0.4, '3': 0.4}}

In [8]:
model.decode(sequence)

path:  [{'HOT': 'HOT', 'COLD': 'HOT'}, {'HOT': 'COLD', 'COLD': 'COLD'}]
the last viterbi  {'HOT': 0.012800000000000004, 'COLD': 0.003200000000000001}


['HOT', 'COLD', 'HOT']

In [9]:
model.learn(sequence)

In [10]:
model._start_prob

{'COLD': 0.06337091240109236, 'HOT': 0.9366290875989077}

In [11]:
model._trans_prob

{'COLD': {'COLD': 0.2465897166841553, 'HOT': 0.7534102833158446},
 'HOT': {'COLD': 0.4627994955863806, 'HOT': 0.537200504413619}}

In [12]:
model._emit_prob

{'COLD': {'1': 0.7149962695846804, '2': 0.0, '3': 0.28500373041531957},
 'HOT': {'1': 0.18375568551007146, '2': 0.0, '3': 0.8162443144899284}}

In [13]:
model.decode(sequence)

path:  [{'HOT': 'HOT', 'COLD': 'HOT'}, {'HOT': 'COLD', 'COLD': 'COLD'}]
the last viterbi  {'HOT': 0.1555736982952833, 'COLD': 0.01777910804547891}


['HOT', 'COLD', 'HOT']