# Backward algorithm

(It is recommended to first go through the forward-algorithm notebook to check out all the notation and the logic behind these algortihms)

This is a recursive algorithm that, as the forward algorithm, serves to compute $$ p(x_1,...,x_T)$$ in polynomial time in $T$.

It uses the quantity $\beta(t,i)$ in a backward-recursive way, with initial value $$ \beta(t=T, i)=1\,, \forall i$$ Note that the "initial" value occurs at $t=T$ in this algorithm. The backward-recursive algorithm reads $$ \beta(t, i) = \sum_j^M A(i,j)B(j, x_{t+1})\beta(t+1, j)$$

This makes $\beta$ equal to $$ \beta(t,i) = p(x_{t+1},...,x_T| z_t = i) $$ and $$ p(x_1,...,x_T) = \sum_{z_1=1}^M \pi(z_1) B(z_1, x_1) \beta(t=1, z_1)$$

Recall that the expression for $p(x_1,...,x_T)$ can be written in terms of transition matrices $A_{ij} = p(z_{t+1}=j| z_t =i), B_{jk} = p(x_t=k| z_t = j)$ and initial probabilities $\pi_j = p(z_1=j)$: $$ p(x_1,...,x_T) = \sum_{z_1}^M ...\sum_{z_T}^M \pi_{z_1} \prod_t^T A_{z_{t+1}, z_t} B_{z_t, x_t}$$ but this explicit computation scales exponentially with $T$.

In [5]:
import numpy as np
from typing import List, Union

In [6]:
pi = np.array([0.1, 0.1, 0.8]).reshape(3,1) # 3x1
B = np.array([[0.1, 0.9],[0.35, 0.65],[0., 1.]]) # 3x2
A = np.array([[0.25, 0.25, 0.5], [0.4, 0.2, 0.4], [0.75, 0.2, 0.05]]) #3x3

In [44]:
# assume there exist matrices A and B and also vector \pi
# dim(A) = M x M, dim(B) = M x K, and dim(\pi) = M x 1
# for p(x1, ..., xT), the xs are fixed. This boilds down to picking a column of the resulting matrix
def beta_initial(A_dim: int) -> np.ndarray:
    '''
    Computes the value of beta associated to initial time t=T

    Parameters
        A_dim: int
            dimension of matrix A (namely, M in the above notation)

    Return
        np.ndarray: initial value of beta (namely, beta_1)
    '''
    return np.ones(shape=(A_dim, 1)) # column vector of ones (one component per each z-state)

def beta_recurrent(
                    A: Union[List, np.ndarray], 
                    B: Union[List, np.ndarray], 
                    sequence_length: int, 
                    beta_ini: np.ndarray
                    ) -> List[np.ndarray]:
    '''
    Computes a set of alphas following the (recursive) forward algorithm

    Parameters
        A:  Union[List, np.ndarray]
            Initial vector of probabilities (i.e. p(z_1)) for each possible state of z_1
        B:  Union[List, np.ndarray]
            time-independent matrix of transition probabiltiies p(x_t|z_t) for each x- and z-states
        sequence_length: int
            The length of the input sequence of xs whose join probability we want to compute
        alpha_ini: np,ndarray
            The value of alpha(t=1, z_1)
    Return
        List[np.ndarray]: list of alphas of the form 
        [alpha(t=1, z_1),..., alpha(t=T-1, z_{T-1}), alpha(t=T, z_T)]
        (note that each alpha will have a different dimensionality as a tensor: (M, K,...,K))
    '''
    A = np.array(A)
    B = np.array(B)
    assert isinstance(sequence_length, int), 'the length of the sequence must be an integer number'
    assert beta_ini.shape[0] == A.shape[0], 'the length of the first axis of beta_ini must be the rows of A'

    # initialize betas list
    betas_list = [0]*sequence_length
    betas_list[-1] = beta_ini 
    for t_ in reversed(range(1,sequence_length)):
        # elipsis ... stand for "any untouched extra dimension"
        # note that we don't use t+1 when indexing beta because 
        # python starts counting indexes from 0: reversed(range(0,sequence_length))
        # goes from sequence_length-1 to 0. The last index of betas_list
        # is also sequence_length-1
        betas_list[t_-1] = np.einsum('ij,js,j...->is...', A, B, betas_list[t_])
        # note that index associated to z is first index of beta
        # (this is so also for beta_ini)
    return betas_list

def get_general_probability_from_beta(
                                      beta: np.ndarray, 
                                      pi: np.ndarray, 
                                      B:np.ndarray
                                      ) -> np.ndarray:
    '''
    Computes the probability of a generic sequence of x states given a beta 
    (this beta is suppossed to correspond to beta(t=1, z_1))

    Parameters:
        beta: np.ndarray
            A tensor with dimensions (M, K, K,..., K, 1)
        pi: np.ndarray
            A vector with dimension Mx1
        B: np.ndarray
            A matrix with dimension MxK
    Returns:
        np.ndarray:
            A tensor with dimension (K, K,...,K)
    '''
    # compute sum_z1 pi(z1) B(z1,x1) beta(t=1,z1)
    # returns probability for arbitrary set of x
    # (note the ordering: ijk... will correspond to x_1, x_2, ..., x_T)
    # also, the fact that we defined beta_ini to be Mx1 and
    # dim(pi)=Mx1 as well implies that after np.einsum we will get
    # a tensor of dim 1 x K x K x ... x 1
    # remove the first and last dummy dimensions s, l
    # by explicitly "summing" over them
    return np.einsum('is,ij,i...ml->j...m', pi, B, beta)

def get_probability_sequence(
                            pi: Union[List, np.ndarray], 
                            A: Union[List, np.ndarray], 
                            B: Union[List, np.ndarray], 
                            xs_sequence: Union[List, np.ndarray]
                            ) -> float:
    ''' 
    Computes the probability of a specific sequence of x states

    Parameters:
        pi: Union[List, np.ndarray]
            Initial vector of probabilities (i.e. p(z_1)) for each possible state of z_1
        A:  Union[List, np.ndarray]
            Initial vector of probabilities (i.e. p(z_1)) for each possible state of z_1
        B:  Union[List, np.ndarray]
            time-independent matrix of transition probabiltiies p(x_t|z_t) for each x- and z-states
        xs_sequence: Union[List, np.ndarray]
            Full list of x-states for which to compute the probability.
            The order must be [x_T, x_{T-1},..., x_1]

    Returns:
        float: The probability of the input sequence.
    '''
    # xs_sequence must be ordered as [x_1, x_2,..., x_T]
    A = np.array(A)
    B = np.array(B)
    xs_sequence = np.array(xs_sequence)

    assert A.shape[0] == A.shape[1], 'A must be a square matrix'
    assert A.shape[0] == B.shape[0], 'number of rows of A and B must be the same'
    
    pi = np.array(pi).reshape(pi.size, 1)
    assert pi.shape[0] == B.shape[0], 'number of rows of pi and B must be the same'
    # x_t states can go from 1 to K
    assert (xs_sequence <= B.shape[1]).all(), 'there are states in input sequence that go beyond dimensions of B'
    
    T_ = len(xs_sequence)
    M_ = B.shape[0]
    K_ = B.shape[1]

    # compute initial state
    beta_ini = beta_initial(A.shape[0])
    # get sequence of betas for arbitrary xs
    beta_list = beta_recurrent(A, B, T_, beta_ini)
    # get general probability for first beta from list
    # dim(general_prob) = (K,K,...K)
    general_prob = get_general_probability_from_beta(beta_list[0], pi, B)
    # indices of general_prob start at 0 but x states are indexed by 1,..K
    # thats why we use xs_sequence-1. 
    # pick x-index for each K-dimension
    return general_prob[tuple(xs_sequence-1)]

In [42]:
beta_l = beta_recurrent(A, B, 2, beta_initial(A.shape[0]))

In [29]:
beta_l[-1].shape

(3, 1)

In [43]:
get_general_probability_from_beta(beta_l[0], pi, B)

array([[0.004975, 0.040025],
       [0.133275, 0.821725]])

In [47]:
get_probability_sequence(pi, A, B, list(reversed([1,2,1,1,2,2])))

0.0012056062545468752

The above number coincides with the result that we get by applying the forward algorithm (see notebook on forward algorithm). Note that the input list of xs has to be reversed with respect to the one that we input in the forward algorithm, as expected.