Matt Dula

Collaborators: Cory Hilton, Anton Schlegel

# Sharing Work Between Processors

In this workbook, we'll learn how Python's `multiprocessing` module can be used to share our computational workload among multiple CPU cores using processes. Many processes working in tandem should (ideally) improve the performance of our code.

We'll start with a basic introduction to `multiprocessing`'s most-used object, the `Pool` class. This class automatically creates a group (or pool, if you will) of processes and splits work between them. The workbook will then walk you through the thought process for applying `Pool` to a problem before asking you to do it on your own.

The final part of this workbook will have you use `Pool` in a [batch job](https://docs.icer.msu.edu/Job_Script_and_Job_Submission/) on the HPCC. If you aren't comfortable using the HPCC's command line, I suggest you [review this tutorial](https://d2l.msu.edu/d2l/le/content/1988819/viewContent/14776592/View).

The examples in this workbook are modified from [High Performance Python by Gorelick & Ozsvald](https://github.com/mynameisfiber/high_performance_python_2e/tree/master/09_multiprocessing).


## Process Investigation

In this first exercise, we'll play with `multiprocessing`'s `Pool.map()` to see how it functions. 

First, we create an instance of the `Pool` object and define the number of processes we want to use.
It's best practice to explicitly set this number. By default, `Pool` will set the number of processes
to the number of available cores. On a shared system like the HPCC, this can easily mean your program
will interfere with other users! We can use the context manager keyword `with` to ensure our processes
are properly ended after use.

The `Pool.map()` method applies a function to an iterable object. In the example below, we use
`list(arange(10))` to construct a list of numbers. The 10 items in this list will be distributed
among four processors. **The items are not guaranteed to be distributed the same way every time.**
An example scenario is shown in the diagram below:

![Diagram of 10 list items spread among 4 processes](pool_block_diagram.png)

For the example code below, we apply the function `get_info` to each item in the list.
This function returns some identifying information about the process running it such as its process ID and
name. The process ID is how the operating system identifies our process, while the name is internal
to Python. Our function will also return the data that was handed to the process, so we can see how `map()`
divides the work.

Run the following code block and then answer the exercises.

In [1]:
import multiprocessing

def get_info(data):
    
    this_process = multiprocessing.current_process()
    
    info = {"proc_id" : this_process.pid,
            "proc_name" : this_process.name,
            "proc_data" : data}
    
    return info

parent_process = multiprocessing.current_process()
print("Parent process is named", parent_process.name, "with PID", parent_process.pid)

all_data = list(range(10))
print("The data to be distributed is:", all_data)

with multiprocessing.Pool(processes = 4) as p:
    # Pool.map accepts a function to apply
    # and an iterable object of data to share
    all_info = p.map(func = get_info, iterable = all_data)

# Print information about the processes in a nice way
print("Information from child processes:")
for entry in all_info:
    print('\t',entry)

Parent process is named MainProcess with PID 2106401
The data to be distributed is: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Information from child processes:
	 {'proc_id': 2106419, 'proc_name': 'ForkPoolWorker-1', 'proc_data': 0}
	 {'proc_id': 2106420, 'proc_name': 'ForkPoolWorker-2', 'proc_data': 1}
	 {'proc_id': 2106421, 'proc_name': 'ForkPoolWorker-3', 'proc_data': 2}
	 {'proc_id': 2106419, 'proc_name': 'ForkPoolWorker-1', 'proc_data': 3}
	 {'proc_id': 2106420, 'proc_name': 'ForkPoolWorker-2', 'proc_data': 4}
	 {'proc_id': 2106419, 'proc_name': 'ForkPoolWorker-1', 'proc_data': 5}
	 {'proc_id': 2106419, 'proc_name': 'ForkPoolWorker-1', 'proc_data': 6}
	 {'proc_id': 2106420, 'proc_name': 'ForkPoolWorker-2', 'proc_data': 7}
	 {'proc_id': 2106421, 'proc_name': 'ForkPoolWorker-3', 'proc_data': 8}
	 {'proc_id': 2106419, 'proc_name': 'ForkPoolWorker-1', 'proc_data': 9}


In [2]:
parent_process = multiprocessing.current_process()
print("Parent process is named", parent_process.name, "with PID", parent_process.pid)

all_data = [5,5,5,5]
print("The data to be distributed is:", all_data)

with multiprocessing.Pool(processes = 4) as p:
    # Pool.map accepts a function to apply
    # and an iterable object of data to share
    all_info = p.map(func = get_info, iterable = all_data)

# Print information about the processes in a nice way
print("Information from child processes:")
for entry in all_info:
    print('\t',entry)

Parent process is named MainProcess with PID 2106401
The data to be distributed is: [5, 5, 5, 5]
Information from child processes:
	 {'proc_id': 2106426, 'proc_name': 'ForkPoolWorker-5', 'proc_data': 5}
	 {'proc_id': 2106427, 'proc_name': 'ForkPoolWorker-6', 'proc_data': 5}
	 {'proc_id': 2106428, 'proc_name': 'ForkPoolWorker-7', 'proc_data': 5}
	 {'proc_id': 2106426, 'proc_name': 'ForkPoolWorker-5', 'proc_data': 5}


---
### Exercises

1. What kind of object is `all_info`? That is, what data structure does `Pool.map()`  use to collect the values returned by each process?

`all_info` is a list object (in this case, a list of dicts).

2. How many distinct **child** processes are there in `all_info`? Does this align with expectations?

There appear to be 4 unique child processes in `all_info`, which aligns with expectations, because we have allocated four cores and instantiated the `Pool` function with four processes.

3. How many pieces of data does each process handle? Considering the total amount of data distributed, is the work shared evenly?

Each process handles 3 pieces of data except for PID 1315917, which handles one piece of data. Therefore, the work is not shared completely evenly.

4. If we were to instead set `all_data = [5, 5, 5, 5]`, how many dictionaries would you expect to be inside `all_info`? What would be each processes' value for `proc_data`? **Don't run any code yet.**

I would expect four dictionaries inside `all_info`, one for each entry of the data list. Each process should probably have a value of 5, since all items are the same.

5. Perform the above code change. Do the results match your expectations?

Yes, the results matched my expectations exactly.

---
## How to Implement Pool

The general process for implementing `Pool.map()` requires us to think about three things:

1. The set of objects we wish to distribute
2. Encapsulating our desired operation into a function
3. How to combine the results from each process into a final answer.

For the following walkthrough, we'll be distributing runs of a Monte Carlo algorithm; that is, each process will run its own copy of the algorithm. Broadly speaking, Monte Carlo algorithms rely on random sampling to generate a result. We'll be using a Monte Carlo method to approximate the value of $\pi$.

### About the Algorithm

Imagine we have a square with sides of length $2r$. Inside this square we can draw a circle with radius $r$. Since the area of the square is $(2r)^2$ and the area of the circle is $\pi r^2$, the ratio of the two gives us $\pi$:

$$
\frac{Area\ of\ Circle}{Area\ of\ Square} = \frac{\pi r^2}{4r^2} = \frac{\pi}{4}
$$

We can estimate the area of these two shapes by randomly selecting points, say within $-1 \le x \le 1$ and $-1 \le y \le 1$. By construction, all of the points will fall inside a square with $r=1$, so the area of the square is approximately the total number of points we have sampled. Then, we can count the number of points that fall inside the circle $x^2 + y^2 \le 1$. This number is approximately the area of the circle. Therefore,

$$
\frac{Number\ of\ Points\ in\ Circle}{Total\ Number\ of\ Points} \approx \frac{\pi}{4}
$$

An example of this procedure is shown below [[source](https://miro.medium.com/v2/resize:fit:960/1*N8AWxYj3s2WrNrddinkcvQ.png)]. Points inside the circle are colored red, while points outside the circle (but still inside the square) are dark blue.

<div>
<img src="monte_carlo_pi.png" alt="Demonstration of the Monte Carlo method for estimating pi" width="70%">
</div>

To achieve a more accurate estimate of $\pi$, we need to increase the number of points we sample. One tweak we can make to improve the efficiency of our sampling is to **only sample the upper right quadrant** ($0 \le x \le 1$ and $0 \le y \le 1$). The math to approximate $\pi$ remains the same, but our random points now more efficiently cover the area of the quadrant.

It is this variation that is implemented below. The following exercises will walk you through the process of adapting this code for use with `Pool.map()`. Each process will run its own Monte Carlo estimate, and the results can be combined as

$$
\pi = 4 \times \frac{\sum_{N_{procs}} Number\ of\ Points\ in\ Circle}{\sum_{N_{procs}} Total\ Number\ of\ Points}
$$

### Seeding our Random Number Generator

A computer can never generate truly random numbers. Instead, special algorithms are used to generate a series of 
_pseudo_ random numbers. These numbers are usually sufficiently "random enough" for most purposes.
The algorithms take a single input value known as the "seed"; if the seed is known,
the series of numbers generated is entirely deterministic. When testing algorithms that require (pseudo) random numbers,
it can be helpful to fix the seed to a particular value. In the code below, we'll use the process ID of the
process running this Jupyter Notebook as our seed. That way, your seed will be different from your neighbors but
will stay consistent as long as you don't close the Jupyter server.

In [3]:
import numpy as np
import os

In [4]:
%%timeit
n_samples = int(1e8) # "e" notation defaults to float
seed = os.getpid() # another way of getting process ID

# Newer versions of NumPy recommend using a Generator object
# See https://numpy.org/doc/stable/reference/random/index.html#random-quick-start
rng = np.random.default_rng(seed) # use PID as seed
x = rng.uniform(0, 1, n_samples)
y = rng.uniform(0, 1, n_samples)
inside_circle = (x**2 + y**2) <= 1 # return array of True and False
n_inside_circle = np.sum(inside_circle) # count all the Trues (aka 1s)

pi_estimate = 4 * n_inside_circle/n_samples
# print(f"With {n_samples} samples, pi is {pi_estimate}")

3.23 s ± 621 μs per loop (mean ± std. dev. of 7 runs, 1 loop each)


---
### Exercises

1. First, time how long the above code takes to run. You may want to remove the `print` statement.

Code takes approximately 3.3 seconds to run


2. Convert the above algorithm into a function called `sample_quarter_circle`. It should accept a number of samples as input, and return the number of samples inside the (quarter) circle. Test that your function can approximate pi.

Note: Why don't I have you return the estimate of pi? Because it is easier take the ratio *after* combining the counts from each Monte Carlo run.

In [5]:
def sample_quarter_circle(n_samples: int):
    seed = os.getpid()
    
    rng = np.random.default_rng(seed) # use PID as seed
    x = rng.uniform(0, 1, n_samples)
    y = rng.uniform(0, 1, n_samples)
    inside_circle = (x**2 + y**2) <= 1 # return array of True and False
    n_inside_circle = np.sum(inside_circle)
    
    return n_inside_circle

3. When parallelizing code, its best to take a moment to consider your approach. Use this question as an opportunity to consider how you want your parallel code to be designed.

    Each process will run its own instance of `sample_quarter_circle`. We therefore need to specify the number of samples each process should draw. The number of samples used is important to keep track of because it is directly tied to the accuracy of our estimate. There are two ways we might go about this:

    Option 1. We could specify the total number of samples that should be drawn across *all* processes. Then we can divide by the number of processes to find the number of samples per process.
    
    Option 2. We could specify the number of samples per process. Then, the *total* number of samples changes as the number of processes change.

    Each approach has a drawback. Identify these drawbacks. You are free to adopt whichever approach you decide moving forward.

Option 1 would fix the number of total samples, theoretically also fixing our accuracy, but this would result in speedup proportional to the number of cores we choose to use, as it is not a fixed _per process_ number. Option 2 would fix the _per process_ number of samples, causing the runtime to remain constant, but we achieve better accuracy as the number of cores increases. I like the idea of improved speed, so I will choose option 1.

4. `Pool.map()` requires two arguments: `func` and `iterable`. Your function `sample_quarter_circle` fulfills the `func` requirement. Each process will then be given an item from `iterable` and apply `func` to that item. For simplicity, `iterable` can be a Python list. This list should then fulfill the following requirements:

    a. Each element in the list must be a reasonable argument for `sample_quarter_circle`
    
    b. We only need to run the Monte Carlo once per process, so the length of the list should match the number of processes. 

    Considering what was discussed in problem 3 above, write a code snippet that constructs a list matching the above requirements.

In [10]:
num_samples_total = 1e8
num_processes = 4
sample_iterable = [int(num_samples_total / num_processes)] * num_processes

5. The final piece to consider is: how to handle the data returned by `Pool.map()`? Based on your investigation in the first part of this workbook and the structure of `sample_quarter_circle`, write a plan for finding pi from the data returned by `Pool.map()`.

Since the function will be performed on each value in the list, representing a value of `num_samples`, we will be returned 4 estimates of pi, so we can then average these estimates to obtain a better estimate.

6. You should have all the pieces you need to write a parallelized Monte Carlo estimator for pi. Assemble all the pieces and run your code. Don't forget to properly account for the total number of samples across all processes based on the method you picked from question 3.

In [11]:
num_samples_total = 1e8
num_processes = 4
sample_iterable = [int(num_samples_total / num_processes)] * num_processes

with multiprocessing.Pool(processes = num_processes) as p:
    n_in_circle_list = p.map(func = sample_quarter_circle, iterable = sample_iterable)

print(f"Pi estimate: {4 * np.sum(np.array(n_in_circle_list)) / num_samples_total}")

Pi estimate: 3.141648


7. How long does your parallel version take to run? What factor of speedup is this?

In [12]:
%%timeit
num_samples_total = 1e8
num_processes = 4
sample_iterable = [int(num_samples_total / num_processes)] * num_processes

with multiprocessing.Pool(processes = num_processes) as p:
    n_in_circle_list = p.map(func = sample_quarter_circle, iterable = sample_iterable)

873 ms ± 3.32 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


The code takes 873 ms to run. This is a 3.8x factor of speedup, almost the 4x theoretically predicted by the increased number of processors (probably less than 4x due to the additional work required to start and end processes, along with data transfer).

---

## Implementing Pool on Your Own

The previous exercises walked you through the necessary considerations for implementing `Pool.map()`. Now, you'll apply it to the code below, which checks for prime numbers in a given range. Remember to think about these three items:

1. The set of objects we wish to distribute
2. Encapsulating our desired operation into a function
3. How to combine the results from each process into a final answer.

In [13]:
import itertools
import math

def check_prime(n):
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

number_range = list(range(100000000, 101000000))

are_primes = [check_prime(n) for n in number_range]

# itertools is a built-in Python module full of helpful functions
# "compress" takes two iterables as an argument. The second iterable
# must contain booleans. Items in the first iterable are returned
# or discarded based on the booleans in the second.
primes = list(itertools.compress(number_range, are_primes))

print(len(primes))
print(primes[:5])
print(primes[-5:])

54208
[100000007, 100000037, 100000039, 100000049, 100000073]
[100999939, 100999949, 100999979, 100999981, 100999993]


---
### Exercises

1. Based on the code above, rewrite the prime number search using `Pool.map()` and 4 processes. Remember to check your results!

In [14]:
number_range = list(range(100000000, 101000000))

with multiprocessing.Pool(processes = num_processes) as p:
    are_primes = p.map(func = check_prime, iterable = number_range)

primes = list(itertools.compress(number_range, are_primes))

print(len(primes))
print(primes[:5])
print(primes[-5:])

54208
[100000007, 100000037, 100000039, 100000049, 100000073]
[100999939, 100999949, 100999979, 100999981, 100999993]


2. Time both the original prime number search and your parallel version. What factor of speedup did you achieve?

In [15]:
%%timeit
number_range = list(range(100000000, 101000000))
are_primes = [check_prime(n) for n in number_range]
primes = list(itertools.compress(number_range, are_primes))

24.5 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [16]:
%%timeit
number_range = list(range(100000000, 101000000))
with multiprocessing.Pool(processes = num_processes) as p:
    are_primes = p.map(func = check_prime, iterable = number_range)

primes = list(itertools.compress(number_range, are_primes))

6.06 s ± 30.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


The original method took an average of 24.5 seconds, and the new method took an average of 6.06 seconds; this is a 4.04x factor of speedup

3. Adjust the number of processes used by `Pool` and time the changes. Try both decreasing the number of processes and increasing them. How does the time change, and why? Remember that we only requested 4 CPU cores.

In [17]:
%%timeit
number_range = list(range(100000000, 101000000))
with multiprocessing.Pool(processes = 2) as p:
    are_primes = p.map(func = check_prime, iterable = number_range)

primes = list(itertools.compress(number_range, are_primes))

11.9 s ± 110 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [18]:
%%timeit
number_range = list(range(100000000, 101000000))
with multiprocessing.Pool(processes = 6) as p:
    are_primes = p.map(func = check_prime, iterable = number_range)

primes = list(itertools.compress(number_range, are_primes))

6.28 s ± 139 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


When decreasing the number below 4 (in this case, 2), we create 2 parallel process, and thus do not take full advantage of the number of cores requested; this leads us to only about a 2x speedup. When the number of processes is ABOVE the number of cores, we cannot actually execute the requested processes in parallel, since each core can only one run process at once, leading us to actually only lose time by creating more processes, and gaining the speedup of only 4x parallelization.

---
## Using `Pool` in Batch Jobs

So far, we've been running parallel code interactively, here in this Jupyter Notebook. While great for development, running code interactively is often impractical for large research workloads. In this section, you'll adapt one of the `Pool` examples you've used previously to run via a batch job on the HPCC. 

The HPCC uses software called SLURM to schedule and run jobs based on the resources requested by each job and the available resources on the HPCC. Jobs are submitted using **batch files** which describe both the desired resources and the actual code to be run.

For the following exercises, you'll need to create some additional files. Make sure these files are in the same directory as this notebook so they can be added to the git repository and pushed to GitHub for submission.


---
### Exercises


1. Create an empty file called `pool_example.sb` and open it for editing. We'll use this file to describe our batch job, starting with the resources.

    The very first line of our batch file **must** be `#!/bin/bash`. This allows the lines to be interpreted correctly.

    Then, at minimum, we need to specify the amount of time we expect our job to run, the amount of memory, and the number of "tasks" (how SLURM refers to processes). Let's ask for 1 hour, 4 GB of memory, and 4 tasks. Our time and memory requests will likely be more than we really need, but they are small enough that our job won't sit in the queue for too long. The other alternative is asking for too *few* resources and having our jobs die prematurely.

    Resource requests are made using lines that start with `#SBATCH` like as follows: 

    ```
    #SBATCH --time=01:00:00
    #SBATCH --mem=4GB
    #SBATCH --ntasks=4
    ```

    Copy and paste this to the top of `pool_example.sb`

2. Next, we need to tell SLURM which commands to run. This includes activating our Python environment, changing to the desired directory, and then running our Python script. 

    If you installed Anaconda following the directions in the [ICER documentation](), your Python environment is loaded using the following:

    ```
    module swap Python Conda/3
    conda activate hpc-python
    ```

    You'll then want to include a line that uses the `cd` command to change to this directory. SLURM jobs start in your home directory by default.

    Finally, let's add the necessary command for running a Python script called `pool_example.py`, which we'll create in the next step: 
    
    `python pool_example.py`
    
Note: if you are used to running programs with `srun`, do **not** use that command here. In brief, `srun` is intended for programs that use a different approach to parallelism.


3. Copy the **very first `Pool` example** and paste it into a file called `pool_example.py` file. 

    Currently, we have hardcoded the desired number of processes. Keeping our Python code consistent with the resources we request in our batch file can be troublesome if we keep things as they are. Fortunately, SLURM stores many of our requests as [environment variables](https://docs.icer.msu.edu/Slurm_Environment_Variables/) that can then be accessed through Python.

    Add the following to your `pool_example.py`:

    ```python
    import os
    slurm_procs = int(os.environ["SLURM_NTASKS"]) # env variables are strings
    ```

    Then, adjust `Pool` to use the value of `slurm_procs`.

4. With your completed Python and batch scripts, run `sbatch pool_example.sb` on the command line to submit your job to SLURM's queue. You'll see a job ID displayed upon submission. Any values printed out by your program will be saved to a file who's name follows the pattern `slurm-<jobID>.out`. Check that Python ran in parallel correctly by looking at this file.

Note: You can check the status of your job by running `squeue -u <username>`. If your job has already completed, it won't show up in the list.

5. Make sure to add your batch script (`.sb` file), Python script (`.py` file), and SLURM log (`.out` file) to this git repository with `git add`. You'll need to include these files when you hand in this week's assignment!

---
## For Next Class

In next week's workbook, we'll be using the parallelized version of the prime number search that you wrote in the section Implementing Pool on Your Own.