In [None]:
!apt-get update
!apt-get install -y libopenmpi-dev
!pip install mpi4py

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building depe

In [None]:
%%writefile /content/mpi4py_examples.py
"""
MPI4PY: Distributed-Memory Parallelization Examples
--------------------------------------------------
This file contains multiple examples demonstrating distributed-memory parallelization
using MPI4PY, inspired by the examples in Universidad Politecnica de Yucatan's report.

To run any example:
    mpiexec -n <number_of_processes> python mpi4py_examples.py --example <example_number>

Examples:
1. Hello World - Basic MPI initialization and rank identification
2. Array Math - Distributed array computation with workload partitioning
3. Monte Carlo Simulation - Lennard-Jones particle simulation
4. Image Spectrogram - Parallel FFT processing of images
5. Matrix-Vector Product - Distributed matrix-vector multiplication

Author: Krishna
Date: April 8, 2025
"""

import argparse
import numpy as np
import time
import random
import math
from mpi4py import MPI
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter


def parse_arguments():
    """Parse command line arguments to select which example to run."""
    parser = argparse.ArgumentParser(description='MPI4PY Examples')
    parser.add_argument('--example', type=int, default=1, choices=[1, 2, 3, 4, 5],
                        help='Example number to run (1-5)')
    parser.add_argument('--size', type=int, default=10000000,
                        help='Problem size for relevant examples')
    parser.add_argument('--iterations', type=int, default=10,
                        help='Number of iterations for relevant examples')
    return parser.parse_args()


# ======================================================================
# Example 1: Hello World with MPI
# ======================================================================
def example1_hello_world():
    """
    A simple example showing MPI initialization and process rank identification.
    Each process prints its rank and the total number of processes.
    """
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()

    # Each process prints its rank
    print(f"Hello World from rank {rank} of {size}")

    # Barrier to ensure all processes complete before proceeding
    comm.Barrier()

    # For demonstration, have rank 0 print a summary
    if rank == 0:
        print(f"\nCompleted Example 1: Hello World with {size} processes")

    return


# ======================================================================
# Example 2: Array Math with Timing and Workload Partitioning
# ======================================================================
def example2_array_math(N=10000000):
    """
    Demonstrates distributed array computation with workload partitioning.
    Each process handles a portion of two large arrays, computes element-wise
    addition, and collectively calculates the average.

    Args:
        N: Size of arrays to process
    """
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()

    start_time = MPI.Wtime()

    # Determine workload distribution
    workloads = [N // size for _ in range(size)]
    # Distribute remainder to first few processes
    for i in range(N % size):
        workloads[i] += 1

    # Calculate start and end indices for this process
    my_start = sum(workloads[:rank])
    my_end = my_start + workloads[rank]

    if rank == 0:
        print(f"Processing array of size {N} with {size} processes")
        print(f"Workload distribution: {workloads}")

    # Memory optimization: allocate only what this process needs
    a = np.ones(workloads[rank], dtype=np.float64)
    b = np.zeros(workloads[rank], dtype=np.float64)

    # Initialize array b with a pattern
    for i in range(workloads[rank]):
        b[i] = 1.0 + (i + my_start)

    # Perform element-wise addition
    computation_start = MPI.Wtime()
    a += b  # Vectorized operation
    computation_time = MPI.Wtime() - computation_start

    # Compute local sum and prepare for reduction
    local_sum = np.array([np.sum(a)], dtype=np.float64)
    total_sum = np.zeros(1, dtype=np.float64)

    # Perform reduction to get total sum on rank 0
    reduction_start = MPI.Wtime()
    comm.Reduce([local_sum, MPI.DOUBLE], [total_sum, MPI.DOUBLE], op=MPI.SUM, root=0)
    reduction_time = MPI.Wtime() - reduction_start

    # Calculate and print results on rank 0
    if rank == 0:
        average = total_sum[0] / N
        end_time = MPI.Wtime()
        total_time = end_time - start_time

        print(f"\nResults:")
        print(f"Average value: {average}")
        print(f"Expected average: {(N + 3) / 2}")  # Mathematical verification
        print(f"\nPerformance:")
        print(f"Total time: {total_time:.6f} seconds")
        print(f"Computation time: {computation_time:.6f} seconds")
        print(f"Reduction time: {reduction_time:.6f} seconds")
        print(f"Completed Example 2: Array Math with {size} processes")

    return


# ======================================================================
# Example 3: Monte Carlo Simulation of Lennard-Jones Particles
# ======================================================================
def lennard_jones_potential(r_squared):
    """
    Calculate Lennard-Jones potential energy between two particles.

    Args:
        r_squared: Square of the distance between particles

    Returns:
        Potential energy value
    """
    if r_squared == 0:
        return 0.0

    # Calculate 1/r^6 and 1/r^12 terms
    r6_inv = 1.0 / (r_squared**3)
    r12_inv = r6_inv * r6_inv

    # LJ potential: 4*epsilon*[(sigma/r)^12 - (sigma/r)^6]
    # With epsilon=1 and sigma=1 for simplicity
    return 4.0 * (r12_inv - r6_inv)


def minimum_image_distance(pos_i, pos_j, box_length):
    """
    Calculate minimum image distance between two particles
    under periodic boundary conditions.

    Args:
        pos_i: Position of first particle
        pos_j: Position of second particle
        box_length: Length of the cubic simulation box

    Returns:
        Square of the minimum image distance
    """
    diff = pos_i - pos_j

    # Apply periodic boundary conditions
    diff = diff - box_length * np.round(diff / box_length)

    # Return squared distance
    return np.sum(diff**2)


def get_particle_energy(coordinates, box_length, i_particle, cutoff2, comm):
    """
    Calculate total interaction energy for a single particle with all others,
    distributed across MPI processes.

    Args:
        coordinates: Array of particle positions
        box_length: Length of the cubic simulation box
        i_particle: Index of the particle to calculate energy for
        cutoff2: Square of the cutoff distance for interactions
        comm: MPI communicator

    Returns:
        Total energy for the particle
    """
    rank = comm.Get_rank()
    size = comm.Get_size()

    # Each process calculates a subset of interactions
    e_total = 0.0
    i_position = coordinates[i_particle]
    particle_count = len(coordinates)

    # Distributed loop: each process handles a subset of particles
    for j_particle in range(rank, particle_count, size):
        if i_particle != j_particle:
            j_position = coordinates[j_particle]
            rij2 = minimum_image_distance(i_position, j_position, box_length)

            # Apply cutoff for efficiency
            if rij2 < cutoff2:
                e_total += lennard_jones_potential(rij2)

    # Combine partial results using Allreduce
    e_single = np.array([e_total], dtype=np.float64)
    e_summed = np.zeros(1, dtype=np.float64)
    comm.Allreduce([e_single, MPI.DOUBLE], [e_summed, MPI.DOUBLE], op=MPI.SUM)

    return e_summed[0]


def example3_monte_carlo(iterations=1000, n_particles=100):
    """
    Monte Carlo simulation of Lennard-Jones particles in a periodic box.
    Each process helps compute energies in parallel.

    Args:
        iterations: Number of Monte Carlo steps
        n_particles: Number of particles in the simulation
    """
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()

    # Simulation parameters
    box_length = 10.0
    cutoff = 3.0
    cutoff2 = cutoff * cutoff
    temperature = 1.0
    max_displacement = 0.1

    # Initialize random number generator with same seed on all processes
    seed = 42
    random.seed(seed)
    np.random.seed(seed)

    # Initialize particle coordinates randomly - same on all processes
    if rank == 0:
        coordinates = np.random.uniform(0, box_length, (n_particles, 3))
    else:
        coordinates = np.zeros((n_particles, 3))

    # Broadcast initial coordinates to all processes
    comm.Bcast([coordinates, MPI.DOUBLE], root=0)

    # Calculate initial total energy
    if rank == 0:
        print(f"Starting Monte Carlo simulation with {n_particles} particles")
        print(f"Box length: {box_length}, Temperature: {temperature}")
        print(f"Running on {size} processes for {iterations} iterations")

    # Calculate initial system energy
    total_energy = 0.0
    for i in range(n_particles):
        energy_i = get_particle_energy(coordinates, box_length, i, cutoff2, comm)
        total_energy += energy_i

    # Divide by 2 to avoid double counting pair interactions
    total_energy /= 2.0

    # Set up timing
    start_time = MPI.Wtime()
    energy_time = 0.0
    decision_time = 0.0

    # Main Monte Carlo loop
    accepted_moves = 0

    for step in range(1, iterations + 1):
        # Each process needs the same random particle and displacement
        if rank == 0:
            # Select random particle
            i_particle = random.randint(0, n_particles - 1)
            # Generate random displacement
            displacement = np.random.uniform(-max_displacement, max_displacement, 3)
            random_data = np.array([i_particle, displacement[0], displacement[1], displacement[2]])
        else:
            random_data = np.zeros(4)

        # Broadcast random choice to all processes
        comm.Bcast([random_data, MPI.DOUBLE], root=0)
        i_particle = int(random_data[0])
        displacement = random_data[1:4]

        # Calculate energy before move
        energy_start = MPI.Wtime()
        old_energy = get_particle_energy(coordinates, box_length, i_particle, cutoff2, comm)

        # Make temporary move
        old_position = coordinates[i_particle].copy()
        coordinates[i_particle] += displacement

        # Apply periodic boundary conditions
        coordinates[i_particle] = coordinates[i_particle] % box_length

        # Calculate energy after move
        new_energy = get_particle_energy(coordinates, box_length, i_particle, cutoff2, comm)
        energy_time += MPI.Wtime() - energy_start

        # Accept or reject move using Metropolis criterion
        decision_start = MPI.Wtime()
        delta_energy = new_energy - old_energy

        # Metropolis acceptance criterion
        if delta_energy <= 0 or random.random() < math.exp(-delta_energy / temperature):
            # Accept move
            accepted_moves += 1
            total_energy += delta_energy
        else:
            # Reject move - revert to old position
            coordinates[i_particle] = old_position

        decision_time += MPI.Wtime() - decision_start

        # Print progress and current energy
        if rank == 0 and step % 100 == 0:
            print(f"Step {step}: Energy = {total_energy:.6f}, "
                  f"Acceptance rate = {accepted_moves / step:.2f}")

    end_time = MPI.Wtime()
    total_time = end_time - start_time

    # Print final results
    if rank == 0:
        print(f"\nMonte Carlo simulation completed:")
        print(f"Final energy: {total_energy:.6f}")
        print(f"Acceptance rate: {accepted_moves / iterations:.4f}")
        print(f"\nPerformance:")
        print(f"Total time: {total_time:.6f} seconds")
        print(f"Energy calculation time: {energy_time:.6f} seconds ({energy_time/total_time*100:.1f}%)")
        print(f"Decision time: {decision_time:.6f} seconds ({decision_time/total_time*100:.1f}%)")
        print(f"Completed Example 3: Monte Carlo with {size} processes")

    return


# ======================================================================
# Example 4: Image Spectrogram - Parallel FFT Processing
# ======================================================================
def generate_sample_images(n_images=100, image_size=64):
    """
    Generate synthetic images for the spectrogram example.

    Args:
        n_images: Number of images to generate
        image_size: Size of each square image

    Returns:
        Array of images with shape (n_images, image_size, image_size)
    """
    # Create array to hold all images
    images = np.zeros((n_images, image_size, image_size))

    # Generate images with different patterns
    for i in range(n_images):
        # Pattern type based on image index
        pattern_type = i % 4

        if pattern_type == 0:
            # Sinusoidal pattern
            x = np.linspace(0, 6*np.pi, image_size)
            y = np.linspace(0, 6*np.pi, image_size)
            X, Y = np.meshgrid(x, y)
            freq = 1.0 + (i % 10) / 10.0
            images[i] = np.sin(X * freq) * np.cos(Y * freq)

        elif pattern_type == 1:
            # Circular pattern
            x = np.linspace(-1, 1, image_size)
            y = np.linspace(-1, 1, image_size)
            X, Y = np.meshgrid(x, y)
            R = np.sqrt(X**2 + Y**2)
            radius = 0.3 + (i % 8) / 20.0
            images[i] = (R < radius).astype(float)

        elif pattern_type == 2:
            # Diagonal stripes
            x = np.linspace(0, 1, image_size)
            y = np.linspace(0, 1, image_size)
            X, Y = np.meshgrid(x, y)
            freq = 5 + (i % 10)
            images[i] = np.sin(freq * (X + Y))

        else:
            # Random noise with varying frequency content
            noise_scale = 1.0 + (i % 6) / 2.0
            images[i] = np.random.randn(image_size, image_size)
            # Smooth with Gaussian filter of varying width
            images[i] = gaussian_filter(images[i], sigma=noise_scale)

    return images


def example4_image_spectrogram(n_images=100, image_size=64):
    """
    Compute and visualize the average 2D spectrogram of multiple images
    using parallel FFT processing.

    Args:
        n_images: Number of images to process
        image_size: Size of each square image
    """
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()

    # Setup timing
    start_time = MPI.Wtime()

    if rank == 0:
        print(f"Processing {n_images} images of size {image_size}x{image_size}")
        print(f"Using {size} MPI processes for parallel FFT computation")

        # Generate or load sample images
        # In real application, these would be loaded from a file
        images = generate_sample_images(n_images, image_size)
    else:
        # Other ranks don't need the full images yet
        images = None

    # Broadcast number of images to all processes
    if rank == 0:
        n_images_data = np.array([n_images], dtype=np.int32)
    else:
        n_images_data = np.zeros(1, dtype=np.int32)

    comm.Bcast([n_images_data, MPI.INT], root=0)
    n_images = n_images_data[0]

    # Initialize local spectrogram accumulator
    local_spectrogram = np.zeros((image_size, image_size), dtype=np.float64)

    # Determine which images this rank will process
    my_images = []
    for i in range(n_images):
        if i % size == rank:
            my_images.append(i)

    # Process assigned images
    for img_idx in my_images:
        # Receive image from rank 0 if needed
        if rank == 0:
            img = images[img_idx]
        else:
            img = np.zeros((image_size, image_size), dtype=np.float64)
            comm.Recv([img, MPI.DOUBLE], source=0, tag=img_idx)

        # Compute 2D FFT
        img_fft = np.fft.fft2(img)

        # Shift zero frequency to center
        img_fft_shifted = np.fft.fftshift(img_fft)

        # Compute magnitude and add to local spectrogram
        local_spectrogram += np.abs(img_fft_shifted)

    # Non-rank 0 processes need to send their images
    if rank == 0:
        for i in range(n_images):
            if i % size != 0:  # Not processed by rank 0
                target_rank = i % size
                comm.Send([images[i], MPI.DOUBLE], dest=target_rank, tag=i)

    # Reduce local spectrograms to create global spectrogram on rank 0
    if rank == 0:
        global_spectrogram = np.zeros_like(local_spectrogram)
    else:
        global_spectrogram = None

    comm.Reduce([local_spectrogram, MPI.DOUBLE],
                [global_spectrogram, MPI.DOUBLE],
                op=MPI.SUM, root=0)

    # Process and display results on rank 0
    if rank == 0:
        # Normalize by number of images
        global_spectrogram /= n_images

        # Convert to log scale for better visualization
        log_spectrogram = np.log(1 + global_spectrogram)

        # Calculate total time
        total_time = MPI.Wtime() - start_time

        print(f"\nSpectrogram computation completed:")
        print(f"Processed {n_images} images in {total_time:.6f} seconds")
        print(f"Processing rate: {n_images / total_time:.2f} images per second")

        # Display the spectrogram
        plt.figure(figsize=(10, 8))
        plt.imshow(log_spectrogram, cmap='viridis')
        plt.colorbar(label='Log magnitude')
        plt.title('Average Image Spectrogram (Log Scale)')
        plt.xlabel('Frequency (x)')
        plt.ylabel('Frequency (y)')

        # Save the figure instead of displaying for non-GUI environments
        plt.savefig('/content/spectrogram.png')
        print("Spectrogram saved as '/content/spectrogram.png'")

        print(f"Completed Example 4: Image Spectrogram with {size} processes")

    return


# ======================================================================
# Example 5: Matrix-Vector Product - Iterative Solver Simulation
# ======================================================================
def example5_matrix_vector_product(vector_size=10000, iterations=20):
    """
    Perform distributed matrix-vector multiplication iteratively.
    Each process computes a slice of the matrix-vector product, and
    all processes share the updated vector after each iteration.

    Args:
        vector_size: Size of the vector
        iterations: Number of iterations to perform
    """
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()

    # Ensure vector size is divisible by number of processes
    if vector_size % size != 0:
        if rank == 0:
            print(f"Warning: Vector size {vector_size} not divisible by {size} processes")
            print(f"Adjusting vector size to {vector_size + (size - vector_size % size)}")
        vector_size += (size - vector_size % size)

    # Calculate local vector size and offset
    local_size = vector_size // size
    offset = rank * local_size

    # Initialize vector [0, 0, 0, ..., 0] with first element as 1.0
    # This is a simple test case where value should shift by one position each iteration
    if rank == 0:
        v = np.zeros(vector_size, dtype=np.float64)
        v[0] = 1.0
        print(f"Starting matrix-vector product with vector size {vector_size}")
        print(f"Running {iterations} iterations on {size} processes")
    else:
        v = np.zeros(vector_size, dtype=np.float64)

    # Broadcast initial vector to all processes
    comm.Bcast([v, MPI.DOUBLE], root=0)

    # Allocate local vector chunk
    local_v = np.zeros(local_size, dtype=np.float64)
    new_local_v = np.zeros(local_size, dtype=np.float64)

    # Start timer
    start_time = MPI.Wtime()

    # Define the matrix structure
    # For this example, we use a "shift" matrix where each row i has 1.0 at position i-1
    # This matrix shifts the vector by one element per iteration

    # Main iteration loop
    for iter in range(iterations):
        # Copy global vector to local chunk for easier access
        local_v = v[offset:offset+local_size].copy()

        # Matrix-vector product for local slice
        for i in range(local_size):
            # Global index of this row
            global_i = offset + i

            # For the "shift" matrix, set v[i] = v[i-1]
            if global_i > 0:
                new_local_v[i] = v[global_i-1]
            else:
                # First element becomes zero (or could wrap around)
                new_local_v[i] = 0.0

        # Gather all chunks of the new vector
        # First, all processes send their local results to all others
        comm.Allgather([new_local_v, MPI.DOUBLE], [v, MPI.DOUBLE])

        # Optionally print state for debugging (only for small vectors)
        if vector_size <= 20 and rank == 0:
            print(f"Iteration {iter+1}: {v[:20]}")

    # Calculate elapsed time
    end_time = MPI.Wtime()
    elapsed = end_time - start_time

    # Print results
    if rank == 0:
        # For large vectors, just show beginning and end
        if vector_size > 20:
            print("\nFinal vector (first 10 elements):")
            print(v[:10])
        else:
            print("\nFinal vector:")
            print(v)

        print(f"\nMatrix-vector product completed:")
        print(f"{iterations} iterations of size {vector_size} in {elapsed:.2f} seconds")
        print(f"Performance: {iterations / elapsed:.2f} iterations per second")
        print(f"Completed Example 5: Matrix-Vector Product with {size} processes")

    return


# ======================================================================
# Main function to run selected example
# ======================================================================
def main():
    """Main function to run selected example based on command line arguments."""
    try:
        args = parse_arguments()

        # Run selected example
        if args.example == 1:
            example1_hello_world()
        elif args.example == 2:
            example2_array_math(args.size)
        elif args.example == 3:
            example3_monte_carlo(args.iterations * 100, 100)
        elif args.example == 4:
            example4_image_spectrogram(100, 64)
        elif args.example == 5:
            example5_matrix_vector_product(args.size // 1000, args.iterations)
        else:
            print(f"Invalid example number: {args.example}")
    except Exception as e:
        # Si hay un error al analizar los argumentos, ejecutar ejemplo 1 por defecto
        print(f"Error parsing arguments: {e}")
        print("Running default example (Hello World)")
        example1_hello_world()


if __name__ == "__main__":
    main()

Overwriting /content/mpi4py_examples.py


In [None]:
%%writefile /content/test_example1.py
from mpi4py_examples import example1_hello_world

# Ejecutar el ejemplo Hello World
example1_hello_world()

Overwriting /content/test_example1.py


In [None]:
%%writefile /content/test_example2.py
from mpi4py_examples import example2_array_math

# Ejecutar con un tamaño reducido para que termine más rápido
example2_array_math(1000000)

Overwriting /content/test_example2.py


In [None]:
%%writefile /content/test_example3.py
from mpi4py_examples import example3_monte_carlo

# Ejecutar con valores reducidos para que termine más rápido
example3_monte_carlo(iterations=200, n_particles=50)

Overwriting /content/test_example3.py


In [None]:
%%writefile /content/test_example4.py
from mpi4py_examples import example4_image_spectrogram

# Ejecutar con valores reducidos
example4_image_spectrogram(n_images=20, image_size=32)

Overwriting /content/test_example4.py


In [None]:
%%writefile /content/test_example5.py
from mpi4py_examples import example5_matrix_vector_product

# Ejecutar con valores reducidos
example5_matrix_vector_product(vector_size=1000, iterations=10)

Overwriting /content/test_example5.py


In [None]:
!python /content/test_example1.py

Hello World from rank 0 of 1

Completed Example 1: Hello World with 1 processes


In [None]:
!python /content/test_example2.py

Processing array of size 1000000 with 1 processes
Workload distribution: [1000000]

Results:
Average value: 500001.5
Expected average: 500001.5

Performance:
Total time: 0.183877 seconds
Computation time: 0.001552 seconds
Reduction time: 0.000135 seconds
Completed Example 2: Array Math with 1 processes


In [None]:
!python /content/test_example3.py

Starting Monte Carlo simulation with 50 particles
Box length: 10.0, Temperature: 1.0
Running on 1 processes for 200 iterations
Step 100: Energy = 246.875750, Acceptance rate = 0.94
Step 200: Energy = 18.194883, Acceptance rate = 0.93

Monte Carlo simulation completed:
Final energy: 18.194883
Acceptance rate: 0.9300

Performance:
Total time: 0.213577 seconds
Energy calculation time: 0.209847 seconds (98.3%)
Decision time: 0.000369 seconds (0.2%)
Completed Example 3: Monte Carlo with 1 processes


In [None]:
!python /content/test_example4.py

Processing 20 images of size 32x32
Using 1 MPI processes for parallel FFT computation

Spectrogram computation completed:
Processed 20 images in 0.007947 seconds
Processing rate: 2516.77 images per second
Spectrogram saved as '/content/spectrogram.png'
Completed Example 4: Image Spectrogram with 1 processes


In [None]:
!python /content/test_example5.py

Starting matrix-vector product with vector size 1000
Running 10 iterations on 1 processes

Final vector (first 10 elements):
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]

Matrix-vector product completed:
10 iterations of size 1000 in 0.00 seconds
Performance: 2068.04 iterations per second
Completed Example 5: Matrix-Vector Product with 1 processes
