In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm, multivariate_normal


In [None]:

# Generate simple synthetic data: a mixture of two 1D Gaussians
np.random.seed(100)
n_samples = 300
w_true = [0.7, 0.3]  # True mixture weights
mu_true = [0, 5]
sigma_true = [1, 1]

n_samples_component_1 = int(n_samples * w_true[0])
n_samples_component_2 = n_samples - n_samples_component_1

data = np.concatenate([
    np.random.normal(mu_true[0], sigma_true[0], n_samples_component_1),
    np.random.normal(mu_true[1], sigma_true[1], n_samples_component_2)
])


In [None]:
plt.hist(data);

In [None]:
# Initialize parameters
K = 2
np.random.seed(42)
mu = np.random.choice(data, K)
sigma = np.full(K, np.std(data))
pi = np.full(K, 1 / K)  # start with equal weights 
log_likelihoods = []


# Gaussian PDF
def gaussian_pdf(x, mu, sigma):
    return norm.pdf(x, loc=mu, scale=sigma)




In [None]:
# EM algorithm
n_iterations = 20
for iteration in range(n_iterations):
    # === E-Step ===
    gamma = np.zeros((len(data), K))
    for k in range(K):
        gamma[:, k] = ###
    gamma /= gamma.sum(axis=1, keepdims=True)  # Normalize
    

    # === M-Step ===
    N_k = gamma.sum(axis=0)
    for k in range(K):
        mu[k] = ###
        sigma[k] = ###
        pi[k] = ###

    # Log-likelihood
    likelihood = np.zeros((len(data), K))
    for k in range(K):
        likelihood[:, k] = pi[k] * gaussian_pdf(data, mu[k], sigma[k])
    log_likelihood = np.sum(np.log(likelihood.sum(axis=1)))
    log_likelihoods.append(log_likelihood)

    print(f"Iteration {iteration+1}")
    print(f"  Means: {mu}")
    print(f"  Stddevs: {sigma}")
    print(f"  Mixing coefficients: {pi}")
    print(f"  Log-likelihood: {log_likelihood:.2f}")
    print("-" * 40)



In [None]:

x = np.linspace(min(data) - 1, max(data) + 1, 1000)
final_pdf = np.zeros_like(x)
for k in range(K):
    final_pdf += pi[k] * norm.pdf(x, mu[k], sigma[k])

plt.hist(data, bins=30, density=True, alpha=0.6, label='Data histogram')
plt.plot(x, final_pdf, label='Fitted GMM', linewidth=2)
for k in range(K):
    plt.plot(x, pi[k] * norm.pdf(x, mu[k], sigma[k]), '--', label=f'Component {k+1}')
plt.title("1D GMM Fit Using EM")
plt.legend()
plt.show()

## 2D Example

In [None]:
np.random.seed(1)
n_samples = 300

# True parameters
mu_true = [np.array([0, 0]), np.array([4, 4])]
cov_true = [np.array([[1, 0.5], [0.5, 1]]), np.array([[1, -0.3], [-0.3, 1]])]
pi_true = [0.7, 0.3]

n1 = int(n_samples * pi_true[0])
n2 = n_samples - n1

data = np.vstack([
    np.random.multivariate_normal(mu_true[0], cov_true[0], n1),
    np.random.multivariate_normal(mu_true[1], cov_true[1], n2)
])



In [None]:
plt.scatter(data[:,0], data[:,1]);

In [None]:

# Initialize parameters
np.random.seed(42)
gamma = np.random.dirichlet(alpha=[1]*K, size=len(data))  # Each row sums to 1

# Compute initial parameters from random responsibilities
N_k = gamma.sum(axis=0)
mu = np.array([
    (gamma[:, k][:, np.newaxis] * data).sum(axis=0) / N_k[k]
    for k in range(K)
])
cov = []
for k in range(K):
    diff = data - mu[k]
    weighted_cov = (gamma[:, k][:, np.newaxis, np.newaxis] * 
                    np.einsum('ni,nj->nij', diff, diff)).sum(axis=0) / N_k[k]
    cov.append(weighted_cov)
pi = N_k / len(data)

# Store this initial gamma
responsibilities_over_time = [gamma.copy()]



In [None]:

# EM algorithm
n_iterations = 40
for iteration in range(n_iterations):
    # E-step
    gamma = np.zeros((len(data), K))
    for k in range(K):
        gamma[:, k] = pi[k] * multivariate_normal.pdf(data, mean=mu[k], cov=cov[k])
    gamma /= gamma.sum(axis=1, keepdims=True)
    responsibilities_over_time.append(gamma.copy())

    # M-step
    N_k = gamma.sum(axis=0)
    for k in range(K):
        mu[k] = (gamma[:, k][:, np.newaxis] * data).sum(axis=0) / N_k[k]
        diff = data - mu[k]
        cov[k] = (gamma[:, k][:, np.newaxis, np.newaxis] * 
                  np.einsum('ni,nj->nij', diff, diff)).sum(axis=0) / N_k[k]
        pi[k] = N_k[k] / len(data)



In [None]:
selected_iterations = list(range(1, n_iterations, 4))  # [0, 2, 4, ..., 18]
fig, axes = plt.subplots(2, 5, figsize=(18, 7))

for ax, it in zip(axes.ravel(), selected_iterations):
    gamma = responsibilities_over_time[it]
    color = np.stack([
        gamma[:, 0],       # Red channel → responsibility for component 1
        np.zeros_like(gamma[:, 0]),  # Green channel = 0
        gamma[:, 1]        # Blue channel → responsibility for component 2
    ], axis=1)
    ax.scatter(data[:, 0], data[:, 1], color=color, s=20, edgecolor='k', alpha=0.8)
    ax.set_title(f"Iteration {it + 1}")
    ax.set_xlim(-4, 8)
    ax.set_ylim(-4, 8)
    ax.set_aspect('equal')
    ax.set_xticks([])
    ax.set_yticks([])

plt.suptitle("Responsiblities over EM Iterations", fontsize=18)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()