In [63]:
import numpy as np
import sympy as sp

# Define the normal distribution PDF
def normal_pdf(x, mean, var):
    return (1 / np.sqrt(2 * np.pi * var)) * np.exp(- (x - mean) ** 2 / (2 * var))

# Initialize parameters
np.random.seed(0)  # For reproducibility
n = 1000  # Number of data points
G = 3  # Number of components

# Mixture weights (initial guess)
pi = np.array([0.33, 0.33, 0.34])

# Means and variances for each component (initial guess) drawn from a positive normal distribution
means = np.random.normal(0, 1, G)
variances = np.abs(np.random.normal(0, 1, G))

# Generate synthetic data
true_means = [1.5, 3, 4.5]
true_variances = [1, 0.25, 0.36]
true_pi = [0.45, 0.26, 0.29]

# Assign data points to components
z = np.random.choice(G, size=n, p=true_pi)
y = np.array([np.random.normal(true_means[zi], np.sqrt(true_variances[zi])) for zi in z])

def e_step(y, pi, means, variances):
    n = len(y)
    G = len(pi)
    responsibilities = np.zeros((n, G))

    for i in range(n):
        for g in range(G):
            responsibilities[i, g] = pi[g] * normal_pdf(y[i], means[g], variances[g])
        responsibilities[i, :] /= np.sum(responsibilities[i, :])
    
    return responsibilities

def m_step(y, responsibilities):
    n, G = responsibilities.shape
    N_k = np.sum(responsibilities, axis=0)
    
    pi_new = N_k / n
    means_new = np.sum(responsibilities * y[:, np.newaxis], axis=0) / N_k
    variances_new = np.zeros(G)
    
    for g in range(G):
        variances_new[g] = np.sum(responsibilities[:, g] * (y - means_new[g])**2) / N_k[g]
    
    return pi_new, means_new, variances_new

def calculate_expected_complete_log_likelihood(y, responsibilities, means, variances, pi):
    n, G = responsibilities.shape
    log_likelihood = 0
    
    for i in range(n):
        for g in range(G):
            log_likelihood += responsibilities[i, g] * (np.log(pi[g]) + np.log(normal_pdf(y[i], means[g], variances[g])))
    
    return log_likelihood

def em_algorithm(y, pi, means, variances, max_iter=100, tol=1e-6):
    # Print the initial parameters
    print(f"Initial pi = {pi}")
    print(f"Initial means = {means}")
    print(f"Initial variances = {variances}")
    for iteration in range(max_iter):
        # E-Step
        responsibilities = e_step(y, pi, means, variances)
        
        # Calculate the expected complete log-likelihood
        log_likelihood = calculate_expected_complete_log_likelihood(y, responsibilities, means, variances, pi)
        if iteration % 10 == 0:
            print(f"Iteration {iteration}: Log-Likelihood = {log_likelihood}")
        
            # check if log-likelihood is improved at least by 5 percent
            if iteration > 0 and (log_likelihood - log_likelihood_prev) / np.abs(log_likelihood_prev) < -0.05:
                # print the improvment rate
                print(f"Improvement rate = {(log_likelihood - log_likelihood_prev) / np.abs(log_likelihood_prev)}")
                
        
        log_likelihood_prev = log_likelihood
        # M-Step
        pi_new, means_new, variances_new = m_step(y, responsibilities)
        
        # Check for convergence
        if np.allclose(pi, pi_new, atol=tol) and np.allclose(means, means_new, atol=tol) and np.allclose(variances, variances_new, atol=tol):
            print(f"Converged after {iteration} iterations")
            # print the converged parameters
            print(f"pi = {pi_new}")
            print(f"means = {means_new}")
            print(f"variances = {variances_new}")
            break
        
        pi, means, variances = pi_new, means_new, variances_new
    
    return pi, means, variances, responsibilities

# Run the EM algorithm
pi_est, means_est, variances_est, responsibilities_est = em_algorithm(y, pi, means, variances, max_iter=1000)


Initial pi = [0.33 0.33 0.34]
Initial means = [1.76405235 0.40015721 0.97873798]
Initial variances = [2.2408932  1.86755799 0.97727788]
Iteration 0: Log-Likelihood = -3266.579099343927
Iteration 10: Log-Likelihood = -2409.2214826353297
Iteration 20: Log-Likelihood = -2377.4324818959367
Iteration 30: Log-Likelihood = -2376.8694994274974
Iteration 40: Log-Likelihood = -2380.1648494097135
Iteration 50: Log-Likelihood = -2384.5879230728483
Iteration 60: Log-Likelihood = -2388.076182417474
Iteration 70: Log-Likelihood = -2388.495702547733
Iteration 80: Log-Likelihood = -2382.121412942661
Iteration 90: Log-Likelihood = -2359.2596615120397
Iteration 100: Log-Likelihood = -2293.5990713997885
Iteration 110: Log-Likelihood = -2184.8598251432795
Iteration 120: Log-Likelihood = -2122.211973038834
Iteration 130: Log-Likelihood = -2088.7053539622357
Iteration 140: Log-Likelihood = -2070.210675112268
Iteration 150: Log-Likelihood = -2064.6127001476625
Iteration 160: Log-Likelihood = -2065.60584173637

In [66]:
# Print sort initial and estimated parameters
print('-----------------------------------')
print(f"Initial pi = {np.sort(pi)}")
print(f"Initial means = {np.sort(means)}")
print(f"Initial variances = {np.sort(variances)}")
# Print estimated parameters
print('-----------------------------------')
print(f"Estimated mixture weights: {np.sort(pi_est)}")
print(f"Estimated means: {np.sort(means_est)}")
print(f"Estimated variances: {np.sort(variances_est)}")
print('-----------------------------------')
print(f"True mixture weights: {np.sort(true_pi)}")
print(f"True means: {np.sort(true_means)}")
print(f"True variances: {np.sort(true_variances)}")

-----------------------------------
Initial pi = [0.33 0.33 0.34]
Initial means = [0.40015721 0.97873798 1.76405235]
Initial variances = [0.97727788 1.86755799 2.2408932 ]
-----------------------------------
Estimated mixture weights: [0.257917   0.34645448 0.39562852]
Estimated means: [1.38669275 2.9793012  4.57873216]
Estimated variances: [0.2706809  0.32159385 0.73134055]
-----------------------------------
True mixture weights: [0.26 0.29 0.45]
True means: [1.5 3.  4.5]
True variances: [0.25 0.36 1.  ]


In [68]:
means = np.random.normal(0, 1, G)
variances = np.abs(np.random.normal(0, 1, G))
tol = 1e-6
# Manually calculate the responsibilities
# logging
resp_dict = {}
log_likelihood_dict = {}

for iteration in range(100):
        # E-Step
        responsibilities = e_step(y, pi, means, variances)
        
        # Calculate the expected complete log-likelihood
        log_likelihood = calculate_expected_complete_log_likelihood(y, responsibilities, means, variances, pi)
        if iteration % 10 == 0:
            print(f"Iteration {iteration}: Log-Likelihood = {log_likelihood}")
        
            # check if log-likelihood is improved at least by 5 percent
            if iteration > 0 and (log_likelihood - log_likelihood_prev) / np.abs(log_likelihood_prev) < -0.05:
                # print the improvment rate
                print(f"Improvement rate = {(log_likelihood - log_likelihood_prev) / np.abs(log_likelihood_prev)}")
                
        
        log_likelihood_prev = log_likelihood
        # M-Step
        pi_new, means_new, variances_new = m_step(y, responsibilities)
        
        # Check for convergence
        if np.allclose(pi, pi_new, atol=tol) and np.allclose(means, means_new, atol=tol) and np.allclose(variances, variances_new, atol=tol):
            print(f"Converged after {iteration} iterations")
            # print the converged parameters
            print(f"pi = {pi_new}")
            print(f"means = {means_new}")
            print(f"variances = {variances_new}")
            break
        
        pi, means, variances = pi_new, means_new, variances_new

Iteration 0: Log-Likelihood = -5953.014927521345
Iteration 10: Log-Likelihood = -2129.5893905976045
Iteration 20: Log-Likelihood = -2153.922766183279
Iteration 30: Log-Likelihood = -2174.8955020871113
Iteration 40: Log-Likelihood = -2194.3266114534085
Iteration 50: Log-Likelihood = -2211.945342159297
Iteration 60: Log-Likelihood = -2226.0333200816285
Iteration 70: Log-Likelihood = -2233.6621150031588
Iteration 80: Log-Likelihood = -2234.726655462529
Iteration 90: Log-Likelihood = -2231.433391791867


In [69]:
responsibilities

array([[7.00984197e-01, 2.37579552e-01, 6.14362519e-02],
       [8.63562193e-03, 4.24857572e-15, 9.91364378e-01],
       [3.35172113e-02, 1.57830004e-11, 9.66482789e-01],
       ...,
       [1.32545968e-01, 5.21021378e-08, 8.67453979e-01],
       [5.87806498e-01, 3.80322817e-01, 3.18706849e-02],
       [5.01647779e-04, 4.17024595e-23, 9.99498352e-01]])

In [70]:
responsibilities.shape

(1000, 3)

In [76]:
import numpy as np
import sympy as sp

# Define the normal distribution PDF using sympy
def normal_pdf_sympy(x, mean, var):
    return (1 / sp.sqrt(2 * sp.pi * var)) * sp.exp(- (x - mean)**2 / (2 * var))

# Define the normal distribution PDF using numpy
def normal_pdf_numpy(x, mean, var):
    return (1 / np.sqrt(2 * np.pi * var)) * np.exp(- (x - mean) ** 2 / (2 * var))

# Initialize parameters
np.random.seed(0)  # For reproducibility
n = 100  # Number of data points
G = 3  # Number of components

# Mixture weights (initial guess)
pi = np.array([0.33, 0.33, 0.34])

# Means and variances for each component (initial guess)
means = np.array([1.0, 3.0, 5.0])
variances = np.array([1.0, 1.0, 1.0])

# Generate synthetic data
true_means = [1.5, 3, 4.5]
true_variances = [1, 0.25, 0.36]
true_pi = [0.33, 0.33, 0.34]

# Assign data points to components
z = np.random.choice(G, size=n, p=true_pi)
y = np.array([np.random.normal(true_means[zi], np.sqrt(true_variances[zi])) for zi in z])

# E-Step: Calculate responsibilities
def e_step(y, pi, means, variances):
    n = len(y)
    G = len(pi)
    responsibilities = np.zeros((n, G))

    for i in range(n):
        for g in range(G):
            responsibilities[i, g] = pi[g] * normal_pdf_numpy(y[i], means[g], variances[g])
        responsibilities[i, :] /= np.sum(responsibilities[i, :])  # Normalize responsibilities
    
    return responsibilities

# Calculate expected complete log-likelihood
def calculate_expected_complete_log_likelihood(y, responsibilities, means, variances, pi):
    n, G = responsibilities.shape
    log_likelihood = 0
    
    for i in range(n):
        for g in range(G):
            log_likelihood += responsibilities[i, g] * (np.log(pi[g]) + np.log(normal_pdf_numpy(y[i], means[g], variances[g])))
    
    return log_likelihood

# Symbolic differentiation using sympy
def symbolic_m_step(y, responsibilities):
    n, G = responsibilities.shape
    N_k = np.sum(responsibilities, axis=0)  # Effective number of points assigned to each component
    
    # Initialize new parameters
    pi_new = N_k / n
    
    # Define symbols for mean and variance
    mu, sigma2, x = sp.symbols('mu sigma2 x')
    
    # Log of the normal PDF using sympy
    log_normal_pdf = sp.log((1 / sp.sqrt(2 * sp.pi * sigma2)) * sp.exp(- (x - mu)**2 / (2 * sigma2)))
    
    means_new = np.zeros(G)
    variances_new = np.zeros(G)
    
    for g in range(G):
        # Calculate means
        Q_mu = sum(responsibilities[i, g] * log_normal_pdf.subs({x: y[i], sigma2: variances[g]}) for i in range(n))
        dQ_mu = sp.diff(Q_mu, mu)
        means_new[g] = float(sp.solve(dQ_mu, mu)[0])
        
        # Calculate variances
        Q_sigma2 = sum(responsibilities[i, g] * log_normal_pdf.subs({x: y[i], mu: means_new[g]}) for i in range(n))
        dQ_sigma2 = sp.diff(Q_sigma2, sigma2)
        variances_new[g] = float(sp.solve(dQ_sigma2, sigma2)[0])
    
    return pi_new, means_new, variances_new

# EM Algorithm
def em_algorithm(y, pi, means, variances, max_iter=100, tol=1e-6):
    for iteration in range(max_iter):
        # E-Step
        responsibilities = e_step(y, pi, means, variances)
        
        # Calculate the expected complete log-likelihood
        log_likelihood = calculate_expected_complete_log_likelihood(y, responsibilities, means, variances, pi)
        print(f"Iteration {iteration}: Log-Likelihood = {log_likelihood}")
        
        # M-Step using symbolic differentiation
        pi_new, means_new, variances_new = symbolic_m_step(y, responsibilities)
        
        # Check for convergence
        if np.allclose(pi, pi_new, atol=tol) and np.allclose(means, means_new, atol=tol) and np.allclose(variances, variances_new, atol=tol):
            break
        
        # Update parameters for the next iteration
        pi, means, variances = pi_new, means_new, variances_new
    
    return pi, means, variances, responsibilities

# Run the EM algorithm
pi_est, means_est, variances_est, responsibilities_est = em_algorithm(y, pi, means, variances)

# Print estimated parameters
print(f"Estimated mixture weights: {pi_est}")
print(f"Estimated means: {means_est}")
print(f"Estimated variances: {variances_est}")


Iteration 0: Log-Likelihood = -245.28506402383815
