In [2]:
import numpy as np
from numpy.linalg import matrix_power
import random as rand 


def my_print(x):
    m=0
    for u in x.ravel() : 
        m=max(m,len(("%.5f" %u).rstrip("0").rstrip(".").lstrip("0")))
    def form(u) :
        return " "*(m-len(("%.5f" %u).rstrip("0").rstrip(".").lstrip("0")))+(("%.5f" %u).rstrip("0").rstrip(".").lstrip("0"))
    np.set_printoptions(linewidth=130,formatter={"float" : lambda u:form(u)})
    print(x)
    np.set_printoptions()

<p><center><h1 style="border:solid;height:45px;padding-top:5px;border-radius:10px;width:400px;color:#AAAAFF;font-size:30px">Markov Models</h1></center></p>

A Markov model is specified by a square $N\times N$ matrix $A$, all of whose entries are non-negative, and all of whose columns sum to $1$.

The matrix $A$ is seen as a transition matrix between a set of states $\{0,\dots, N-1\}$ where the entry at the intersection of the $i$-th row and the $j$-th column is 
$$a_{i,j}=P(\ S_{n+1}=i \ \vert\ S_{n} =j\ )$$

Then the matrix $P=A^k$ has for entries 
$$p_{i,j}=P(\ S_{n+k}=i \ \vert\ S_{n} =j\ ) $$

Let us first write a function that furnishes a sequence of observations of the states that follows the laws above :


In [3]:

def MM_to_observations(A,init,size):
    # A is the transition matrix ( size = N x N )
    # init is the initial state ( integer between 0 and N-1 )
    # size is the size of the sample (integer >0)

    answer=[init]
    N=A.shape[0]
    X=list(range(N))
    for i in range(size) :
        distr=A[:,answer[i]]
        answer.append(rand.choices(X,weights=distr)[0])
    return answer


In [4]:
def test_MM_to_observations():
    A=np.array([[.3,.1,.3],[.2,.3,.2],[.5,.6,.5]])
    print("Our transition matrix is")
    my_print(A)
    print("\nHere is a sample of 15 observations starting at the state 0 :")
    print (MM_to_observations(A,0,15))
    print("\nIf we take a longer sequence, say of 200000 states, we can make the following statistics :")
    t=MM_to_observations(A,1,200000)
    proportions=[]
    for i in range(3) :
        proportions.append(sum([1 for x in t if x==i])/len(t))
        print ("   -> proportion of {}'s : {}".format(i,proportions[i]))
    print("\nThese proportions are to be compared to the value of A^k for k large (here k=100) :")
    my_print(matrix_power(A,100))

test_MM_to_observations()

Our transition matrix is
[[.3 .1 .3]
 [.2 .3 .2]
 [.5 .6 .5]]

Here is a sample of 15 observations starting at the state 0 :
[0, 2, 2, 2, 0, 1, 2, 2, 2, 1, 2, 2, 1, 2, 2, 1]

If we take a longer sequence, say of 200000 states, we can make the following statistics :
   -> proportion of 0's : 0.25495372523137383
   -> proportion of 1's : 0.22158889205553972
   -> proportion of 2's : 0.5234573827130864

These proportions are to be compared to the value of A^k for k large (here k=100) :
[[.25556 .25556 .25556]
 [.22222 .22222 .22222]
 [.52222 .52222 .52222]]


Now let us go in the other direction : we take a (long enough) sequence of observed states, postulated to have been obtain as a sample of a Markov model, as above. What is the matrix $A$ of this Markov Model ?

In [5]:

def observations_to_MM(N,obs):
    #obs is a sequence of integers in the range 0...N-1
    
    answer=np.zeros([N,N])
    for i in range(len(obs)-1) :
        answer[obs[i+1],obs[i]]+=1
    for i in range(N):
        answer[:,i]/=np.sum(answer[:,i])
    return answer

In [6]:
def test_observations_to_MM():
    a=np.array([[.3,.1,.3],[.2,.3,.2],[.5,.6,.5]])
    t=MM_to_observations(a,1,20000)
    new_a=observations_to_MM(3,t)
    print("We compute a sample of 20000 observations from a MM with transition matrix ")
    my_print(a)
    print ("\nThen we estimate the transition matrix from this sample :")
    my_print(new_a)
    print("\nQuite close !!!")

test_observations_to_MM()

We compute a sample of 20000 observations from a MM with transition matrix 
[[.3 .1 .3]
 [.2 .3 .2]
 [.5 .6 .5]]

Then we estimate the transition matrix from this sample :
[[.31025 .09243 .30075]
 [ .2056 .30463  .1935]
 [.48415 .60294 .50575]]

Quite close !!!


<p><center><h1 style="border:solid;height:45px;padding-top:5px;border-radius:10px;width:400px;color:#AAAAFF;font-size:30px">Hidden Markov Models</h1></center></p>

Now we assume there exists a Markov model  with transition matrix $A$ whose $N$ states are unobservable, and that this model emits obvervable signals ranging from $0$ to $M-1$ via an emission matrix $B$ of size $M\times N$ (see the lesson).


The hidden states are written $(S_0,S_1,\dots ,S_n,...)$. The observed signals are written $(O_0,O_1,\dots ,O_n,...)$.

Then the entry at the intersection of the $i$-th row and the $j$-th column of $B$ is 
$$b_{i,j}=P(\ O_n = i\ \vert\ S_n=j)$$

Note that if we write $C=B\times A^k$ (which is an $M\times N$ matrix again) then
$$c_{i,j}=P(\ O_{n+k} = i\ \vert\ S_n=j)$$

Let us first assume $A$ and $B$ are known, and make some experiments.

In [7]:
def HMM_to_sample(A,B,init,size) :
    # A is the transition matrix (NxN)
    # B is the emission matrix (MxN)
    # init is the starting state (an integer in [0 ... N-1])

    hidden = MM_to_observations(A,init,size)
    obs = []
    M=B.shape[0]
    X=list(range(M))
    for i in range(size) :
        distr=B[:,hidden[i]]
        obs.append(rand.choices(X,weights=distr)[0])
    return hidden , obs

In [8]:
def test_HMM_to_sample(show=True) :
    A=np.array([[.3,.1,.2],[.2,.3,.3],[.5,.6,.5]])
    B=np.array([[.1,.4,.9],[.9,.6,.1]])
    if show : print("3 hidden states with transition matrix")
    if show :my_print(A)
    if show : print("\n2 observable states with emission matrix")
    if show :my_print(B)
    if show : print("\nhere is a sample")
    hid,obs=HMM_to_sample(A,B,0,15)
    top_row=    "  hidden :    "
    middle_row1="              "
    middle_row2="              "
    bottom_row= "observed :    "
    for i in range(15) :
        top_row+=    "{} -> ".format(hid[i])
        middle_row1+="|    "
        middle_row2+="v    "
        bottom_row+= "{}    ".format(obs[i])
    to_be_printed=top_row[:-3]+"\n"+middle_row1+"\n"+middle_row2+"\n"+bottom_row
    if show : print(to_be_printed)
    return A,B,hid,obs,to_be_printed

A,B,hid,obs,to_be_printed=test_HMM_to_sample()    

3 hidden states with transition matrix
[[.3 .1 .2]
 [.2 .3 .3]
 [.5 .6 .5]]

2 observable states with emission matrix
[[.1 .4 .9]
 [.9 .6 .1]]

here is a sample
  hidden :    0 -> 2 -> 0 -> 2 -> 2 -> 0 -> 0 -> 2 -> 0 -> 0 -> 0 -> 2 -> 2 -> 1 -> 2 
              |    |    |    |    |    |    |    |    |    |    |    |    |    |    |    
              v    v    v    v    v    v    v    v    v    v    v    v    v    v    v    
observed :    1    0    1    0    1    1    1    1    1    1    1    0    0    0    0    


<p><center><h1 style="border:solid;height:38px;padding-top:5px;border-radius:10px;width:400px;color:#DDAAFF;font-size:20px">The derived Markov model </h1></center></p>

Given such a hidden Markov model $\mathcal H = (A,B,\text{init})$ we can derive a simple Markov model corresponding to the observed states in two ways :
- by taking a long sample of observations and using our function observations_to_MM 
- by directly computing $P(\ [O_{s+1}=j]\ \vert\  [O_{s}=i]\ \wedge\ \mathcal H )$.

Let us first compute the transition probability (to avoid dependency to \text{init} we will assume $s$ is large enough)
$$\begin{array}{ccl}
&&P(\ [O_{s+1}=j]\ \vert\  [O_{s}=i]\ \wedge\ \mathcal H )\\
&&\\
&=&\sum_{k} P(\ [O_{s+1}=j]\ \wedge\ [S_s=k]\ \vert\ [O_{s}=i] \wedge \mathcal H )\\
&&\\
&=&\sum_{k} P(\ [O_{s+1}=j] \ \vert\ [S_s=k]\ \wedge\ [O_{s}=i] \wedge \mathcal H )\times P(\ [S_{s}=k] \ \vert\  [O_{s}=i] \wedge \mathcal H )\\
&&\\
&=&\sum_{k} P(\ [O_{s+1}=j] \ \vert\ [S_s=k]\  \wedge \mathcal H )\times P(\ [O_{s}=i] \ \vert\ [S_{s}=k] \wedge \mathcal H )\frac{P([S_{s}=k]\vert \mathcal H)}{P([O_{s}=i]\vert \mathcal H)}\\
&&\\
&=&\frac 1{P([O_{s}=i]\vert \mathcal H)}\sum_k c_{j,k}b_{i,k}P([S_{s}=k]\vert \mathcal H)
\end{array} $$
Now we just have to compute the values $P([S_{s}=k]\vert \mathcal H)$ but as we have seen above, these are contained in $A^{\infty}$, and the values $P([O_{s}=j]\vert \mathcal H)$ but these are containes in $B*A^\infty$.

Let's compare the result of these two methods.

In [9]:
def HMM_to_MM(A,B) :
    A_infinity=matrix_power(A,100)
    C=np.matmul(B,A)
    M=B.shape[0]
    N=B.shape[1]
    D=np.transpose(np.array([A_infinity[:,0] for _ in range(M)]) * B)
    E=np.matmul(C,D)
    F=np.matmul(B,A_infinity)[:,0]
    E=E/np.array([F for _ in range(M)])
    return E

In [10]:
def test_HMM_to_MM() :
    A=np.array([[.3,.1,.2],[.2,.3,.3],[.5,.6,.5]])
    B=np.array([[.1,.4,.9],[.9,.6,.1]])
    print("3 hidden states with transition matrix")
    my_print(A)
    print("\n2 observable states with emission matrix")
    my_print(B)
    print("\nHere are the transition probabilities between the observed states (100000 observed states):")
    print("\n -> via observations_to_MM :")
    print(observations_to_MM(2,HMM_to_sample(A,B,0,100000)[1]))
    print("\n -> via HMM_to_HM :")
    print(HMM_to_MM(A,B))

test_HMM_to_MM()

3 hidden states with transition matrix
[[.3 .1 .2]
 [.2 .3 .3]
 [.5 .6 .5]]

2 observable states with emission matrix
[[.1 .4 .9]
 [.9 .6 .1]]

Here are the transition probabilities between the observed states (100000 observed states):

 -> via observations_to_MM :


[[0.60418146 0.60764083]
 [0.39581854 0.39235917]]

 -> via HMM_to_HM :
[[0.60387037 0.61117143]
 [0.39612963 0.38882857]]


So the two methods give the same results. The resulting Markov model is called the **derived Markov model**. As we will see (and as a short observation makes seem natural), the knowledge of the hidden Markov Model (even when the hidden states are left unobserved) allow a better prediction of the observation to come than the derived MM.

**This means that when a MM is derived from a HMM, it may be worthwhile to identify this HMM. This is our goal.**

<p><center><h1 style="border:solid;height:38px;padding-top:5px;border-radius:10px;width:400px;color:#DDAAFF;font-size:20px">Computing P(Observations) </h1></center></p>


Let us assume we are given a HMM : $A$ and $B$ are known. We also have an idea of the initial distribution on the hidden states, i.e. we have a vector $$\text{hid}_0=\begin{bmatrix} P(S_0=0)\\ \vdots \\ P(S_0=N-1)\end{bmatrix}$$

Let $\mathcal{H}=(A,B,\text{hid}_0)$ be our knowledge.

We want to compute 

$$P(\ \text{obs}\ \vert\ \mathcal H\ )= P(\ [O_0=k_0]\ \wedge\ [O_1=k_1]  \ \wedge\  \cdots \ \wedge\   [O_n=k_n] \ \vert\ \mathcal{H}\ )$$

We do so by induction, introducing 
$$\alpha_n (i)= P(\ [O_0=k_0] \ \wedge\   \cdots \ \wedge\   [O_n=k_n] \ \wedge\   [S_{n}=i]\ \vert\ \mathcal{H}\ )$$
and 
$$\alpha_n = \begin{bmatrix} \alpha_n(0)& \cdots & \alpha_n(N-1)\end{bmatrix}$$

Then 
$$\alpha_0(i) = P(\ [O_0=k_0] \ \wedge\   [S_{0}=i]\ \vert\ \mathcal{H}\ ) = P(\ [O_0=k_0]\  \vert\ [S_{0}=i]\ \wedge\   \mathcal{H})\times P([S_{0}=i]\ \vert\ \mathcal{H}\ ) = b_{k_0,i}\times \text{hid}_0(i) $$
Which we can write in matrix notation as
$$\alpha_0=e_{k_0}\cdot B\cdot\text{hid}_0$$

Now $$\begin{array}{ccl}
\alpha_1(i)&=& P(O_0=k_0 \ \wedge\   O_1=k_1\ \wedge\  S_{1}=i\ \vert\ \mathcal{H})\\
 &&\\
 &=& P([O_0=k_0] \ \wedge\   [[S_0=0\ \vee\ \dots\ \vee\ S_0=N-1]]\ \wedge\ [O_1=k_1]\ \wedge\  [S_{1}=i]\ \vert\ \mathcal{H})\\
 &&\\
 &=&\sum_{j=0}^{N-1}P([O_0=k_0] \ \wedge\   [S_0=j]\ \wedge\ [O_1=k_1]\ \wedge\  [S_{1}=i]\ \vert\ \mathcal{H})
 \end{array}$$
 and
 $$\begin{array}{ccl}
 &&P([O_0=k_0] \ \wedge\   [S_0=j]\ \wedge\ [O_1=k_1]\ \wedge\  [S_{1}=i]\ \vert\ \mathcal{H})\\
 &=&\\
 &=&P([O_1=k_1]\ \vert\ [O_0=k_0] \ \wedge\   [S_0=j]\ \wedge\  [S_{1}=i] \ \wedge\ \mathcal{H})\times P([O_0=k_0] \ \wedge\   [S_0=j]\ \wedge\  [S_{1}=i])\\
 &=&\\
 &=&P([O_1=k_1]\ \vert\  [S_{1}=i] \ \wedge\ \mathcal{H})\times P([O_0=k_0] \ \wedge\   [S_0=j]\ \wedge\  [S_{1}=i]\ \vert\ \mathcal{H})\\
 &&\\
& =& b_{k_1,i} P([S_{1}=i]\ \vert\ [O_0=k_0] \ \wedge\   [S_0=j]\ \wedge\ \mathcal{H})\\
&&\\
&=& b_{k_1,i} a_{i,j}P([O_0=k_0] \ \wedge\   [S_0=j]\ \wedge\ \mathcal{H})\\
&&\\
& =& b_{k_1,i} a_{i,j}\alpha_0(j) 
\end{array}$$
so in numpy notation, we get the following formula :
$$\alpha_1 = B[k_1,:]*\text{matmul}(A,\alpha_0)$$
and similarly
$$\alpha_{s+1} = B[k_{s+1},:]*\text{matmul}(A,\alpha_s)$$

The implementation of this recursion is often called the **forward algorithm**.

In [11]:
def alpha_pass(a,b,init,obs) :
    # computes alpha_k = P( Obs[:k] and s_k | (a,b,init) )
    alphas=np.zeros([a.shape[0],len(obs)])
    alpha=(b[obs[0],:].ravel())*(init.ravel())
    alphas[:,0]=alpha
    for i in range(1,len(obs)) :
        alpha= b[obs[i],:]*np.matmul(a,alpha) 
        alphas[:,i]=alpha
    return alphas

def forward(a,b,init,obs) :
    return np.sum(alpha_pass(a,b,init,obs), axis=0)

In [12]:
def test_alpha_pass():
    A,B,hid,obs,to_be_printed1=test_HMM_to_sample(show=False)
    print("\n We continue with the previous example :")
    print(to_be_printed1)
    print("""\nLets use alpha_pass on this sample of observations (so the hidden states are assumed to be unobserved) with a uniform initial distribution on the hidden states. We get :\n""")
    init=np.array([.33,.33,.34])
    alphas=alpha_pass(A,B,init,obs)
    my_print(alphas)
    print("\nIf we sum up the columns we get the (result of the forward function which is the list of) probabilities of observing the sequence up to the corresponding rank :\n")
    my_print(np.sum(alphas,axis=0))

test_alpha_pass()


 We continue with the previous example :
  hidden :    0 -> 2 -> 1 -> 2 -> 1 -> 2 -> 2 -> 0 -> 2 -> 2 -> 1 -> 1 -> 1 -> 2 -> 2 
              |    |    |    |    |    |    |    |    |    |    |    |    |    |    |    
              v    v    v    v    v    v    v    v    v    v    v    v    v    v    v    
observed :    1    0    1    0    1    0    0    1    0    0    1    1    1    0    0    

Lets use alpha_pass on this sample of observations (so the hidden states are assumed to be unobserved) with a uniform initial distribution on the hidden states. We get :

[[  .297 .01157 .05382 .00251 .01306 .00061 .00035 .00189 .00009 .00005 .00028 .00012 .00005              ]
 [  .198  .0516 .05673 .01309 .01387 .00319 .00225 .00204 .00047 .00033  .0003  .0001 .00004 .00001 .00001]
 [  .034 .25587 .01647 .06227 .00402 .01518 .00883 .00059 .00222 .00129 .00009 .00004 .00001 .00005 .00003]]

If we sum up the columns we get the (result of the forward function which is the list of) probabilities

We note that the sequence above decreases exponentially fast, which will be a concern when we will use longer samples, but for now, let's forget about it.

We can now compare the results of the forward function with the 3 possible initial states :

In [13]:
def forward_from_possible_inits(A,B,obs) :
    N=A.shape[0]
    inits=np.diag(np.ones(N))
    return np.array([forward(A,B,inits[:,i],obs) for i in range(N)])

def test_forward_from_possible_inits() :
    A,B,hid,obs,to_be_printed1=test_HMM_to_sample(show=False)
    my_print(forward_from_possible_inits(A,B,obs))

test_forward_from_possible_inits()

[[    .9   .504 .20169 .12353 .04911 .03011 .01814 .01097 .00664 .00401 .00159 .00097 .00059 .00035 .00021]
 [    .6   .402 .15924 .09772 .03884 .02382 .01435 .00868 .00525 .00317 .00125 .00077 .00046 .00028 .00017]
 [    .1   .059 .02329 .01429 .00568 .00348  .0021 .00127 .00077 .00046 .00018 .00011 .00007 .00004 .00002]]


So in this example it is clear that the most probable initial state was $S_0=0$ (and indeed, it was our starting hidden value). Can we also guess what is, say, the most probable fifth state ?

<p><center><h1 style="border:solid;height:38px;padding-top:5px;border-radius:10px;width:400px;color:#DDAAFF;font-size:20px">Uncovering the hidden states </h1></center></p>

We will compute $$\gamma_k(i)=P(S_k=i \ \vert\ \text{obs}\ \wedge\ \mathcal H)$$

We have : $$P(S_k=k \ \vert\ \text{obs}\ \wedge\ \mathcal H)=\frac{P(S_k=k\ \wedge\ \text{obs}\ \vert\ \mathcal H)}{P(\text{obs}\ \vert\ \mathcal H)} $$
and 
$$P(S_k=k\ \wedge\ \text{obs}\ \vert\ \mathcal H)=P(\text{obs}_0 \wedge \dots \wedge \text{obs}_k \wedge [S_k=i] \wedge \text{obs}_{k+1}\wedge\dots\wedge\text{obs}_{n}\vert \mathcal H) =P(\text{obs}_0 \wedge \dots \wedge \text{obs}_k \wedge [S_k=i] \vert \mathcal H)\times P(\text{obs}_{k+1} \wedge \dots \wedge \text{obs}_n\  \vert\ [S_k=i] \wedge \mathcal H) $$
(note that it is important here to know that once you know $S_k$, you don't get more information about $O_{k+1}$ if you assume $O_k$, $O_{k-1}$, ... and $S_{k-1}$, $S_{k-2}$ ... are known.)

So writing 
$$\beta_k(i)=P(\text{obs}_{k+1} \wedge \dots \wedge \text{obs}_n\  \vert\ [S_k=i] \wedge \mathcal H)$$
we get
$$P(S_k=k \ \vert\ \text{obs}\ \wedge\ \mathcal H)=\frac{\alpha_k(i)\beta_k(i)}{P(\text{obs}\ \vert\ \mathcal H)}$$
Since the $\alpha$'s are computed (via the alpha_pass function ), and so is $P(\text{obs}\ \vert\ \mathcal H)$ (via the forward function) we are left with computing the $\beta$'s

Once again we will use an induction, but this time starting from the end : indeed, we know 
$$\beta_n(i)=P(O_n=k_n\vert s_{n-1}=i)=(BA)_{k_n,i}$$
and just as we did for the alphas, we can derive the induction formula
$$\beta_{s}(i)=\sum_j b_{k_{s+1},j}a_{j,i}\beta_{s+1}(j)$$
which we can rewrite in numpy notation as
$$\beta_s =\text{matmul}(\ B[k_{s+1},j]*\beta_{s+1}\ ,\ A\ )$$ 


In [14]:
def beta_pass(a,b,obs) :
    
    T=len(obs)
    betas=np.zeros([a.shape[0],T])
    beta=np.matmul(b,a)[obs[T-1],:]
    betas[:,T-2]=beta
    for i in range(1,T-1) :
        beta=np.matmul(b[obs[T-i-1],:]*beta,a)
        betas[:,T-i-2]=beta
    betas[:,T-1]=np.ones(a.shape[0])   #Coherent with the formulas, allows to have an array of the same size as the alphas
    return betas

Now we compute the gammas

In [15]:

def gamma_pass(a,b,init,obs):
    alphas=alpha_pass(a,b,init,obs) 
    betas=beta_pass(a,b,obs)
    tot=np.sum(alphas[:,-1])
    gammas=alphas*betas/tot
    return gammas

In [16]:
def test_gammas() :
    A,B,hid,obs,to_be_printed1=test_HMM_to_sample(show=False)
    print(to_be_printed1)
    my_print(beta_pass(A,B,obs))

    print("\nChoosing an initial distribution of [.33, .33, .34] our gammas are")
    init=np.array([.33,.33,.34])
    gammas=gamma_pass(A,B,init,obs)
    my_print(gammas)
    print("\nSumming up the columns we obtain")
    my_print(np.sum(gammas,axis=0))
    print("which is reassuring : up to now, there's no evidence our code is flawed !")

test_gammas()

  hidden :    0 -> 2 -> 2 -> 2 -> 1 -> 1 -> 1 -> 2 -> 2 -> 2 -> 1 -> 1 -> 1 -> 2 -> 0 
              |    |    |    |    |    |    |    |    |    |    |    |    |    |    |    
              v    v    v    v    v    v    v    v    v    v    v    v    v    v    v    
observed :    1    0    0    0    0    1    1    0    0    0    0    1    0    0    1    
[[.00015 .00024  .0004 .00067 .00137 .00332 .00709 .01173 .01939 .03268 .06208 .13276  .2241    .44      1]
 [.00017 .00029 .00048 .00079 .00096 .00263 .00855 .01413 .02335 .03878 .04919 .15985  .2654    .33      1]
 [.00015 .00026 .00042  .0007 .00123 .00319 .00754 .01247 .02062 .03402 .05973 .14113  .2329    .41      1]]

Choosing an initial distribution of [.33, .33, .34] our gammas are
[[ .5195 .03346 .02861  .0285 .03517 .46331 .41982 .03118 .02846 .02838 .03261 .37957 .03017 .03444 .41767]
 [.41727  .1798 .21781 .21796 .16009 .39635 .44274 .18566 .21781 .21807 .16736 .49339 .18767 .16455 .45081]
 [.06323 .78674 .75359 .75354 .804

Finally, we can "uncover" the hidden states :

In [17]:
def most_probable(A,B,init,obs) :
    gammas=gamma_pass(A,B,init,obs)
    return np.argmax( gammas, axis=0)

In [18]:
def test_most_probable():
    A,B,hid,obs,to_be_printed1=test_HMM_to_sample(show=False)
    print(to_be_printed1)
    print("\nIn this situation, with an initial distribution of [.33, .33, .34], the most probable hidden states are :")
    print(most_probable(A,B,np.array([.33,.33,.34]),obs))
    print("\nLet us estimate the average of good answers on 100 trys :")
    good=0
    N=100
    for i in range(N) :
        A,B,hid,obs,to_be_printed1=test_HMM_to_sample(show=False)
        most=most_probable(A,B,np.array([.33,.33,.34]),obs)
        good+=sum([1 for i in range(len(most)) if hid[i]==most[i]])
    print("average score : ", good/(len(hid)*N))
test_most_probable()

  hidden :    0 -> 2 -> 2 -> 0 -> 0 -> 1 -> 2 -> 2 -> 2 -> 1 -> 2 -> 2 -> 0 -> 2 -> 2 
              |    |    |    |    |    |    |    |    |    |    |    |    |    |    |    
              v    v    v    v    v    v    v    v    v    v    v    v    v    v    v    
observed :    1    0    0    1    1    0    0    0    0    1    0    0    1    0    0    

In this situation, with an initial distribution of [.33, .33, .34], the most probable hidden states are :
[0 2 2 0 1 2 2 2 2 1 2 2 1 2 2]

Let us estimate the average of good answers on 100 trys :
average score :  0.626875


<p><center><h1 style="border:solid;height:38px;padding-top:5px;border-radius:10px;width:400px;color:#DDAAFF;font-size:20px">Estimating  B from obs </h1></center></p>

Another way to check our implementations so far are flawless comes from the following remark :

As the size of the sample goes large, the ratio
$$\frac{\displaystyle\sum_{s\ :\ O_s=i} \gamma_s(j)}{\displaystyle\sum_{s} \gamma_s(j)}$$
should converge to $b_{i,j}$.

Also, this will be useful later : if we start with a given set of observations and a random HMM $\mathcal H_0=(A_0,B_0,\text{init}_0)$, then $\mathcal H_1=(A_0,B_1,\text{init}_0)$ will be a better model (where $B_1$ is the evaluated $B$).

In [19]:
def gammas_and_obs_to_B(N,M,gammas,obs):
    sumgammas=np.sum(gammas,axis=1)
    answer=np.zeros([M,N])
    for i in range(M) :
        mask=np.array([o==i for o in obs ])*1.0
        answer[i,:]=(np.sum(gammas*mask, axis=1)/sumgammas)
    return answer

def test_gammas_and_obs_to_B():
    A=np.array([[.3,.1,.2],[.2,.3,.3],[.5,.6,.5]])
    B=np.array([[.1,.4,.9],[.9,.6,.1]])
    print("3 hidden states with transition matrix")
    my_print(A)
    print("\n2 observable states with emission matrix")
    my_print(B)
    hid,obs=HMM_to_sample(A,B,0,1000)
    gammas=gamma_pass(A,B,np.array([1,0,0]),obs)
    print("\nEstimated B from 1000 observations :")
    print(gammas_and_obs_to_B(3,2,gammas,obs))

test_gammas_and_obs_to_B()

3 hidden states with transition matrix
[[.3 .1 .2]
 [.2 .3 .3]
 [.5 .6 .5]]

2 observable states with emission matrix
[[.1 .4 .9]
 [.9 .6 .1]]

Estimated B from 1000 observations :
[[0.08934776 0.36885024 0.88813168]
 [0.91065224 0.63114976 0.11186832]]


<p><center><h1 style="border:solid;height:38px;padding-top:5px;border-radius:10px;width:400px;color:#DDAAFF;font-size:20px">Estimating A from obs </h1></center></p>

Now we want to do the same with $A$. Let us write $\lambda_s(i,j)=P([S_s=j]\ \wedge \ [S_{s+1}=i] \vert \text{obs} \wedge \mathcal H)$.

Then just as in the function observations_to_MM we see that 
$$\frac{\sum_s \lambda_s(i,j)}{\sum_s \gamma_s(i)}$$
should converge to $a_{i,j}$ as the sample goes large.

Now computations like the ones we did to find a formula for $\alpha$ give :
$$\lambda_s(i,j)=\frac{\beta_{s+1}(i)\times b_{O_{s+1},i}\times a_{i,j}\times \alpha_s(j)}{P(\ \text{obs}\ \vert\ \mathcal H\ )}$$


In [20]:
def lambda_pass(A,B,init,obs) :
    N=A.shape[0]
    T=len(obs)
    alphas=alpha_pass(A,B,init,obs)
    betas=beta_pass(A,B,obs)
    lambdas=np.zeros([N,N,T-1])
    for s in range(T-1) :
        lambdas[:,:,s]=np.transpose((betas[:,s+1]*B[obs[s+1],:])*np.transpose(A*alphas[:,s]))
    probobs=np.sum(alphas[:,-1])
    return lambdas/probobs

def test_lambdas():
    A=np.array([[.3,.1,.2],[.2,.3,.3],[.5,.6,.5]])
    B=np.array([[.1,.4,.9],[.9,.6,.1]])
    print("3 hidden states with transition matrix")
    my_print(A)
    print("\n2 observable states with emission matrix")
    my_print(B)
    hid,obs=HMM_to_sample(A,B,0,30)
    lambdas=lambda_pass(A,B,np.array([0,0,1]),obs)
    print("\n Computing the lambdas, and then suming the (square) slices gives")
    print(np.sum(np.sum(lambdas,axis=0),axis=0))
    print("\n Once again, everything looks all right !")

test_lambdas()



3 hidden states with transition matrix
[[.3 .1 .2]
 [.2 .3 .3]
 [.5 .6 .5]]

2 observable states with emission matrix
[[.1 .4 .9]
 [.9 .6 .1]]

 Computing the lambdas, and then suming the (square) slices gives
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]

 Once again, everything looks all right !


Now let's estimate $A$ from the lambdas :

In [21]:
def lambdas_to_A(A,B,init,obs) :
    lambdas=lambda_pass(A,B,init,obs)
    gammas=gamma_pass(A,B,init,obs)[:,:-1]
    sumlambd=np.sum(lambdas,axis=2)
    sumgam=np.sum(gammas,1)
    return sumlambd/sumgam, gammas

def test_lambdas_to_A():
    A=np.array([[.3,.1,.2],[.2,.3,.3],[.5,.6,.5]])
    B=np.array([[.1,.4,.9],[.9,.6,.1]])
    print("3 hidden states with transition matrix")
    my_print(A)
    print("\n2 observable states with emission matrix")
    my_print(B)
    init=np.array([0,0,1])
    hid,obs=HMM_to_sample(A,B,0,1000)
    print(r"here is the estimated transition matrix A_est")
    estA,gammas=lambdas_to_A(A,B,init,obs)
    print(estA)

test_lambdas_to_A()


3 hidden states with transition matrix
[[.3 .1 .2]
 [.2 .3 .3]
 [.5 .6 .5]]

2 observable states with emission matrix
[[.1 .4 .9]
 [.9 .6 .1]]
here is the estimated transition matrix A_est
[[0.28644895 0.09465508 0.19535963]
 [0.19555206 0.29283796 0.301387  ]
 [0.517999   0.61250695 0.50325336]]


<p><center><h1 style="border:solid;height:38px;padding-top:5px;border-radius:10px;width:400px;color:#DDAAFF;font-size:20px">The Baum-Welch algorithm</h1></center></p>

Finally we have all the tools to implement the Baum-Welch algorithm.

As we have seen, given a HMM $\mathcal H=(A,B,init)$, we are able, given a large sample of observations, to estimate the matrices $A$ and $B$ and the initial state init.

In other words, given a very very very large sample of observations, $\mathcal H$ is a fixed point of the application
$$\mathcal H\mapsto \text{Estimation} (\mathcal H\ \vert\ obs\wedge \mathcal H)$$

The idea of the Baum-Welch algorithm is that this fixed point should be attractrive (we won't prove this here) so that choosing a random initial HMM $\mathcal H_0$ the sequence defined by
$$\mathcal H_{n+1}=\text{Estimation} (\mathcal H_n\ \vert\ obs\wedge \mathcal H_n)$$
should converge to $\mathcal H$.

In [22]:
def Baum_Welch(A,B,init,obs,n_iter) :
    N=A.shape[0]
    M=B.shape[0]
    #A=(np.ones([N,N])+2*np.diag(np.ones(N)))/(N+2)
    #B=np.ones([M,N])/M
    #init=np.ones(N)/N
    for _ in range(n_iter) :
        A,gammas=lambdas_to_A(A,B,init,obs)
        B=gammas_and_obs_to_B(N,M,gammas,obs[:-1])
        init=gammas[:,0]
        #print (A,"\n\n",B,"\n\n",init,"\n\n")
    return A,B,init

def test_Baum_Welch_1():
    A=np.array([[.3,.1,.2],[.2,.3,.3],[.5,.6,.5]])
    B=np.array([[.1,.4,.9],[.9,.6,.1]])
    print("3 hidden states with transition matrix")
    my_print(A)
    print("\n2 observable states with emission matrix")
    my_print(B)
    init=np.array([0,0,1])
    hid,obs=HMM_to_sample(A,B,0,1000)
    A=np.array([[.4,.1,.2],[.2,.3,.3],[.4,.6,.5]])
    B=np.array([[.1,.4,.8],[.9,.6,.2]])
    estA,estB,estinit=Baum_Welch(A,B,init,obs,1000)
    print(estA,"\n\n",estB,"\n\n",estinit)

def test_Baum_Welch_3():
    A=np.array([[.7,.3],[.3,.7],])
    B=np.array([[.4,.9,],[.6,.1]])
    print("2 hidden states with transition matrix")
    my_print(A)
    print("\n2 observable states with emission matrix")
    my_print(B)
    print("\nDerived MM :")
    my_print(HMM_to_MM(A,B))
    init=np.array([0,1])
    hid,obs=HMM_to_sample(A,B,0,500)
    A=np.array([[.6,.2],[.4,.8],])
    B=np.array([[.3,.8],[.7,.2]])
    print("starting with A,B, init :")
    print(A,"\n\n",B,"\n\n",init)
    
    print("after 100 Baum-Welch steps :\n")
    estA,estB,estinit=Baum_Welch(A,B,init,obs,100)
    
    print("\nEstimated A,B, init:")
    print(estA,"\n\n",estB,"\n\n",estinit)
    print("\nDerived MM :")
    my_print(HMM_to_MM(estA,estB))
    
    print("\nafter 1000 more Baum-Welch steps :")
    estA,estB,estinit=Baum_Welch(estA,estB,estinit,obs,1000)
    
    print("\nEstimated A,B, init:")
    print(estA,"\n\n",estB,"\n\n",estinit)
    print("\nDerived MM :")
    my_print(HMM_to_MM(estA,estB))
    
    

test_Baum_Welch_3()

2 hidden states with transition matrix
[[.7 .3]
 [.3 .7]]

2 observable states with emission matrix
[[.4 .9]
 [.6 .1]]

Derived MM :
[[.68846 .57857]
 [.31154 .42143]]
starting with A,B, init :
[[0.6 0.2]
 [0.4 0.8]] 

 [[0.3 0.8]
 [0.7 0.2]] 

 [0 1]
after 100 Baum-Welch steps :




Estimated A,B, init:
[[0.8451203  0.09869343]
 [0.1548797  0.90130657]] 

 [[0.42389696 0.78460756]
 [0.57610304 0.21539244]] 

 [0. 1.]

Derived MM :
[[.68005 .57932]
 [.31995 .42068]]

after 1000 more Baum-Welch steps :

Estimated A,B, init:
[[0.8901438  0.05260891]
 [0.1098562  0.94739109]] 

 [[0.41809826 0.75161399]
 [0.58190174 0.24838601]] 

 [0. 1.]

Derived MM :
[[.67531 .58638]
 [.32469 .41362]]


<p><center><h1 style="border:solid;height:38px;padding-top:5px;border-radius:10px;width:400px;color:#DDAAFF;font-size:20px">Normalized Baum-Welch algorithm</h1></center></p>

To be able to consider longer sequences of observations we modify slightly the computations inside the BW algorithm, by normalizing the alphas and the betas, which is finally provably transparent.

In [28]:
def Baum_Welch_Norm_step(a,b,init,obs) :
    print (obs)
    N=a.shape[0]
    M=b.shape[0]
    T=len(obs)
    cs=[]

    # compute the alphas
    alphas=np.zeros([a.shape[0],len(obs)])
    alpha=(b[obs[0],:].ravel())*(init.ravel())
    c=np.sum(alpha)
    cs.append(1/c)
    alpha/=c
    alphas[:,0]=alpha
    for i in range(1,len(obs)) :
        alpha= b[obs[i],:]*np.matmul(a,alpha) 
        c=np.sum(alpha)
        cs.append(1/c)
        alpha/=c
        alphas[:,i]=alpha
    

    # Compute the betas
    
    betas=np.zeros([a.shape[0],T])
    beta=np.ones(a.shape[0])*cs[T-1]
    betas[:,T-1]=beta
    for i in range(1,T-1) :
        beta=np.matmul(b[obs[T-i],:]*beta,a)
        beta*=cs[T-i-1]
        betas[:,T-i-1]=beta

    
    #Compute the lambdas
    lambdas=np.zeros([N,N,T-1])
    for s in range(T-1) :
        lambdas[:,:,s]=np.transpose((betas[:,s+1]*b[obs[s+1],:])*np.transpose(a*alphas[:,s]))


    #Compute the gammas
    gammas=np.sum(lambdas, axis=0)

    #Compute new B 
    sumgammas=np.sum(gammas,axis=1)
    new_B=np.zeros([M,N])
    for i in range(M) :
        mask=np.array([o==i for o in obs[:-1] ])*1.0
        new_B[i,:]=(np.sum(gammas*mask, axis=1)/sumgammas)
    
    #Compute new A
    sumlambd=np.sum(lambdas,axis=2)
    new_A=sumlambd/sumgammas
    
    #Compute new init
    new_init=gammas[:,0]

    #Compute log likehood
    log_lik=-1*np.sum(np.log(np.array(cs)))

    return new_A,new_B,new_init,log_lik



In [30]:
api_calls=[]

with open("sample_analysis_data.txt", "r") as f:
    L = f.readlines()
    for l in L :
        api_calls.append(l.split(" "))
    
merged_api_calls = []
for api_call in api_calls :
    merged_api_calls+=api_call

s=set(merged_api_calls)
dict_api_calls = dict()
for api_call,i in zip(list(s),range(len(list(s)))):
    dict_api_calls[api_call]=i

def train_HMM_on_seq_of_api_calls(line_number,n_hidden):
    line_as_numbers=np.array([dict_api_calls[x] for x in api_calls[line_number]])
    est_A=np.array([[.6,.2],[.4,.8],])
    est_B=np.random.randn(2*202).reshape((202,2))
    est_B=np.exp(est_B)
    est_init=np.array([1,0])

    for _ in range(10) :
        est_A,est_B,est_init,log_lik=Baum_Welch_Norm_step(est_A,est_B,est_init,line_as_numbers)
        if _%20==0 :
            print(_, " : " ,log_lik)

train_HMM_on_seq_of_api_calls(0,2)

[153 200  81  48  11 108 189  20 189  60  60  44 142  12  11 108 189  11 108 189  11 108 189  81  60  60  11 108 189  60 146 146
  60  44 142  12 173 200  67  95 184 126 184 173 200   9  98  45  27]
0  :  -2.230909143982432
[153 200  81  48  11 108 189  20 189  60  60  44 142  12  11 108 189  11 108 189  11 108 189  81  60  60  11 108 189  60 146 146
  60  44 142  12 173 200  67  95 184 126 184 173 200   9  98  45  27]
[153 200  81  48  11 108 189  20 189  60  60  44 142  12  11 108 189  11 108 189  11 108 189  81  60  60  11 108 189  60 146 146
  60  44 142  12 173 200  67  95 184 126 184 173 200   9  98  45  27]
[153 200  81  48  11 108 189  20 189  60  60  44 142  12  11 108 189  11 108 189  11 108 189  81  60  60  11 108 189  60 146 146
  60  44 142  12 173 200  67  95 184 126 184 173 200   9  98  45  27]
[153 200  81  48  11 108 189  20 189  60  60  44 142  12  11 108 189  11 108 189  11 108 189  81  60  60  11 108 189  60 146 146
  60  44 142  12 173 200  67  95 184 126 184 173 2

  cs.append(1/c)
  alpha/=c
  beta=np.matmul(b[obs[T-i],:]*beta,a)
  lambdas[:,:,s]=np.transpose((betas[:,s+1]*b[obs[s+1],:])*np.transpose(a*alphas[:,s]))


In [25]:
def test_Baum_Welch_norm():
    A=np.array([[.7,.3],[.3,.7],])
    B=np.array([[.4,.9,],[.6,.1]])
    print("2 hidden states with transition matrix")
    my_print(A)
    print("\n2 observable states with emission matrix")
    my_print(B)
    print("\nDerived MM :")
    my_print(HMM_to_MM(A,B))
    init=np.array([0,1])
    hid,obs=HMM_to_sample(A,B,0,10000)
    A=np.array([[.6,.2],[.4,.8],])
    B=np.array([[.3,.8],[.7,.2]])
    print("starting with A,B, init :")
    print(A,"\n\n",B,"\n\n",init)
    
    estA,estB,estinit = A,B,init
    print("let's perform 1000 Baum-Welch steps :\n")
    for _ in range(1000) :
        estA,estB,estinit,log_lik=Baum_Welch_Norm_step(estA,estB,estinit,obs)
        if _%20==0 :
            print(_, " : " ,log_lik)
              
    print("\nEstimated A,B, init:")
    print(estA,"\n\n",estB,"\n\n",estinit)
    print("\nDerived MM :")
    my_print(HMM_to_MM(estA,estB))
    
    
    
    

test_Baum_Welch_norm()

2 hidden states with transition matrix
[[.7 .3]
 [.3 .7]]

2 observable states with emission matrix
[[.4 .9]
 [.6 .1]]

Derived MM :
[[.68846 .57857]
 [.31154 .42143]]
starting with A,B, init :
[[0.6 0.2]
 [0.4 0.8]] 

 [[0.3 0.8]
 [0.7 0.2]] 

 [0 1]
let's perform 1000 Baum-Welch steps :

0  :  -6408.921139171808
20  :  -6402.39334860554
40  :  -6402.305048381193
60  :  -6402.238774360075
80  :  -6402.187519175388
100  :  -6402.146808045495
120  :  -6402.113709352505
140  :  -6402.086258416671
160  :  -6402.063108872944
180  :  -6402.043317255717
200  :  -6402.026207323153
220  :  -6402.011283135373
240  :  -6401.998172506434
260  :  -6401.986589694684
280  :  -6401.976310457141
300  :  -6401.967155152601
320  :  -6401.958977144952
340  :  -6401.951654733661
360  :  -6401.945085454932
380  :  -6401.939181991924
400  :  -6401.933869188169
420  :  -6401.929081825547
440  :  -6401.924762938366
460  :  -6401.920862508551
480  :  -6401.917336435576


KeyboardInterrupt: 