In [1]:
import matplotlib.pyplot as plt
from scipy.stats import norm
import numpy as np
import matplotlib

matplotlib.use('agg')

def generate_gaussian_samples(mu: list, sigma: list) -> np.ndarray:
    """Generate samples from multiple Gaussian distributions specified by means and standard deviations."""
    samples = np.zeros(1000*len(mu))
    np.random.seed(0)
    for i, (mean, std) in enumerate(zip(mu, sigma)):
        samples[i * 1000:(i + 1) * 1000] = np.random.normal(mean, std, 1000)
    print('Sample size:', np.shape(samples))
    plt.hist(samples, bins=70, density=True)
    plt.ylabel('N samples')
    print('\n')
    return samples

def initialize_parameters(n: int, seed) -> tuple:
    """Initialize parameters for the Gaussian Mixture Model: mixing coefficients, means, and standard deviations."""
    pis = [1 / n] * n
    np.random.seed(seed)
    mus = 2 * np.random.rand(n)
    sigmas = np.random.rand(n)
    return pis, mus, sigmas

def expectation_step(data: np.ndarray, mu: np.ndarray, sigma: np.ndarray, pi: np.ndarray) -> tuple:
    """Perform the E-step of the EM algorithm, calculating responsibilities for each component."""
    r = np.zeros((len(data), len(pi)))
    for c in range(len(pi)):
        r[:, c] = pi[c] * norm(loc=mu[c], scale=sigma[c]).pdf(data)
    r /= r.sum(axis=1, keepdims=True)
    m_c = r.sum(axis=0)
    pi = m_c / m_c.sum()
    return r, m_c, pi

def maximization_step(data: np.ndarray, r: np.ndarray, m_c: np.ndarray) -> tuple:
    """Perform the M-step of the EM algorithm, updating means and standard deviations."""
    mu = np.sum(data.reshape(-1, 1) * r, axis=0) / m_c
    sigma = np.sqrt(np.sum(r * (data.reshape(-1, 1) - mu)**2, axis=0) / m_c)
    return mu, sigma

def compute_aic(data: np.ndarray, mu: np.ndarray, sigma: np.ndarray, pi: np.ndarray) -> float:
    """Compute the Akaike Information Criterion for model selection."""
    n = len(data)
    k = len(mu) * 3  # number of parameters (each mu, sigma, and pi counts)
    log_likelihood = np.sum(np.log(np.sum([pi[j] * norm(mu[j], sigma[j]).pdf(data) for j in range(len(pi))], axis=0)))
    aic = 2 * k - 2 * log_likelihood
    return aic

def plot_gaussian_mixtures(data: np.ndarray, mu: np.ndarray, sigma: np.ndarray, pi: np.ndarray, iteration):
    """Plot the data and the estimated Gaussian models."""
    fig = plt.figure(figsize=(9, 6.5))
    ax0 = fig.add_subplot(111)
    x = np.linspace(min(data), max(data), num=300)
    ax0.hist(data, bins=50, alpha=0.2, density=True, label="Data Histogram")
    for i in range(len(pi)):
        y = pi[i] * norm(loc=mu[i], scale=sigma[i]).pdf(x)
        ax0.plot(x, y, label=f'Component {iteration}')
    plt.title(f'Gaussian Mixtures at Iteration {iteration}')
    plt.legend()
    fig.savefig(f'2d_images/{iteration+1}.png')
    plt.close(fig)

def run_em_algorithm(data: np.ndarray, max_iter: int, pi: list, mu: list, sigma: list, threshold: float = 0.01):
    """Execute the EM algorithm for a specified number of iterations and visualize the process, considering AIC."""
    previous_aic = float('inf')
    no_improve_count = 0
    aic_list = []

    for iteration in range(max_iter):
        r, m_c, pi = expectation_step(data, mu, sigma, pi)
        mu, sigma = maximization_step(data, r, m_c)
        current_aic = compute_aic(data, mu, sigma, pi)
        aic_list.append(current_aic)
        plot_gaussian_mixtures(data, mu, sigma, pi, iteration)

        if iteration % 8 == 0 or iteration == max_iter - 1:
            print(f'iteration: {iteration}')
            print("AIC:", current_aic)
            print("pis:", pi)
            print("mus:", mu)
            print("sigmas:", sigma)
            print()

        if previous_aic - current_aic < threshold:
            no_improve_count += 1
            if no_improve_count >= 3:
                print(f"Terminating early: AIC improvement less than {threshold} for three consecutive iterations.")
                return aic_list
        else:
            no_improve_count = 0
        
        previous_aic = current_aic
    print(f'Algorithm did not converge in {max_iter} iterations')
    return aic_list

def main():
    n_modules = 5
    max_iterations = 200
    seed = 0
    
    mus = [-1.1, -0.8, -0.2, 0.6, 0.95]
    sigmas = [0.100, 0.316, 0.13, 0.200, 0.224]
    samples = generate_gaussian_samples(mus, sigmas)
    pis, mus, sigmas = initialize_parameters(n_modules, seed)
    aic_list  = run_em_algorithm(samples, max_iterations, pis, mus, sigmas, threshold=0.1)
    
    fig = plt.figure(figsize=(12, 5))
    ax0 = fig.add_subplot(111)
    ax0.plot(aic_list)
    ax0.set_ylabel('AIC value')
    fig.savefig('2d_images/AIC.png')

if __name__ == "__main__":
    main()


Sample size: (5000,)


iteration: 0
AIC: 11461.650746835068
pis: [0.14909152 0.05191944 0.25852743 0.38991225 0.15054937]
mus: [ 0.25132113  0.93594935 -0.34209979 -0.53250858  0.65851017]
sigmas: [0.66812368 0.2817511  0.73981573 0.68381436 0.36132985]

iteration: 8
AIC: 10419.271372877121
pis: [0.09280522 0.10015381 0.19402046 0.3814086  0.23161191]
mus: [ 0.14042578  0.9206442  -0.44171495 -0.80886867  0.76896408]
sigmas: [0.60340707 0.25337545 0.51590285 0.392023   0.24240259]

iteration: 16
AIC: 9952.050895861536
pis: [0.09696179 0.1069495  0.22218687 0.33651428 0.23738755]
mus: [ 0.01944573  0.94695085 -0.29663512 -0.97899222  0.76381502]
sigmas: [0.48529708 0.25915003 0.37852753 0.26448666 0.23627702]

iteration: 24
AIC: 9111.318186097638
pis: [0.07187493 0.1151963  0.24280039 0.3113402  0.25878819]
mus: [-0.12318742  0.95754394 -0.2844298  -1.06098124  0.72284074]
sigmas: [0.57554989 0.25762134 0.19530293 0.15342285 0.24131779]

iteration: 32
AIC: 9075.78858345555
pis: [0.06655

In [2]:
import imageio.v2 as imageio
import os

def create_gif(image_folder, output_path, duration, last_frame_duration, max_iterations):
    """Create a GIF from a set of images in a folder with the last frame having a longer duration."""
    images = []
    existing_files = os.listdir(image_folder)
    for i in range(max_iterations):
        if f'{i}.png' in existing_files:
            file_path = os.path.join(image_folder, f'{i}.png')
            images.append(imageio.imread(file_path))
    # Append the last image additional times to extend its display time
    last_image = images[-1]
    extended_frames = int(last_frame_duration / duration)
    images.extend([last_image] * extended_frames)
    
    imageio.mimsave(output_path, images, duration=duration)

def main():
    image_folder = '2d_images'  # Folder containing images
    output_path = 'output.gif'  # Desired output GIF file path
    frame_duration = 0.05  # Duration of each frame in the GIF
    last_frame_duration = 0.5  # Extended duration for the last frame
    max_iterations = 200

    create_gif(image_folder, output_path, frame_duration, last_frame_duration, max_iterations)

if __name__ == "__main__":
    main()


In [3]:
import cv2
import os

def create_video(image_folder, output_path, frame_duration, max_iterations):
    """Create a video from a set of images in a folder."""
    images = []
    existing_files = os.listdir(image_folder)
    for i in range(max_iterations):
        if f'{i}.png' in existing_files:
            images.append(f'{i}.png')  # Change this to match your image format
    # Determine the width and height from the first image
    frame = cv2.imread(os.path.join(image_folder, images[0]))
    height, width, layers = frame.shape

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # or 'XVID'
    video = cv2.VideoWriter(output_path, fourcc, 1 / frame_duration, (width, height))

    for image in images:
        video.write(cv2.imread(os.path.join(image_folder, image)))

    # Extend the last frame duration
    for _ in range(int(0.5 / frame_duration)):  # Adjust '0.5' to however many seconds you want the last frame to hold
        video.write(frame)

    cv2.destroyAllWindows()
    video.release()

def main():
    image_folder = '2d_images'  # Folder containing images
    output_path = 'output_video.mp4'  # Desired output video file path
    frame_duration = 0.05  # Duration of each frame in the video
    max_iterations = 200

    create_video(image_folder, output_path, frame_duration, max_iterations)

if __name__ == "__main__":
    main()
