# Exercise sheet 10 - Parallelisation

# Exercise 1 - Rigged dice

Create a rigged dice function that 25% of the time returns the number 6. The rest of the time it returns the integers 1,2,3,4,5 uniformly.
Test your function, by calling it **one billion times** (10^9) and checking that 6 is returned in the range of 249-251 million (inclusive) times. You do not need to check that numbers 1 to 5 are returned uniformly or randomly, but you need to check that your function returns integers in the range 1-6 (inclusive). **Time** how long it takes to run the script.

Now attempt to **parallelise the task with a method of your own choosing** and time how long it takes once more. How does this compare to the previous *un-optimised* run?


In [24]:
import numpy as np
import multiprocessing
import random
import time
import numpy as np
import time
import threading


In [25]:
start_time = time.time()

def rigged_dice():
    choices = [1, 2, 3, 4, 5, 6]
    probabilities = [0.15, 0.15, 0.15, 0.15, 0.15, 0.25]
    return np.random.choice(choices, p=probabilities)
print(rigged_dice())

#100k test
trials = 100000
test = [rigged_dice() for i in range(trials)]
six_count = test.count(6)

if 24700 <= six_count <= 25300:
    print(f"Test passed! The number 6 appeared {six_count} times.")
else:
    print(f"Test failed! The number 6 appeared {six_count} times.")

print(f"--- {time.time() - start_time} seconds 100k trials ---")


6
Test passed! The number 6 appeared 25186 times.
--- 0.7112221717834473 seconds 100k trials ---


In [26]:
def rigged_dice():
    if random.random() < 0.25:
        return 6
    else:
        return random.randint(1, 5)

def worker(num_iterations):
    count_6 = 0
    count_valid = 0
    for _ in range(num_iterations):
        result = rigged_dice()
        if 1 <= result <= 6:
            count_valid += 1
            if result == 6:
                count_6 += 1
        else:
            raise ValueError(f"something went wrong: {result}")
    return (count_6, count_valid)

def main():
    TOTAL_ROLLS = 10**9
    NUM_PROCESSES = multiprocessing.cpu_count()
    CHUNK_SIZE = TOTAL_ROLLS // NUM_PROCESSES

    print(f"number of processes: {NUM_PROCESSES}")

    start_time = time.time()

    with multiprocessing.Pool(processes=NUM_PROCESSES) as pool:
        # Create a list of tasks
        tasks = [CHUNK_SIZE] * NUM_PROCESSES
        # Handle any remaining rolls
        remaining = TOTAL_ROLLS % NUM_PROCESSES
        if remaining:
            tasks.append(remaining)
        
        # Map the worker function to the pool
        results = pool.map(worker, tasks)

    total_6s = sum(result[0] for result in results)
    #total_valid = sum(result[1] for result in results)

    end_time = time.time()
    elapsed_time = end_time - start_time

    print(f"Total rolls: {TOTAL_ROLLS}")
    print(f"Number of 6s rolled: {total_6s}")
    print(f"Elapsed time: {elapsed_time:.2f} seconds")
    # Check if 6s are within the expected range
    expected_min = 249_000_000
    expected_max = 251_000_000
    if expected_min <= total_6s <= expected_max:
        print(f"Number of 6s is within the expected range: {expected_min} - {expected_max}")
    else:
        print(f"Number of 6s is outside the expected range: {total_6s}")

if __name__ == "__main__":
    main()


number of processes: 12
Total rolls: 1000000000
Number of 6s rolled: 250000929
Elapsed time: 47.69 seconds
Number of 6s is within the expected range: 249000000 - 251000000


# Exercise 2 - Calculate $\pi$

Using the **DSMC method**, calculate the value of **$\pi$**.


**Approach:**
In order to do this, create a 2-dimensional domain (defined by the coordinates $x_{min}, x_{max}, y_{min}, y_{max}$) and launch a number P of particles at random locations within. Check which particles lie inside a circle with radius $$ \frac{x_{max}-x_{min}}{2}, $$ where $x_{min}, x_{max}$ are the x-limits of your 2D domain. 

Get your value for $\pi$ by using the following formula:
$\pi = \frac{4 \cdot n_{inside}}{P},$ where $n_{inside}$ is the number of particles inside the circle and $P$ is the total number of particles.

Play around with the number of particles. 

a) Try to improve this task by making use of threading (you can use either the **_thread** or **threading** module). What are your findings, is the script running faster?

b) Now try to improve the running time of the code by employing the **multiprocessing** module. Are there any differences as compared to threading?

In [27]:
def estimate_pi(total_particles=10_000_000_0, x_min=0, x_max=1, y_min=0, y_max=1):
    radius = (x_max - x_min) / 2
    center = x_min + radius

    start_time = time.time()

    x = np.random.uniform(x_min, x_max, total_particles)
    y = np.random.uniform(y_min, y_max, total_particles)
    distances_squared = (x - center)**2 + (y - center)**2
    n_inside = np.sum(distances_squared <= radius**2)
    pi_estimate = (4 * n_inside) / total_particles

    elapsed_time = time.time() - start_time

    return total_particles, pi_estimate, elapsed_time

if __name__ == "__main__":
    particles, pi, elapsed = estimate_pi()
    print(f"Particles: {particles}")
    print(f"Estimated π: {pi}")
    print(f"Time elapsed: {elapsed:.2f} seconds")


Particles: 100000000
Estimated π: 3.14135396
Time elapsed: 1.75 seconds


### (A)

In [None]:
def estimate_pi_threaded(total_particles=10_000_000_0, num_threads=6, x_min=0, x_max=1, y_min=0, y_max=1):
    radius = (x_max - x_min) / 2
    center = x_min + radius

    particles_per_thread = total_particles // num_threads
    remaining_particles = total_particles % num_threads

    counts = []
    lock = threading.Lock()

    def worker(n_particles):
        nonlocal counts
        x = np.random.uniform(x_min, x_max, n_particles)
        y = np.random.uniform(y_min, y_max, n_particles)
        distances_squared = (x - center)**2 + (y - center)**2
        n_inside = np.sum(distances_squared <= radius**2)
        with lock:
            counts.append(n_inside)

    threads = []
    start_time = time.time()

    #  threads
    for i in range(num_threads):
        # Distribute remaining particles to the last thread
        if i == num_threads - 1:
            n_particles = particles_per_thread + remaining_particles
        else:
            n_particles = particles_per_thread
        thread = threading.Thread(target=worker, args=(n_particles,))
        threads.append(thread)
        thread.start()

    # Wait for all threads to finish
    for thread in threads:
        thread.join()


    total_inside = sum(counts)
    pi_estimate = (4 * total_inside) / total_particles
    elapsed_time = time.time() - start_time

    return total_particles, pi_estimate, elapsed_time

if __name__ == "__main__":
    PARTICLES, PI, TIME_ELAPSED = estimate_pi_threaded()
    print(f"Particles: {PARTICLES}")
    print(f"Estimated π: {PI}")
    print(f"Time elapsed: {TIME_ELAPSED:.2f} seconds")


Particles: 100000000
Estimated π: 3.14167308
Time elapsed: 1.17 seconds


### (B)

In [None]:
def count_inside_circle(n_particles, x_min, x_max, y_min, y_max):
    radius = (x_max - x_min) / 2
    center = x_min + radius

    x = np.random.uniform(x_min, x_max, n_particles)
    y = np.random.uniform(y_min, y_max, n_particles)
    distances_squared = (x - center)**2 + (y - center)**2
    return np.sum(distances_squared <= radius**2)

def estimate_pi_multiprocessing(total_particles=10_000_000_0, num_processes=None, x_min=0, x_max=1, y_min=0, y_max=1):
    if num_processes is None:
        num_processes = multiprocessing.cpu_count()

    particles_per_process = total_particles // num_processes
    remaining_particles = total_particles % num_processes

    # list with the number of particles for each process
    tasks = [particles_per_process] * num_processes
    if remaining_particles:
        tasks[-1] += remaining_particles  # Add the remainder to the last process

    start_time = time.time()

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.starmap(
            count_inside_circle,
            [(n, x_min, x_max, y_min, y_max) for n in tasks]
        )

    total_inside = sum(results)
    pi_estimate = (4 * total_inside) / total_particles
    elapsed_time = time.time() - start_time

    return total_particles, pi_estimate, elapsed_time

if __name__ == "__main__":
    PARTICLES, PI, TIME_ELAPSED = estimate_pi_multiprocessing()
    print(f"Particles: {PARTICLES}")
    print(f"Estimated π: {PI}")
    print(f"Time elapsed: {TIME_ELAPSED:.2f} seconds")

Particles: 100000000
Estimated π: 3.14120812
Time elapsed: 0.73 seconds


# Exercise 3 - Why do we parallelise?

Make a one slide presentation of some of the things you consider important when it comes to parallelising code (in Python) or why it's not important, based on the things discussed during lecture, found in documentation, online forums, experimenting with your code etc.

Keep it short and to the point, no minimum time requirement. You can do it as a bunch of comments as part of your code or as a simple ASCII file, anything goes as long as the points are presented in a coherent manner.

## Parallelizing Code in Python

### **1. Task Nature**
- **CPU-bound:** 
  - **Use:** `multiprocessing` to leverage multiple cores.
  - **Reason:** Bypasses the Global Interpreter Lock (GIL).
- **I/O-bound:** 
  - **Use:** `threading` or `asyncio` for concurrent operations.
  - **Reason:** Efficiently handles waiting times without heavy CPU usage.

### **2. Global Interpreter Lock (GIL)**
- **Impact:** 
  - Restricts execution of multiple native threads in CPU-bound tasks.
- **Solution:** 
  - **Multiprocessing:** Creates separate processes, each with its own Python interpreter.

### **3. Overhead and Complexity**
- **Process/Thread Management:** 
  - **Overhead:** Increased memory and time costs for creating and managing threads/processes.
  - **Best Practice:** Parallelize only when tasks are substantial enough to offset overhead.
- **Debugging Challenges:** 
  - More complex to debug due to potential race conditions and deadlocks.

### **4. Data Sharing and Synchronization**
- **Multiprocessing:** 
  - **Memory:** Separate memory spaces.
  - **Communication:** Use inter-process communication (IPC) mechanisms like queues or pipes.
- **Threading:** 
  - **Memory:** Shared memory space.
  - **Synchronization:** Requires locks or other synchronization primitives to prevent data corruption.

### **5. Scalability**
- **Multiprocessing:** 
  - Scales well with the number of CPU cores.
- **Threading:** 
  - More suited for handling multiple I/O tasks rather than CPU-intensive operations.

### **6. Libraries and Tools**
- **High-Level APIs:** 
  - `concurrent.futures` offers `ThreadPoolExecutor` and `ProcessPoolExecutor` for simplified parallelism.
- **NumPy and Other Libraries:** 
  - Some operations release the GIL, allowing true parallelism even with threading.

---

## **When to Parallelize**
- **Independent Tasks:** Tasks that don't rely on each other's results.
- **Performance Gains:** When the computational benefits outweigh the overhead.
- **Scalable Workloads:** Suitable for applications that can scale with more cores.

## **When Not to Parallelize**
- **Simple or Quick Tasks:** Overhead may negate any performance improvements.
- **Tightly Coupled Tasks:** High interdependency can lead to complex synchronization issues.
- **Memory Constraints:** Limited memory may not support multiple processes effectively.

---

## **Conclusion**
Parallelizing code in Python can lead to significant performance improvements, especially for CPU-bound and large-scale tasks. However, it's essential to assess the nature of the task, understand the implications of the GIL, and weigh the added complexity against the potential benefits. Utilizing the right tools and libraries can simplify the process and help achieve efficient parallelism.

---
