In [1]:
import numpy as np

In [2]:
def forward_HMMs(O, A, B, pi):
    
    # O là chuỗi quan sát (observations)
    # A là ma trận xác suất chuyển trạng thái (transition_probability_matrix)
    # B là ma trận xác suất bộc phát (emission_probabilities) 
    # pi là phân phối xác suất khởi đầu
    
    N = A.shape[0] # số trạng thái có thể xảy ra
    T = O.shape[0] # độ dài của chuỗi quan sát
    
    a = np.zeros((T, N)) # Khởi tạo ma trận với a[t][j] là xác suất ở trạng thái ẩn j sau t dấu hiệu quan sát được đầu tiên
    
    a[0, :] = pi[:]*B[:, O[0]] # Khởi tạo các giá trị của trạng thái t = 0               
        
    for t in range (1, T):
        for j in range (0, N):
            a[t, j] = np.dot(a[t-1, :], A[:, j]) * B[j, O[t]] # thao tác cơ sở
            
    forward_prob = np.sum(a[T-1, :]) # độ hợp lý của chuỗi quan sát
    
    return forward_prob

In [3]:
# Đây là hàm trả về xác suất xảy ra quan sát khi biết trạng thái
def probOfObservationGivenState(B, index_observation, state, o, Q):
    # B là ma trận xác suất bộc phát (emission_probability_matrix)
    # o là tập các quan sát ban đầu đã biết
    # Q là tập trạng thái ban đầu đã biết
    # observation và state dùng để tính xác suất xảy ra quan sát observation ở trạng thái state
    index_state = Q.index(state)
    return B[index_state, index_observation]

def viterbi_HMMs(O, A, B, pi, Q, o):
    # O là chuỗi quan sát đầu vào (observations)
    # A là ma trận xác suất chuyển đổi trạng thái (transition_probability_matrix)
    # pi là phân phối xác suất khởi đầu (initial_distribution)
    # B là ma trận xác suất bộc phát (emission_probability_matrix)
    # Q là tập các trạng thái đã cho
    # o là tập quan sát đã cho
    hidden_states = [] # chuỗi trạng thái cần tìm


    prev_state = -1
    for index, observation in enumerate(O):
        index_observation = o.index(O[index])   
        delta = []
        for s in range(len(Q)):
            if index == 0:
                transition_prob = pi[s]
            else:
                transition_prob = A[prev_state, s]

            emission_prob = probOfObservationGivenState(B, index_observation, Q[s], o, Q) 
            delta.append(emission_prob*transition_prob)
            
        delta_max = max(delta)
        prev_state = delta.index(delta_max)
        hidden_states.append(Q[prev_state])
    
    return hidden_states

In [11]:
def forward_backward(O, A, B, pi): 
    N = A.shape[0] # Số trạng thái 
    T = O.shape[0] # Số dữ liệu quan sát được 

  # Tính ma trận alpha 
    alpha = np.zeros((T, N))  
    alpha[0, :] = pi[:] * B [:, O[0]]
    for t in range(1, T): 
        for j in range(N): 
            for i in range(N): 
                alpha[t, j] += alpha[t - 1, i] * A[i, j] * B[j, O[t]]
  
  # Tính ma trận beta 
    beta = np.zeros((T, N)) 
    beta[-1] = np.ones(N) 
    for t in range(T - 2, -1, -1): 
        for i in range(N): 
            for j in range(N):  
                beta[t, i] += A[i, j] * B[j, O[t + 1]] * beta[t + 1, j]
    return alpha, beta 

def baum_welch(O, num_states, num_obs, num_iter): 
  # Khởi tạo các giá trị ngẫu nhiên cho ma trận A, B 
    A = np.random.rand(num_states, num_states)
    B = np.random.rand(num_states, num_obs)
    A /= np.sum(A, 1).reshape(-1, 1)
    B /= np.sum(B, 1).reshape(-1, 1)
    pi = np.ones(num_states)
    pi /= np.sum(pi)  

    N = A.shape[0] # Số trạng thái 
    T = O.shape[0] # Số dữ liệu quan sát được 
    M = B.shape[1] # Số dấu hiệu 

    forward_prob = -1 # Biến lưu lại forward probability 

    for iter in range(num_iter): 

        # Tính ma trận alpha, beta
        alpha, beta = forward_backward(O, A, B, pi) 

        # E-step 
        # Tính ma trận xi 
        xi = np.zeros((N, N, T - 1))
        for t in range(T - 1): 
            denominator = np.dot(alpha[t].T, beta[t]) 
            for i in range(N): 
                numerator = alpha[t, i] * A[i, :] * B[:, O[t + 1]] * beta[t + 1] 
                xi[i, :, t] = numerator / denominator 
        
        # Tính ma trận gamma 
        mult_ab = alpha * beta
        gamma = mult_ab / np.sum(mult_ab, axis = 1).reshape((T, 1))

        # M-step
        # Cập nhật ma trận A 
        numerator_A = np.sum(xi, axis = 2) 
        denominator_A = np.sum(numerator_A, axis = 1).reshape((N, 1))
        A = numerator_A / denominator_A

        # Cập nhật ma trận B 
        denominator_B = np.sum(gamma, axis = 0)
        for i in range(M): 
            B[:, i] = np.sum(gamma[O == i, :], axis = 0) / denominator_B

        forward_prob = np.sum(alpha[-1]) 
    return A, B, forward_prob 

# Ánh xạ các trạng thái và dấu hiệu sang số 
def cvIndexSO(S, O, Q, o): 
    rS = np.array([Q.index(_) for _ in S])
    rO = np.array([o.index(_) for _ in O])
    return rS, rO

# Phần  cài đặt
Câu 2:

a)

Theo đề bài ta có, người chơi không biết anh Huy đã tung viên nào, chỉ biết lần tung đó ra mặt nào nên dựa vào phần lý thuyết về mô hình Markov ẩn dễ dàng ta nhận ra rằng:

* Loại xúc xắc anh Huy tung chính là trạng thái (state) và nó là dữ kiện ẩn. Do đó, gọi $q_t \in \{Imbalanced, Balanced\}$ là loại xúc xắc mà anh Huy tung tại thời điểm $t$ với $q_t = Balanced$ chính là **viên xúc xắc cân bằng** và ngược lại là **viên xúc xắc lỗi**.
* Số xuất hiện trên mặt con xúc xắc chính là quan sát (observation). Gọi $O_t \in \{1, 2, 3, 4, 5, 6\}$ là số xuất hiện trên mặt con xúc xắc tại thời điểm $t$ với $O_t = i\ (1 \leq i \leq 6)$ chính là tại xuất hiện mặt số $i$.
* Gọi A là ma trận xác suất chuyển trạng thái với hệ số $a_{ij}$ đặc trưng cho xác suất chuyển từ trạng thái $i\ (i\in \{Imbalanced,Balanced\})$ sang trạng thái $j\ (j\in\{Imbalanced, Balanced\})$ (chẳng hạn chuyển từ trạng thái Balanced $\rightarrow$ Balanced, Balanced $\rightarrow$ Imbalanced... ):
$$A = \begin{pmatrix}0.3 & 0.7 \\ 0.2 &  0.8 \end{pmatrix} $$    
* Gọi B là ma trận phân bố xác suất với hệ số $b_{ij}$ đặc trưng cho xác suất quan sát được quan sát $j\ \in \{1, 2, 3, 4, 5, 6\}$ trong trạng thái $i \in \{Imbalanced, Balanced\}$, dựa vào dữ kiện đề bài ta có:
$$ B = \begin{pmatrix}0.1 &  0.1 & 0.1 & 0.1 & 0.1 & 0.5 \\ \frac{1}{6} & \frac{1}{6} & \frac{1}{6} & \frac{1}{6} & \frac{1}{6} & \frac{1}{6} \end{pmatrix}$$

b) 

Ta sẽ phát sinh ngẫu nhiên một chuỗi quan sát có độ dài $T = 100$ theo đúng mô tả trên như sau:

In [5]:
Q = ['Imbalanced', 'Balanced'] 
o = [1, 2, 3, 4, 5, 6]
A = np.array(([.3, .7], [.2, .8]))
B = np.array(([0.1, 0.1, 0.1, 0.1, 0.1, 0.5], [1/6, 1/6, 1/6, 1/6, 1/6, 1/6]))
pi = np.array([.8, .2]) # phân phối xác suất khởi đầu đối với các trạng thái ẩn 
length = 100 # độ dài chuỗi quan sát

In [6]:
#Hàm phát sinh chuỗi quan sát có độ dài n
def GeneratingASequenceHMMsOfLengthN(length, Q, o, A, pi, B, verbose = False):
    
    
    O = []# Chuỗi quan sát với độ dài length
    S = []# Trạng thái của quan sát tương ứng
    
    pos_Q = np.arange(len(Q))
    pos_o = np.arange(len(o))
    
    #Quan sát và trạng thái ban đầu
    random_pos_S = np.random.choice(pos_Q, p = pi) # Sinh ngẫu nhiên trạng thái ban đầu với xác suất p = pi
    random_pos_O = np.random.choice(pos_o, p = B[random_pos_S, :]) # Sinh quan sát ở trạng thái S[0] tại thời điểm ban đầu với xác suất p = xác suất quan sát được các quan sát ở trạng thái S[0]
    S.append(Q[random_pos_S])     
    O.append(o[random_pos_O]) 
    if verbose:
        print(S[0], O[0])
    
    for i in range(1, length):
        random_pos_S = np.random.choice(pos_Q, p = A[random_pos_S, :])
        random_pos_O = np.random.choice(pos_o, p = B[random_pos_S, :])
        S.append(Q[random_pos_S]) # Sinh ngẫu nhiên trạng thái i với xác suất p = xác suất chuyển từ trạng thái S[i-1]
        O.append(o[random_pos_O]) # Sinh quan sát ở trạng thái S[0] tại thời điểm i với xác suất p = xác suất quan sát được các quan sát ở trạng thái S[i]
        if verbose:
            print(S[i], O[i])
    return S, O

In [7]:
iters = 1000
acc_arr = np.empty(iters, dtype = 'float32') # mảng lưu lại các kết quả độ chính xác mỗi lần làm lại thí nghiệm

In [8]:
for i in range(iters):
    S, O = GeneratingASequenceHMMsOfLengthN(length, Q, o, A, pi, B, verbose = False)
    hidden_states = viterbi_HMMs(O, A, B, pi, Q, o)
    n = len(S)
    cnt = 0
    for j in range(n):
        if hidden_states[j] == S[j]:
            cnt = cnt + 1
    acc_arr[i] = cnt/length

In [9]:
print(acc_arr)

print(np.mean(acc_arr))

print(np.std(acc_arr))

[0.79 0.79 0.84 0.79 0.72 0.79 0.8  0.77 0.83 0.78 0.75 0.75 0.78 0.77
 0.76 0.85 0.75 0.81 0.78 0.73 0.73 0.77 0.73 0.85 0.77 0.76 0.71 0.78
 0.79 0.81 0.75 0.8  0.78 0.76 0.79 0.7  0.78 0.79 0.85 0.75 0.72 0.74
 0.8  0.72 0.83 0.82 0.71 0.82 0.83 0.84 0.85 0.78 0.72 0.78 0.73 0.74
 0.76 0.76 0.8  0.76 0.86 0.85 0.77 0.81 0.79 0.83 0.78 0.84 0.84 0.65
 0.83 0.73 0.81 0.69 0.8  0.69 0.69 0.83 0.79 0.83 0.85 0.63 0.77 0.71
 0.74 0.78 0.73 0.73 0.75 0.77 0.79 0.75 0.79 0.8  0.85 0.71 0.83 0.76
 0.72 0.78 0.76 0.82 0.84 0.79 0.72 0.77 0.82 0.85 0.75 0.74 0.77 0.78
 0.78 0.76 0.78 0.72 0.77 0.83 0.79 0.72 0.68 0.88 0.84 0.78 0.77 0.78
 0.8  0.81 0.72 0.79 0.79 0.8  0.8  0.77 0.81 0.72 0.79 0.91 0.81 0.76
 0.83 0.69 0.8  0.82 0.77 0.79 0.86 0.76 0.8  0.78 0.8  0.69 0.68 0.8
 0.75 0.76 0.84 0.81 0.73 0.72 0.77 0.63 0.76 0.8  0.82 0.79 0.81 0.87
 0.73 0.77 0.75 0.81 0.72 0.7  0.75 0.81 0.77 0.85 0.81 0.79 0.75 0.76
 0.77 0.8  0.81 0.74 0.81 0.77 0.75 0.79 0.68 0.69 0.76 0.78 0.78 0.83
 0.81 0

# Nhận xét
Sau 1000 lần lặp lại thí nghiệm ta ghi nhận lại số liệu ta có thể đánh giá rằng mô hình dự đoán này là khá tốt (ổn định) vì:

* Độ chính xác của nó và ta tính được độ chính xác trung bình là 0.77756 tức là đâu đó tiệm cận 0.8. 
* Có độ lệch chuẩn là 0.05 tương đối thấp.



In [12]:
nTry = 5 
num_states = B.shape[0] 
num_obs = B.shape[1]
np.set_printoptions(suppress=True)
for iTry in range(nTry): 
    S, O = GeneratingASequenceHMMsOfLengthN(100, Q, o, A, pi, B, verbose = False) 
    S, O = cvIndexSO(S, O, Q, o)
    for i in range(3): 
        A, B, forward_prob = baum_welch(O, num_states, num_obs, 100)
        print(f'Try {iTry + 1}.{i + 1}')
        print(f'A matrix\n{A}')
        print(f'B matrix\n{B}')
        print(f'forward_prob = {forward_prob}\n')

Try 1.1
A matrix
[[0.05154961 0.94845039]
 [0.763004   0.236996  ]]
B matrix
[[0.00024093 0.26757906 0.25798431 0.17754408 0.14067648 0.15597512]
 [0.30782743 0.05453753 0.09856551 0.10951795 0.17568877 0.25386281]]
forward_prob = 6.022179402322201e-77

Try 1.2
A matrix
[[0.08060017 0.91939983]
 [0.77518324 0.22481676]]
B matrix
[[0.00017376 0.26928925 0.2536213  0.18092003 0.13958829 0.15640736]
 [0.31446667 0.0485238  0.09886558 0.10519042 0.1773637  0.25558983]]
forward_prob = 5.860395961516386e-77

Try 1.3
A matrix
[[0.23673967 0.76326033]
 [0.94513857 0.05486143]]
B matrix
[[0.30854264 0.05389337 0.09870206 0.10887579 0.17588785 0.25409827]
 [0.00000646 0.26792402 0.25748345 0.17818979 0.14050541 0.15589087]]
forward_prob = 6.012514534539675e-77

Try 2.1
A matrix
[[0.18544068 0.81455932]
 [0.80338684 0.19661316]]
B matrix
[[0.09420607 0.34308075 0.24223549 0.10431339 0.19237736 0.02378694]
 [0.18497882 0.         0.05940626 0.19487341 0.14802095 0.41272056]]
forward_prob = 5.16248

# Nhận xét
Sau 5 lần lặp lại việc tạo ngẫu nhiên chuỗi quan sát ngẫu nhiên, với mỗi chuỗi quan sát cho chạy 3 lần thuật toán Baum Welch với ma trận A, B được khởi tạo khác nhau ta thấy: 

* Với mỗi chuỗi quan sát ngẫu nhiên kết quả cho ra là khác nhau mặc dù các chuỗi quan sát đấy đều được khởi tạo từ các ma trận xác suất biết trước. 
* Với mỗi lần chạy cho cùng một chuỗi quan sát, kết quả cho ra có một vài sự khác biệt. Do đó ta có thể thấy dù Baum Welch là một thuật toán học không cần giám sát tuy nhiên sự khởi tạo của ma trận A (trasition matrix) và ma trận B (emission matrix) là quan trọng và nó ảnh hưởng đến kết quả đầu ra. 
