In [1]:
import numpy as np
import pandas as pd
import pickle
import time

In [27]:
class HMM:
    """
    Order 1 Hidden Markov Model
    Attributes
    ----------
    A : numpy.ndarray
        State transition probability matrix
    B: numpy.ndarray
        Output emission probability matrix with shape(N, number of output types)
    pi: numpy.ndarray
        Initial state probablity vector
    Common Variables
    ----------------
    obs_seq : list of int
        list of observations (represented as ints corresponding to output
        indexes in B) in order of appearance
    T : int
        number of observations in an observation sequence
    N : int
        number of states
    """

    def __init__(self, A, B, pi):        
        if np.any(A<0) | np.any(B<0) | np.any(pi<0) | np.any(A>1) | np.any(B>1) | np.any(pi>1):
            return("Les paramètres initiaux ne sont pas corrects (des probabilités sont comprises entre 0 et 1)")
        
        self.A = A
        self.B = B
        self.pi = pi
        

    def forward(self, Y):
        N = self.A.shape[0]
        T = len(Y)

        alpha = np.zeros([N,T])
        alpha[:,0] = self.pi * self.B[:, Y[0]]

        for t in range(1,T):
            for i in range(N):
                alpha[i,t] = self.B[i, Y[t]] * np.sum(alpha[:,t-1] * self.A[:,i])

        return alpha

    def backward(self, Y):
        N = self.A.shape[0]
        T = len(Y)

        beta = np.zeros([N,T])
        beta[:,-1] = 1

        for t in range(T-2,-1,-1):
            for i in range(N):
                beta[i,t] = np.sum(beta[:,t+1] * self.A[i,:] * self.B[:, Y[t+1]])

        return beta

    def baum_welch_train(self, Y):
        N = self.A.shape[0]
        T = len(Y)

        # Effectuons les etapes de forward et backward
        alpha = self.forward(Y)
        beta = self.backward(Y)

        # Stockons P( Y | theta )
        Y_proba = np.sum(alpha[:,-1])

        # Calculons gamma
        gamma = alpha * beta / Y_proba
        
        # Calculons xi
        xi = np.zeros([T-1, N, N])
        for t in range(T-1):
            xi[t,:,:] = (alpha[:,t] * self.A.T).T * beta[:, t+1] * self.B[:,Y[t+1]] / Y_proba


            
            
        self.pi = gamma[:,0]
        self.A = np.sum(xi,axis = 0)/np.sum(gamma[:,:-1],axis = 1)
        
        # Problème d'indices ici
        for i in range(N):
            for t1 in range(T):
                self.B[i,t1] = 0
                ind = np.argwhere(Y == t1)
                self.B += gamma[i,ind]
                self.B[i,t1]/= np.sum(gamma[i,:])
        
        def viterbi(self, Y):
            """
            Returns
            -------
            V : numpy.ndarray
            V [s][t] = Maximum probability of an observation sequence ending
                       at time 't' with final state 's'
            prev : numpy.ndarray
            Contains a pointer to the previous state at t-1 that maximizes
            V[state][t]
            """
            N = self.A.shape[0]
            T = len(Y)
            prev = np.zeros((T - 1, N), dtype=int)

            # DP matrix containing max likelihood of state at a given time
            V = np.zeros((N, T))
            V[:,0] = self.pi * self.B[:,Y[0]]

            for t in range(1, T):
                for n in range(N):
                    seq_probs = V[:,t-1] * self.A[:,n] * self.B[n, Y[t]]
                    prev[t-1,n] = np.argmax(seq_probs)
                    V[n,t] = np.max(seq_probs)

            return V, prev

In [3]:
path = 'typos-data/'

# Données avec 10% de typos
train10 = pickle.load(open(path+'train10.pkl', 'rb'))
test10 = pickle.load(open(path+'test10.pkl', 'rb'))

# Données avec 20% de typos
train20 = pickle.load(open(path+'train20.pkl', 'rb'))
test20 = pickle.load(open(path+'test20.pkl', 'rb'))

train = train10
test = test10

tot = len(train + test)
print ("Nombre de phrases totales = " + str(tot))
print ("Nombre de phrases de train = " + str(len(train)))
print ("Nombre de phrases de test  = " + str(len(test)))

Nombre de phrases totales = 30558
Nombre de phrases de train = 29057
Nombre de phrases de test  = 1501


In [4]:
temp = [[letter[0] for letter in word] for word in train10]
texte = [letter for word in temp for letter in word]
texte[:17]

['b',
 'y',
 't',
 'h',
 'e',
 'i',
 'r',
 'o',
 'w',
 'n',
 'a',
 'c',
 'v',
 'o',
 'u',
 'n',
 't']

In [5]:
alphabet = {'a':0,
            'b':1,
           'c':2,
           'd':3,
           'e':4,
           'f':5,
           'g':6,
           'h':7,
           'i':8,
           'j':9,
           'k':10,
           'l':11,
           'm':12,
           'n':13,
           'o':14,
           'p':15,
           'q':16,
           'r':17,
           's':18,
           't':19,
           'u':20,
           'v':21,
           'w':22,
           'x':23,
           'y':24,
           'z':25}

In [15]:
dic_len = len(np.unique(texte))
texte_len = len(texte)
print(dic_len)
print(texte_len)

26
143168


In [16]:
A = np.ones([dic_len,dic_len])/dic_len
B = np.ones([dic_len,texte_len])/dic_len
pi = np.ones(dic_len)/dic_len

In [17]:
train_input = [alphabet[letter] for letter in texte]
train_input[:10]

[1, 24, 19, 7, 4, 8, 17, 14, 22, 13]

In [None]:
hmm = HMM(A=A, 
          B=B,
          pi = pi)

hmm.baum_welch_train(train_input)