Authors: Wenbo Fu (679744457), Bingyan Liu(668046518)

Both authors contribute equally

In [1]:
import numpy as np

# Part I

In [2]:
data = np.loadtxt('faithful.dat',skiprows = 1)

In [3]:
X = data[:,1:].T

In [4]:
p, n = X.shape

In [5]:
X.shape

(2, 272)

In [6]:
def estep(X, pi, Sigma, mu):
# X: (p,n)
# pi: (G,)
# Sigma: (p,p)
# mu: (p,G)

#return
#prob.T (n,G)
    p, n = X.shape
    G = pi.shape
    deltax = X[:,np.newaxis,:]-mu[:, :, np.newaxis] #(p,G,n)
    U, S, V = np.linalg.svd(Sigma)
    D = np.diag(1/np.sqrt(S))
    deltax1 = np.tensordot(D@U.T,deltax,axes=(1, 0))
    exponent = -1/2*np.sum(deltax1**2,axis=0)
    N = 1/np.sqrt(np.linalg.det(Sigma)*(2*np.pi)**p)*np.exp(exponent) #(G,n)
    prob = np.diag(pi)@N / np.sum(np.diag(pi)@N,axis = 0)
    
    return prob.T    

In [7]:
def mstep(X,prob):
# X: (p,n)
# prob: (n,G)

#return
#pi: (G)
#mu: (p,G)
#Sigma: (p,p)
    p, n = X.shape
    pi = np.sum(prob,axis = 0)/n
    mu = X @ prob / np.sum(prob,axis = 0)[np.newaxis,:] #(p,G)
    deltax = X[:,np.newaxis,:]-mu[:, :, np.newaxis] #(p,G,n)
    deltax1 = deltax * np.sqrt(prob.T)[np.newaxis,:,:]
    Sigma = np.tensordot(deltax1,deltax1,axes=([1,2],[1,2]))/n
    return pi, mu, Sigma

In [8]:
def loglik(X,pi,Sigma,mu):
# X: (p,n)
# pi: (G,)
# Sigma: (p,p)
# mu: (p,G)  

    p, n = X.shape
    G = pi.shape
    deltax = X[:,np.newaxis,:]-mu[:, :, np.newaxis] #(p,G,n)
    U, S, V = np.linalg.svd(Sigma)
    D = np.diag(1/np.sqrt(S))
    deltax1 = np.tensordot(D@U.T,deltax,axes=(1, 0))
    exponent = -1/2*np.sum(deltax1**2,axis=0)
    N = 1/np.sqrt(np.linalg.det(Sigma)*(2*np.pi)**p)*np.exp(exponent) #(G,n)
    logp = np.sum(np.log(pi @ N))
    
    return logp

In [9]:
def myEM(X,G,pi,Sigma,mu,itmax):
    
    for i in range(itmax):
        prob = estep(X, pi, Sigma, mu)
        pi, mu, Sigma = mstep(X,prob)
    
    logp = loglik(X,pi,Sigma,mu)
    return pi, mu, Sigma, logp    

In [10]:
itmax = 20
G=2
p1 = 10/n
p2 = (n-10)/n
pi = np.array([p1,p2])

mu1 = np.mean(X[:,:10],axis = 1)
mu2 = np.mean(X[:,10:],axis = 1)
mu = np.vstack((mu1, mu2)).T

deltax1 = X[:,:10] - mu1[:,np.newaxis]
deltax2 = X[:,10:] - mu2[:,np.newaxis]

Sigma = 1/n*(deltax1@deltax1.T+deltax2@deltax2.T)

pi, mu, Sigma, logp = myEM(X,G,pi,Sigma,mu,itmax)

In [11]:
pi

array([0.04297883, 0.95702117])

In [12]:
mu

array([[ 3.49564188,  3.48743016],
       [76.79789154, 70.63205853]])

In [13]:
Sigma

array([[  1.29793612,  13.92433626],
       [ 13.92433626, 182.58009247]])

In [14]:
logp

-1289.5693549424104

In [15]:
itmax = 20
G=3

p1 = 10/n
p2 = 20/n
p3 = (n-30)/n
pi = np.array([p1,p2,p3])

mu1 = np.mean(X[:,:10],axis = 1)
mu2 = np.mean(X[:,10:30],axis = 1)
mu3 = np.mean(X[:,30:],axis = 1)
mu = np.vstack((mu1, mu2, mu3)).T

deltax1 = X[:,:10] - mu1[:,np.newaxis]
deltax2 = X[:,10:30] - mu2[:,np.newaxis]
deltax3 = X[:,30:] - mu3[:,np.newaxis]

Sigma = 1/n*(deltax1@deltax1.T+deltax2@deltax2.T+deltax3@deltax3.T)

pi, mu, Sigma, logp = myEM(X,G,pi,Sigma,mu,itmax)

In [16]:
pi

array([0.04363422, 0.07718656, 0.87917922])

In [17]:
mu

array([[ 3.51006918,  2.81616674,  3.54564083],
       [77.10563811, 63.35752634, 71.25084801]])

In [18]:
Sigma

array([[  1.26015772,  13.51153756],
       [ 13.51153756, 177.96419105]])

In [19]:
logp

-1289.3509588627348

# Part II

In [20]:
data = np.loadtxt('coding4_part2_data.txt')

In [21]:
X = data.astype(int)

In [22]:
def BWonestep(X,w,A,B,mx,mz):
    
# X: (T)
# w: (mz)
# A: (mz,mz)
# B: (mz,mx)

#return
# A: (mz,mz)
# B: (mz,mx)

    T = len(X)
    alpha = np.zeros((T,mz))
    beta = np.ones((T,mz))
    alpha[0,:] = w*B[:,X[0]-1]
    for i in range(1, T):
        alpha[i,:]=(alpha[i-1,:]@A)*B[:,X[i]-1]
        beta[T-1-i,:] = A@np.diag(beta[T-i,:])@B[:,X[T-i]-1]
    gamma = np.zeros((T-1,mz,mz))
    gamma_t = np.zeros((T,mz))
    for i in range(T-1):
        gamma[i,:,:] = np.diag(alpha[i,:])@A@np.diag(beta[i+1,:])@np.diag(B[:,X[i+1]-1])
#    gamma_t[:-1,:] = np.sum(gamma,axis = 2)
#    gamma_t[-1,:] = np.sum(gamma[T-2,:,:],axis = 1)
    gamma_t[1:,:] = np.sum(gamma,axis = 1)
    gamma_t[0,:] = np.sum(gamma[0,:,:],axis = 1)
    
    A = np.diag(1/np.sum(gamma,axis=(0,2)))@np.sum(gamma,axis=0)
    for i in range(mx):
        gamma_ti = gamma_t[np.where(X==i+1)[0],:]
        B[:,i] = np.sum(gamma_ti,axis=0)/np.sum(gamma_t,axis=0)
        
    return A, B
    

In [23]:
def myBW(X,w,A,B,mx,mz,itmax):
# X: (T)
# w: (mz)
# A: (mz,mz)
# B: (mz,mx)

#return
# A: (mz,mz)
# B: (mz,mx)
    for i in range(itmax):
        A, B = BWonestep(X,w,A,B,mx,mz)
    return A, B

In [24]:
mx = 3
mz = 2
w = np.array([0.5,0.5])
A = np.array([[0.5,0.5],[0.5,0.5]])
B = np.array([[1/9,3/9,5/9],[1/6,2/6,3/6]])
itmax = 100

In [25]:
A, B= myBW(X,w,A,B,mx,mz,itmax)

In [26]:
A

array([[0.49793938, 0.50206062],
       [0.44883431, 0.55116569]])

In [27]:
B

array([[0.22159897, 0.20266127, 0.57573976],
       [0.34175148, 0.17866665, 0.47958186]])

In [28]:
def myViterbi(X,w,A,B,mx,mz):
    
# X: (T)
# w: (mz)
# A: (mz,mz)
# B: (mz,mx)

#return
# Z: (T)
    T = len(X)
    delta = np.zeros((T,mz))
    delta[0,:] = w*B[:,X[0]-1]
    for i in range(1,T):
        delta[i,:]=np.max(delta[i-1,:]*A.T,axis = 1)*B[:,X[i]-1]
    Z = np.zeros(T).astype(int)
    Z[-1] = np.argmax(delta[-1,:]) + 1
    for i in range(1,T):
        Z[T-i-1] = np.argmax(delta[T-i-1,:]*A[:,Z[T-i]-1])+1
    return Z

In [29]:
Z = myViterbi(X,w,A,B,mx,mz)

In [30]:
file_path = 'Coding4_part2_Z.txt'
with open(file_path, "r") as file:
    lines = file.read().splitlines()
data=[]
for line in lines:
    data += line.split()
Z1 = np.array(data).astype(int)

In [31]:
np.array_equal(Z,Z1)

True