## Latent Class Analysis 
- Uses conjugate priors

### Author: Joey Jingze

In [None]:
import numpy as np
import time
from tqdm.notebook import tqdm,trange

# Generate synthetic data

In [None]:
def gen_data(num_class = 4, num_feature = 7, num_object = 100000):
    mu = np.ones(num_class) # the dirichlet prior alpha
    alpha, beta = 1,1  # the beta prior

    pi = np.random.dirichlet(mu)
    p = np.random.beta(alpha, beta, size = (num_class, num_feature))
    c = np.random.choice(num_class, p = pi, size = num_object) # the true class labels

    y = np.zeros((num_object, num_feature)) # the observations

    for i in range(num_class):
        idx = c==i
        y[idx,:] = np.random.binomial(1, p[i,:], size = (idx.sum(),num_feature))
    return y, c

In [None]:
# very fast
num_class = 3
num_feature = 5
num_object = 1000
s = time.time()
y,c_groundtruth = gen_data(num_class, num_feature, num_object)
print(time.time()-s)

In [None]:
y.shape, c_groundtruth.shape

In [None]:
gt = (c_groundtruth==0).sum(), (c_groundtruth==1).sum(), (c_groundtruth==2).sum()
gt / np.sum(gt)

# Gibbs

In [None]:
num_iter = 3000

In [None]:
# allocate storage
Pi = np.zeros((num_iter, num_class))    # Pi
Pjk = np.zeros((num_iter, num_class, num_feature))  # Pjk
Cij = np.zeros((num_iter, num_object, num_class))    # Cij
Cij_prob = np.zeros((num_iter, num_object, num_class)) # Cij.pr


In [None]:
# initialize 
mu = np.ones(num_class) # the dirichlet prior alpha
Pi_current = np.random.dirichlet(mu)  # pi.t1
alpha, beta = np.ones((num_class, num_feature)), np.ones((num_class, num_feature))   # the beta prior
Pjk_current = np.random.beta(alpha, beta) # pjk.t1

Cij_current = np.zeros((num_object, num_class)) # Cl.t1
Cij_prob_current = np.zeros((num_object, num_class)) # Clp.t1


In [None]:
# one step iteration

# update Cij and C
for i in range(num_object):
    temp = Pi_current * np.prod(Pjk_current**y[i,:] * (1-Pjk_current)**(1-y[i,:]), axis=1)
    Cij_prob_current[i,:] = temp/temp.sum()
    Cij_current[i,:] = np.random.multinomial(1, Cij_prob_current[i,:])

# update Pi and Pjk
Pi_current = np.random.dirichlet(mu + Cij_current.sum(axis = 0))

temp_alpha = alpha.copy() 
temp_beta = beta.copy()

# update Pjk 
for j in range(num_class):
    for k in range(num_feature):
        temp_alpha[j,k] += np.sum(y[:,k]*Cij_current[:,j])
        temp_beta[j,k] += np.sum((1-y[:,k])*Cij_current[:,j])

Pjk_current = np.random.beta(temp_alpha, temp_beta) 



# Gibbs iterations

In [None]:
def Gibbs_step(Pi_current, Pjk_current, Cij_prob_current, Cij_current):
    # update Cij and C
    for i in range(num_object):
        temp = Pi_current * np.prod(Pjk_current**y[i,:] * (1-Pjk_current)**(1-y[i,:]), axis=1)
        Cij_prob_current[i,:] = temp/temp.sum()
        Cij_current[i,:] = np.random.multinomial(1, Cij_prob_current[i,:])

    # update Pi
    Pi_current = np.random.dirichlet(mu + Cij_current.sum(axis = 0))

    temp_alpha = alpha.copy() 
    temp_beta = beta.copy() 

    # update Pjk
    for j in range(num_class):
        for k in range(num_feature):
            temp_alpha[j,k] += np.sum(y[:,k]*Cij_current[:,j])
            temp_beta[j,k] += np.sum((1-y[:,k])*Cij_current[:,j])

    Pjk_current = np.random.beta(temp_alpha, temp_beta) 
    
    return Pi_current, Pjk_current, Cij_prob_current, Cij_current

In [None]:
num_iter = 3000
# allocate storage
Pi = np.zeros((num_iter, num_class))    # Pi
Pjk = np.zeros((num_iter, num_class, num_feature))  # Pjk
Cij = np.zeros((num_iter, num_object, num_class))    # Cij
Cij_prob = np.zeros((num_iter, num_object, num_class)) # Cij.pr

# initialize 
mu = np.ones(num_class) # the dirichlet prior alpha
Pi_current = np.random.dirichlet(mu)  # pi.t1
alpha, beta = np.ones((num_class, num_feature)), np.ones((num_class, num_feature))   # the beta prior
Pjk_current = np.random.beta(alpha, beta, size = (num_class, num_feature)) # pjk.t1

Cij_prob_current = np.zeros((num_object, num_class)) # Clp.t1
Cij_current = np.zeros((num_object, num_class)) # Cl.t1

# start iteration
for i in trange(num_iter):
    Pi_current, Pjk_current, Cij_prob_current, Cij_current = Gibbs_step(Pi_current, Pjk_current, Cij_prob_current, Cij_current)
    Pi[i,:] = Pi_current
    Pjk[i,:,:] = Pjk_current
    Cij_prob[i,:,:] = Cij_prob_current    
    Cij[i,:,:] = Cij_current


In [None]:
print(Pi[-5:,:])

In [None]:
print(Pjk[-5:,:,:])

In [None]:
a = Cij[-1,:,:].sum(axis=0)

In [None]:
a / a.sum()

In [None]:
# another run
# the position switched!
Cij[-1,:,:].sum(axis=0)